How to ingest CSV files with Logstash

Importing CSV files with Logstash turns spreadsheet exports and application dumps into searchable Elasticsearch documents, which is useful for bulk loading reference data, user lists, or report output without rewriting the source format by hand.

The current file input can treat each finished CSV as a content-complete file, read it once in read mode, and hand each row to the csv filter before the elasticsearch output indexes the parsed fields. Defining the column list explicitly keeps header handling deterministic and avoids the single-worker requirement that Elastic still documents for header autodetection.

Read mode always starts from the beginning of newly discovered files, records progress in a sincedb file, and deletes completed files by default unless a different file_completed_action is set. The pipeline below keeps file metadata in ECS form under log.file.path, but disables ECS only on the csv filter so the parsed CSV columns stay as flat fields like name, email, and role.

Steps to ingest CSV files with Logstash:

  1. Create directories for inbound CSV files and Logstash state.
    $ sudo install -d -o logstash -g logstash -m 755 /var/lib/logstash/input /var/lib/logstash/state
  2. Place a complete CSV file in the watched directory.
    $ sudo tee /var/lib/logstash/input/users.csv <<'CSV'
    name,email,role
    Ava Jensen,ava@example.net,admin
    Noah Patel,noah@example.net,viewer
    CSV

    Copy or move finished CSV files into the watched directory. Editing a file in place while the file input is reading it can ingest partial rows, and the logstash user still needs read permission on the final file.

  3. Create a dedicated CSV pipeline at /etc/logstash/conf.d/20-csv.conf.
    input {
      file {
        path => "/var/lib/logstash/input/*.csv"
        mode => "read"
        file_completed_action => "log"
        file_completed_log_path => "/var/lib/logstash/state/csv-completed.log"
        sincedb_path => "/var/lib/logstash/state/csv-ingest.sincedb"
        ecs_compatibility => v8
      }
    }
    
    filter {
      csv {
        columns => ["name", "email", "role"]
        skip_header => true
        ecs_compatibility => disabled
      }
    }
    
    output {
      elasticsearch {
        hosts => ["http://elasticsearch.example.net:9200"]
        index => "users-%{+YYYY.MM.dd}"
      }
    }

    Current Elastic documentation states that read mode ignores start_position and always reads each newly discovered file from the beginning. The default file_completed_action is delete, so this example changes it to log to keep the source CSV on disk after ingestion.

    The sincedb_path value must be a file path, not a directory. Deleting the sincedb file causes the same CSV to be reread and can create duplicate documents.

    Current csv filter documentation still requires pipeline.workers: 1 only when autodetect_column_names is used. Defining columns explicitly keeps header skipping deterministic without changing the whole node’s worker count.

    Use separate pipelines or conditional branches for CSV files that do not share the same column order, delimiter, or quoting rules. Numeric or date columns also need additional filters such as convert or date before indexing.

  4. Test the pipeline configuration for errors.
    $ sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash --path.data /tmp/logstash-configtest --config.test_and_exit
    Using bundled JDK: /usr/share/logstash/jdk
    Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties
    Configuration OK
    [2026-04-07T14:21:08,214][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
  5. Restart the Logstash service to load the CSV pipeline.
    $ sudo systemctl restart logstash

    Restarting Logstash briefly pauses ingestion while pipelines stop and reload.

  6. Confirm the file input finished reading the CSV.
    $ sudo cat /var/lib/logstash/state/csv-completed.log
    /var/lib/logstash/input/users.csv

    The completion log is appended to over time, so rotate or truncate it as part of normal housekeeping if many CSV files are imported.

  7. Verify the parsed rows were indexed in Elasticsearch.
    $ curl --silent --show-error --fail \
      "http://elasticsearch.example.net:9200/users-*/_count?q=log.file.path:%22/var/lib/logstash/input/users.csv%22&pretty"
    {
      "count" : 2,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      }
    }

    This query uses log.file.path because the example keeps ECS-compatible file metadata enabled on the file input. If that plugin runs with ECS disabled in another pipeline, query path instead.

    Replace _count with _search and add &_source_includes=name,email,role when the parsed fields need inspection instead of a simple row count.

    Replace the hosts value, index pattern, and file path filter to match the real pipeline and destination cluster.