Consuming Kafka topics with Logstash adds a durable buffer between event producers and downstream parsing or indexing, which makes spikes, restarts, and short network interruptions easier to absorb without dropping data immediately at the edge.

Current Logstash releases ship the kafka input as part of the bundled Kafka integration. The input joins a Kafka consumer group, stores offsets in Kafka, and can attach topic, partition, offset, and timestamp details under [@metadata][kafka] when decorate_events is set to basic or extended.

Set explicit bootstrap_servers, topics, group_id, and client_id values instead of relying on defaults such as localhost:9092, ["logstash"], and logstash. For a new group_id, auto_offset_reset ⇒ “earliest” replays existing records from the start of the topic, and any broker that requires encryption or authentication also needs matching security_protocol, sasl_*, and ssl_* settings with secrets kept outside plain-text pipeline files.

Steps to configure a Kafka input in Logstash:

  1. Create a dedicated pipeline file such as /etc/logstash/conf.d/30-kafka-input.conf.
    input {
      kafka {
        id => "kafka_logs"
        bootstrap_servers => "kafka-1.example.net:9092,kafka-2.example.net:9092"
        topics => ["logs"]
        group_id => "logstash-consumer"
        client_id => "logstash-01"
        auto_offset_reset => "earliest"
        decorate_events => "basic"
    
        # security_protocol => "SASL_SSL"
        # sasl_mechanism => "SCRAM-SHA-512"
        # sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='logstash' password='${KAFKA_PASSWORD}';"
        # ssl_truststore_location => "/etc/logstash/kafka.client.truststore.p12"
        # ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
        # ssl_truststore_type => "PKCS12"
      }
    }
    
    output {
      stdout {
        codec => rubydebug { metadata => true }
      }
    }

    Current Logstash releases already bundle the Kafka input inside logstash-integration-kafka, so a separate logstash-plugin install logstash-input-kafka step is usually unnecessary. The temporary stdout output and metadata ⇒ true setting make validation easier, and the Kafka fields remain under [@metadata][kafka] unless a filter copies them into the event.

    For a new group_id, auto_offset_reset ⇒ “earliest” can backfill the entire topic. Use latest instead when only new records should be consumed.

  2. Test the pipeline configuration before restarting the service.
    $ sudo /usr/share/logstash/bin/logstash --path.settings /etc/logstash --config.test_and_exit
    Using bundled JDK: /usr/share/logstash/jdk
    Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties
    [2026-04-08T00:11:42,435][INFO ][logstash.runner          ] Log4j configuration path used is: /etc/logstash/log4j2.properties
    [2026-04-08T00:11:42,455][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"9.3.2", "jruby.version"=>"jruby 9.4.13.0 (3.1.4) 2025-06-10 9938a3461f OpenJDK 64-Bit Server VM 21.0.10+7-LTS on 21.0.10+7-LTS +indy +jit [aarch64-linux]"}
    ##### snipped #####
    [2026-04-08T00:11:44,198][INFO ][logstash.javapipeline    ][main] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
    Configuration OK
    [2026-04-08T00:11:44,199][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

    The syntax test validates the pipeline and plugin settings, but Kafka reachability and broker authentication are only proven after the service starts.

  3. Restart the Logstash service so it loads the new Kafka input.
    $ sudo systemctl restart logstash

    A restart briefly stops active pipelines, so upstream producers may continue filling Kafka while Logstash reconnects and catches up.

  4. Review recent Logstash journal lines and confirm the consumer joins the group and receives records from the topic.
    $ sudo journalctl --unit=logstash --since "5 min ago" --no-pager --lines=40
    Apr 08 00:11:46 logstash-01 logstash[24831]: [2026-04-08T00:11:46,204][INFO ][org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer][main][kafka_logs] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Subscribed to topic(s): logs
    Apr 08 00:11:49 logstash-01 logstash[24831]: [2026-04-08T00:11:49,626][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][kafka_logs] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Discovered group coordinator kafka-1.example.net:9092 (id: 2147483647 rack: null)
    Apr 08 00:11:49 logstash-01 logstash[24831]: [2026-04-08T00:11:49,698][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][kafka_logs] [Consumer clientId=logstash-01-0, groupId=logstash-consumer] Successfully joined group with generation Generation{generationId=1, memberId='logstash-01-0-5591f50f-c2c1-4db5-aca7-99409f05acb2', protocol='range'}
    Apr 08 00:14:06 logstash-01 logstash[24831]:        "message" => "kafka input live check"

    If the service writes to files instead of the system journal, check /var/log/logstash/logstash-plain.log for the same consumer-group and event output lines.

  5. Query the Logstash monitoring API and confirm the kafka_logs input shows rising event counters.
    $ curl -s http://localhost:9600/_node/stats/pipelines/main?pretty=true
    {
      "pipelines" : {
        "main" : {
          "events" : {
            "filtered" : 1,
            "out" : 1,
            "in" : 1,
            "queue_push_duration_in_millis" : 0
          },
          "plugins" : {
            "inputs" : [ {
              "id" : "kafka_logs",
              "name" : "kafka",
              "events" : {
                "out" : 1,
                "queue_push_duration_in_millis" : 0
              }
            } ]
          }
        }
      }
    }

    Increasing in and out counters confirm that the input is consuming records and the pipeline is forwarding them to the configured output. Current Logstash builds expose the API by default; if /etc/logstash/logstash.yml changes api.http.host, api.http.port, or API authentication settings, query that URL instead.