Using an Elasticsearch ingest pipeline from Logstash moves parsing and enrichment closer to indexed data, reducing duplicated filter logic across multiple pipelines and shippers. Centralizing transforms in the cluster also helps keep outputs consistent when multiple sources write into the same indices.

Logstash sends events to Elasticsearch using the Bulk API, and the Elasticsearch output plugin can target an ingest pipeline by adding a pipeline parameter to each bulk request. Elasticsearch applies the pipeline processors (such as set, rename, grok, geoip, or date) before the document is indexed, so the stored data reflects the ingest-stage changes.

The ingest pipeline ID is case-sensitive and must exist in the destination cluster, or Elasticsearch returns indexing errors for each bulk item. Ingest processing runs on nodes with the ingest role, so CPU-heavy pipelines can reduce indexing throughput and may require capacity planning or processor tuning. Secure clusters typically require authentication for both Logstash output and verification requests, so curl examples may need credentials or an API key added.

Steps to use an Elasticsearch ingest pipeline from Logstash:

  1. Create the ingest pipeline in Elasticsearch with a stable pipeline ID.

    Prefer a long-lived ID such as normalize-logs, keeping versioning inside the pipeline definition or via a controlled rename strategy to avoid breaking Logstash outputs.

  2. Reference the pipeline in the Logstash Elasticsearch output.
    input {
      file {
        path => "/var/lib/logstash/examples/ingest-pipeline.log"
        start_position => "beginning"
        sincedb_path => "/var/lib/logstash/sincedb-ingest-pipeline"
      }
    }
    
    output {
      if [log][file][path] == "/var/lib/logstash/examples/ingest-pipeline.log" {
        elasticsearch {
          hosts => ["http://elasticsearch.example.net:9200"]
          index => "logstash-ingest-%{+YYYY.MM.dd}"
          pipeline => "normalize-logs"
          manage_template => false
        }
      }
    }

    The pipeline value supports Logstash sprintf formatting (for example pipeline => "%{[@metadata][ingest_pipeline]}") to select different ingest pipelines per event.

    A missing or mistyped pipeline ID causes Elasticsearch bulk indexing failures, producing output errors and preventing documents from being indexed as expected.

  3. Test the Logstash pipeline configuration.
    $ sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash --path.data /tmp/logstash-configtest --config.test_and_exit
    Configuration OK
  4. Restart the Logstash service.
    $ sudo systemctl restart logstash
  5. Check the Logstash service status for active state, including recent errors.
    $ sudo systemctl status logstash --no-pager -l
    ● logstash.service - logstash
         Loaded: loaded (/usr/lib/systemd/system/logstash.service; enabled; preset: enabled)
         Active: active (running) since Wed 2026-01-07 22:43:52 UTC; 4s ago
       Main PID: 39213 (java)
          Tasks: 31 (limit: 28486)
         Memory: 508.1M (peak: 508.6M)
            CPU: 15.756s
    ##### snipped #####
  6. Confirm the ingest pipeline exists.
    $ curl -s "http://elasticsearch.example.net:9200/_ingest/pipeline/normalize-logs?pretty"
    {
      "normalize-logs" : {
        "processors" : [
          {
            "set" : {
              "field" : "event.dataset",
              "value" : "app"
            }
          }
        ]
      }
    }
  7. Query Elasticsearch for a newly indexed document containing fields set by the ingest pipeline.
    $ curl -s "http://elasticsearch.example.net:9200/logstash-ingest-*/_search?pretty" -H "Content-Type: application/json" -d '
    {
      "size": 1,
      "sort": [
        {
          "@timestamp": "desc"
        }
      ],
      "_source": [
        "@timestamp",
        "event.dataset",
        "message"
      ]
    }'
    {
      "took" : 5,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [
          {
            "_index" : "logstash-ingest-2026.01.07",
            "_id" : "sfyhmpsBMfcBipKWoOgT",
            "_score" : null,
            "_source" : {
              "@timestamp" : "2026-01-07T22:44:01.648297835Z",
              "event" : {
                "dataset" : "app"
              },
              "message" : "ingest pipeline example log line"
            },
            "sort" : [
              1767825841648
            ]
          }
        ]
      }
    }

    Match the verification query to a field written by the ingest pipeline (for example a set or rename target) to prove the pipeline is applied at index time.