提取管道编辑

提取管道允许您在索引之前对数据执行常见的转换。例如,您可以使用管道删除字段、从文本中提取值以及丰富数据。

管道由一系列称为 处理器 的可配置任务组成。每个处理器按顺序运行,对传入的文档进行特定的更改。处理器运行完毕后,Elasticsearch 会将转换后的文档添加到您的数据流或索引中。

Ingest pipeline diagram

您可以使用 Kibana 的 提取管道 功能或 提取 API 创建和管理提取管道。Elasticsearch 将管道存储在 集群状态 中。

先决条件编辑

  • 具有 ingest 节点角色的节点处理管道处理。要使用提取管道,您的集群必须至少有一个具有 ingest 角色的节点。对于繁重的提取负载,我们建议创建 专用提取节点
  • 如果启用了 Elasticsearch 安全功能,则您必须具有 manage_pipeline 集群权限 才能管理提取管道。要使用 Kibana 的 提取管道 功能,您还需要 cluster:monitor/nodes/info 集群权限。
  • 包含 enrich 处理器的管道需要进行额外的设置。请参阅 丰富您的数据

创建和管理管道编辑

在 Kibana 中,打开主菜单,然后单击 堆栈管理 > 提取管道。在列表视图中,您可以

  • 查看管道列表并深入了解详细信息
  • 编辑或克隆现有管道
  • 删除管道
Kibana’s Ingest Pipelines list view

要创建管道,请单击 创建管道 > 新建管道。有关示例教程,请参阅 示例:解析日志

使用 从 CSV 新建管道 选项,您可以使用 CSV 创建一个提取管道,将自定义数据映射到 Elastic 通用架构 (ECS)。将您的自定义数据映射到 ECS 可以更轻松地搜索数据,并允许您重复使用来自其他数据集的可视化。要开始使用,请查看 将自定义数据映射到 ECS

您还可以使用 提取 API 创建和管理管道。以下 创建管道 API 请求创建一个包含两个 set 处理器,后跟一个 lowercase 处理器的管道。处理器按指定的顺序依次运行。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    description: 'My optional pipeline description',
    processors: [
      {
        set: {
          description: 'My optional processor description',
          field: 'my-long-field',
          value: 10
        }
      },
      {
        set: {
          description: "Set 'my-boolean-field' to true",
          field: 'my-boolean-field',
          value: true
        }
      },
      {
        lowercase: {
          field: 'my-keyword-field'
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "description": "My optional pipeline description",
  "processors": [
    {
      "set": {
        "description": "My optional processor description",
        "field": "my-long-field",
        "value": 10
      }
    },
    {
      "set": {
        "description": "Set 'my-boolean-field' to true",
        "field": "my-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "my-keyword-field"
      }
    }
  ]
}

管理管道版本编辑

创建或更新管道时,您可以指定一个可选的 version 整数。您可以将此版本号与 if_version 参数一起使用,以有条件地更新管道。指定 if_version 参数后,成功更新会增加管道的版本。

PUT _ingest/pipeline/my-pipeline-id
{
  "version": 1,
  "processors": [ ... ]
}

要使用 API 取消设置 version 编号,请在不指定 version 参数的情况下替换或更新管道。

测试管道编辑

在生产环境中使用管道之前,我们建议您使用示例文档对其进行测试。在 Kibana 中创建或编辑管道时,单击 添加文档。在 文档 选项卡中,提供示例文档,然后单击 运行管道

Test a pipeline in Kibana

您还可以使用 模拟管道 API 测试管道。您可以在请求路径中指定已配置的管道。例如,以下请求测试 my-pipeline

response = client.ingest.simulate(
  id: 'my-pipeline',
  body: {
    docs: [
      {
        _source: {
          "my-keyword-field": 'FOO'
        }
      },
      {
        _source: {
          "my-keyword-field": 'BAR'
        }
      }
    ]
  }
)
puts response
POST _ingest/pipeline/my-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}

或者,您可以在请求正文中指定管道及其处理器。

response = client.ingest.simulate(
  body: {
    pipeline: {
      processors: [
        {
          lowercase: {
            field: 'my-keyword-field'
          }
        }
      ]
    },
    docs: [
      {
        _source: {
          "my-keyword-field": 'FOO'
        }
      },
      {
        _source: {
          "my-keyword-field": 'BAR'
        }
      }
    ]
  }
)
puts response
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "lowercase": {
          "field": "my-keyword-field"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}

API 返回转换后的文档

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "my-keyword-field": "foo"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:03.000Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "my-keyword-field": "bar"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:04.000Z"
        }
      }
    }
  ]
}

将管道添加到索引请求编辑

使用 pipeline 查询参数将管道应用于 单个批量 索引请求中的文档。

response = client.index(
  index: 'my-data-stream',
  pipeline: 'my-pipeline',
  body: {
    "@timestamp": '2099-03-07T11:04:05.000Z',
    "my-keyword-field": 'foo'
  }
)
puts response

response = client.bulk(
  index: 'my-data-stream',
  pipeline: 'my-pipeline',
  body: [
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-07T11:04:06.000Z',
      "my-keyword-field": 'foo'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-07T11:04:07.000Z',
      "my-keyword-field": 'bar'
    }
  ]
)
puts response
POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

您还可以将 pipeline 参数与 按查询更新重新索引 API 一起使用。

response = client.update_by_query(
  index: 'my-data-stream',
  pipeline: 'my-pipeline'
)
puts response

response = client.reindex(
  body: {
    source: {
      index: 'my-data-stream'
    },
    dest: {
      index: 'my-new-data-stream',
      op_type: 'create',
      pipeline: 'my-pipeline'
    }
  }
)
puts response
POST my-data-stream/_update_by_query?pipeline=my-pipeline

POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

设置默认管道编辑

使用 index.default_pipeline 索引设置来设置默认管道。如果未指定 pipeline 参数,Elasticsearch 会将此管道应用于索引请求。

设置最终管道编辑

使用 index.final_pipeline 索引设置来设置最终管道。Elasticsearch 会在请求或默认管道之后应用此管道,即使两者均未指定。

Beats 的管道编辑

要将提取管道添加到 Elastic Beat,请在 <BEAT_NAME>.ymloutput.elasticsearch 下指定 pipeline 参数。例如,对于 Filebeat,您将在 filebeat.yml 中指定 pipeline

output.elasticsearch:
  hosts: ["localhost:9200"]
  pipeline: my-pipeline

Fleet 和 Elastic Agent 的管道编辑

Elastic Agent 集成附带默认的提取管道,这些管道在索引之前对数据进行预处理和丰富。 Fleet 使用包含 管道索引设置索引模板 应用这些管道。Elasticsearch 根据 流的命名方案 将这些模板与您的 Fleet 数据流相匹配。

每个默认集成管道都会调用一个不存在的、未版本化的 *@custom 提取管道。如果未更改,则此管道调用对您的数据没有影响。但是,您可以修改此调用以创建用于集成的自定义管道,这些管道在升级后仍然存在。有关详细信息,请参阅 教程:使用自定义提取管道转换数据

Fleet 不为 自定义日志 集成提供默认的提取管道,但您可以使用 索引模板自定义配置 为此集成指定管道。

选项 1:索引模板

  1. 创建测试 您的提取管道。将您的管道命名为 logs-<dataset-name>-default。这使得跟踪集成管道变得更容易。

    例如,以下请求为 my-app 数据集创建一个管道。管道的名称为 logs-my_app-default

    PUT _ingest/pipeline/logs-my_app-default
    {
      "description": "Pipeline for `my_app` dataset",
      "processors": [ ... ]
    }
  2. 创建一个 索引模板,其中包含 index.default_pipelineindex.final_pipeline 索引设置中的管道。确保模板已 启用数据流。模板的索引模式应与 logs-<dataset-name>-* 匹配。

    您可以使用 Kibana 的 索引管理 功能或 创建索引模板 API 创建此模板。

    例如,以下请求创建一个与 logs-my_app-* 匹配的模板。该模板使用包含 index.default_pipeline 索引设置的组件模板。

    # Creates a component template for index settings
    PUT _component_template/logs-my_app-settings
    {
      "template": {
        "settings": {
          "index.default_pipeline": "logs-my_app-default",
          "index.lifecycle.name": "logs"
        }
      }
    }
    
    # Creates an index template matching `logs-my_app-*`
    PUT _index_template/logs-my_app-template
    {
      "index_patterns": ["logs-my_app-*"],
      "data_stream": { },
      "priority": 500,
      "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]
    }
  3. 在 Fleet 中添加或编辑 自定义日志 集成时,单击 配置集成 > 自定义日志文件 > 高级选项
  4. 数据集名称中,指定数据集的名称。Fleet 会将集成的新数据添加到生成的 logs-<dataset-name>-default 数据流中。

    例如,如果数据集的名称为 my_app,则 Fleet 会将新数据添加到 logs-my_app-default 数据流中。

    Set up custom log integration in Fleet
  5. 使用滚动 API滚动数据流。这可确保 Elasticsearch 将索引模板及其管道设置应用于集成的任何新数据。

    response = client.indices.rollover(
      alias: 'logs-my_app-default'
    )
    puts response
    POST logs-my_app-default/_rollover/

选项 2:自定义配置

  1. 创建测试 您的提取管道。将您的管道命名为 logs-<dataset-name>-default。这使得跟踪集成管道变得更容易。

    例如,以下请求为 my-app 数据集创建一个管道。管道的名称为 logs-my_app-default

    PUT _ingest/pipeline/logs-my_app-default
    {
      "description": "Pipeline for `my_app` dataset",
      "processors": [ ... ]
    }
  2. 在 Fleet 中添加或编辑 自定义日志 集成时,单击 配置集成 > 自定义日志文件 > 高级选项
  3. 数据集名称中,指定数据集的名称。Fleet 会将集成的新数据添加到生成的 logs-<dataset-name>-default 数据流中。

    例如,如果数据集的名称为 my_app,则 Fleet 会将新数据添加到 logs-my_app-default 数据流中。

  4. 自定义配置中,在 pipeline 策略设置中指定管道。

    Custom pipeline configuration for custom log integration

Elastic Agent 独立版

如果运行 Elastic Agent 独立版,则可以使用包含 index.default_pipelineindex.final_pipeline 索引设置的索引模板来应用管道。或者,可以在 elastic-agent.yml 配置中指定 pipeline 策略设置。请参阅安装独立 Elastic Agent

搜索索引的管道编辑

当您为搜索用例创建 Elasticsearch 索引时(例如,使用网络爬虫连接器),这些索引会自动使用特定的摄取管道进行设置。这些处理器有助于优化搜索内容。有关更多信息,请参阅搜索中的摄取管道

在处理器中访问源字段编辑

处理器对传入文档的源字段具有读写访问权限。要在处理器中访问字段键,请使用其字段名称。以下 set 处理器访问 my-long-field

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        set: {
          field: 'my-long-field',
          value: 10
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "my-long-field",
        "value": 10
      }
    }
  ]
}

您也可以在前面加上 _source 前缀。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        set: {
          field: '_source.my-long-field',
          value: 10
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "_source.my-long-field",
        "value": 10
      }
    }
  ]
}

使用点符号访问对象字段。

如果文档包含扁平化对象,请使用dot_expander处理器先对其进行扩展。其他摄取处理器无法访问扁平化对象。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        dot_expander: {
          description: "Expand 'my-object-field.my-property'",
          field: 'my-object-field.my-property'
        }
      },
      {
        set: {
          description: "Set 'my-object-field.my-property' to 10",
          field: 'my-object-field.my-property',
          value: 10
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "dot_expander": {
        "description": "Expand 'my-object-field.my-property'",
        "field": "my-object-field.my-property"
      }
    },
    {
      "set": {
        "description": "Set 'my-object-field.my-property' to 10",
        "field": "my-object-field.my-property",
        "value": 10
      }
    }
  ]
}

一些处理器参数支持Mustache模板片段。要在模板片段中访问字段值,请将字段名称括在三个花括号中:{{{field-name}}}。您可以使用模板片段动态设置字段名称。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        set: {
          description: "Set dynamic '<service>' field to 'code' value",
          field: '{{{service}}}',
          value: '{{{code}}}'
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Set dynamic '<service>' field to 'code' value",
        "field": "{{{service}}}",
        "value": "{{{code}}}"
      }
    }
  ]
}

在处理器中访问元数据字段编辑

处理器可以通过名称访问以下元数据字段

  • _index
  • _id
  • _routing
  • _dynamic_templates
response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        set: {
          description: "Set '_routing' to 'geoip.country_iso_code' value",
          field: '_routing',
          value: '{{{geoip.country_iso_code}}}'
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Set '_routing' to 'geoip.country_iso_code' value",
        "field": "_routing",
        "value": "{{{geoip.country_iso_code}}}"
      }
    }
  ]
}

使用 Mustache 模板片段访问元数据字段值。例如,{{{_routing}}} 检索文档的路由值。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        set: {
          description: 'Use geo_point dynamic template for address field',
          field: '_dynamic_templates',
          value: {
            address: 'geo_point'
          }
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Use geo_point dynamic template for address field",
        "field": "_dynamic_templates",
        "value": {
          "address": "geo_point"
        }
      }
    }
  ]
}

上面的 set 处理器告诉 ES,如果该字段尚未在索引的映射中定义,则对字段 address 使用名为 geo_point 的动态模板。如果已在批量请求中定义,则此处理器会覆盖字段 address 的动态模板,但对批量请求中定义的其他动态模板没有影响。

如果您自动生成文档 ID,则不能在处理器中使用 {{{_id}}}。Elasticsearch 在摄取后分配自动生成的 _id 值。

在处理器中访问摄取元数据编辑

摄取处理器可以使用 _ingest 键添加和访问摄取元数据。

与源字段和元数据字段不同,Elasticsearch 默认情况下不索引摄取元数据字段。Elasticsearch 还允许以 _ingest 键开头的源字段。如果您的数据包含此类源字段,请使用 _source._ingest 访问它们。

默认情况下,管道仅创建 _ingest.timestamp 摄取元数据字段。此字段包含 Elasticsearch 收到文档索引请求的时间戳。要索引 _ingest.timestamp 或其他摄取元数据字段,请使用 set 处理器。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        set: {
          description: "Index the ingest timestamp as 'event.ingested'",
          field: 'event.ingested',
          value: '{{{_ingest.timestamp}}}'
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Index the ingest timestamp as 'event.ingested'",
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

处理管道故障编辑

管道的处理器按顺序运行。默认情况下,当其中一个处理器失败或遇到错误时,管道处理将停止。

要忽略处理器故障并运行管道的其余处理器,请将 ignore_failure 设置为 true

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        rename: {
          description: "Rename 'provider' to 'cloud.provider'",
          field: 'provider',
          target_field: 'cloud.provider',
          ignore_failure: true
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      }
    }
  ]
}

使用 on_failure 参数指定在处理器故障后立即运行的处理器列表。如果指定了 on_failure,则 Elasticsearch 之后会运行管道的其余处理器,即使 on_failure 配置为空也是如此。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        rename: {
          description: "Rename 'provider' to 'cloud.provider'",
          field: 'provider',
          target_field: 'cloud.provider',
          on_failure: [
            {
              set: {
                description: "Set 'error.message'",
                field: 'error.message',
                value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
                override: false
              }
            }
          ]
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}

嵌套 on_failure 处理器列表以进行嵌套错误处理。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        rename: {
          description: "Rename 'provider' to 'cloud.provider'",
          field: 'provider',
          target_field: 'cloud.provider',
          on_failure: [
            {
              set: {
                description: "Set 'error.message'",
                field: 'error.message',
                value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
                override: false,
                on_failure: [
                  {
                    set: {
                      description: "Set 'error.message.multi'",
                      field: 'error.message.multi',
                      value: 'Document encountered multiple ingest errors',
                      override: true
                    }
                  }
                ]
              }
            }
          ]
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false,
              "on_failure": [
                {
                  "set": {
                    "description": "Set 'error.message.multi'",
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  }
                }
              ]
            }
          }
        ]
      }
    }
  ]
}

您还可以为管道指定 on_failure。如果未设置 on_failure 值的处理器失败,则 Elasticsearch 会将此管道级参数用作回退。Elasticsearch 不会尝试运行管道的其余处理器。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

有关管道故障的其他信息可能在文档元数据字段 on_failure_messageon_failure_processor_typeon_failure_processor_tagon_failure_pipeline 中提供。这些字段只能从 on_failure 块中访问。

以下示例使用元数据字段在文档中包含有关管道故障的信息。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

有条件地运行处理器编辑

每个处理器都支持一个可选的 if 条件,编写为Painless 脚本。如果提供,则处理器仅在 if 条件为 true 时运行。

if 条件脚本在 Painless 的摄取处理器上下文中运行。在 if 条件下,ctx 值为只读。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        drop: {
          description: "Drop documents with 'network.name' of 'Guest'",
          if: "ctx?.network?.name == 'Guest'"
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents with 'network.name' of 'Guest'",
        "if": "ctx?.network?.name == 'Guest'"
      }
    }
  ]
}

如果启用了script.painless.regex.enabled集群设置,则可以在 if 条件脚本中使用正则表达式。有关支持的语法,请参阅Painless 正则表达式

如果可能,请避免使用正则表达式。昂贵的正则表达式会降低索引速度。

response = client.ingest.put_pipeline(
  id: 'my-pipeline',
  body: {
    processors: [
      {
        set: {
          description: "If 'url.scheme' is 'http', set 'url.insecure' to true",
          if: 'ctx.url?.scheme =~ /^http[^s]/',
          field: 'url.insecure',
          value: true
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
        "if": "ctx.url?.scheme =~ /^http[^s]/",
        "field": "url.insecure",
        "value": true
      }
    }
  ]
}

您必须在单行上将 if 条件指定为有效的 JSON。但是,您可以使用Kibana 控制台的三引号语法来编写和调试更大的脚本。

如果可能,请避免使用复杂或昂贵的 if 条件脚本。昂贵的条件脚本会降低索引速度。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                if (tag.toLowerCase().contains('prod')) {
                  return false;
                }
              }
            }
            return true;
        """
      }
    }
  ]
}

您还可以指定存储的脚本作为 if 条件。

PUT _scripts/my-prod-tag-script
{
  "script": {
    "lang": "painless",
    "source": """
      Collection tags = ctx.tags;
      if(tags != null){
        for (String tag : tags) {
          if (tag.toLowerCase().contains('prod')) {
            return false;
          }
        }
      }
      return true;
    """
  }
}

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": { "id": "my-prod-tag-script" }
      }
    }
  ]
}

传入文档通常包含对象字段。如果处理器脚本尝试访问其父对象不存在的字段,则 Elasticsearch 会返回 NullPointerException。要避免这些异常,请使用空安全运算符(例如 ?.),并将脚本编写为空安全。

例如,ctx.network?.name.equalsIgnoreCase('Guest') 不是空安全的。ctx.network?.name 可以返回 null。将脚本重写为 'Guest'.equalsIgnoreCase(ctx.network?.name),这是空安全的,因为 Guest 始终为非 null。

如果无法将脚本重写为空安全,请包含显式的空检查。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that contain 'network.name' of 'Guest'",
        "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
      }
    }
  ]
}

有条件地应用管道编辑

if 条件与pipeline处理器结合使用,可以根据您的条件将其他管道应用于文档。您可以将此管道用作用于配置多个数据流或索引的索引模板中的默认管道

response = client.ingest.put_pipeline(
  id: 'one-pipeline-to-rule-them-all',
  body: {
    processors: [
      {
        pipeline: {
          description: "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
          if: "ctx.service?.name == 'apache_httpd'",
          name: 'httpd_pipeline'
        }
      },
      {
        pipeline: {
          description: "If 'service.name' is 'syslog', use 'syslog_pipeline'",
          if: "ctx.service?.name == 'syslog'",
          name: 'syslog_pipeline'
        }
      },
      {
        fail: {
          description: "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
          if: "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
          message: 'This pipeline requires service.name to be either `syslog` or `apache_httpd`'
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
  "processors": [
    {
      "pipeline": {
        "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

获取管道使用情况统计信息编辑

使用节点统计信息API 获取全局和每个管道的摄取统计信息。使用这些统计信息可以确定哪些管道运行最频繁或花费的处理时间最多。

response = client.nodes.stats(
  metric: 'ingest',
  filter_path: 'nodes.*.ingest'
)
puts response
GET _nodes/stats/ingest?filter_path=nodes.*.ingest