Azure Blob 存储输入编辑

使用 azure blob storage 输入 读取存储在 Azure 云上的容器中的文件内容。该输入可以配置为在使用和不使用轮询的情况下工作,但目前,如果禁用轮询,它将只执行一次性传递,列出文件内容并结束进程。对于大多数情况,通常建议使用轮询,即使在处理大量文件时轮询的成本很高。

为了减少错误并确保稳定的处理环境,此输入采用以下功能

  1. 处理 azure blob 容器时,如果突然发生任何中断,该进程将能够从上次成功保存状态的文件后恢复处理。
  2. 如果某些文件发生任何错误,将记录相应的错误,但其他文件将继续正常处理。
  3. 如果发生任何导致主线程停止的重大错误,将生成相应的日志,描述该错误。

注意:目前仅支持 JSONNDJSON blob/文件格式。Blob/文件也可以进行 gzip 压缩。至于身份验证类型,我们目前支持 共享访问密钥连接字符串

下面给出了一个示例配置,其中对每个字段都有详细说明:-

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    max_workers: 3
    poll: true
    poll_interval: 10s
  - name: container_2
    max_workers: 3
    poll: true
    poll_interval: 10s

说明: 上面给出的 配置 描述了一个基本的 blob 存储配置,它有两个名为 container_1container_2 的容器。这些容器中的每一个都有自己的属性,例如 namemax_workerspollpoll_interval。这些属性在下方有详细说明。现在让我们试着理解这个配置是如何工作的。

为了让 azure blob 存储输入识别它需要读取和处理的文件,它需要指定容器名称。我们可以根据需要设置任意数量的容器。我们还可以在根级别配置属性 max_workerspollpoll_interval,然后这些属性将应用于所有没有明确指定任何这些属性的容器。

如果在根级别指定了属性 max_workerspollpoll_interval,则仍然可以在容器级别使用不同的值覆盖这些属性,从而提供广泛的灵活性和定制。下面的示例展示了这种行为。

在接收到此配置后,azure blob 存储输入将连接到服务并使用给定的 account_nameauth.shared_credentials.account_key 获取 ServiceClient,然后它将生成两个主要的 go 例程,每个容器一个。在此之后,每个例程(线程)将初始化一个调度器,该调度器将依次使用 max_workers 值初始化一个内存中的工作线程池(线程池),其中有 3 个可用的 工作线程。基本上,这相当于两个工作线程池实例,每个容器一个,每个实例有 3 个工作线程。这些 工作线程 将负责执行处理文件的 作业(在本例中为读取和输出文件的内容)。

调度器负责调度作业,并在每次迭代中使用池中 最大可用工作线程 数来决定要检索和处理的文件数。这保持了工作分配的效率。调度器使用 poll_interval 属性值来决定每次迭代后等待多长时间。每次迭代都包含处理一定数量的文件,该数量由 最大可用工作线程 数决定。

示例响应:-

 {
    "@timestamp": "2022-07-25T07:00:18.544Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "_doc",
        "version": "8.4.0",
        "_id": "beatscontainer-data_3.json-worker-1"
    },
    "message": "{\n    \"id\": 3,\n    \"title\": \"Samsung Universe 9\",\n    \"description\": \"Samsung's new variant which goes beyond Galaxy to the Universe\",\n    \"price\": 1249,\n    \"discountPercentage\": 15.46,\n    \"rating\": 4.09,\n    \"stock\": 36,\n    \"brand\": \"Samsung\",\n    \"category\": \"smartphones\",\n    \"thumbnail\": \"https://dummyjson.com/image/i/products/3/thumbnail.jpg\",\n    \"images\": [\n        \"https://dummyjson.com/image/i/products/3/1.jpg\"\n    ]\n}",
    "cloud": {
        "provider": "azure"
    },
    "input": {
        "type": "azure-blob-storage"
    },
    "log": {
        "offset": 200,
        "file": {
            "path": "https://beatsblobstorage1.blob.core.windows.net/beatscontainer/data_3.json"
        }
    },
    "azure": {
        "storage": {
            "container": {
                "name": "beatscontainer"
            },
            "blob": {
                "content_type": "application/json",
                "name": "data_3.json"
            }
        }
    },
    "event": {
        "kind": "publish_data"
    }
}

从上面的响应中我们可以看到,message 字段包含原始的字符串化数据。

一些关键属性如下:-

  1. message:原始字符串化的 blob 数据。
  2. log.file.path:blob 在 azure 云中的路径。
  3. azure.storage.blob.container.name:读取文件的容器的名称。
  4. azure.storage.blob.object.name:已读取的文件/blob 的名称。
  5. azure.storage.blob.object.content_type:文件/blob 的内容类型。您可以在此处找到支持的内容类型。

现在让我们更详细地探讨一下配置属性。

支持的属性:-

account_name编辑

此属性是各种内部操作(如身份验证、创建服务客户端和 blob 客户端)所需的,这些操作在内部用于各种处理目的。

auth.shared_credentials.account_key编辑

此属性包含 访问密钥,该密钥可在 Azure 云上的相应存储帐户下的 访问密钥 部分找到。一个存储帐户可以包含多个容器,它们都将使用此公共访问密钥。

auth.connection_string.uri编辑

此属性包含 连接字符串,该字符串可在 Azure 云上的相应存储帐户下的 访问密钥 部分找到。一个存储帐户可以包含多个容器,它们都将使用此公共连接字符串。

我们只需要指定 auth.shared_credentials.account_keyauth.connection_string.uri 中的一个即可用于身份验证。如果两个属性都指定了,则将使用配置中先出现的那个。

storage_url编辑

如果需要,请使用此属性指定自定义存储 URL。默认情况下,它指向 azure 云存储。只有在需要连接到提供 blob 存储的不同环境时才使用此选项。

URL 格式: {{protocol}}://{{account_name}}.{{storage_uri}}。此属性位于配置的根级别,而不是任何容器块内。

containers编辑

此属性包含有关特定容器的详细信息,如 namemax_workerspollpoll_interval。属性 name 是特定于容器的,因为它描述了容器名称,而字段 max_workerspollpoll_interval 可以存在于容器级别和根级别。此属性在内部表示为一个数组,因此我们可以根据需要添加任意数量的容器。

name编辑

这是容器的一个特定子字段。它指定了容器名称。

max_workers编辑

此属性定义了在工作线程池(线程池)中为处理读取文件内容的作业分配的最大工作线程数(go 例程/轻量级线程)。工作线程数越多,实现的并发量就越大。由于内部 SDK 限制,每个容器最多可以定义 5000 个工作线程。此属性可以在配置的根级别和容器级别指定。如果同时指定了这两个级别的值,则容器级别的值将始终优先,并覆盖根级别的值。

poll编辑

此属性通知调度器是否继续轮询新文件。其默认值为 false,因此,如果没有明确指定,它将不会继续轮询。此属性可以在配置的根级别和容器级别指定。如果同时指定了这两个级别的值,则容器级别的值将始终优先,并覆盖根级别的值。

poll_interval编辑

此属性定义了内部调度器对下一组 blob/文件进行轮询调用之前等待的最长时间。它可以用以下格式定义:{{x}}s{{x}}m{{x}}h,其中 s = 秒m = 分钟h = 小时。值 {{x}} 可以是我们希望的任何值。例如:10s 表示我们希望每 10 秒进行一次轮询。如果没有为此指定值,则默认情况下,它将初始化为 300 秒。此属性可以在配置的根级别和容器级别指定。如果同时指定了这两个级别的值,则容器级别的值将始终优先,并覆盖根级别的值。

file_selectors编辑

如果 Azure Blob 存储容器包含 Filebeat 不应处理的文件对应的 Blob,则可以使用 file_selectors 来限制下载的文件。这是一个基于 regex 模式的选择器列表。regex 应该匹配 Blob 名称,或者应该是 Blob 名称的一部分(最好是前缀)。regex 语法与 Go 编程语言中使用的语法相同。与任何配置的正则表达式不匹配的文件将不会被处理。此属性可以在配置的根级别和容器级别指定。如果同时指定了两个级别,则容器级别值将始终优先,并覆盖根级别值。

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    file_selectors:
    - regex: '/Monitoring/'
    - regex: 'docs/'
    - regex: '/Security-Logs/'

expand_event_list_from_fieldedit

如果使用此输入的文件集希望接收捆绑在特定字段或对象数组下的多条消息,则可以指定 expand_event_list_from_field 的配置选项。此设置将能够将组值下的消息拆分为单独的事件。例如,如果您有 JSON 格式的日志,并且在 JSON 对象 "Records" 下找到了事件。要将事件拆分为单独的事件,可以将配置选项 expand_event_list_from_field 设置为 "Records"。此属性可以在配置的根级别和容器级别指定。如果同时指定了两个级别,则容器级别值将始终优先,并覆盖根级别值。

{
    "Records": [
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:51:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
        },
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:52:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
        }
    ]
}
filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    expand_event_list_from_field: Records

此属性仅适用于 JSON 文件格式。如果文件在根级别有一个对象数组,则不需要指定此属性。根级别的对象数组会自动拆分为单独的事件。如果由于某些意外错误而发生故障或输入崩溃,则处理将从最后一个成功处理的文件/Blob 恢复。

timestamp_epochedit

此属性可用于筛选出时间戳早于指定值的文件/Blob。此属性的值应采用 Unix epoch(秒)格式。时间戳值将与从 Blob 元数据中获取的 LastModified Timestamp 进行比较。此属性可以在配置的根级别和容器级别指定。如果同时指定了两个级别,则容器级别值将始终优先,并覆盖根级别值。

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    timestamp_epoch: 1627233600

以下示例配置将进一步解释容器级别的属性覆盖:-

情况 1

此处,container_1 使用根级别属性,而 container_2 覆盖这些值

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  max_workers: 10
  poll: true
  poll_interval: 15s
  containers:
  - name: container_1
  - name: container_2
    max_workers: 3
    poll: true
    poll_interval: 10s

说明: 在此配置中,container_1max_workerspollpoll_interval 中没有定义子属性。它从根级别继承这些字段的值,即 max_workers = 10poll = truepoll_interval = 15 seconds。但是,container_2 定义了这些字段,它将使用这些值,而不是使用根值。

情况 2

此处,container_1container_2 都覆盖根值

filebeat.inputs:
  - type: azure-blob-storage
    id: my-azureblobstorage-id
    enabled: true
    account_name: some_account
    auth.shared_credentials.account_key: some_key
    max_workers: 10
    poll: true
    poll_interval: 15s
    containers:
    - name: container_1
      max_workers: 5
      poll: true
      poll_interval: 10s
    - name: container_2
      max_workers: 5
      poll: true
      poll_interval: 10s

说明: 在此配置中,即使我们在根级别指定了 max_workers = 10poll = truepoll_interval = 15s,但两个容器都将使用其子属性中定义的各自值覆盖这些值。

欢迎任何有助于我们进一步优化此输入的反馈。请随时针对任何错误或功能请求提交 Github 问题。