Ingesting CSV files with Logstash turns exported reports and spreadsheet-friendly data into searchable documents in Elasticsearch, enabling fast lookup, filtering, and aggregations without manual reformatting.

CSV ingestion in Logstash is handled by a pipeline where the file input reads each CSV row as an event, the csv filter splits the row into fields, and the elasticsearch output indexes the resulting event into the selected index pattern.

Parsing depends on consistent column order, delimiter, and quoting, and type conversion is not automatic unless configured via options like convert in the csv filter. The file input only honors start_position on a file’s first read, while its sincedb state controls subsequent reads; clearing the configured sincedb_path forces a re-read and can create duplicate documents. The default pipeline also loads all /etc/logstash/conf.d/*.conf files together, so unrelated inputs and outputs should be removed or disabled to keep CSV imports isolated.

Steps to ingest CSV files with Logstash:

  1. Create a directory for CSV uploads.
    $ sudo install -d -m 755 /var/lib/logstash/input
  2. Place a CSV file for ingestion in the input directory.
    $ sudo tee /var/lib/logstash/input/users.csv <<'CSV'
    name,email,role
    Ava Jensen,ava@example.net,admin
    Noah Patel,noah@example.net,viewer
    CSV

    Move completed CSV files into the watched directory to avoid indexing partially-written rows. Ensure the logstash service user has read permission to the CSV.

  3. Create a pipeline configuration for CSV ingestion at /etc/logstash/conf.d/20-csv.conf.
    input {
      file {
        path => "/var/lib/logstash/input/*.csv"
        start_position => "beginning"
        sincedb_path => "/var/lib/logstash/sincedb-csv"
      }
    }
    filter {
      if [log][file][path] == "/var/lib/logstash/input/users.csv" {
        csv {
          autodetect_column_names => true
          skip_header => true
        }
      }
    }
    output {
      if [log][file][path] == "/var/lib/logstash/input/users.csv" {
        elasticsearch {
          hosts => ["http://elasticsearch.example.net:9200"]
          index => "users-%{+YYYY.MM.dd}"
        }
      }
    }

    Update the file path check to match the CSV being ingested, and replace hosts with the actual Elasticsearch endpoint (including authentication or TLS if required). Deleting /var/lib/logstash/sincedb-csv forces re-import from the beginning and can create duplicate documents.

  4. Test the pipeline configuration for errors.
    $ sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash --path.data /tmp/logstash-configtest --config.test_and_exit
    Configuration OK
  5. Restart the Logstash service to load the CSV pipeline.
    $ sudo systemctl restart logstash
  6. Verify documents were indexed.
    $ curl -s "http://elasticsearch.example.net:9200/users-*/_search?q=log.file.path:\"/var/lib/logstash/input/users.csv\"&pretty"
    {
      "hits" : {
        "total" : {
          "value" : 2,
          "relation" : "eq"
        },
        "hits" : [
          {
            "_index" : "users-2026.01.07",
            "_source" : {
              "name" : "Ava Jensen",
              "email" : "ava@example.net",
              "role" : "admin"
            }
          },
          {
            "_index" : "users-2026.01.07",
            "_source" : {
              "name" : "Noah Patel",
              "email" : "noah@example.net",
              "role" : "viewer"
            }
          }
        ]
      }
    }