Azure Blob 存储输入
编辑Azure Blob 存储输入
编辑使用 azure blob storage input
从存储在 Azure 云上的容器中的文件中读取内容。 可以配置输入以使用轮询或不使用轮询,但目前,如果禁用轮询,它将仅执行一次性传递,列出文件内容并结束进程。 通常建议在大多数情况下使用轮询,即使在处理大量文件时可能会变得昂贵。
为了减轻错误并确保稳定的处理环境,此输入采用以下功能
- 在处理 Azure Blob 容器时,如果突然发生任何中断,该进程将能够在上次处理并成功保存状态的文件后恢复。
- 如果某些文件发生任何错误,将适当地记录这些错误,但其余文件将继续正常处理。
- 如果发生任何导致主线程停止的重大错误,将适当地生成日志,描述所述错误。
filebeat.inputs: - type: azure-blob-storage id: my-azureblobstorage-id enabled: true account_name: some_account auth.shared_credentials.account_key: some_key containers: - name: container_1 max_workers: 3 poll: true poll_interval: 10s - name: container_2 max_workers: 3 poll: true poll_interval: 10s
解释: 上面给出的 configuration
描述了一个基本的 Blob 存储配置,其中包含两个名为 container_1
和 container_2
的容器。 这些容器中的每一个都有自己的属性,例如 name
、max_workers
、poll
和 poll_interval
。 这些属性在下面给出了详细解释。 现在,让我们尝试了解此配置的工作原理。
为了使 Azure Blob 存储输入能够识别它需要读取和处理的文件,它将需要指定容器名称。 我们可以根据需要添加任意数量的容器。 我们还可以在根级别配置属性 max_workers
、poll
和 poll_interval
,然后将其应用于未显式指定任何这些属性的所有容器。
如果在根级别指定了属性 max_workers
、poll
和 poll_interval
,则仍然可以在容器级别使用不同的值覆盖这些属性,从而提供广泛的灵活性和自定义。 下面的示例展示了此行为。
接收到此配置后,Azure Blob 存储输入将连接到服务并使用给定的 account_name
和 auth.shared_credentials.account_key
检索 ServiceClient
,然后它将为每个容器生成两个主要的 go-routine。 此后,这些例程(线程)中的每一个都将初始化一个调度程序,该调度程序将反过来使用 max_workers
值来初始化一个内存中的工作线程池(线程池),其中包含 3
个可用的 workers
。 基本上,这等同于两个工作线程池实例,每个容器一个,每个池都有 3 个工作线程。 这些 workers
将负责执行处理文件的 jobs
(在本例中为读取并输出文件内容)。
调度程序负责调度作业,并在每次迭代时使用池中 最大可用工作线程
来决定要检索和处理的文件数量。 这可以保持工作负载分配的效率。 调度程序使用 poll_interval
属性值来决定每次迭代后等待多长时间。 每次迭代都包括处理一定数量的文件,由 最大可用工作线程
值决定。
示例响应:-
{ "@timestamp": "2022-07-25T07:00:18.544Z", "@metadata": { "beat": "filebeat", "type": "_doc", "version": "8.4.0", "_id": "beatscontainer-data_3.json-worker-1" }, "message": "{\n \"id\": 3,\n \"title\": \"Samsung Universe 9\",\n \"description\": \"Samsung's new variant which goes beyond Galaxy to the Universe\",\n \"price\": 1249,\n \"discountPercentage\": 15.46,\n \"rating\": 4.09,\n \"stock\": 36,\n \"brand\": \"Samsung\",\n \"category\": \"smartphones\",\n \"thumbnail\": \"https://dummyjson.com/image/i/products/3/thumbnail.jpg\",\n \"images\": [\n \"https://dummyjson.com/image/i/products/3/1.jpg\"\n ]\n}", "cloud": { "provider": "azure" }, "input": { "type": "azure-blob-storage" }, "log": { "offset": 200, "file": { "path": "https://beatsblobstorage1.blob.core.windows.net/beatscontainer/data_3.json" } }, "azure": { "storage": { "container": { "name": "beatscontainer" }, "blob": { "content_type": "application/json", "name": "data_3.json" } } }, "event": { "kind": "publish_data" } }
正如我们从上面的响应中看到的那样,message
字段包含原始字符串化的数据。
以下是一些关键属性:-
- message:原始字符串化的 Blob 数据。
- log.file.path:Azure 云中 Blob 的路径。
- azure.storage.blob.container.name:从中读取文件的容器的名称。
- azure.storage.blob.object.name:已读取的文件/Blob 的名称。
- azure.storage.blob.object.content_type:文件/Blob 的内容类型。 您可以在此处找到支持的内容类型。
现在,让我们更详细地探讨一下配置属性。
account_name
编辑此属性是各种内部操作所必需的,这些操作与身份验证、创建服务客户端和 Blob 客户端有关,这些客户端在内部用于各种处理目的。
auth.oauth2
编辑此属性包含 Microsoft Entra ID RBAC 身份验证凭据,用于安全连接到 Azure Blob 存储。auth.oauth2
属性包含以下子属性
-
client_id
:Azure Entra ID 应用程序的客户端 ID。 -
client_secret
:Azure Entra ID 应用程序的客户端密钥。 -
tenant_id
:Azure Entra ID 应用程序的租户 ID。
下面给出了一个使用 auth.oauth2
的示例配置
filebeat.inputs: - type: azure-blob-storage account_name: some_account auth.oauth2: client_id: "some_client_id" client_secret: "some_client_secret" tenant_id: "some_tenant_id" containers: - name: container_1 max_workers: 3 poll: true poll_interval: 10s
有关如何设置 auth.oauth2
凭据的信息,请参阅 Azure 文档此处
根据我们的内部测试,服务主体似乎需要至少 blobOwner 的访问级别才能读取 Blob。 如果您在访问级别方面遇到任何问题,请确保将访问级别设置为 blobOwner。
auth.shared_credentials.account_key
编辑此属性包含在 Azure 云上的相应存储帐户下的 访问密钥
部分下找到的 访问密钥。 单个存储帐户可以包含多个容器,并且它们都将使用此公共访问密钥。
auth.connection_string.uri
编辑此属性包含在 Azure 云上的相应存储帐户下的 访问密钥
部分下找到的 连接字符串。 单个存储帐户可以包含多个容器,并且它们都将使用此公共连接字符串。
我们只需要指定 auth.shared_credentials.account_key
或 auth.connection_string.uri
中的一个用于身份验证目的。 如果同时指定了这两个属性,则将使用配置中首先出现的属性。
storage_url
编辑如果需要,可以使用此属性指定自定义存储 URL。 默认情况下,它指向 Azure 云存储。 仅当需要连接到提供 Blob 存储的不同环境时,才使用此属性。
URL 格式:{{protocol}}://{{account_name}}.{{storage_uri}}
。 此属性位于配置的根级别,而不是任何容器块内。
containers
编辑此属性包含有关特定容器的详细信息,例如 name
、max_workers
、poll
和 poll_interval
。 属性 name
特定于容器,因为它描述了容器名称,而字段 max_workers
、poll
和 poll_interval
可以同时存在于容器级别和根级别。 此属性在内部表示为一个数组,因此我们可以根据需要添加任意数量的容器。
name
编辑这是容器的特定子字段。 它指定容器名称。
max_workers
编辑此属性定义分配给工作线程池的最大工作线程数,用于处理读取文件内容的作业。 它可以在配置的根级别和容器级别指定。 如果同时指定了容器级别值和根级别值,则容器级别值将覆盖根级别值。 较大的工作线程数并不一定会提高吞吐量,应根据文件数量、正在处理的文件的大小以及可用资源仔细调整此参数。 将 max_workers
增加到非常高的值可能会导致资源利用率问题,并可能导致处理瓶颈。 通常建议最多使用 2000
个工作线程。 非常低的 max_worker
计数会大大增加获取 Blob 所需的网络调用次数,这可能会导致处理瓶颈。
输入会计算工作负载分配的批处理大小,以确保所有工作线程之间的工作负载均匀。 这意味着,对于给定的 max_workers
参数值,输入将计算该设置的最佳批处理大小。max_workers
值应根据诸如要处理的文件总数、可用系统资源和网络带宽等因素进行配置。
示例
- 设置
max_workers=3
会导致每个请求获取3 个 Blob
(批处理大小 = 3),然后将其分配给3 个工作线程
。 - 设置
max_workers=100
会导致每个请求获取100 个 Blob
(批处理大小 = 100),并将其分配给100 个工作线程
。
poll
编辑此属性通知调度程序是否继续轮询新文件。 此属性的默认值为 false
,因此如果不明确指定,它将不会继续轮询。 此属性可以在配置的根级别以及容器级别指定。 如果同时指定了容器级别值和根级别值,则容器级别值将始终优先并覆盖根级别值。
poll_interval
编辑此属性定义内部调度器在下次轮询调用以获取下一组 blob/文件后将等待的最长时间。它可以定义为以下格式:{{x}}s
、{{x}}m
、{{x}}h
,其中 s = 秒
、m = 分钟
,h = 小时
。值 {{x}}
可以是任何我们希望的值。例如:10s
表示我们希望每 10 秒进行一次轮询。如果未为此指定值,则默认初始化为 300 秒
。此属性可以在配置的根级别和容器级别指定。如果同时指定了根级别和容器级别的值,容器级别的值将始终具有优先级并覆盖根级别的值。
encoding
编辑用于读取包含国际字符的数据的文件编码。这仅适用于非 JSON 日志。请参阅 encoding
。
decoding
编辑文件解码选项用于指定将用于解码文件内容的编解码器。这可以应用于任何文件流数据。以下显示了一个示例配置
目前支持的编解码器如下:-
- CSV: 此编解码器解码 RFC 4180 CSV 数据流。
the CSV codec
编辑CSV
编解码器用于解码 RFC 4180 CSV 数据流。在不使用其他选项的情况下启用编解码器将使用默认的编解码器选项。
decoding.codec.csv.enabled: true
CSV 编解码器支持五个子属性来控制 CSV 解码的各个方面。comma
属性指定 CSV 格式使用的字段分隔符。如果未指定,则使用逗号字符,
。comment
属性指定应解释为注释标记的字符。如果指定,则以该字符开头的行将被忽略。comma
和 comment
都必须是单个字符。lazy_quotes
属性控制如何处理字段中的引号。如果 lazy_quotes
为 true,则引号可能出现在未加引号的字段中,并且未加倍的引号可能出现在加引号的字段中。trim_leading_space
属性指定应忽略前导空格,即使 comma
字符是空格。有关上述配置属性行为的完整详细信息,请参阅 CSV 解码器 文档。fields_names
属性可用于指定数据的列名。如果它不存在,则字段名称从数据的第一行非注释行获取。字段数必须与字段名称数匹配。
以下显示了一个示例配置
decoding.codec.csv.enabled: true decoding.codec.csv.comma: "\t" decoding.codec.csv.comment: "#"
file_selectors
编辑如果 Azure blob 存储容器将包含与 Filebeat 不应处理的文件相对应的 blob,则可以使用 file_selectors
来限制下载的文件。这是一个基于 regex
模式的选择器列表。regex
应与 blob 名称匹配,或者应该是 blob 名称的一部分(理想情况下是前缀)。regex
语法与 Go 编程语言中使用的语法相同。不匹配任何配置的正则表达式的文件将不会被处理。此属性可以在配置的根级别和容器级别指定。如果同时指定了根级别和容器级别的值,容器级别的值将始终具有优先级并覆盖根级别的值。
filebeat.inputs: - type: azure-blob-storage id: my-azureblobstorage-id enabled: true account_name: some_account auth.shared_credentials.account_key: some_key containers: - name: container_1 file_selectors: - regex: '/Monitoring/' - regex: 'docs/' - regex: '/Security-Logs/'
file_selectors
操作在代理本地执行。代理将下载所有文件,然后根据 file_selectors
对其进行筛选。如果文件数量非常多,这可能会导致处理瓶颈。建议仅在文件数量有限或有充足资源可用时才使用此属性。
expand_event_list_from_field
编辑如果使用此输入的 file-set 期望在特定字段或对象数组下接收多个捆绑的消息,则可以指定 expand_event_list_from_field
的配置选项。此设置能够将组值下的消息拆分为单独的事件。例如,如果您的日志采用 JSON 格式,并且事件在 JSON 对象“Records”下找到。要将事件拆分为单独的事件,可以将配置选项 expand_event_list_from_field
设置为 “Records”。此属性可以在配置的根级别和容器级别指定。如果同时指定了根级别和容器级别的值,容器级别的值将始终具有优先级并覆盖根级别的值。
{ "Records": [ { "eventVersion": "1.07", "eventTime": "2019-11-14T00:51:00Z", "region": "us-east-1", "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE", }, { "eventVersion": "1.07", "eventTime": "2019-11-14T00:52:00Z", "region": "us-east-1", "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE", } ] }
filebeat.inputs: - type: azure-blob-storage id: my-azureblobstorage-id enabled: true account_name: some_account auth.shared_credentials.account_key: some_key containers: - name: container_1 expand_event_list_from_field: Records
此属性仅适用于 JSON 文件格式。如果文件在根级别具有对象数组,则无需指定此属性。根级别的对象数组将自动拆分为单独的事件。如果由于某些意外错误而发生故障或输入崩溃,处理将从上次成功处理的文件/blob 继续。
timestamp_epoch
编辑此属性可用于过滤掉时间戳早于指定值的文件/blob。此属性的值应采用 Unix epoch
(秒)格式。时间戳值与从 blob 元数据获得的 LastModified Timestamp
进行比较。此属性可以在配置的根级别和容器级别指定。如果同时指定了根级别和容器级别的值,容器级别的值将始终具有优先级并覆盖根级别的值。
filebeat.inputs: - type: azure-blob-storage id: my-azureblobstorage-id enabled: true account_name: some_account auth.shared_credentials.account_key: some_key containers: - name: container_1 timestamp_epoch: 1627233600
Azure Blob 存储 API 不提供直接根据时间戳筛选文件的方法,因此输入将下载所有文件,然后根据时间戳筛选它们。如果文件数量非常多,这可能会导致处理瓶颈。建议仅在文件数量有限或有充足资源可用时才使用此属性。
案例 - 1
此处 container_1
使用根级别属性,而 container_2
覆盖这些值
filebeat.inputs: - type: azure-blob-storage id: my-azureblobstorage-id enabled: true account_name: some_account auth.shared_credentials.account_key: some_key max_workers: 10 poll: true poll_interval: 15s containers: - name: container_1 - name: container_2 max_workers: 3 poll: true poll_interval: 10s
解释:在此配置中,container_1
在 max_workers
、poll
和 poll_interval
中没有定义任何子属性。它从根级别继承这些字段的值,即 max_workers = 10
、poll = true
和 poll_interval = 15 秒
。但是,container_2
定义了这些字段,它将使用这些值而不是使用根值。
案例 - 2
此处 container_1
和 container_2
都覆盖了根值
filebeat.inputs: - type: azure-blob-storage id: my-azureblobstorage-id enabled: true account_name: some_account auth.shared_credentials.account_key: some_key max_workers: 10 poll: true poll_interval: 15s containers: - name: container_1 max_workers: 5 poll: true poll_interval: 10s - name: container_2 max_workers: 5 poll: true poll_interval: 10s
解释:在此配置中,即使我们在根级别指定了 max_workers = 10
、poll = true
和 poll_interval = 15s
,两个容器都会使用其各自的子属性中定义的值来覆盖这些值。
欢迎任何反馈,这将有助于我们进一步优化此输入。如有任何错误或功能请求,请随时打开 github 问题。