Consuming events from Kafka with Logstash provides a durable ingestion layer for logs, metrics, and application events, smoothing traffic spikes and decoupling producers from downstream outputs.

The kafka input plugin uses the Kafka consumer protocol to join a consumer group, receive partition assignments, and commit offsets back to Kafka so restarts resume from the last committed position rather than re-reading entire topics.

Broker reachability, topic ACLs, and matching security settings (SSL or SASL) must be in place before enabling ingestion, and consumer options such as group_id and auto_offset_reset influence replay behavior and potential duplicates during backfills or restarts.

Steps to configure a Logstash Kafka input:

  1. Install the Kafka input plugin.
    $ sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-kafka
    Using bundled JDK: /usr/share/logstash/jdk
    Validating logstash-input-kafka
    ERROR: Installation aborted, plugin 'logstash-input-kafka' is already provided by 'logstash-integration-kafka'

    Logstash 8.19 bundles the Kafka input/output plugins via logstash-integration-kafka, so installation is skipped when already present.

  2. Create a pipeline configuration file at /etc/logstash/conf.d/30-kafka-input.conf.
    input {
      kafka {
        bootstrap_servers => "kafka-1.example.net:9092,kafka-2.example.net:9092"
        topics => ["logs"]
        group_id => "logstash-consumer"
        client_id => "logstash-01"
        decorate_events => true
    
        # auto_offset_reset => "earliest"
    
        # security_protocol => "SASL_SSL"
        # sasl_mechanism => "SCRAM-SHA-512"
        # sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='logstash' password='${KAFKA_PASSWORD}';"
        # ssl_truststore_location => "/etc/logstash/kafka.client.truststore.jks"
        # ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
      }
    }
    
    output {
      elasticsearch {
        hosts => ["http://elasticsearch.example.net:9200"]
        index => "kafka-events-%{+YYYY.MM.dd}"
      }
    }

    group_id controls offset tracking and parallelism, and decorate_events adds Kafka metadata such as topic, partition, and offset for troubleshooting.

    Avoid storing Kafka credentials in plaintext configuration, and prefer environment variables or a secure secret store supported by the deployment.

    Setting auto_offset_reset to earliest for a new group_id can backfill the entire topic and significantly increase ingestion rate.

  3. Test the pipeline configuration.
    $ sudo /usr/share/logstash/bin/logstash --path.settings /etc/logstash --config.test_and_exit
    Using bundled JDK: /usr/share/logstash/jdk
    Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties
    [2026-01-08T08:41:08,315][WARN ][logstash.runner          ] NOTICE: Running Logstash as a superuser is strongly discouraged as it poses a security risk. Set 'allow_superuser' to false for better security.
    ##### snipped #####
    Configuration OK
    [2026-01-08T08:41:08,693][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

    The syntax test validates Logstash configuration structure, and Kafka connectivity is validated when the pipeline starts.

  4. Restart the Logstash service.
    $ sudo systemctl restart logstash

    A restart briefly stops active pipelines and should be scheduled appropriately for high-throughput environments.

  5. Check the Logstash logs for Kafka authentication or consumer assignment errors.
    $ sudo journalctl -u logstash -n 50 --no-pager
    Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,518][INFO ][org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Subscribed to topic(s): logs
    Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,690][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Discovered group coordinator kafka-1.example.net:9092 (id: 2147483646 rack: null)
    Jan 08 08:41:25 host logstash[7703]: [2026-01-08T08:41:25,703][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][f47b62ebc8d2ec043cace78c99ca7a7e92937f0f9e73979e24591a178e23c88b] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Successfully joined group with generation Generation{generationId=3, memberId='logstash-01-0-a57fe128-746e-4c66-af0c-464adef22d94', protocol='range'}
    ##### snipped #####

    If the system journal is unavailable, check /var/log/logstash/logstash-plain.log for startup and plugin errors.

  6. Check pipeline stats for event flow.
    $ curl -s http://127.0.0.1:9600/_node/stats/pipelines?pretty | grep -n '"events"' -A 8 | head -n 20
    17:      "events" : {
    18-        "duration_in_millis" : 139,
    19-        "in" : 3,
    20-        "filtered" : 3,
    21-        "out" : 3,
    22-        "queue_push_duration_in_millis" : 0
    23-      },
    24-      "flow" : {
    25-        "filter_throughput" : {
    --
    60:          "events" : {
    61-            "out" : 3,
    62-            "queue_push_duration_in_millis" : 0
    63-          }
    64-        } ],
    65-        "codecs" : [ {
    66-          "id" : "plain_a7dd84fd-73d6-4936-99e1-c478b7ea7b93",
    67-          "encode" : {
    68-            "duration_in_millis" : 0,
    ##### snipped #####

    Increasing in and out counters indicate successful consumption and output, and the monitoring API port may differ if http.port is changed from its default range in /etc/logstash/logstash.yml.