Azure Blob存储输入

编辑

使用azure blob storage input 从 Azure 云中容器中存储的文件读取内容。该输入可以配置为使用和不使用轮询的方式工作,但是目前,如果禁用轮询,它只会执行一次性通过,列出文件内容并结束进程。即使处理大量文件可能会很昂贵,但在大多数情况下仍然建议使用轮询。

为了减少错误并确保稳定的处理环境,此输入采用了以下功能:

  1. 在处理 Azure Blob 容器时,如果突然出现任何中断,该进程将能够在它处理的最后一个文件之后恢复,并且能够成功保存状态。
  2. 如果某些文件出现任何错误,它们将被适当记录,但其余文件将继续正常处理。
  3. 如果出现任何阻止主线程的主要错误,日志将被适当生成,描述该错误。

注意:目前仅支持JSONNDJSON Blob/文件格式。Blob/文件也可能是 gzip 压缩的。至于身份验证类型,我们目前支持共享访问密钥连接字符串

下面给出了一个带有每个字段详细说明的示例配置:-

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    max_workers: 3
    poll: true
    poll_interval: 10s
  - name: container_2
    max_workers: 3
    poll: true
    poll_interval: 10s

说明: 上面给出的配置描述了一个基本的 Blob 存储配置,其中包含名为container_1container_2的两个容器。这些容器中的每一个都有其自己的属性,例如namemax_workerspollpoll_interval。这些属性的详细说明在下面给出。现在让我们尝试了解此配置的工作原理。

为了让 Azure Blob 存储输入识别它需要读取和处理的文件,需要指定容器名称。我们可以根据需要添加任意数量的容器。我们还可以配置max_workerspollpoll_interval根级别的属性,这些属性将应用于未显式指定这些属性的所有容器。

如果在根级别指定了属性max_workerspollpoll_interval,这些属性仍然可以在容器级别使用不同的值进行覆盖,从而提供广泛的灵活性和自定义功能。下面的示例显示了此行为。

在收到此配置后,Azure Blob 存储输入将使用给定的account_nameauth.shared_credentials.account_key连接到服务并检索ServiceClient,然后它将为每个容器生成两个主要的 goroutine。此后,每个例程(线程)将初始化一个调度程序,该调度程序将反过来使用max_workers值来使用3个可用worker初始化一个内存中的工作池(线程池)。基本上,这相当于两个工作池实例,每个容器一个,每个都有 3 个工作器。这些worker将负责执行处理文件的job(在本例中为读取和输出文件内容)。

调度程序负责调度作业,并在每次迭代中使用池中的最大可用工作器来确定要检索和处理的文件数量。这保持了工作分配效率。调度程序使用poll_interval属性值来确定每次迭代后等待多长时间。每次迭代都包括处理一定数量的文件,由最大可用工作器值决定。

示例响应:-

 {
    "@timestamp": "2022-07-25T07:00:18.544Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "_doc",
        "version": "8.4.0",
        "_id": "beatscontainer-data_3.json-worker-1"
    },
    "message": "{\n    \"id\": 3,\n    \"title\": \"Samsung Universe 9\",\n    \"description\": \"Samsung's new variant which goes beyond Galaxy to the Universe\",\n    \"price\": 1249,\n    \"discountPercentage\": 15.46,\n    \"rating\": 4.09,\n    \"stock\": 36,\n    \"brand\": \"Samsung\",\n    \"category\": \"smartphones\",\n    \"thumbnail\": \"https://dummyjson.com/image/i/products/3/thumbnail.jpg\",\n    \"images\": [\n        \"https://dummyjson.com/image/i/products/3/1.jpg\"\n    ]\n}",
    "cloud": {
        "provider": "azure"
    },
    "input": {
        "type": "azure-blob-storage"
    },
    "log": {
        "offset": 200,
        "file": {
            "path": "https://beatsblobstorage1.blob.core.windows.net/beatscontainer/data_3.json"
        }
    },
    "azure": {
        "storage": {
            "container": {
                "name": "beatscontainer"
            },
            "blob": {
                "content_type": "application/json",
                "name": "data_3.json"
            }
        }
    },
    "event": {
        "kind": "publish_data"
    }
}

从上面的响应中可以看出,message字段包含原始的字符串化数据。

一些关键属性如下:-

  1. message:原始字符串化的 Blob 数据。
  2. log.file.path:Azure 云中 Blob 的路径。
  3. azure.storage.blob.container.name:已读取文件的容器名称。
  4. azure.storage.blob.object.name:已读取的文件/Blob 的名称。
  5. azure.storage.blob.object.content_type:文件/Blob 的内容类型。您可以在此处找到受支持的内容类型。

现在让我们更详细地探讨配置属性。

支持的属性:-

account_name

编辑

此属性对于各种内部操作是必需的,这些操作涉及身份验证、创建服务客户端和 Blob 客户端,这些客户端在各种处理目的中内部使用。

auth.oauth2

编辑

此属性包含用于安全连接到 Azure Blob 存储的 Microsoft Entra ID RBAC 身份验证凭据。auth.oauth2 属性包含以下子属性:

  1. client_id:Azure Entra ID 应用程序的客户端 ID。
  2. client_secret:Azure Entra ID 应用程序的客户端密钥。
  3. tenant_id:Azure Entra ID 应用程序的租户 ID。

下面给出了一个包含auth.oauth2的示例配置:

filebeat.inputs:
- type: azure-blob-storage
  account_name: some_account
  auth.oauth2:
    client_id: "some_client_id"
    client_secret: "some_client_secret"
    tenant_id: "some_tenant_id"
  containers:
  - name: container_1
    max_workers: 3
    poll: true
    poll_interval: 10s

如何在 Azure 文档此处中设置auth.oauth2凭据。

根据我们的内部测试,似乎我们需要至少blobOwner访问级别才能使服务主体能够读取 Blob。如果您在访问级别方面遇到任何问题,请确保访问级别设置为blobOwner

auth.shared_credentials.account_key

编辑

此属性包含在 Azure 云中相应存储帐户的访问密钥部分下找到的访问密钥。单个存储帐户可以包含多个容器,它们都将使用此公共访问密钥。

auth.connection_string.uri

编辑

此属性包含在 Azure 云中相应存储帐户的访问密钥部分下找到的连接字符串。单个存储帐户可以包含多个容器,它们都将使用此公共连接字符串。

我们只需要指定auth.shared_credentials.account_keyauth.connection_string.uri中的一个用于身份验证。如果同时指定这两个属性,则将使用配置中首先出现的属性。

storage_url

编辑

如果需要,可以使用此属性指定自定义存储 URL。默认情况下,它指向 Azure 云存储。只有在需要连接到提供 Blob 存储的不同环境时才使用此属性。

URL 格式:{{protocol}}://{{account_name}}.{{storage_uri}}。此属性位于配置的根级别,而不是任何容器块内。

containers

编辑

此属性包含有关特定容器的详细信息,例如namemax_workerspollpoll_intervalname属性特定于容器,因为它描述了容器名称,而字段max_workerspollpoll_interval既可以在容器级别也可以在根级别存在。此属性在内部表示为数组,因此我们可以根据需要添加任意数量的容器。

name

编辑

这是容器的特定子字段。它指定容器名称。

max_workers

编辑

此属性定义在工作池(线程池)中分配的最大工作器(goroutine/轻量级线程)数量,用于处理读取文件内容的作业。更多的工作器意味着实现更高的并发性。由于内部 SDK 的限制,每个容器最多可以定义5000个工作器。此属性可以在配置的根级别以及容器级别指定。如果同时指定了容器级别值和根级别值,则容器级别值始终优先并覆盖根级别值。

poll

编辑

此属性通知调度程序是否继续轮询新文件。其默认值为false,因此如果未显式指定,则不会继续轮询。此属性可以在配置的根级别以及容器级别指定。如果同时指定了容器级别值和根级别值,则容器级别值始终优先并覆盖根级别值。

poll_interval

编辑

此属性定义内部调度程序在发出轮询调用以获取下一组 Blob/文件后,允许的最大时间。它可以采用以下格式定义:{{x}}s{{x}}m{{x}}h,其中 s = 秒m = 分钟h = 小时{{x}} 的值可以是任何我们想要的值。例如:10s 表示我们希望每 10 秒进行一次轮询。如果未为此指定值,则默认初始化为 300 秒。此属性可以在配置的根级别和容器级别指定。如果同时指定两者,则容器级别的值始终优先并覆盖根级别的值。

encoding

编辑

用于读取包含国际字符的数据的文件编码。这仅适用于非 JSON 日志。参见 encoding

decoding

编辑

文件解码选项用于指定用于解码文件内容的编解码器。这可以应用于任何文件流数据。下面显示一个示例配置。

当前支持的编解码器如下:

  1. CSV:此编解码器解码 RFC 4180 CSV 数据流。

the CSV codec

编辑

CSV 编解码器用于解码 RFC 4180 CSV 数据流。启用编解码器而不使用其他选项将使用默认的编解码器选项。

  decoding.codec.csv.enabled: true

CSV 编解码器支持五个子属性来控制 CSV 解码的各个方面。comma 属性指定 CSV 格式使用的字段分隔符字符。如果未指定,则使用逗号字符 ,comment 属性指定应解释为注释标记的字符。如果指定,则以该字符开头的行将被忽略。commacomment 都必须是单个字符。lazy_quotes 属性控制如何处理字段中的引号。如果 lazy_quotes 为 true,则引号可能出现在未加引号的字段中,而未加倍的引号可能出现在加引号的字段中。trim_leading_space 属性指定应忽略前导空格,即使 comma 字符为空格也是如此。有关上述配置属性行为的完整详细信息,请参阅 CSV 解码器 文档 fields_names 属性可用于指定数据的列名。如果不存在,则字段名将从第一行非注释数据行中获取。字段数必须与字段名数匹配。

下面显示一个示例配置

  decoding.codec.csv.enabled: true
  decoding.codec.csv.comma: "\t"
  decoding.codec.csv.comment: "#"

file_selectors

编辑

如果 Azure Blob 存储容器包含 Filebeat 不应处理的文件对应的 Blob,则可以使用 file_selectors 来限制下载的文件。这是一个基于 regex 模式的选择器列表。regex 应匹配 Blob 名称或应为 Blob 名称的一部分(理想情况下为前缀)。regex 语法与 Go 编程语言中使用的语法相同。不匹配任何已配置正则表达式的文件将不会被处理。此属性可以在配置的根级别和容器级别指定。如果同时指定两者,则容器级别的值始终优先并覆盖根级别的值。

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    file_selectors:
    - regex: '/Monitoring/'
    - regex: 'docs/'
    - regex: '/Security-Logs/'

expand_event_list_from_field

编辑

如果使用此输入的文件集预期接收多个捆绑在特定字段或对象数组下的消息,则可以指定 expand_event_list_from_field 的配置选项。此设置能够将组值下的消息拆分为单独的事件。例如,如果您的日志采用 JSON 格式,并且事件位于 JSON 对象“Records”下。要将事件拆分为单独的事件,可以将配置选项 expand_event_list_from_field 设置为“Records”。此属性可以在配置的根级别和容器级别指定。如果同时指定两者,则容器级别的值始终优先并覆盖根级别的值。

{
    "Records": [
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:51:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
        },
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:52:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
        }
    ]
}
filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    expand_event_list_from_field: Records

此属性仅适用于 JSON 文件格式。如果文件在根级别具有对象数组,则不需要指定此属性。根级别的对象数组会自动拆分为单独的事件。如果由于某些意外错误而发生故障或输入崩溃,则处理将从上次成功处理的文件/Blob 恢复。

timestamp_epoch

编辑

此属性可用于过滤掉时间戳早于指定值的文件/Blob。此属性的值应为 Unix epoch(秒)格式。时间戳值将与从 Blob 元数据获得的 LastModified Timestamp 进行比较。此属性可以在配置的根级别和容器级别指定。如果同时指定两者,则容器级别的值始终优先并覆盖根级别的值。

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    timestamp_epoch: 1627233600

下面的示例配置将进一步解释容器级别属性的覆盖:

案例 1

这里 container_1 使用根级别属性,而 container_2 覆盖这些值。

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  max_workers: 10
  poll: true
  poll_interval: 15s
  containers:
  - name: container_1
  - name: container_2
    max_workers: 3
    poll: true
    poll_interval: 10s

说明:在此配置中,container_1 没有定义 max_workerspollpoll_interval 的子属性。它继承这些字段的根级别值,即 max_workers = 10poll = truepoll_interval = 15 seconds。但是,container_2 定义了这些字段,它将使用这些值而不是根值。

案例 2

这里 container_1container_2 都覆盖根值。

filebeat.inputs:
  - type: azure-blob-storage
    id: my-azureblobstorage-id
    enabled: true
    account_name: some_account
    auth.shared_credentials.account_key: some_key
    max_workers: 10
    poll: true
    poll_interval: 15s
    containers:
    - name: container_1
      max_workers: 5
      poll: true
      poll_interval: 10s
    - name: container_2
      max_workers: 5
      poll: true
      poll_interval: 10s

说明:在此配置中,即使我们在根级别指定了 max_workers = 10poll = truepoll_interval = 15s,这两个容器也将使用它们各自在子属性中定义的值来覆盖这些值。

欢迎任何有助于我们进一步优化此输入的反馈。如有任何错误或功能请求,请随时在 github 上提交 issue。