Google Cloud Storage 输入
编辑Google Cloud Storage 输入
编辑使用 google cloud storage input
从存储在您 Google Cloud 上的存储桶中的文件中读取内容。输入可以配置为使用和不使用轮询,但目前,如果禁用轮询,它只会执行一次性通过,列出文件内容并结束进程。即使处理大量文件可能会变得昂贵,但在大多数情况下仍然建议使用轮询。
为了减轻错误并确保稳定的处理环境,此输入采用了以下功能
- 在处理 Google Cloud 存储桶时,如果突然出现任何中断,该过程将能够在它处理的最后一个文件之后恢复,并且能够成功保存该文件的状态。
- 如果某些文件出现任何错误,将适当地记录这些错误,但其余文件将继续正常处理。
- 如果发生任何导致主线程停止的主要错误,将适当地生成日志,描述所述错误。
filebeat.inputs: - type: gcs id: my-gcs-id enabled: true project_id: my_project_id auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json parse_json: true buckets: - name: gcs-test-new max_workers: 3 poll: true poll_interval: 15s bucket_timeout: 60s - name: gcs-test-old max_workers: 3 poll: true poll_interval: 10s bucket_timeout: 30s
说明:上面给出的此 配置
描述了一个基本的 gcs 配置,其中包含两个名为 gcs-test-new
和 gcs-test-old
的存储桶。这些存储桶中的每一个都有自己的属性,例如 name
、max_workers
、poll
、poll_interval
和 bucket_timeout
。这些属性的详细说明在 下面给出。现在让我们尝试了解此配置的工作原理。
为了使 Google Cloud Storage 输入识别它需要读取和处理的文件,需要指定存储桶名称。我们可以根据需要拥有任意数量的存储桶。我们还可以配置根级别的属性 max_workers
、poll
、poll_interval
和 bucket_timeout
,然后这些属性将应用于所有未显式指定这些属性的存储桶。
如果在根级别指定了属性 max_workers
、poll
、poll_interval
和 bucket_timeout
,则仍然可以在存储桶级别使用不同的值覆盖这些属性,从而提供广泛的灵活性和自定义。 下面的示例显示了此行为。
收到此配置后,Google Cloud Storage 输入将连接到服务并使用给定的 bucket_name
和 auth.credentials_file
检索 Storage Client
,然后它将生成两个主要的 Go 协程,每个存储桶一个。在此之后,这些协程(线程)中的每一个都将初始化一个调度程序,该调度程序又将使用 max_workers
值初始化一个内存中的工作池(线程池),其中有 3
个 worker
可用。基本上,这相当于两个工作池实例,每个存储桶一个,每个都有 3 个工作线程。这些 worker
将负责执行处理文件的 job
(在本例中,读取并输出文件内容)。
调度程序负责调度作业,并在每次迭代中使用池中的 可用最大 worker
来决定要检索和处理的文件数量。这使工作分配保持高效。调度程序使用 poll_interval
属性值来决定每次迭代后等待多长时间。 bucket_timeout
值用于在存储桶列表 API 调用超过给定值时超时。每次迭代都包括处理一定数量的文件,由 可用最大 worker
值决定。
示例响应:-
{ "@timestamp": "2022-09-01T13:54:24.588Z", "@metadata": { "beat": "filebeat", "type": "_doc", "version": "8.5.0", "_id": "gcs-test-new-data_3.json-worker-1" }, "log": { "offset": 200, "file": { "path": "gs://gcs-test-new/data_3.json" } }, "input": { "type": "gcs" }, "message": "{\n \"id\": 1,\n \"title\": \"iPhone 9\",\n \"description\": \"An apple mobile which is nothing like apple\",\n \"price\": 549,\n \"discountPercentage\": 12.96,\n \"rating\": 4.69,\n \"stock\": 94,\n \"brand\": \"Apple\",\n \"category\": \"smartphones\",\n \"thumbnail\": \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\",\n \"images\": [\n \"https://dummyjson.com/image/i/products/1/1.jpg\",\n \"https://dummyjson.com/image/i/products/1/2.jpg\",\n \"https://dummyjson.com/image/i/products/1/3.jpg\",\n \"https://dummyjson.com/image/i/products/1/4.jpg\",\n \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\"\n ]\n}\n", "cloud": { "provider": "goole cloud" }, "gcs": { "storage": { "bucket": { "name": "gcs-test-new" }, "object": { "name": "data_3.json", "content_type": "application/json", "json_data": [ { "id": 1, "discountPercentage": 12.96, "rating": 4.69, "brand": "Apple", "price": 549, "category": "smartphones", "thumbnail": "https://dummyjson.com/image/i/products/1/thumbnail.jpg", "description": "An apple mobile which is nothing like apple", "title": "iPhone 9", "stock": 94, "images": [ "https://dummyjson.com/image/i/products/1/1.jpg", "https://dummyjson.com/image/i/products/1/2.jpg", "https://dummyjson.com/image/i/products/1/3.jpg", "https://dummyjson.com/image/i/products/1/4.jpg", "https://dummyjson.com/image/i/products/1/thumbnail.jpg" ] } ] } } }, "event": { "kind": "publish_data" } }
从上面的响应中可以看到,message
字段包含原始的字符串化数据,而 gcs.storage.object.data
包含对象化数据。
一些关键属性如下:-
- message:原始的字符串化对象数据。
- log.file.path:Google Cloud 中对象的路径。
- gcs.storage.bucket.name:读取文件的存储桶的名称。
- gcs.storage.object.name:已读取的文件/对象的名称。
- gcs.storage.object.content_type:文件/对象的内容类型。您可以在 此处找到支持的内容类型。
- gcs.storage.object.json_data:对象化的 JSON 文件数据,表示文件的内容。
现在让我们更详细地探讨配置属性。
project_id
编辑此属性对于各种内部操作(涉及身份验证、创建存储客户端和日志记录)是必需的,这些操作在内部用于各种处理目的。
auth.credentials_json.account_key
编辑此属性包含 JSON 服务帐户凭据字符串,可以从 Google Cloud Console 生成,参考:https://cloud.google.com/iam/docs/creating-managing-service-account-keys,在相应的存储帐户下。单个存储帐户可以包含多个存储桶,并且它们都将使用此公共服务帐户访问密钥。
auth.credentials_file.path
编辑此属性包含 服务帐户凭据文件,可以从 Google Cloud Console 生成,参考:https://cloud.google.com/iam/docs/creating-managing-service-account-keys,在相应的存储帐户下。单个存储帐户可以包含多个存储桶,并且它们都将使用此公共服务帐户凭据文件。
我们只需要指定 auth.credentials_json.account_key
或 auth.credentials_file.path
中的一个用于身份验证目的。如果两个属性都指定了,则将使用配置中首先出现的那个。
buckets
编辑此属性包含有关特定存储桶的详细信息,例如 name
、max_workers
、poll
、poll_interval
和 bucket_timeout
。属性 name
特定于存储桶,因为它描述了存储桶名称,而字段 max_workers
、poll
、poll_interval
和 bucket_timeout
可以在存储桶级别和根级别都存在。此属性在内部表示为数组,因此我们可以根据需要添加任意数量的存储桶。
name
编辑这是存储桶的特定子字段。它指定存储桶名称。
bucket_timeout
编辑此属性定义存储桶操作在没有收到响应(例如:读取文件/列出文件)后将放弃并停止的最长时间。它可以用以下格式定义:{{x}}s
、{{x}}m
、{{x}}h
,其中 s = 秒
、m = 分钟
和 h = 小时
。值 {{x}}
可以是任何我们想要的数字。如果未为此指定值,则默认情况下初始化为 50 秒
。此属性可以在配置的根级别以及存储桶级别都指定。如果同时指定了两个级别,则存储桶级别的值将始终优先并覆盖根级别的值。应使用的 bucket_timeout
值取决于文件的大小和网络速度。如果超时时间太短,输入将无法完全读取文件,并且日志中将出现 context_deadline_exceeded
错误。如果超时时间太长,输入将等待很长时间才能读取文件,这可能导致输入速度变慢。在设置这两个值时,应考虑 bucket_timeout
和 poll_interval
之间的比率。较低的 poll_interval
和非常高的 bucket_timeout
会导致资源利用率问题,因为调度操作将在每个轮询迭代中生成。如果先前的轮询操作仍在运行,这可能会导致并发运行的操作,从而导致随着时间的推移出现瓶颈。
max_workers
编辑此属性定义用于处理读取文件内容的作业的工作池(线程池)中分配的最大工作程序(goroutine/轻量级线程)数量。此属性可以在配置的根级别和存储桶级别都指定。如果同时指定了两个级别,则存储桶级别的值会覆盖根级别的值。工作程序数量的增加不一定能提高吞吐量,应根据要处理的文件数量、文件大小和可用资源仔细调整。将max_workers
增加到非常高的值可能会导致资源利用率问题,并可能导致处理瓶颈。通常建议最大限制为2000
个工作程序。max_worker
计数过低会大大增加获取对象的网络调用次数,这可能导致处理瓶颈。
当前,max_workers
的值与batch_size
相关联,以确保工作负载在所有goroutine之间均匀分布。这确保了输入能够以有效的方式处理文件。此batch_size
确定在一个调用中将获取多少个对象。应根据要读取的文件数量、可用资源和网络速度设置max_workers
值。例如,max_workers=3
表示每次分页请求总共获取3
个GCS对象,并在3个goroutine
之间分配;max_workers=100
表示每次分页请求获取100
个GCS对象,并在100个goroutine
之间分配。
poll
编辑此属性通知调度程序是否继续轮询新文件。其默认值为false
,因此如果没有明确指定,它不会继续轮询。此属性可以在配置的根级别和存储桶级别都指定。如果同时指定了两个级别,则存储桶级别的值始终优先并覆盖根级别的值。
poll_interval
编辑此属性定义内部调度程序在下次轮询下一组对象/文件之前需要等待的最长时间。它可以用以下格式定义:{{x}}s
、{{x}}m
、{{x}}h
,其中s = 秒
、m = 分钟
和h = 小时
。{{x}}
的值可以是任何我们想要的。例如:10s
表示我们希望每10秒轮询一次。如果未为此指定值,则默认初始化为300秒
。此属性可以在配置的根级别和存储桶级别都指定。如果同时指定了两个级别,则存储桶级别的值始终优先并覆盖根级别的值。poll_interval
应设置为等于bucket_timeout
的值。这将确保在当前存储桶都已处理完毕之前不会启动另一个调度操作。如果poll_interval
设置为小于bucket_timeout
的值,则输入将在当前操作完成之前启动另一个调度操作,这可能导致随着时间的推移出现瓶颈。较低的poll_interval
可以提高输入速度,但会消耗更多资源。
某些极端情况可能需要为poll_interval
和bucket_timeout
设置不同的值。例如,如果文件非常大且网络速度很慢,则应将bucket_timeout
的值设置为高于poll_interval
的值。这将确保轮询操作不会等待太长时间来读取文件,并在当前操作仍在处理时转到下一个迭代。这将确保更高的吞吐量和更好的资源利用率。
parse_json
编辑此属性通知发布者是否解析和对象化JSON数据。默认情况下,此属性设置为false
,因为处理高度嵌套的JSON数据可能会很昂贵。如果将其设置为false
,则响应中的gcs.storage.object.json_data字段将包含一个空数组。此属性仅适用于JSON对象,对其他类型的对象没有影响。此属性可以在配置的根级别和存储桶级别都指定。如果同时指定了两个级别,则存储桶级别的值始终优先并覆盖根级别的值。
encoding
编辑用于读取包含国际字符的数据的文件编码。这仅适用于非JSON日志。请参阅encoding
。
decoding
编辑文件解码选项用于指定用于解码文件内容的编解码器。这可以应用于任何文件流数据。下面显示了一个示例配置
当前支持的编解码器如下所示:
- CSV:此编解码器解码RFC 4180 CSV数据流。
the CSV codec
编辑CSV
编解码器用于解码RFC 4180 CSV数据流。启用编解码器而没有其他选项将使用默认的编解码器选项。
decoding.codec.csv.enabled: true
CSV编解码器支持五个子属性来控制CSV解码的各个方面。comma
属性指定CSV格式使用的字段分隔符字符。如果未指定,则使用逗号字符,
。comment
属性指定应解释为注释标记的字符。如果指定了此属性,则以该字符开头的行将被忽略。comma
和comment
都必须是单个字符。lazy_quotes
属性控制如何处理字段中的引号。如果lazy_quotes
为true,则引号可能会出现在未加引号的字段中,未加倍的引号可能会出现在加引号的字段中。trim_leading_space
属性指定即使comma
字符为空格,也应忽略前导空格。有关前面配置属性行为的完整详细信息,请参阅CSV解码器文档fields_names
属性可用于指定数据的列名。如果不存在,则从第一行非注释数据行获取字段名。字段数必须与字段名数匹配。
下面显示了一个示例配置
decoding.codec.csv.enabled: true decoding.codec.csv.comma: "\t" decoding.codec.csv.comment: "#"
file_selectors
编辑如果GCS存储桶中的对象对应于Filebeat不应该处理的文件,则可以使用file_selectors
来限制下载的文件。这是一个基于正则表达式模式的选择器列表。正则表达式应匹配对象名称或应为对象名称的一部分(理想情况下是前缀)。使用的正则表达式语法为[RE2](https://github.com/google/re2/wiki/Syntax)。与任何已配置表达式都不匹配的文件将不会被处理。此属性可以在配置的根级别和容器级别都指定。如果同时指定了两个级别,则容器级别的值始终优先并覆盖根级别的值。
filebeat.inputs: - type: gcs project_id: my_project_id auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json buckets: - name: obs-bucket max_workers: 3 poll: true poll_interval: 15s bucket_timeout: 60s file_selectors: - regex: '/Monitoring/' - regex: 'docs/' - regex: '/Security-Logs/'
file_selectors
操作是在代理本地执行的,因此使用此选项会导致代理下载所有文件,然后对其进行过滤。如果文件数量非常多,这可能会导致处理瓶颈。建议仅在文件数量有限或有充足资源可用时使用此属性。
expand_event_list_from_field
编辑如果使用此输入的文件集期望接收捆绑在特定字段下或对象数组下的多个消息,则可以指定expand_event_list_from_field
的配置选项。此设置将能够将组值下的消息拆分为单独的事件。例如,如果您的日志采用JSON格式,并且事件位于JSON对象“Records”下。要将事件拆分为单独的事件,可以将配置选项expand_event_list_from_field
设置为“Records”。此属性可以在配置的根级别和容器级别都指定。如果同时指定了两个级别,则容器级别的值始终优先并覆盖根级别的值。
{ "Records": [ { "eventVersion": "1.07", "eventTime": "2019-11-14T00:51:00Z", "region": "us-east-1", "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE", }, { "eventVersion": "1.07", "eventTime": "2019-11-14T00:52:00Z", "region": "us-east-1", "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE", } ] }
filebeat.inputs: - type: gcs project_id: my_project_id auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json buckets: - name: obs-bucket max_workers: 3 poll: true poll_interval: 15s bucket_timeout: 60s expand_event_list_from_field: Records
parse_json
设置与expand_event_list_from_field
不兼容。如果启用,它将被忽略。此属性仅适用于JSON文件格式。如果文件在根级别具有对象数组,则无需指定此属性。根级别的对象数组会自动拆分为单独的事件。如果由于某些意外错误导致发生故障或输入崩溃,则处理将从上次成功处理的文件或对象继续。
timestamp_epoch
编辑此属性可用于过滤掉时间戳早于指定值的文件和对象。此属性的值应采用Unixepoch
(秒)格式。时间戳值与从对象元数据获得的object.Updated
字段进行比较。此属性可以在配置的根级别和容器级别都指定。如果同时指定了两个级别,则容器级别的值始终优先并覆盖根级别的值。
filebeat.inputs: - type: gcs project_id: my_project_id auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json buckets: - name: obs-bucket max_workers: 3 poll: true poll_interval: 15s bucket_timeout: 60s timestamp_epoch: 1630444800
GCS API没有提供基于时间戳直接过滤文件的方法,因此输入将下载所有文件,然后根据时间戳对其进行过滤。如果文件数量非常多,这可能会导致处理瓶颈。建议仅在文件数量有限或有充足资源可用时使用此属性。此选项垂直扩展,而不是水平扩展。
案例1
此处bucket_1
使用根级别属性,而bucket_2
覆盖这些值
filebeat.inputs: - type: gcs id: my-gcs-id enabled: true project_id: my_project_id auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json max_workers: 10 poll: true poll_interval: 15s buckets: - name: bucket_1 - name: bucket_2 max_workers: 3 poll: true poll_interval: 10s
说明:在此配置中,bucket_1
在max_workers
、poll
和poll_interval
中没有定义任何子属性。它继承了来自根级别的这些字段的值,即max_workers = 10
、poll = true
和poll_interval = 15秒
。但是,bucket_2
定义了这些字段,它将使用这些值,而不是使用根值。
案例2
这里 bucket_1
和 bucket_2
都覆盖了根级别的值
filebeat.inputs: - type: gcs id: my-gcs-id enabled: true project_id: my_project_id auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json max_workers: 10 poll: true poll_interval: 15s buckets: - name: bucket_1 max_workers: 5 poll: true poll_interval: 10s - name: bucket_2 max_workers: 5 poll: true poll_interval: 10s
解释: 在此配置中,即使我们在根级别指定了 max_workers = 10
、poll = true
和 poll_interval = 15s
,这两个桶也会使用它们各自定义在其子属性中的值覆盖这些值。
欢迎任何有助于我们进一步优化此输入的反馈。如有任何错误或功能请求,请随时在 GitHub 上提交 issue。