Consuming events from Kafka with Logstash provides a durable ingestion layer for logs, metrics, and application events, smoothing traffic spikes and decoupling producers from downstream outputs.
The kafka input plugin uses the Kafka consumer protocol to join a consumer group, receive partition assignments, and commit offsets back to Kafka so restarts resume from the last committed position rather than re-reading entire topics.
Broker reachability, topic ACLs, and matching security settings (SSL or SASL) must be in place before enabling ingestion, and consumer options such as group_id and auto_offset_reset influence replay behavior and potential duplicates during backfills or restarts.
$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-kafka Using bundled JDK: /usr/share/logstash/jdk Validating logstash-input-kafka ERROR: Installation aborted, plugin 'logstash-input-kafka' is already provided by 'logstash-integration-kafka'
Logstash 8.19 bundles the Kafka input/output plugins via logstash-integration-kafka, so installation is skipped when already present.
input {
kafka {
bootstrap_servers => "kafka-1.example.net:9092,kafka-2.example.net:9092"
topics => ["logs"]
group_id => "logstash-consumer"
client_id => "logstash-01"
decorate_events => true
# auto_offset_reset => "earliest"
# security_protocol => "SASL_SSL"
# sasl_mechanism => "SCRAM-SHA-512"
# sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='logstash' password='${KAFKA_PASSWORD}';"
# ssl_truststore_location => "/etc/logstash/kafka.client.truststore.jks"
# ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch.example.net:9200"]
index => "kafka-events-%{+YYYY.MM.dd}"
}
}
group_id controls offset tracking and parallelism, and decorate_events adds Kafka metadata such as topic, partition, and offset for troubleshooting.
Avoid storing Kafka credentials in plaintext configuration, and prefer environment variables or a secure secret store supported by the deployment.
Setting auto_offset_reset to earliest for a new group_id can backfill the entire topic and significantly increase ingestion rate.
$ sudo /usr/share/logstash/bin/logstash --path.settings /etc/logstash --config.test_and_exit Using bundled JDK: /usr/share/logstash/jdk Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties [2026-01-08T08:41:08,315][WARN ][logstash.runner ] NOTICE: Running Logstash as a superuser is strongly discouraged as it poses a security risk. Set 'allow_superuser' to false for better security. ##### snipped ##### Configuration OK [2026-01-08T08:41:08,693][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
The syntax test validates Logstash configuration structure, and Kafka connectivity is validated when the pipeline starts.
$ sudo systemctl restart logstash
A restart briefly stops active pipelines and should be scheduled appropriately for high-throughput environments.
$ sudo journalctl -u logstash -n 50 --no-pager
Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,518][INFO ][org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Subscribed to topic(s): logs
Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,690][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Discovered group coordinator kafka-1.example.net:9092 (id: 2147483646 rack: null)
Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,703][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Successfully joined group with generation Generation{generationId=3, memberId='logstash-01-0-a57fe128-746e-4c66-af0c-464adef22d94', protocol='range'}
##### snipped #####
If the system journal is unavailable, check /var/log/logstash/logstash-plain.log for startup and plugin errors.
$ curl -s http://127.0.0.1:9600/_node/stats/pipelines?pretty | grep -n '"events"' -A 8 | head -n 20
17: "events" : {
18- "duration_in_millis" : 139,
19- "in" : 3,
20- "filtered" : 3,
21- "out" : 3,
22- "queue_push_duration_in_millis" : 0
23- },
24- "flow" : {
25- "filter_throughput" : {
--
60: "events" : {
61- "out" : 3,
62- "queue_push_duration_in_millis" : 0
63- }
64- } ],
65- "codecs" : [ {
66- "id" : "plain_a7dd84fd-73d6-4936-99e1-c478b7ea7b93",
67- "encode" : {
68- "duration_in_millis" : 0,
##### snipped #####
Increasing in and out counters indicate successful consumption and output, and the monitoring API port may differ if http.port is changed from its default range in /etc/logstash/logstash.yml.