Consuming Kafka topics with Logstash adds a durable buffer between event producers and downstream parsing or indexing, which makes spikes, restarts, and short network interruptions easier to absorb without dropping data immediately at the edge.
Current Logstash releases ship the kafka input as part of the bundled Kafka integration. The input joins a Kafka consumer group, stores offsets in Kafka, and can attach topic, partition, offset, and timestamp details under [@metadata][kafka] when decorate_events is set to basic or extended.
Set explicit bootstrap_servers, topics, group_id, and client_id values instead of relying on defaults such as localhost:9092, ["logstash"], and logstash. For a new group_id, auto_offset_reset ⇒ “earliest” replays existing records from the start of the topic, and any broker that requires encryption or authentication also needs matching security_protocol, sasl_*, and ssl_* settings with secrets kept outside plain-text pipeline files.
input {
kafka {
id => "kafka_logs"
bootstrap_servers => "kafka-1.example.net:9092,kafka-2.example.net:9092"
topics => ["logs"]
group_id => "logstash-consumer"
client_id => "logstash-01"
auto_offset_reset => "earliest"
decorate_events => "basic"
# security_protocol => "SASL_SSL"
# sasl_mechanism => "SCRAM-SHA-512"
# sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='logstash' password='${KAFKA_PASSWORD}';"
# ssl_truststore_location => "/etc/logstash/kafka.client.truststore.p12"
# ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
# ssl_truststore_type => "PKCS12"
}
}
output {
stdout {
codec => rubydebug { metadata => true }
}
}
Current Logstash releases already bundle the Kafka input inside logstash-integration-kafka, so a separate logstash-plugin install logstash-input-kafka step is usually unnecessary. The temporary stdout output and metadata ⇒ true setting make validation easier, and the Kafka fields remain under [@metadata][kafka] unless a filter copies them into the event.
For a new group_id, auto_offset_reset ⇒ “earliest” can backfill the entire topic. Use latest instead when only new records should be consumed.
$ sudo /usr/share/logstash/bin/logstash --path.settings /etc/logstash --config.test_and_exit
Using bundled JDK: /usr/share/logstash/jdk
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties
[2026-04-08T00:11:42,435][INFO ][logstash.runner ] Log4j configuration path used is: /etc/logstash/log4j2.properties
[2026-04-08T00:11:42,455][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"9.3.2", "jruby.version"=>"jruby 9.4.13.0 (3.1.4) 2025-06-10 9938a3461f OpenJDK 64-Bit Server VM 21.0.10+7-LTS on 21.0.10+7-LTS +indy +jit [aarch64-linux]"}
##### snipped #####
[2026-04-08T00:11:44,198][INFO ][logstash.javapipeline ][main] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
Configuration OK
[2026-04-08T00:11:44,199][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
The syntax test validates the pipeline and plugin settings, but Kafka reachability and broker authentication are only proven after the service starts.
$ sudo systemctl restart logstash
A restart briefly stops active pipelines, so upstream producers may continue filling Kafka while Logstash reconnects and catches up.
$ sudo journalctl --unit=logstash --since "5 min ago" --no-pager --lines=40
Apr 08 00:11:46 logstash-01 logstash[24831]: [2026-04-08T00:11:46,204][INFO ][org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer][main][kafka_logs] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Subscribed to topic(s): logs
Apr 08 00:11:49 logstash-01 logstash[24831]: [2026-04-08T00:11:49,626][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][kafka_logs] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Discovered group coordinator kafka-1.example.net:9092 (id: 2147483647 rack: null)
Apr 08 00:11:49 logstash-01 logstash[24831]: [2026-04-08T00:11:49,698][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][kafka_logs] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Successfully joined group with generation Generation{generationId=1, memberId='logstash-01-0-5591f50f-c2c1-4db5-aca7-99409f05acb2', protocol='range'}
Apr 08 00:14:06 logstash-01 logstash[24831]: "message" => "kafka input live check"
If the service writes to files instead of the system journal, check /var/log/logstash/logstash-plain.log for the same consumer-group and event output lines.
$ curl -s http://localhost:9600/_node/stats/pipelines/main?pretty=true
{
"pipelines" : {
"main" : {
"events" : {
"filtered" : 1,
"out" : 1,
"in" : 1,
"queue_push_duration_in_millis" : 0
},
"plugins" : {
"inputs" : [ {
"id" : "kafka_logs",
"name" : "kafka",
"events" : {
"out" : 1,
"queue_push_duration_in_millis" : 0
}
} ]
}
}
}
}
Increasing in and out counters confirm that the input is consuming records and the pipeline is forwarding them to the configured output. Current Logstash builds expose the API by default; if /etc/logstash/logstash.yml changes api.http.host, api.http.port, or API authentication settings, query that URL instead.