Publishing Logstash events to Kafka adds a durable handoff point between pipeline processing and downstream consumers, which makes it easier to absorb indexing delays, fan out one event stream to several systems, and keep ingestion decoupled from the final destination.

The kafka output plugin acts as a Kafka producer. It uses bootstrap_servers to fetch broker metadata, writes each event to the topic named by topic_id, and then opens produce connections to the broker addresses that Kafka advertises back to the client. The default codec is plain, so adding codec ⇒ json is the usual way to send the full event structure rather than only the message text plus a few default fields.

Current Logstash packages already bundle Kafka input and output support through logstash-integration-kafka, so the first task is usually to confirm plugin availability instead of installing a separate gem. Package-based validation is safest under the logstash service account because Logstash 9.x blocks superuser runs by default, and Kafka 3.6+ brokers can reject delayed events when log.message.timestamp.type stays at CreateTime and the message timestamp falls outside the broker or topic acceptance window.

Steps to configure Logstash output to Kafka:

  1. Check whether the Kafka output is already available in the current Logstash installation.
    $ sudo /usr/share/logstash/bin/logstash-plugin list --verbose logstash-output-kafka
    Using bundled JDK: /usr/share/logstash/jdk
    logstash-integration-kafka (11.8.4)
     ├── logstash-input-kafka
     └── logstash-output-kafka

    Current releases usually report the bundled integration package instead of a standalone logstash-output-kafka gem. If the command returns no Kafka output entry, install it with sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-kafka and then re-run the list command.

  2. Create a dedicated example input path that the logstash service account can read.
    $ sudo install -d -o logstash -g logstash -m 0750 /var/lib/logstash/examples
    $ sudo install -o logstash -g logstash -m 0640 /dev/null /var/lib/logstash/examples/kafka-output.log

    The example uses a separate file input so the Kafka output can be tested without changing an existing production input.

  3. Create a pipeline configuration file at /etc/logstash/conf.d/60-kafka-output.conf.
    input {
      file {
        path => ["/var/lib/logstash/examples/kafka-output.log"]
        start_position => "end"
        sincedb_path => "/var/lib/logstash/kafka-output-demo.sincedb"
        tags => ["kafka_output_demo"]
      }
    }
    
    output {
      if "kafka_output_demo" in [tags] {
        kafka {
          id => "kafka_demo_output"
          bootstrap_servers => "kafka-1.example.net:9092,kafka-2.example.net:9092"
          topic_id => "logstash-events"
          codec => json
          acks => "1"
          client_id => "logstash-kafka-demo"
    
          # security_protocol => "SASL_SSL"
          # sasl_mechanism => "SCRAM-SHA-512"
          # sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='logstash' password='${KAFKA_PASSWORD}';"
          # ssl_truststore_location => "/etc/logstash/kafka.client.truststore.jks"
          # ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
        }
      }
    }

    The topic shown in topic_id must already exist unless the Kafka cluster explicitly allows topic auto-creation, and the Logstash client must be allowed to produce to that topic.

    bootstrap_servers is only the metadata bootstrap list. The brokers must advertise listener hostnames and ports that the Logstash host can actually reach, or produce requests will fail after the initial metadata lookup.

    The default Kafka output codec is plain. Setting codec ⇒ json sends the full event, and assigning an explicit id makes the output easy to identify in Logstash node statistics.

    Kafka 3.6+ can reject events when log.message.timestamp.type stays at CreateTime and the event timestamp is too old or too far in the future. If retention and timestamp validation should use broker arrival time instead, set the broker or topic to LogAppendTime.

  4. Test the pipeline configuration with the packaged settings directory and a temporary data path.
    $ sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash --path.data /tmp/logstash-kafka-output-configtest --config.test_and_exit -f /etc/logstash/conf.d/60-kafka-output.conf
    Using bundled JDK: /usr/share/logstash/jdk
    [2026-04-08T00:12:03,879][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"9.3.2", "jruby.version"=>"jruby 9.4.13.0 (3.1.4) 2025-06-10 9938a3461f OpenJDK 64-Bit Server VM 21.0.10+7-LTS on 21.0.10+7-LTS +indy +jit"}
    ##### snipped #####
    [2026-04-08T00:12:09,483][INFO ][logstash.javapipeline    ][main] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
    Configuration OK
    [2026-04-08T00:12:09,485][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

    This validation confirms the pipeline syntax and plugin settings only. It does not prove that broker DNS, topic ACLs, SASL credentials, TLS trust, or advertised listeners are correct.

    Current Logstash 9.x releases refuse superuser runs by default. Running the same command as root fails unless allow_superuser is explicitly enabled in /etc/logstash/logstash.yml.

  5. Restart the Logstash service so the updated pipeline is loaded.
    $ sudo systemctl restart logstash

    Restarting the service restarts every pipeline in the instance and can briefly pause ingestion while inputs and outputs reopen.

  6. Confirm the Logstash service returned to an active state.
    $ sudo systemctl status logstash --no-pager
    ● logstash.service - logstash
         Loaded: loaded (/usr/lib/systemd/system/logstash.service; enabled; preset: enabled)
         Active: active (running) since Tue 2026-04-08 00:13:09 UTC; 4s ago
       Main PID: 21405 (java)
          Tasks: 112 (limit: 28486)
         Memory: 657.3M
    ##### snipped #####

    An active service confirms that Logstash started and accepted the pipeline configuration. It does not by itself prove that Kafka is accepting produced events.

  7. Append a fresh test line to the example input file so the pipeline has a new event to publish.
    $ printf 'kafka-output-test %s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" | sudo tee -a /var/lib/logstash/examples/kafka-output.log
    kafka-output-test 2026-04-08T00:13:49Z

    With start_position set to end, append the test line after the pipeline is running so the file input treats it as new data instead of replaying old content.

  8. Check the Logstash node statistics for the named Kafka output.
    $ curl -s http://localhost:9600/_node/stats/pipelines/main?pretty | grep -nA4 '"id" : "kafka_demo_output"'
    92:          "id" : "kafka_demo_output",
    93-          "events" : {
    94-            "out" : 2,
    95-            "duration_in_millis" : 620,
    96-            "in" : 2

    Any non-zero or rising events.out value for the named kafka output shows that Logstash is handing events to the Kafka producer successfully.

    The monitoring API commonly listens on http://localhost:9600/ when the default API settings are in use.

  9. If Kafka CLI access is available, read one message from the topic to confirm the serialized payload.
    $ kafka-console-consumer.sh --bootstrap-server kafka-1.example.net:9092 --topic logstash-events --from-beginning --max-messages 1 --timeout-ms 10000
    {"log":{"file":{"path":"/var/lib/logstash/examples/kafka-output.log"}},"@timestamp":"2026-04-08T00:13:50.596Z","@version":"1","host":{"name":"logstash-1.example.net"},"message":"kafka-output-test 2026-04-08T00:13:49Z","tags":["kafka_output_demo"],"event":{"original":"kafka-output-test 2026-04-08T00:13:49Z"}}

    This final check is optional because it requires Kafka command-line access on a broker host or an administration workstation. It confirms the JSON event content in addition to the Logstash node counters.