Publishing Logstash events to Kafka adds a durable handoff point between pipeline processing and downstream consumers, which makes it easier to absorb indexing delays, fan out one event stream to several systems, and keep ingestion decoupled from the final destination.
The kafka output plugin acts as a Kafka producer. It uses bootstrap_servers to fetch broker metadata, writes each event to the topic named by topic_id, and then opens produce connections to the broker addresses that Kafka advertises back to the client. The default codec is plain, so adding codec ⇒ json is the usual way to send the full event structure rather than only the message text plus a few default fields.
Current Logstash packages already bundle Kafka input and output support through logstash-integration-kafka, so the first task is usually to confirm plugin availability instead of installing a separate gem. Package-based validation is safest under the logstash service account because Logstash 9.x blocks superuser runs by default, and Kafka 3.6+ brokers can reject delayed events when log.message.timestamp.type stays at CreateTime and the message timestamp falls outside the broker or topic acceptance window.
$ sudo /usr/share/logstash/bin/logstash-plugin list --verbose logstash-output-kafka Using bundled JDK: /usr/share/logstash/jdk logstash-integration-kafka (11.8.4) ├── logstash-input-kafka └── logstash-output-kafka
Current releases usually report the bundled integration package instead of a standalone logstash-output-kafka gem. If the command returns no Kafka output entry, install it with sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-kafka and then re-run the list command.
Related: How to install Logstash plugins
$ sudo install -d -o logstash -g logstash -m 0750 /var/lib/logstash/examples $ sudo install -o logstash -g logstash -m 0640 /dev/null /var/lib/logstash/examples/kafka-output.log
The example uses a separate file input so the Kafka output can be tested without changing an existing production input.
input {
file {
path => ["/var/lib/logstash/examples/kafka-output.log"]
start_position => "end"
sincedb_path => "/var/lib/logstash/kafka-output-demo.sincedb"
tags => ["kafka_output_demo"]
}
}
output {
if "kafka_output_demo" in [tags] {
kafka {
id => "kafka_demo_output"
bootstrap_servers => "kafka-1.example.net:9092,kafka-2.example.net:9092"
topic_id => "logstash-events"
codec => json
acks => "1"
client_id => "logstash-kafka-demo"
# security_protocol => "SASL_SSL"
# sasl_mechanism => "SCRAM-SHA-512"
# sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='logstash' password='${KAFKA_PASSWORD}';"
# ssl_truststore_location => "/etc/logstash/kafka.client.truststore.jks"
# ssl_truststore_password => "${KAFKA_TRUSTSTORE_PASSWORD}"
}
}
}
The topic shown in topic_id must already exist unless the Kafka cluster explicitly allows topic auto-creation, and the Logstash client must be allowed to produce to that topic.
bootstrap_servers is only the metadata bootstrap list. The brokers must advertise listener hostnames and ports that the Logstash host can actually reach, or produce requests will fail after the initial metadata lookup.
The default Kafka output codec is plain. Setting codec ⇒ json sends the full event, and assigning an explicit id makes the output easy to identify in Logstash node statistics.
Kafka 3.6+ can reject events when log.message.timestamp.type stays at CreateTime and the event timestamp is too old or too far in the future. If retention and timestamp validation should use broker arrival time instead, set the broker or topic to LogAppendTime.
$ sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash --path.data /tmp/logstash-kafka-output-configtest --config.test_and_exit -f /etc/logstash/conf.d/60-kafka-output.conf
Using bundled JDK: /usr/share/logstash/jdk
[2026-04-08T00:12:03,879][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"9.3.2", "jruby.version"=>"jruby 9.4.13.0 (3.1.4) 2025-06-10 9938a3461f OpenJDK 64-Bit Server VM 21.0.10+7-LTS on 21.0.10+7-LTS +indy +jit"}
##### snipped #####
[2026-04-08T00:12:09,483][INFO ][logstash.javapipeline ][main] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
Configuration OK
[2026-04-08T00:12:09,485][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
This validation confirms the pipeline syntax and plugin settings only. It does not prove that broker DNS, topic ACLs, SASL credentials, TLS trust, or advertised listeners are correct.
Current Logstash 9.x releases refuse superuser runs by default. Running the same command as root fails unless allow_superuser is explicitly enabled in /etc/logstash/logstash.yml.
$ sudo systemctl restart logstash
Restarting the service restarts every pipeline in the instance and can briefly pause ingestion while inputs and outputs reopen.
$ sudo systemctl status logstash --no-pager
● logstash.service - logstash
Loaded: loaded (/usr/lib/systemd/system/logstash.service; enabled; preset: enabled)
Active: active (running) since Tue 2026-04-08 00:13:09 UTC; 4s ago
Main PID: 21405 (java)
Tasks: 112 (limit: 28486)
Memory: 657.3M
##### snipped #####
An active service confirms that Logstash started and accepted the pipeline configuration. It does not by itself prove that Kafka is accepting produced events.
$ printf 'kafka-output-test %s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" | sudo tee -a /var/lib/logstash/examples/kafka-output.log kafka-output-test 2026-04-08T00:13:49Z
With start_position set to end, append the test line after the pipeline is running so the file input treats it as new data instead of replaying old content.
$ curl -s http://localhost:9600/_node/stats/pipelines/main?pretty | grep -nA4 '"id" : "kafka_demo_output"'
92: "id" : "kafka_demo_output",
93- "events" : {
94- "out" : 2,
95- "duration_in_millis" : 620,
96- "in" : 2
Any non-zero or rising events.out value for the named kafka output shows that Logstash is handing events to the Kafka producer successfully.
The monitoring API commonly listens on http://localhost:9600/ when the default API settings are in use.
$ kafka-console-consumer.sh --bootstrap-server kafka-1.example.net:9092 --topic logstash-events --from-beginning --max-messages 1 --timeout-ms 10000
{"log":{"file":{"path":"/var/lib/logstash/examples/kafka-output.log"}},"@timestamp":"2026-04-08T00:13:50.596Z","@version":"1","host":{"name":"logstash-1.example.net"},"message":"kafka-output-test 2026-04-08T00:13:49Z","tags":["kafka_output_demo"],"event":{"original":"kafka-output-test 2026-04-08T00:13:49Z"}}
This final check is optional because it requires Kafka command-line access on a broker host or an administration workstation. It confirms the JSON event content in addition to the Logstash node counters.