附件处理器

编辑

附件处理器允许 Elasticsearch 使用 Apache 文本提取库 Tika 提取常用格式(如 PPT、XLS 和 PDF)的文件附件。

源字段必须是 base64 编码的二进制数据。 如果您不想承担 base64 来回转换的开销,可以使用 CBOR 格式而不是 JSON,并将字段指定为字节数组而不是字符串表示。 这样处理器将跳过 base64 解码。

在管道中使用附件处理器

编辑

表 4. 附件选项

名称 必需 默认值 描述

field

-

从中获取 base64 编码字段的字段

target_field

attachment

将保存附件信息的字段

indexed_chars

100000

用于提取的字符数,以防止出现巨大的字段。 使用 -1 表示没有限制。

indexed_chars_field

null

从中可以覆盖用于提取的字符数的字段名称。 请参阅 indexed_chars

properties

所有属性

要选择存储的属性数组。 可以是 contenttitlenameauthorkeywordsdatecontent_typecontent_lengthlanguage

ignore_missing

false

如果为 truefield 不存在,则处理器会静默退出,而不修改文档

remove_binary

false

如果为 true,则二进制 field 将从文档中删除

resource_name

包含要解码的资源名称的字段。 如果指定,处理器会将此资源名称传递给底层 Tika 库,以启用 基于资源名称的检测

示例

编辑

如果要将文件附加到 JSON 文档,必须首先将文件编码为 base64 字符串。 在类 Unix 系统上,您可以使用 base64 命令执行此操作

base64 -in myfile.rtf

该命令返回文件的 base64 编码字符串。 以下 base64 字符串适用于包含文本 Lorem ipsum dolor sit amet.rtf 文件:e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=

使用附件处理器解码字符串并提取文件的属性

resp = client.ingest.put_pipeline(
    id="attachment",
    description="Extract attachment information",
    processors=[
        {
            "attachment": {
                "field": "data",
                "remove_binary": False
            }
        }
    ],
)
print(resp)

resp1 = client.index(
    index="my-index-000001",
    id="my_id",
    pipeline="attachment",
    document={
        "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
    },
)
print(resp1)

resp2 = client.get(
    index="my-index-000001",
    id="my_id",
)
print(resp2)
response = client.ingest.put_pipeline(
  id: 'attachment',
  body: {
    description: 'Extract attachment information',
    processors: [
      {
        attachment: {
          field: 'data',
          remove_binary: false
        }
      }
    ]
  }
)
puts response

response = client.index(
  index: 'my-index-000001',
  id: 'my_id',
  pipeline: 'attachment',
  body: {
    data: 'e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0='
  }
)
puts response

response = client.get(
  index: 'my-index-000001',
  id: 'my_id'
)
puts response
PUT _ingest/pipeline/attachment
{
  "description" : "Extract attachment information",
  "processors" : [
    {
      "attachment" : {
        "field" : "data",
        "remove_binary": false
      }
    }
  ]
}
PUT my-index-000001/_doc/my_id?pipeline=attachment
{
  "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
}
GET my-index-000001/_doc/my_id

文档的 attachment 对象包含文件的提取属性

{
  "found": true,
  "_index": "my-index-000001",
  "_id": "my_id",
  "_version": 1,
  "_seq_no": 22,
  "_primary_term": 1,
  "_source": {
    "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
    "attachment": {
      "content_type": "application/rtf",
      "language": "ro",
      "content": "Lorem ipsum dolor sit amet",
      "content_length": 28
    }
  }
}

在文档中保留二进制数据可能会消耗大量资源。 强烈建议从文档中删除该字段。 将 remove_binary 设置为 true 可以自动删除该字段。

导出的字段

编辑

可以从文档中提取的字段包括

  • content,
  • title,
  • author,
  • keywords,
  • date,
  • content_type,
  • content_length,
  • language,
  • modified,
  • format,
  • identifier,
  • contributor,
  • coverage,
  • modifier,
  • creator_tool,
  • publisher,
  • relation,
  • rights,
  • source,
  • type,
  • description,
  • print_date,
  • metadata_date,
  • latitude,
  • longitude,
  • altitude,
  • rating,
  • comments

要仅提取某些 attachment 字段,请指定 properties 数组

resp = client.ingest.put_pipeline(
    id="attachment",
    description="Extract attachment information",
    processors=[
        {
            "attachment": {
                "field": "data",
                "properties": [
                    "content",
                    "title"
                ],
                "remove_binary": False
            }
        }
    ],
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'attachment',
  body: {
    description: 'Extract attachment information',
    processors: [
      {
        attachment: {
          field: 'data',
          properties: [
            'content',
            'title'
          ],
          remove_binary: false
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/attachment
{
  "description" : "Extract attachment information",
  "processors" : [
    {
      "attachment" : {
        "field" : "data",
        "properties": [ "content", "title" ],
        "remove_binary": false
      }
    }
  ]
}

从二进制数据中提取内容是一项资源密集型操作,会消耗大量资源。 强烈建议在专用的 ingest 节点中使用此处理器运行管道。

将附件处理器与 CBOR 结合使用

编辑

为避免将 JSON 编码和解码为 base64,您可以改为将 CBOR 数据传递给附件处理器。 例如,以下请求创建 cbor-attachment 管道,该管道使用附件处理器。

resp = client.ingest.put_pipeline(
    id="cbor-attachment",
    description="Extract attachment information",
    processors=[
        {
            "attachment": {
                "field": "data",
                "remove_binary": False
            }
        }
    ],
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'cbor-attachment',
  body: {
    description: 'Extract attachment information',
    processors: [
      {
        attachment: {
          field: 'data',
          remove_binary: false
        }
      }
    ]
  }
)
puts response
PUT _ingest/pipeline/cbor-attachment
{
  "description" : "Extract attachment information",
  "processors" : [
    {
      "attachment" : {
        "field" : "data",
        "remove_binary": false
      }
    }
  ]
}

以下 Python 脚本将 CBOR 数据传递给包含 cbor-attachment 管道的 HTTP 索引请求。 HTTP 请求标头使用 application/cborcontent-type

并非所有 Elasticsearch 客户端都支持自定义 HTTP 请求标头。

import cbor2
import requests

file = 'my-file'
headers = {'content-type': 'application/cbor'}

with open(file, 'rb') as f:
  doc = {
    'data': f.read()
  }
  requests.put(
    'https://127.0.0.1:9200/my-index-000001/_doc/my_id?pipeline=cbor-attachment',
    data=cbor2.dumps(doc),
    headers=headers
  )

限制提取的字符数

编辑

为防止提取过多字符并使节点内存过载,默认情况下,用于提取的字符数限制为 100000。 您可以通过设置 indexed_chars 来更改此值。 使用 -1 表示没有限制,但请确保在设置此值时,您的节点将有足够的 HEAP 来提取非常大的文档的内容。

您还可以通过从给定字段中提取要设置的限制来定义每个文档的此限制。 如果文档具有该字段,它将覆盖 indexed_chars 设置。 要设置此字段,请定义 indexed_chars_field 设置。

例如

resp = client.ingest.put_pipeline(
    id="attachment",
    description="Extract attachment information",
    processors=[
        {
            "attachment": {
                "field": "data",
                "indexed_chars": 11,
                "indexed_chars_field": "max_size",
                "remove_binary": False
            }
        }
    ],
)
print(resp)

resp1 = client.index(
    index="my-index-000001",
    id="my_id",
    pipeline="attachment",
    document={
        "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
    },
)
print(resp1)

resp2 = client.get(
    index="my-index-000001",
    id="my_id",
)
print(resp2)
response = client.ingest.put_pipeline(
  id: 'attachment',
  body: {
    description: 'Extract attachment information',
    processors: [
      {
        attachment: {
          field: 'data',
          indexed_chars: 11,
          indexed_chars_field: 'max_size',
          remove_binary: false
        }
      }
    ]
  }
)
puts response

response = client.index(
  index: 'my-index-000001',
  id: 'my_id',
  pipeline: 'attachment',
  body: {
    data: 'e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0='
  }
)
puts response

response = client.get(
  index: 'my-index-000001',
  id: 'my_id'
)
puts response
PUT _ingest/pipeline/attachment
{
  "description" : "Extract attachment information",
  "processors" : [
    {
      "attachment" : {
        "field" : "data",
        "indexed_chars" : 11,
        "indexed_chars_field" : "max_size",
        "remove_binary": false
      }
    }
  ]
}
PUT my-index-000001/_doc/my_id?pipeline=attachment
{
  "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
}
GET my-index-000001/_doc/my_id

返回此内容

{
  "found": true,
  "_index": "my-index-000001",
  "_id": "my_id",
  "_version": 1,
  "_seq_no": 35,
  "_primary_term": 1,
  "_source": {
    "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
    "attachment": {
      "content_type": "application/rtf",
      "language": "is",
      "content": "Lorem ipsum",
      "content_length": 11
    }
  }
}
resp = client.ingest.put_pipeline(
    id="attachment",
    description="Extract attachment information",
    processors=[
        {
            "attachment": {
                "field": "data",
                "indexed_chars": 11,
                "indexed_chars_field": "max_size",
                "remove_binary": False
            }
        }
    ],
)
print(resp)

resp1 = client.index(
    index="my-index-000001",
    id="my_id_2",
    pipeline="attachment",
    document={
        "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
        "max_size": 5
    },
)
print(resp1)

resp2 = client.get(
    index="my-index-000001",
    id="my_id_2",
)
print(resp2)
response = client.ingest.put_pipeline(
  id: 'attachment',
  body: {
    description: 'Extract attachment information',
    processors: [
      {
        attachment: {
          field: 'data',
          indexed_chars: 11,
          indexed_chars_field: 'max_size',
          remove_binary: false
        }
      }
    ]
  }
)
puts response

response = client.index(
  index: 'my-index-000001',
  id: 'my_id_2',
  pipeline: 'attachment',
  body: {
    data: 'e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=',
    max_size: 5
  }
)
puts response

response = client.get(
  index: 'my-index-000001',
  id: 'my_id_2'
)
puts response
PUT _ingest/pipeline/attachment
{
  "description" : "Extract attachment information",
  "processors" : [
    {
      "attachment" : {
        "field" : "data",
        "indexed_chars" : 11,
        "indexed_chars_field" : "max_size",
        "remove_binary": false
      }
    }
  ]
}
PUT my-index-000001/_doc/my_id_2?pipeline=attachment
{
  "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
  "max_size": 5
}
GET my-index-000001/_doc/my_id_2

返回此内容

{
  "found": true,
  "_index": "my-index-000001",
  "_id": "my_id_2",
  "_version": 1,
  "_seq_no": 40,
  "_primary_term": 1,
  "_source": {
    "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
    "max_size": 5,
    "attachment": {
      "content_type": "application/rtf",
      "language": "sl",
      "content": "Lorem",
      "content_length": 5
    }
  }
}

将附件处理器与数组一起使用

编辑

要在附件数组中使用附件处理器,需要使用 foreach 处理器。 这使得可以在数组的各个元素上运行附件处理器。

例如,给定以下源

{
  "attachments" : [
    {
      "filename" : "ipsum.txt",
      "data" : "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo="
    },
    {
      "filename" : "test.txt",
      "data" : "VGhpcyBpcyBhIHRlc3QK"
    }
  ]
}

在这种情况下,我们希望处理附件字段中每个元素的数据字段,并将属性插入到文档中,因此使用以下 foreach 处理器

resp = client.ingest.put_pipeline(
    id="attachment",
    description="Extract attachment information from arrays",
    processors=[
        {
            "foreach": {
                "field": "attachments",
                "processor": {
                    "attachment": {
                        "target_field": "_ingest._value.attachment",
                        "field": "_ingest._value.data",
                        "remove_binary": False
                    }
                }
            }
        }
    ],
)
print(resp)

resp1 = client.index(
    index="my-index-000001",
    id="my_id",
    pipeline="attachment",
    document={
        "attachments": [
            {
                "filename": "ipsum.txt",
                "data": "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo="
            },
            {
                "filename": "test.txt",
                "data": "VGhpcyBpcyBhIHRlc3QK"
            }
        ]
    },
)
print(resp1)

resp2 = client.get(
    index="my-index-000001",
    id="my_id",
)
print(resp2)
response = client.ingest.put_pipeline(
  id: 'attachment',
  body: {
    description: 'Extract attachment information from arrays',
    processors: [
      {
        foreach: {
          field: 'attachments',
          processor: {
            attachment: {
              target_field: '_ingest._value.attachment',
              field: '_ingest._value.data',
              remove_binary: false
            }
          }
        }
      }
    ]
  }
)
puts response

response = client.index(
  index: 'my-index-000001',
  id: 'my_id',
  pipeline: 'attachment',
  body: {
    attachments: [
      {
        filename: 'ipsum.txt',
        data: 'dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo='
      },
      {
        filename: 'test.txt',
        data: 'VGhpcyBpcyBhIHRlc3QK'
      }
    ]
  }
)
puts response

response = client.get(
  index: 'my-index-000001',
  id: 'my_id'
)
puts response
PUT _ingest/pipeline/attachment
{
  "description" : "Extract attachment information from arrays",
  "processors" : [
    {
      "foreach": {
        "field": "attachments",
        "processor": {
          "attachment": {
            "target_field": "_ingest._value.attachment",
            "field": "_ingest._value.data",
            "remove_binary": false
          }
        }
      }
    }
  ]
}
PUT my-index-000001/_doc/my_id?pipeline=attachment
{
  "attachments" : [
    {
      "filename" : "ipsum.txt",
      "data" : "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo="
    },
    {
      "filename" : "test.txt",
      "data" : "VGhpcyBpcyBhIHRlc3QK"
    }
  ]
}
GET my-index-000001/_doc/my_id

返回此内容

{
  "_index" : "my-index-000001",
  "_id" : "my_id",
  "_version" : 1,
  "_seq_no" : 50,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "attachments" : [
      {
        "filename" : "ipsum.txt",
        "data" : "dGhpcyBpcwpqdXN0IHNvbWUgdGV4dAo=",
        "attachment" : {
          "content_type" : "text/plain; charset=ISO-8859-1",
          "language" : "en",
          "content" : "this is\njust some text",
          "content_length" : 24
        }
      },
      {
        "filename" : "test.txt",
        "data" : "VGhpcyBpcyBhIHRlc3QK",
        "attachment" : {
          "content_type" : "text/plain; charset=ISO-8859-1",
          "language" : "en",
          "content" : "This is a test",
          "content_length" : 16
        }
      }
    ]
  }
}

请注意,需要设置 target_field,否则将使用默认值,即顶级字段 attachment。 此顶级字段上的属性将仅包含第一个附件的值。 但是,通过在 _ingest._value 上指定 target_field,它将正确地将属性与正确的附件关联起来。