Creating an ingest pipeline in Elasticsearch lets you normalize, enrich, or sanitize documents before they are written to an index, which keeps downstream searches, dashboards, and alerts working against consistent field names and values.
Pipelines are cluster-wide objects and they run only when the cluster has at least one node with the ingest role. On secured clusters, managing pipelines also requires the manage_pipeline cluster privilege, and current self-managed installs usually expose the API over HTTPS with credentials because security is enabled automatically on first startup unless that setup is skipped or disabled.
The _ingest/pipeline API creates and replaces pipelines by ID, and successful changes take effect immediately. Reusing an existing pipeline ID therefore changes ingest behavior for every request or index setting that references it, so test with _simulate before indexing live documents and use pipeline version with the if_version parameter when you need optimistic update protection.
Steps to create an ingest pipeline in Elasticsearch:
- Create the ingest pipeline definition.
$ curl -sS -H "Content-Type: application/json" -X PUT "http://localhost:9200/_ingest/pipeline/logs-normalize?pretty" -d '{ "description": "Normalize log level and add service name", "processors": [ { "lowercase": { "field": "level" } }, { "set": { "field": "service", "value": "api" } } ] }' { "acknowledged" : true }On current self-managed deployments, replace http://localhost:9200 with your HTTPS endpoint and add authentication such as --user username:password or -H "Authorization: ApiKey BASE64_KEY".
A PUT request with an existing pipeline ID replaces that pipeline immediately. When you need change control, add a pipeline version and update it with the if_version query parameter.
- Fetch the stored pipeline definition to confirm the description and processor list.
$ curl -sS "http://localhost:9200/_ingest/pipeline/logs-normalize?filter_path=*.description,*.processors&pretty" { "logs-normalize" : { "description" : "Normalize log level and add service name", "processors" : [ { "lowercase" : { "field" : "level" } }, { "set" : { "field" : "service", "value" : "api" } } ] } } - Simulate the pipeline with a sample document before sending live traffic through it.
$ curl -sS -H "Content-Type: application/json" -X POST "http://localhost:9200/_ingest/pipeline/logs-normalize/_simulate?filter_path=docs.doc._source&pretty" -d '{ "docs": [ { "_source": { "level": "ERROR", "message": "timeout" } } ] }' { "docs" : [ { "doc" : { "_source" : { "message" : "timeout", "level" : "error", "service" : "api" } } } ] }The _simulate endpoint returns transformed documents without indexing them, which is the safest first check for processor order, field names, and output shape.
- Index a document through the pipeline with a fixed document ID.
$ curl -sS -H "Content-Type: application/json" -X PUT "http://localhost:9200/app-ingest-demo-000001/_doc/1?pipeline=logs-normalize&refresh=wait_for&filter_path=_index,_id,result&pretty" -d '{ "level": "WARN", "message": "retry" }' { "_index" : "app-ingest-demo-000001", "_id" : "1", "result" : "created" }The pipeline query parameter also works with bulk indexing requests when the same transform should apply to a batch.
- Retrieve the stored document to confirm the pipeline changed the indexed fields.
$ curl -sS "http://localhost:9200/app-ingest-demo-000001/_doc/1?filter_path=_source&pretty" { "_source" : { "level" : "warn", "service" : "api", "message" : "retry" } }This confirms the pipeline was not only stored, but also executed during indexing.
Mohd Shakir Zakaria is a cloud architect with deep roots in software development and open-source advocacy. Certified in AWS, Red Hat, VMware, ITIL, and Linux, he specializes in designing and managing robust cloud and on-premises infrastructures.
