Google Cloud Storage 输入

编辑

使用 google cloud storage input 从存储在 Google Cloud 上的存储桶中的文件中读取内容。该输入可以配置为使用或不使用轮询,但如果禁用轮询,它将仅执行一次数据收集,列出文件内容并结束进程。

为了减轻错误并确保稳定的处理环境,此输入采用以下功能

  1. 在处理 Google Cloud 存储桶时,如果突然发生任何中断,该进程将能够在处理完最后一个文件并成功保存状态后恢复。
  2. 如果某些文件发生任何错误,它们将被适当地记录,但其余文件将继续正常处理。
  3. 如果发生任何停止主线程的重大错误,将生成适当的日志,描述所述错误。

配置选项删除通知bucket_timeout 配置选项已从 Google Cloud Storage 输入中删除。删除此选项的目的是简化配置并使其更易于使用。bucket_timeout 选项令人困惑,并且有可能让用户错误地配置输入,从而导致意外行为。该输入现在使用更强大、更高效的方式来在内部处理存储桶超时。

目前仅支持 JSONNDJSON 对象/文件格式。对象/文件也可以进行 gzip 压缩。“JSON 凭据密钥”和“凭据文件”是支持的身份验证类型。如果数组作为对象/文件的根对象存在,它会自动拆分为单独的对象并进行处理。如果文件/对象的下载失败或中断,则会重试下载 2 次。目前,这不可由用户配置。

下面给出了每个字段的详细说明的示例配置:-

filebeat.inputs:
- type: gcs
  id: my-gcs-id
  enabled: true
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  parse_json: true
  buckets:
  - name: gcs-test-new
    max_workers: 3
    poll: true
    poll_interval: 15s
  - name: gcs-test-old
    max_workers: 3
    poll: true
    poll_interval: 10s

说明:上面给出的此 configuration 描述了一个基本的 gcs 配置,其中包含两个名为 gcs-test-newgcs-test-old 的存储桶。这些存储桶中的每一个都有自己的属性,例如 namemax_workerspollpoll_interval。这些属性的详细解释在下面给出。现在,让我们尝试了解此配置的工作原理。

为了让 Google Cloud Storage 输入识别它需要读取和处理的文件,它将需要指定存储桶名称。我们可以根据需要设置任意数量的存储桶。我们还可以配置根级别的属性 max_workerspollpoll_interval,然后将其应用于所有未明确指定这些属性的存储桶。

如果在根级别指定了属性 max_workerspollpoll_interval,则仍可以在存储桶级别使用不同的值覆盖这些属性,从而提供广泛的灵活性和自定义性。下面的示例展示了此行为。

在收到此配置后,Google Cloud Storage 输入将连接到该服务,并使用给定的 bucket_nameauth.credentials_file 检索 Storage Client,然后它将为每个存储桶生成两个主要的 Go 协程。之后,这些例程(线程)中的每一个都将初始化一个调度程序,该调度程序将反过来使用 max_workers 值来初始化一个内存中的工作池(线程池),其中有 3 个可用的 workers。基本上,这相当于两个工作池实例,每个存储桶一个,每个实例有 3 个工作线程。这些 workers 将负责执行处理文件的 jobs(在本例中为读取和输出文件内容)。

调度程序负责调度作业,并在每次迭代时使用池中 最大可用工作线程数 来确定要检索和处理的文件数量。这使得工作分配高效。调度程序使用 poll_interval 属性值来确定每次迭代后等待的时间。每次迭代都包括处理一定数量的文件,这些文件由 最大可用工作线程数 值决定。

示例响应:-

{
  "@timestamp": "2022-09-01T13:54:24.588Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.5.0",
    "_id": "gcs-test-new-data_3.json-worker-1"
  },
  "log": {
    "offset": 200,
    "file": {
      "path": "gs://gcs-test-new/data_3.json"
    }
  },
  "input": {
    "type": "gcs"
  },
  "message": "{\n    \"id\": 1,\n    \"title\": \"iPhone 9\",\n    \"description\": \"An apple mobile which is nothing like apple\",\n    \"price\": 549,\n    \"discountPercentage\": 12.96,\n    \"rating\": 4.69,\n    \"stock\": 94,\n    \"brand\": \"Apple\",\n    \"category\": \"smartphones\",\n    \"thumbnail\": \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\",\n    \"images\": [\n        \"https://dummyjson.com/image/i/products/1/1.jpg\",\n        \"https://dummyjson.com/image/i/products/1/2.jpg\",\n        \"https://dummyjson.com/image/i/products/1/3.jpg\",\n        \"https://dummyjson.com/image/i/products/1/4.jpg\",\n        \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\"\n    ]\n}\n",
  "cloud": {
    "provider": "goole cloud"
  },
  "gcs": {
    "storage": {
      "bucket": {
        "name": "gcs-test-new"
      },
      "object": {
        "name": "data_3.json",
        "content_type": "application/json",
        "json_data": [
          {
            "id": 1,
            "discountPercentage": 12.96,
            "rating": 4.69,
            "brand": "Apple",
            "price": 549,
            "category": "smartphones",
            "thumbnail": "https://dummyjson.com/image/i/products/1/thumbnail.jpg",
            "description": "An apple mobile which is nothing like apple",
            "title": "iPhone 9",
            "stock": 94,
            "images": [
              "https://dummyjson.com/image/i/products/1/1.jpg",
              "https://dummyjson.com/image/i/products/1/2.jpg",
              "https://dummyjson.com/image/i/products/1/3.jpg",
              "https://dummyjson.com/image/i/products/1/4.jpg",
              "https://dummyjson.com/image/i/products/1/thumbnail.jpg"
            ]
          }
        ]
      }
    }
  },
  "event": {
    "kind": "publish_data"
  }
}

正如我们从上面的响应中看到的,message 字段包含原始的字符串化数据,而 gcs.storage.object.data 包含对象化数据。

一些关键属性如下:-

  1. message:原始字符串化对象数据。
  2. log.file.path:Google Cloud 中对象的路径。
  3. gcs.storage.bucket.name:从中读取文件的存储桶的名称。
  4. gcs.storage.object.name:已读取的文件/对象的名称。
  5. gcs.storage.object.content_type:文件/对象的内容类型。您可以在此处找到支持的内容类型。
  6. gcs.storage.object.json_data:对象化的 json 文件数据,表示文件的内容。

现在,让我们更详细地探讨一下配置属性。

支持的属性:-

project_id

编辑

此属性对于身份验证、创建存储客户端和日志记录方面的各种内部操作是必需的,这些操作在内部用于各种处理目的。

auth.credentials_json.account_key

编辑

此属性包含 json 服务帐户凭据字符串,该字符串可以从 Google Cloud 控制台生成,参考:https://cloud.google.com/iam/docs/creating-managing-service-account-keys,在相应的存储帐户下。单个存储帐户可以包含多个存储桶,它们都将使用此公共服务帐户访问密钥。

auth.credentials_file.path

编辑

此属性包含 服务帐户凭据文件,该文件可以从 Google Cloud 控制台生成,参考:https://cloud.google.com/iam/docs/creating-managing-service-account-keys,在相应的存储帐户下。单个存储帐户可以包含多个存储桶,它们都将使用此公共服务帐户凭据文件。

我们只需要指定 auth.credentials_json.account_keyauth.credentials_file.path 中的一个用于身份验证。如果同时指定了这两个属性,则将使用配置中首先出现的属性。

buckets

编辑

此属性包含有关特定存储桶的详细信息,例如 namemax_workerspollpoll_intervalname 属性特定于存储桶,因为它描述了存储桶名称,而字段 max_workerspollpoll_interval 既可以存在于存储桶级别,也可以存在于根级别。此属性在内部表示为数组,因此我们可以添加任意数量的存储桶。

name

编辑

这是存储桶的一个特定子字段。它指定了存储桶名称。

max_workers

编辑

此属性定义在工作池(线程池)中分配的最大工作线程数(goroutine / 轻量级线程),用于处理读取文件内容的作业。此属性既可以在配置的根级别指定,也可以在存储桶级别指定。如果同时指定了这两个级别的值,则存储桶级别的值会覆盖根级别的值。更多的工作线程并不一定能提高吞吐量,应根据文件数量、正在处理的文件大小和可用资源仔细调整此值。将 max_workers 增加到非常高的值可能会导致资源利用率问题,并可能导致处理瓶颈。通常建议的最大上限为 2000 个工作线程。非常低的 max_worker 计数将大大增加提取对象所需的网络调用次数,这会导致处理瓶颈。

max_workers 的值目前与 batch_size 相关联,以确保工作负载在所有 goroutine 中均匀分布。这确保了输入能够以高效的方式处理文件。此 batch_size 确定在一个调用中将提取多少个对象。max_workers 值应根据要读取的文件数量、可用资源和网络速度进行设置。例如,max_workers=3 表示每个分页请求总共提取 3 个 gcs 对象并将其分配给 3 个 goroutinemax_workers=100 表示每个分页请求提取 100 个 gcs 对象并将其分配给 100 个 goroutine

poll

编辑

此属性通知调度程序是否继续轮询新文件。此属性的默认值设置为 true。此属性既可以在配置的根级别指定,也可以在存储桶级别指定。如果同时指定了这两个级别的值,则存储桶级别的值始终优先并覆盖根级别的值。

poll_interval

编辑

此属性定义内部调度程序调用下一组对象/文件的轮询请求后的最大时间量。它可以用以下格式定义:{{x}}s{{x}}m{{x}}h,其中 s = 秒m = 分钟h = 小时。值 {{x}} 可以是我们希望的任何值。示例:10s 表示我们希望每 10 秒轮询一次。如果未指定此值,则默认将其初始化为 5 分钟。此属性既可以在配置的根级别指定,也可以在存储桶级别指定。如果同时指定了这两个级别的值,则存储桶级别的值始终优先并覆盖根级别的值。使用较低的 poll_interval 可以使输入更快,但代价是更高的资源利用率。

parse_json

编辑

此属性通知发布者是否解析和对象化 JSON 数据。默认情况下,此属性设置为 false,因为处理高度嵌套的 JSON 数据可能会很耗费资源。如果将其设置为 false,则响应中的 gcs.storage.object.json_data 字段将为空数组。此属性仅适用于 JSON 对象,对其他类型的对象无效。此属性可以在配置的根级别和存储桶级别指定。如果同时指定了根级别和存储桶级别的值,则存储桶级别的值始终优先并覆盖根级别的值。

encoding

编辑

用于读取包含国际字符的数据的文件编码。这仅适用于非 JSON 日志。请参阅 encoding

decoding

编辑

文件解码选项用于指定将用于解码文件内容的编解码器。这可以应用于任何文件流数据。下面显示了一个示例配置

目前支持的编解码器如下:-

  1. CSV:此编解码器解码 RFC 4180 CSV 数据流。

CSV 编解码器

编辑

CSV 编解码器用于解码 RFC 4180 CSV 数据流。启用编解码器而不使用其他选项将使用默认的编解码器选项。

  decoding.codec.csv.enabled: true

CSV 编解码器支持五个子属性来控制 CSV 解码的各个方面。comma 属性指定 CSV 格式使用的字段分隔符。如果未指定,则使用逗号字符 ,comment 属性指定应解释为注释标记的字符。如果指定了此属性,则将忽略以该字符开头的行。commacomment 都必须是单个字符。lazy_quotes 属性控制如何处理字段中的引号。如果 lazy_quotes 为 true,则引号可能出现在未加引号的字段中,并且未加倍的引号可能出现在加引号的字段中。trim_leading_space 属性指定应忽略前导空格,即使 comma 字符是空格也是如此。有关上述配置属性行为的完整详细信息,请参阅 CSV 解码器 文档fields_names 属性可用于指定数据的列名。如果此属性不存在,则字段名称将从数据的第一条非注释行中获取。字段的数量必须与字段名称的数量匹配。

下面显示了一个示例配置

  decoding.codec.csv.enabled: true
  decoding.codec.csv.comma: "\t"
  decoding.codec.csv.comment: "#"

file_selectors

编辑

如果 GCS 存储桶中的对象对应于 Filebeat 不应处理的文件,则可以使用 file_selectors 来限制下载的文件。这是一个基于正则表达式模式的选择器列表。正则表达式应与对象名称匹配,或者应为对象名称的一部分(理想情况下为前缀)。使用的正则表达式语法是 [RE2](https://github.com/google/re2/wiki/Syntax)。不匹配任何已配置表达式的文件将不会被处理。此属性可以在配置的根级别和容器级别指定。如果同时指定了根级别和容器级别的值,则容器级别的值始终优先并覆盖根级别的值。

filebeat.inputs:
- type: gcs
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  buckets:
  - name: obs-bucket
    max_workers: 3
    poll: true
    poll_interval: 15s
    file_selectors:
    - regex: '/Monitoring/'
    - regex: 'docs/'
    - regex: '/Security-Logs/'

file_selectors 操作在代理本地执行,因此使用此选项将导致代理下载所有文件,然后对其进行筛选。如果文件数量非常多,则可能会导致处理瓶颈。建议仅在文件数量有限或有充足资源可用时才使用此属性。

expand_event_list_from_field

编辑

如果使用此输入的 file-set 期望在特定字段或对象数组下接收捆绑的多个消息,则可以指定 expand_event_list_from_field 的配置选项。此设置能够将组值下的消息拆分为单独的事件。例如,如果您的日志采用 JSON 格式,并且事件在 JSON 对象“Records”下找到。要将事件拆分为单独的事件,可以将配置选项 expand_event_list_from_field 设置为“Records”。此属性可以在配置的根级别和容器级别指定。如果同时指定了根级别和容器级别的值,则容器级别的值始终优先并覆盖根级别的值。

{
    "Records": [
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:51:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
        },
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:52:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
        }
    ]
}
filebeat.inputs:
- type: gcs
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  buckets:
  - name: obs-bucket
    max_workers: 3
    poll: true
    poll_interval: 15s
    expand_event_list_from_field: Records

parse_json 设置与 expand_event_list_from_field 不兼容。如果启用此设置,则会被忽略。此属性仅适用于 JSON 文件格式。如果文件在根级别具有对象数组,则无需指定此属性。根级别对象数组会自动拆分为单独的事件。如果由于某些意外错误而发生故障或输入崩溃,则处理将从上次成功处理的文件或对象恢复。

timestamp_epoch

编辑

此属性可用于筛选出时间戳早于指定值的文件和对象。此属性的值应采用 unix epoch(秒)格式。时间戳值将与从对象元数据中获取的 object.Updated 字段进行比较。此属性可以在配置的根级别和容器级别指定。如果同时指定了根级别和容器级别的值,则容器级别的值始终优先并覆盖根级别的值。

filebeat.inputs:
- type: gcs
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  buckets:
  - name: obs-bucket
    max_workers: 3
    poll: true
    poll_interval: 15s
    timestamp_epoch: 1630444800

GCS API 没有提供直接基于时间戳筛选文件的方法,因此输入将下载所有文件,然后根据时间戳进行筛选。如果文件数量非常多,则可能会导致处理瓶颈。建议仅在文件数量有限或有充足资源可用时才使用此属性。此选项是垂直扩展,而不是水平扩展。

retry

编辑

此属性可用于配置一个子属性列表,这些子属性直接控制当文件/对象的下载失败或中断时,输入应如何表现。

  • max_attempts:此属性定义可重试错误应尝试的最大重试次数(包括初始 API 调用)。此属性的默认值为 3
  • initial_backoff_duration:此属性定义初始退避时间。此属性的默认值为 1s
  • max_backoff_duration:此属性定义最大退避时间。此属性的默认值为 30s
  • backoff_multiplier:此属性定义退避乘数因子。此属性的默认值为 2

initial_backoff_durationmax_backoff_duration 属性必须具有时间单位。有效的时间单位为 nsus(或 µs)、mssmh

通过配置这些属性,用户可以灵活地控制输入在下载失败或中断时应如何表现。此属性只能在配置的根级别指定,而不能在存储桶级别指定。它统一应用于所有存储桶。

下面给出一个示例配置:-

filebeat.inputs:
- type: gcs
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  retry:
    max_attempts: 3
    initial_backoff_duration: 2s
    max_backoff_duration: 60s
    backoff_multiplier: 2
  buckets:
  - name: obs-bucket
    max_workers: 3
    poll: true
    poll_interval: 11m

下面的示例配置将进一步解释存储桶级别的属性覆盖:-

案例 - 1

在此处,bucket_1 使用根级别属性,而 bucket_2 覆盖这些值

filebeat.inputs:
- type: gcs
  id: my-gcs-id
  enabled: true
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  max_workers: 10
  poll: true
  poll_interval: 15s
  buckets:
  - name: bucket_1
  - name: bucket_2
    max_workers: 3
    poll: true
    poll_interval: 10s

说明: 在此配置中,bucket_1max_workerspollpoll_interval 中没有定义子属性。它从根级别继承这些字段的值,即 max_workers = 10poll = truepoll_interval = 15 seconds。但是,bucket_2 定义了这些字段,它将使用这些值,而不是使用根值。

案例 - 2

在此处,bucket_1bucket_2 都覆盖了根值

filebeat.inputs:
  - type: gcs
    id: my-gcs-id
    enabled: true
    project_id: my_project_id
    auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
    max_workers: 10
    poll: true
    poll_interval: 15s
    buckets:
    - name: bucket_1
      max_workers: 5
      poll: true
      poll_interval: 10s
    - name: bucket_2
      max_workers: 5
      poll: true
      poll_interval: 10s

说明: 在此配置中,即使我们在根级别指定了 max_workers = 10poll = truepoll_interval = 15s,这两个存储桶也会使用其子属性中定义的各自值来覆盖这些值。

指标

编辑

此输入在 HTTP 监视端点下公开指标。这些指标在 /inputs 路径下公开。它们可用于观察输入的活动。

指标 描述

url

输入资源的 URL。

errors_total

输入遇到的错误总数。

decode_errors_total

输入遇到的解码错误总数。

gcs_objects_requested_total

已下载的 GCS 对象总数。

gcs_objects_published_total

已处理并发布的 GCS 对象总数。

gcs_objects_listed_total

列表操作返回的 GCS 对象总数。

gcs_bytes_processed_total

已处理的 GCS 字节总数。

gcs_events_created_total

从处理 GCS 数据创建的事件总数。

gcs_failed_jobs_total

失败作业的总数。

gcs_expired_failed_jobs_total

无法恢复的已过期失败作业的总数。

gcs_objects_tracked_gauge

状态注册表中当前跟踪的对象数量(仪表)。

gcs_objects_inflight_gauge

正在处理的 GCS 对象数量(仪表)。

gcs_jobs_scheduled_after_validation

验证后计划的作业数量的直方图。

gcs_object_processing_time

以纳秒为单位的 GCS 对象处理时间(从下载开始到解析完成)的直方图。

gcs_object_size_in_bytes

已处理的 GCS 对象大小(以字节为单位)的直方图。

gcs_events_per_object

每个 GCS 对象的事件计数的直方图。

source_lag_time

源 (Updated) 时间戳与读取对象的时间之间的时间差(以纳秒为单位)的直方图。

通用输入选项

编辑

通用选项

编辑

所有输入都支持以下配置选项。

enabled
编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags
编辑

Filebeat 在每个已发布事件的 tags 字段中包含的标记列表。标记使您可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件筛选。这些标记将附加到常规配置中指定的标记列表中。

示例

filebeat.inputs:
- type: gcs
  . . .
  tags: ["json"]
fields
编辑

可选字段,您可以指定这些字段以向输出添加其他信息。例如,您可以添加一些字段用于过滤日志数据。字段可以是标量值、数组、字典或这些类型的任意嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields 子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在通用配置中声明了重复的字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: gcs
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
编辑

如果此选项设置为 true,则自定义 字段 将作为输出文档中的顶级字段存储,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。

processors
编辑

应用于输入数据的一系列处理器。

有关在配置中指定处理器的信息,请参阅处理器

pipeline
编辑

为由此输入生成的事件设置的 Ingest 管道 ID。

管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会导致更简单的配置文件。如果管道在输入和输出中都配置了,则使用输入中的选项。

keep_null
编辑

如果此选项设置为 true,则具有 null 值的字段将发布在输出文档中。默认情况下,keep_null 设置为 false

index
编辑

如果存在,此格式化字符串将覆盖来自此输入的事件的索引(对于 elasticsearch 输出),或者设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能会扩展为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host
编辑

默认情况下,所有事件都包含 host.name。可以将此选项设置为 true 以禁用将此字段添加到所有事件。默认值为 false

欢迎任何反馈,这将有助于我们进一步优化此输入。 如果有任何错误或功能请求,请随时打开一个 github issue。