Google Cloud Storage 输入编辑

使用 google cloud storage 输入 读取存储在 Google Cloud 上存储桶中的文件内容。该输入可以配置为在启用和不启用轮询的情况下工作,但目前,如果禁用轮询,它将只执行一次性传递,列出文件内容并结束进程。轮询通常是大多数情况下的建议,即使在处理大量文件时它可能会变得昂贵。

为了减少错误并确保稳定的处理环境,此输入采用以下功能

  1. 在处理 google cloud 存储桶时,如果突然出现任何中断,该过程将能够从其成功保存状态的最后一个文件之后恢复。
  2. 如果某些文件出现任何错误,它们将被适当地记录,但其他文件将继续正常处理。
  3. 如果发生任何导致主线程停止的重大错误,将生成相应的日志,描述该错误。

目前仅支持 JSONNDJSON 对象/文件格式。对象/文件也可以进行 gzip 压缩。“JSON 凭据密钥”和“凭据文件”是支持的身份验证类型。如果数组作为对象/文件的根对象存在,则它会自动拆分为单个对象并进行处理。如果文件/对象的下载失败或中断,则会重试下载 2 次。这目前不是用户可配置的。

下面给出了一个示例配置,并对每个字段进行了详细说明:-

filebeat.inputs:
- type: gcs
  id: my-gcs-id
  enabled: true
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  parse_json: true
  buckets:
  - name: gcs-test-new
    max_workers: 3
    poll: true
    poll_interval: 15s
    bucket_timeout: 60s
  - name: gcs-test-old
    max_workers: 3
    poll: true
    poll_interval: 10s
    bucket_timeout: 30s

说明: 上面给出的 配置 描述了一个基本的 gcs 配置,它有两个名为 gcs-test-newgcs-test-old 的存储桶。这些存储桶中的每一个都有自己的属性,例如 namemax_workerspollpoll_intervalbucket_timeout。这些属性在下面有详细的解释。现在让我们试着了解一下这个配置是如何工作的。

为了让 google cloud storage 输入标识它需要读取和处理的文件,它需要指定存储桶名称。我们可以根据需要添加任意数量的存储桶。我们还可以在根级别配置属性 max_workerspollpoll_intervalbucket_timeout,然后这些属性将应用于所有未明确指定任何这些属性的存储桶。

如果在根级别指定了属性 max_workerspollpoll_intervalbucket_timeout,则仍然可以在存储桶级别使用不同的值覆盖这些属性,从而提供广泛的灵活性和定制化。下面的例子下面展示了这种行为。

在接收到此配置后,google cloud storage 输入将连接到服务并使用给定的 bucket_nameauth.credentials_file 检索 存储客户端,然后它将生成两个主要的 go-routines,每个存储桶一个。在此之后,每个例程(线程)将初始化一个调度器,该调度器将依次使用 max_workers 值初始化一个内存工作池(线程池),其中有 3 个可用的 工作线程。基本上,这相当于工作池的两个实例,每个存储桶一个,每个实例有 3 个工作线程。这些 工作线程 将负责执行处理文件的 作业(在本例中是读取和输出文件的内容)。

调度器负责调度作业,并在每次迭代时使用池中 最大可用工作线程数 来决定要检索和处理的文件数量。这保持了工作分配的效率。调度器使用 poll_interval 属性值来决定每次迭代后等待多长时间。如果对存储桶列表 api 的调用超过了给定的值,则使用 bucket_timeout 值使其超时。每次迭代都包括处理一定数量的文件,这由 最大可用工作线程数 值决定。

示例响应:-

{
  "@timestamp": "2022-09-01T13:54:24.588Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.5.0",
    "_id": "gcs-test-new-data_3.json-worker-1"
  },
  "log": {
    "offset": 200,
    "file": {
      "path": "gs://gcs-test-new/data_3.json"
    }
  },
  "input": {
    "type": "gcs"
  },
  "message": "{\n    \"id\": 1,\n    \"title\": \"iPhone 9\",\n    \"description\": \"An apple mobile which is nothing like apple\",\n    \"price\": 549,\n    \"discountPercentage\": 12.96,\n    \"rating\": 4.69,\n    \"stock\": 94,\n    \"brand\": \"Apple\",\n    \"category\": \"smartphones\",\n    \"thumbnail\": \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\",\n    \"images\": [\n        \"https://dummyjson.com/image/i/products/1/1.jpg\",\n        \"https://dummyjson.com/image/i/products/1/2.jpg\",\n        \"https://dummyjson.com/image/i/products/1/3.jpg\",\n        \"https://dummyjson.com/image/i/products/1/4.jpg\",\n        \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\"\n    ]\n}\n",
  "cloud": {
    "provider": "goole cloud"
  },
  "gcs": {
    "storage": {
      "bucket": {
        "name": "gcs-test-new"
      },
      "object": {
        "name": "data_3.json",
        "content_type": "application/json",
        "json_data": [
          {
            "id": 1,
            "discountPercentage": 12.96,
            "rating": 4.69,
            "brand": "Apple",
            "price": 549,
            "category": "smartphones",
            "thumbnail": "https://dummyjson.com/image/i/products/1/thumbnail.jpg",
            "description": "An apple mobile which is nothing like apple",
            "title": "iPhone 9",
            "stock": 94,
            "images": [
              "https://dummyjson.com/image/i/products/1/1.jpg",
              "https://dummyjson.com/image/i/products/1/2.jpg",
              "https://dummyjson.com/image/i/products/1/3.jpg",
              "https://dummyjson.com/image/i/products/1/4.jpg",
              "https://dummyjson.com/image/i/products/1/thumbnail.jpg"
            ]
          }
        ]
      }
    }
  },
  "event": {
    "kind": "publish_data"
  }
}

从上面的响应中我们可以看到,message 字段包含原始的字符串化数据,而 gcs.storage.object.data 包含对象化的数据。

一些关键属性如下:-

  1. message:原始的字符串化对象数据。
  2. log.file.path:对象在 google cloud 中的路径。
  3. gcs.storage.bucket.name:读取文件的存储桶的名称。
  4. gcs.storage.object.name:已读取的文件/对象的名称。
  5. gcs.storage.object.content_type:文件/对象的内容类型。您可以在此处找到支持的内容类型。
  6. gcs.storage.object.json_data:对象化的 json 文件数据,表示文件的内容。

现在让我们更详细地探讨一下配置属性。

支持的属性:-

project_id编辑

此属性是各种内部操作所必需的,这些操作涉及身份验证、创建存储客户端和日志记录,这些操作在内部用于各种处理目的。

auth.credentials_json.account_key编辑

此属性包含 json 服务帐户凭据字符串,可以从 google cloud 控制台生成,参考:https://cloud.google.com/iam/docs/creating-managing-service-account-keys,在相应的存储帐户下。单个存储帐户可以包含多个存储桶,它们都将使用此通用服务帐户访问密钥。

auth.credentials_file.path编辑

此属性包含 服务帐户凭据文件,可以从 google cloud 控制台生成,参考:https://cloud.google.com/iam/docs/creating-managing-service-account-keys,在相应的存储帐户下。单个存储帐户可以包含多个存储桶,它们都将使用此通用服务帐户凭据文件。

我们只需要指定 auth.credentials_json.account_keyauth.credentials_file.path 中的一个用于身份验证。如果同时指定了这两个属性,则将使用配置中首先出现的那个属性。

buckets编辑

此属性包含有关特定存储桶的详细信息,如 namemax_workerspollpoll_intervalbucket_timeout。属性 name 是特定于存储桶的,因为它描述了存储桶名称,而字段 max_workerspollpoll_intervalbucket_timeout 可以存在于存储桶级别和根级别。此属性在内部表示为一个数组,因此我们可以根据需要添加任意数量的存储桶。

name编辑

这是存储桶的特定子字段。它指定存储桶名称。

bucket_timeout编辑

此属性定义了在没有收到响应的情况下(例如:读取文件/列出文件)存储桶操作将放弃并停止的最长时间。它可以用以下格式定义:{{x}}s{{x}}m{{x}}h,其中 s = 秒m = 分钟h = 小时。值 {{x}} 可以是我们希望的任何值。如果没有为此指定值,则默认情况下将其初始化为 50 秒。此属性可以在配置的根级别和存储桶级别指定。如果同时指定了这两个级别,则存储桶级别的值将始终优先并覆盖根级别的值。

max_workers编辑

此属性定义了为处理读取文件内容的作业在工作池(线程池)中分配的最大工作线程数(go 例程/轻量级线程)。工作线程越多,意味着实现的并发量越大。由于内部 sdk 的限制,每个存储桶最多可以定义 5000 个工作线程。此属性可以在配置的根级别和存储桶级别指定。如果同时指定了这两个级别,则存储桶级别的值将始终优先并覆盖根级别的值。

poll编辑

此属性通知调度器是否继续轮询新文件。其默认值为 false,因此如果没有明确指定,它将不会继续轮询。此属性可以在配置的根级别和存储桶级别指定。如果同时指定了这两个级别,则存储桶级别的值将始终优先并覆盖根级别的值。

poll_interval编辑

此属性定义内部调度程序对下一组对象/文件进行轮询调用的最长时间。它可以使用以下格式定义:{{x}}s{{x}}m{{x}}h,其中 s = 秒m = 分钟h = 小时。值 {{x}} 可以是任何我们希望的值。例如:10s 表示我们希望每 10 秒进行一次轮询。如果未为此指定任何值,则默认情况下将其初始化为 300 秒。此属性可以在配置的根级别和存储桶级别指定。如果同时指定了这两个级别,则存储桶级别的值将始终优先,并覆盖根级别的值。

parse_json编辑

此属性通知发布者是否解析和对象化 JSON 数据。默认情况下,此属性设置为 false,因为处理高度嵌套的 JSON 数据可能会很昂贵。如果此属性设置为 false,则响应中的 gcs.storage.object.json_data 字段将为空数组。此属性仅适用于 JSON 对象,对其他类型的对象无效。此属性可以在配置的根级别和存储桶级别指定。如果同时指定了这两个级别,则存储桶级别的值将始终优先,并覆盖根级别的值。

file_selectors编辑

如果 GCS 存储桶中的对象对应于 Filebeat 不应处理的文件,则可以使用 file_selectors 来限制下载的文件。这是一个选择器列表,这些选择器基于正则表达式模式。正则表达式应与对象名称匹配,或者应该是对象名称的一部分(最好是前缀)。使用的正则表达式语法是 [RE2](https://github.com/google/re2/wiki/Syntax)。与任何配置的表达式不匹配的文件将不会被处理。此属性可以在配置的根级别和容器级别指定。如果同时指定了这两个级别,则容器级别的值将始终优先,并覆盖根级别的值。

filebeat.inputs:
- type: gcs
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  buckets:
  - name: obs-bucket
    max_workers: 3
    poll: true
    poll_interval: 15s
    bucket_timeout: 60s
    file_selectors:
    - regex: '/Monitoring/'
    - regex: 'docs/'
    - regex: '/Security-Logs/'

expand_event_list_from_field编辑

如果使用此输入的文件集希望接收捆绑在特定字段或对象数组下的多条消息,则可以指定 expand_event_list_from_field 的配置选项。此设置将能够将组值下的消息拆分为单独的事件。例如,如果您有 JSON 格式的日志,并且在 JSON 对象“Records”下找到了事件。要将事件拆分为单独的事件,可以将配置选项 expand_event_list_from_field 设置为“Records”。此属性可以在配置的根级别和容器级别指定。如果同时指定了这两个级别,则容器级别的值将始终优先,并覆盖根级别的值。

{
    "Records": [
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:51:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
        },
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:52:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
        }
    ]
}
filebeat.inputs:
- type: gcs
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  buckets:
  - name: obs-bucket
    max_workers: 3
    poll: true
    poll_interval: 15s
    bucket_timeout: 60s
    expand_event_list_from_field: Records

parse_json 设置与 expand_event_list_from_field 不兼容。如果启用,它将被忽略。此属性仅适用于 JSON 文件格式。如果文件在根级别有一个对象数组,则不需要指定此属性。根级别的对象数组会自动拆分为单独的事件。如果由于某些意外错误而导致失败或输入崩溃,处理将从最后一个成功处理的文件或对象恢复。

timestamp_epoch编辑

此属性可用于过滤掉时间戳早于指定值的文件和对象。此属性的值应采用 Unix epoch(秒)格式。时间戳值将与从对象元数据中获取的 object.Updated 字段进行比较。此属性可以在配置的根级别和容器级别指定。如果同时指定了这两个级别,则容器级别的值将始终优先,并覆盖根级别的值。

filebeat.inputs:
- type: gcs
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  buckets:
  - name: obs-bucket
    max_workers: 3
    poll: true
    poll_interval: 15s
    bucket_timeout: 60s
    timestamp_epoch: 1630444800

以下示例配置将进一步说明存储桶级别对属性的覆盖:

情况 1

此处,bucket_1 使用根级别属性,而 bucket_2 覆盖这些值

filebeat.inputs:
- type: gcs
  id: my-gcs-id
  enabled: true
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  max_workers: 10
  poll: true
  poll_interval: 15s
  buckets:
  - name: bucket_1
  - name: bucket_2
    max_workers: 3
    poll: true
    poll_interval: 10s

说明: 在此配置中,bucket_1max_workerspollpoll_interval 中没有定义子属性。它从根级别继承这些字段的值,即 max_workers = 10poll = truepoll_interval = 15 秒。但是,bucket_2 定义了这些字段,它将使用这些值,而不是使用根值。

情况 2

此处,bucket_1bucket_2 都覆盖根值

filebeat.inputs:
  - type: gcs
    id: my-gcs-id
    enabled: true
    project_id: my_project_id
    auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
    max_workers: 10
    poll: true
    poll_interval: 15s
    buckets:
    - name: bucket_1
      max_workers: 5
      poll: true
      poll_interval: 10s
    - name: bucket_2
      max_workers: 5
      poll: true
      poll_interval: 10s

说明: 在此配置中,即使我们在根级别指定了 max_workers = 10poll = truepoll_interval = 15s,这两个存储桶也会使用其各自定义为子属性的值来覆盖这些值。

欢迎任何有助于我们进一步优化此输入的反馈。对于任何错误或功能请求,请随时在 GitHub 上提交问题。