Importing CSV files with Logstash turns spreadsheet exports and application dumps into searchable Elasticsearch documents, which is useful for bulk loading reference data, user lists, or report output without rewriting the source format by hand.
The current file input can treat each finished CSV as a content-complete file, read it once in read mode, and hand each row to the csv filter before the elasticsearch output indexes the parsed fields. Defining the column list explicitly keeps header handling deterministic and avoids the single-worker requirement that Elastic still documents for header autodetection.
Read mode always starts from the beginning of newly discovered files, records progress in a sincedb file, and deletes completed files by default unless a different file_completed_action is set. The pipeline below keeps file metadata in ECS form under log.file.path, but disables ECS only on the csv filter so the parsed CSV columns stay as flat fields like name, email, and role.
$ sudo install -d -o logstash -g logstash -m 755 /var/lib/logstash/input /var/lib/logstash/state
$ sudo tee /var/lib/logstash/input/users.csv <<'CSV' name,email,role Ava Jensen,ava@example.net,admin Noah Patel,noah@example.net,viewer CSV
Copy or move finished CSV files into the watched directory. Editing a file in place while the file input is reading it can ingest partial rows, and the logstash user still needs read permission on the final file.
input {
file {
path => "/var/lib/logstash/input/*.csv"
mode => "read"
file_completed_action => "log"
file_completed_log_path => "/var/lib/logstash/state/csv-completed.log"
sincedb_path => "/var/lib/logstash/state/csv-ingest.sincedb"
ecs_compatibility => v8
}
}
filter {
csv {
columns => ["name", "email", "role"]
skip_header => true
ecs_compatibility => disabled
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch.example.net:9200"]
index => "users-%{+YYYY.MM.dd}"
}
}
Current Elastic documentation states that read mode ignores start_position and always reads each newly discovered file from the beginning. The default file_completed_action is delete, so this example changes it to log to keep the source CSV on disk after ingestion.
The sincedb_path value must be a file path, not a directory. Deleting the sincedb file causes the same CSV to be reread and can create duplicate documents.
Current csv filter documentation still requires pipeline.workers: 1 only when autodetect_column_names is used. Defining columns explicitly keeps header skipping deterministic without changing the whole node’s worker count.
Use separate pipelines or conditional branches for CSV files that do not share the same column order, delimiter, or quoting rules. Numeric or date columns also need additional filters such as convert or date before indexing.
$ sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash --path.data /tmp/logstash-configtest --config.test_and_exit Using bundled JDK: /usr/share/logstash/jdk Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties Configuration OK [2026-04-07T14:21:08,214][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
$ sudo systemctl restart logstash
Restarting Logstash briefly pauses ingestion while pipelines stop and reload.
$ sudo cat /var/lib/logstash/state/csv-completed.log /var/lib/logstash/input/users.csv
The completion log is appended to over time, so rotate or truncate it as part of normal housekeeping if many CSV files are imported.
$ curl --silent --show-error --fail \
"http://elasticsearch.example.net:9200/users-*/_count?q=log.file.path:%22/var/lib/logstash/input/users.csv%22&pretty"
{
"count" : 2,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
This query uses log.file.path because the example keeps ECS-compatible file metadata enabled on the file input. If that plugin runs with ECS disabled in another pipeline, query path instead.
Replace _count with _search and add &_source_includes=name,email,role when the parsed fields need inspection instead of a simple row count.
Replace the hosts value, index pattern, and file path filter to match the real pipeline and destination cluster.