Ingest pipelines in Elasticsearch transform documents before they are written to an index, keeping fields consistent across sources and reducing downstream parsing. Normalizing at ingest prevents messy inputs from leaking into dashboards, alerts, and aggregations.

A pipeline is created and updated through the _ingest/pipeline API and executed during indexing by ingest-capable nodes. Each pipeline contains an ordered list of processors that mutate _source fields, and it can be referenced per request with the pipeline query parameter or applied automatically via index settings such as index.default_pipeline.

Pipeline IDs are cluster-wide and a PUT request updates any existing pipeline with the same name. Clusters with security enabled typically require HTTPS and authentication for API calls, and processor failures can reject documents during indexing, so validating changes with _simulate is the safest first check.

Steps to create an ingest pipeline in Elasticsearch:

  1. Create an ingest pipeline with processors.
    $ curl -s -H "Content-Type: application/json" -X PUT "http://localhost:9200/_ingest/pipeline/logs-normalize?pretty" -d '{
      "description": "Normalize log level and add service name",
      "processors": [
        { "lowercase": { "field": "level" } },
        { "set": { "field": "service", "value": "api" } }
      ]
    }'
    {
      "acknowledged" : true
    }

    A PUT request replaces an existing pipeline with the same ID, changing ingest behavior for every request that references it.

  2. Simulate the pipeline with a sample document.
    $ curl -s -H "Content-Type: application/json" -X POST "http://localhost:9200/_ingest/pipeline/logs-normalize/_simulate?filter_path=docs.doc._source&pretty" -d '{
      "docs": [
        { "_source": { "level": "ERROR", "message": "timeout" } }
      ]
    }'
    {
      "docs" : [
        {
          "doc" : {
            "_source" : {
              "message" : "timeout",
              "level" : "error",
              "service" : "api"
            }
          }
        }
      ]
    }

    The _simulate endpoint returns transformed documents without indexing them, making it suitable for validating processor behavior safely.

  3. Index a document using the pipeline.
    $ curl -s -H "Content-Type: application/json" -X POST "http://localhost:9200/logs-2025.01/_doc?pipeline=logs-normalize&filter_path=result&pretty" -d '{
      "level": "WARN",
      "message": "retry"
    }'
    {
      "result" : "created"
    }

    The pipeline query parameter can also be used on Bulk API requests to process batches through the same ingest pipeline.

  4. Fetch the stored pipeline definition for confirmation.
    $ curl -s -X GET "http://localhost:9200/_ingest/pipeline/logs-normalize?filter_path=*.description,*.processors&pretty"
    {
      "logs-normalize" : {
        "description" : "Normalize log level and add service name",
        "processors" : [
          {
            "lowercase" : {
              "field" : "level"
            }
          },
          {
            "set" : {
              "field" : "service",
              "value" : "api"
            }
          }
        ]
      }
    }