Consuming events from Kafka with Logstash provides a durable ingestion layer for logs, metrics, and application events, smoothing traffic spikes and decoupling producers from downstream outputs.
The kafka input plugin uses the Kafka consumer protocol to join a consumer group, receive partition assignments, and commit offsets back to Kafka so restarts resume from the last committed position rather than re-reading entire topics.
Broker reachability, topic ACLs, and matching security settings (SSL or SASL) must be in place before enabling ingestion, and consumer options such as group_id and auto_offset_reset influence replay behavior and potential duplicates during backfills or restarts.
Steps to configure a Logstash Kafka input:
- Install the Kafka input plugin.
$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-kafka Using bundled JDK: /usr/share/logstash/jdk Validating logstash-input-kafka ERROR: Installation aborted, plugin 'logstash-input-kafka' is already provided by 'logstash-integration-kafka'
Logstash 8.19 bundles the Kafka input/output plugins via logstash-integration-kafka, so installation is skipped when already present.
- Create a pipeline configuration file at /etc/logstash/conf.d/30-kafka-input.conf.
input { kafka { bootstrap_servers => "kafka-1.example.net:9092,kafka-2.example.net:9092" topics => ["logs"] group_id => "logstash-consumer" client_id => "logstash-01" decorate_events => true # auto_offset_reset => "earliest" # security_protocol => "SASL_SSL" # sasl_mechanism => "SCRAM-SHA-512" # sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='logstash' password='${KAFKA_PASSWORD}';" # ssl_truststore_location => "/etc/logstash/kafka.client.truststore.jks" # ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}" } } output { elasticsearch { hosts => ["http://elasticsearch.example.net:9200"] index => "kafka-events-%{+YYYY.MM.dd}" } }group_id controls offset tracking and parallelism, and decorate_events adds Kafka metadata such as topic, partition, and offset for troubleshooting.
Avoid storing Kafka credentials in plaintext configuration, and prefer environment variables or a secure secret store supported by the deployment.
Setting auto_offset_reset to earliest for a new group_id can backfill the entire topic and significantly increase ingestion rate.
- Test the pipeline configuration.
$ sudo /usr/share/logstash/bin/logstash --path.settings /etc/logstash --config.test_and_exit Using bundled JDK: /usr/share/logstash/jdk Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties [2026-01-08T08:41:08,315][WARN ][logstash.runner ] NOTICE: Running Logstash as a superuser is strongly discouraged as it poses a security risk. Set 'allow_superuser' to false for better security. ##### snipped ##### Configuration OK [2026-01-08T08:41:08,693][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
The syntax test validates Logstash configuration structure, and Kafka connectivity is validated when the pipeline starts.
- Restart the Logstash service.
$ sudo systemctl restart logstash
A restart briefly stops active pipelines and should be scheduled appropriately for high-throughput environments.
- Check the Logstash logs for Kafka authentication or consumer assignment errors.
$ sudo journalctl -u logstash -n 50 --no-pager Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,518][INFO ][org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Subscribed to topic(s): logs Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,690][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Discovered group coordinator kafka-1.example.net:9092 (id: 2147483646 rack: null) Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,703][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Successfully joined group with generation Generation{generationId=3, memberId='logstash-01-0-a57fe128-746e-4c66-af0c-464adef22d94', protocol='range'} ##### snipped #####If the system journal is unavailable, check /var/log/logstash/logstash-plain.log for startup and plugin errors.
- Check pipeline stats for event flow.
$ curl -s http://127.0.0.1:9600/_node/stats/pipelines?pretty | grep -n '"events"' -A 8 | head -n 20 17: "events" : { 18- "duration_in_millis" : 139, 19- "in" : 3, 20- "filtered" : 3, 21- "out" : 3, 22- "queue_push_duration_in_millis" : 0 23- }, 24- "flow" : { 25- "filter_throughput" : { -- 60: "events" : { 61- "out" : 3, 62- "queue_push_duration_in_millis" : 0 63- } 64- } ], 65- "codecs" : [ { 66- "id" : "plain_a7dd84fd-73d6-4936-99e1-c478b7ea7b93", 67- "encode" : { 68- "duration_in_millis" : 0, ##### snipped #####Increasing in and out counters indicate successful consumption and output, and the monitoring API port may differ if http.port is changed from its default range in /etc/logstash/logstash.yml.
Mohd Shakir Zakaria is a cloud architect with deep roots in software development and open-source advocacy. Certified in AWS, Red Hat, VMware, ITIL, and Linux, he specializes in designing and managing robust cloud and on-premises infrastructures.
