AWS S3 输入

编辑

使用 aws-s3 输入从 S3 对象中检索日志,这些对象由从 SQS 队列读取的 S3 通知事件指向,或者直接轮询 S3 存储桶中的 S3 对象列表。首选使用 SQS 通知:轮询 S3 对象列表在性能和成本方面都很昂贵,因此最好仅在无法将 SQS 通知附加到 S3 存储桶时使用。例如,此输入可用于接收 S3 访问日志,以监控对存储桶发出的请求的详细记录。此输入还支持从 SNS 到 SQS 的 S3 通知。

设置 queue_url 配置值启用 SQS 通知方法。设置 bucket_arn 配置值启用 S3 存储桶列表轮询方法。这两个值不能同时设置,至少必须设置其中一个。

当使用 SQS 通知方法时,此输入依赖于传递到 SQS 队列的 S3 通知,用于 s3:ObjectCreated:* 事件。您必须创建一个 SQS 队列,并配置 S3 将事件发布到该队列。

当处理 SQS 消息指向的 S3 对象时,如果设置的可见性超时时间过半,并且处理仍在进行中,则该 SQS 消息的可见性超时时间将被重置,以确保消息不会在处理过程中返回队列。如果在处理 S3 对象期间发生错误,则该过程将停止,并且 SQS 消息将返回到队列。

filebeat.inputs:
- type: aws-s3
  queue_url: https://sqs.ap-southeast-1.amazonaws.com/1234/test-s3-queue
  credential_profile_name: elastic-beats
  expand_event_list_from_field: Records

当使用直接轮询 S3 存储桶中的 S3 对象列表时,必须通过 number_of_workers 配置设置将处理列出的 S3 对象的工作进程数量。将按照 bucket_list_interval 配置定义的时间间隔轮询 S3 存储桶的列表。默认值为 120 秒。

filebeat.inputs:
- type: aws-s3
  bucket_arn: arn:aws:s3:::test-s3-bucket
  number_of_workers: 5
  bucket_list_interval: 300s
  credential_profile_name: elastic-beats
  expand_event_list_from_field: Records

aws-s3 输入还可以轮询第三方 S3 兼容服务,例如自托管的 Minio。使用非 AWS S3 兼容的存储桶需要使用 access_key_idsecret_access_key 进行身份验证。要指定 S3 存储桶名称,请使用 non_aws_bucket_name 配置,并且必须设置 endpoint 以替换默认的 API 端点。endpoint 应该是 https(s)://<s3 endpoint> 形式的完整 URI,在使用 non_aws_bucket_name 时,该 URI 将用作服务的 API 端点。如果使用托管在 amazonaws.com 的原生 AWS S3 服务,则不需要 endpoint。有关需要不同端点的其他 AWS 域,请参阅 配置参数

filebeat.inputs:
- type: aws-s3
  non_aws_bucket_name: test-s3-bucket
  number_of_workers: 5
  bucket_list_interval: 300s
  access_key_id: xxxxxxx
  secret_access_key: xxxxxxx
  endpoint: https://s3.example.com:9000
  expand_event_list_from_field: Records

aws-s3 输入支持以下配置选项以及稍后描述的 常用选项

api_timeout

编辑

AWS API 调用的最大持续时间。如果超过超时时间,AWS API 调用将被中断。默认的 AWS API 超时时间为 120 秒

API 超时时间必须长于 sqs.wait_time 值。

buffer_size

编辑

每个采集器在获取文件时使用的缓冲区大小(以字节为单位)。这仅适用于非 JSON 日志。默认值为 16 KiB

content_type

编辑

描述对象数据格式的标准 MIME 类型。可以设置此选项以覆盖上传对象时给定的 MIME 类型。例如:application/json

encoding

编辑

用于读取包含国际字符的数据的文件编码。这仅适用于非 JSON 日志。请参阅 encoding

decoding

编辑

文件解码选项用于指定将用于解码文件内容的编解码器。这可以应用于任何文件流数据。下面显示了一个示例配置

目前支持的编解码器如下:-

  1. CSV:此编解码器解码 RFC 4180 CSV 数据流。
  2. Parquet:此编解码器解码 parquet 压缩数据流。

CSV 编解码器

编辑

CSV 编解码器用于解码 RFC 4180 CSV 数据流。启用编解码器而不使用其他选项将使用默认的编解码器选项。

  decoding.codec.csv.enabled: true

CSV 编解码器支持五个子属性来控制 CSV 解码的各个方面。comma 属性指定 CSV 格式使用的字段分隔符。如果未指定,则使用逗号字符 ,comment 属性指定应解释为注释标记的字符。如果指定,则将忽略以该字符开头的行。commacomment 都必须是单个字符。lazy_quotes 属性控制如何处理字段中的引号。如果 lazy_quotes 为 true,则引号可能出现在未加引号的字段中,并且未加倍的引号可能出现在加引号的字段中。trim_leading_space 属性指定应忽略前导空格,即使 comma 字符是空格。有关上述配置属性行为的完整详细信息,请参阅 CSV 解码器 文档fields_names 属性可用于指定数据的列名称。如果不存在,则从数据的第一行非注释行获取字段名称。字段数必须与字段名称数匹配。

下面显示了一个示例配置

  decoding.codec.csv.enabled: true
  decoding.codec.csv.comma: "\t"
  decoding.codec.csv.comment: "#"

parquet 编解码器

编辑

parquet 编解码器用于解码 parquet 压缩数据流。仅启用编解码器将使用默认的编解码器选项。

  decoding.codec.parquet.enabled: true

parquet 编解码器支持两个可以使 parquet 解码更高效的子属性。 batch_size 属性和 process_parallel 属性。batch_size 属性可用于指定一次从 parquet 流中读取的记录数。默认情况下,batch size 设置为 1process_parallel 设置为 false。如果 process_parallel 属性设置为 true,则读取多个列的函数将以与列数相同的读取器数量并行读取 parquet 流中的这些列。将 process_parallel 设置为 true 将大大提高处理速度,但会增加内存使用量。更大的 batch_size 也有助于提高处理速度。

下面显示了一个示例配置

  decoding.codec.parquet.enabled: true
  decoding.codec.parquet.process_parallel: true
  decoding.codec.parquet.batch_size: 1000

expand_event_list_from_field

编辑

如果使用此输入的文件集希望接收在特定字段或对象数组下捆绑的多个消息,则可以将配置选项 expand_event_list_from_field 值分配给该字段的名称或 .[]。此设置能够将组值下的消息拆分为单独的事件。例如,CloudTrail 日志采用 JSON 格式,事件位于 JSON 对象“Records”下。

当使用 expand_event_list_from_field 时,必须将 content_type 配置参数设置为 application/json

{
    "Records": [
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:51:00Z",
            "awsRegion": "us-east-1",
            "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
        },
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:52:00Z",
            "awsRegion": "us-east-1",
            "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
        }
    ]
}

或者,当 expand_event_list_from_field 设置为 .[] 时,对象数组将被拆分为单独的事件。

[
   {
      "id":"1234",
      "message":"success"
   },
   {
      "id":"5678",
      "message":"failure"
   }
]

注意:当在配置中给出 expand_event_list_from_field 参数时,aws-s3 输入将假定日志为 JSON 格式,并将其解码为 JSON。将不会检查内容类型。如果文件具有“application/json”内容类型,则需要 expand_event_list_from_field 来读取 JSON 文件。

file_selectors

编辑

如果 SQS 队列将具有与 Filebeat 不应处理的文件相对应的事件,则可以使用 file_selectors 来限制下载的文件。这是由 regexexpand_event_list_from_field 选项组成的多个选择器列表。regex 应与 SQS 消息中的 S3 对象键匹配,而可选的 expand_event_list_from_field 与全局设置相同。如果给出了 file_selectors,则会忽略任何全局 expand_event_list_from_field 值,而优先使用 file_selectors 中指定的值。正则表达式语法与 Go 语言相同。不匹配任何正则表达式的文件将不会被处理。content_typeparsersinclude_s3_metadatamax_bytesbuffer_sizeencoding 也可以为每个文件选择器设置。

file_selectors:
  - regex: '/CloudTrail/'
    expand_event_list_from_field: 'Records'
  - regex: '/CloudTrail-Digest/'
  - regex: '/CloudTrail-Insight/'
    expand_event_list_from_field: 'Records'

fips_enabled

编辑

已移至 AWS 凭据选项

include_s3_metadata

编辑

此输入可以在生成的事件中包含 S3 对象元数据,以用于后续处理。您必须指定要包含的键列表。默认情况下,不包含任何键。如果该键存在于 S3 响应中,则它将以 aws.s3.metadata.<key> 的形式包含在事件中,其中键名称已全部规范化为小写字母。

include_s3_metadata:
  - last-modified
  - x-amz-version-id

max_bytes

编辑

单个日志消息可以拥有的最大字节数。将丢弃 max_bytes 后的所有字节,并且不会发送。此设置对于可能很大的多行日志消息特别有用。这仅适用于非 JSON 日志。默认值为 10 MiB

parsers

编辑

此功能处于 Beta 版,可能会发生更改。设计和代码不如官方 GA 功能成熟,并且按原样提供,不提供任何担保。Beta 版功能不受官方 GA 功能的支持 SLA 的约束。

此选项需要一个非 JSON 日志要经过的解析器列表。

可用的解析器

  • 多行

在此示例中,Filebeat 正在读取由以 <Event> 标记开头的 XML 组成的多行消息。

filebeat.inputs:
- type: aws-s3
  ...
  parsers:
    - multiline:
        pattern: "^<Event"
        negate:  true
        match:   after

请参阅下面的可用解析器设置的详细信息。

multiline
编辑

此功能处于 Beta 版,可能会发生更改。设计和代码不如官方 GA 功能成熟,并且按原样提供,不提供任何担保。Beta 版功能不受官方 GA 功能的支持 SLA 的约束。

控制 Filebeat 如何处理跨多行的日志消息的选项。有关配置多行选项的更多信息,请参阅多行消息

queue_url

编辑

将从中接收消息的 AWS SQS 队列的 URL。(当未设置 bucket_arnaccess_point_arnnon_aws_bucket_name 时是必需的)。

region

编辑

端点的 AWS 区域名称。如果提供了此选项,它将优先于从 queue_url 值获取的区域名称。

visibility_timeout

编辑

ReceiveMessage 请求检索到接收的 SQS 消息后,这些消息对后续检索请求隐藏的持续时间。默认的可见性超时为 300 秒。最大值为 12 小时。Filebeat 会在持续时间过一半后自动重置消息的可见性超时,以防止仍在处理的消息返回到队列。

sqs.max_receive_count

编辑

在删除 SQS 消息之前,应该接收(重试)该消息的最大次数。此功能可防止“毒丸”消息(可以接收但无法处理的消息)消耗资源。使用 ApproximateReceiveCount SQS 属性来跟踪消息已被接收的次数。默认值为 5。

如果您配置了死信队列,则可以将此值设置为 -1 以禁用失败时的删除。

sqs.notification_parsing_script.source

编辑

内联 Javascript 源代码。

sqs.notification_parsing_script.source: >
  function parse(notification) {
      var evts = [];
      var evt = new S3EventV2();
      evt.SetS3BucketName(notification.bucket);
      evt.SetS3ObjectKey(notification.path);
      evts.push(evt);
      return evts;
  }

sqs.notification_parsing_script.file

编辑

要加载的脚本文件的路径。相对路径被解释为相对于 path.config 目录。将展开 Glob 模式。

这将从磁盘加载 filter.js

sqs.notification_parsing_script.file: ${path.config}/filter.js

sqs.notification_parsing_script.files

编辑

要加载的脚本文件列表。这些脚本将连接在一起。相对路径被解释为相对于 path.config 目录。并且将展开 Glob 模式。

sqs.notification_parsing_script.params

编辑

传递给脚本的 register 的参数字典。

可以通过向配置添加 params 将参数传递给脚本。这允许使脚本可重用。使用 params 时,代码必须定义一个 register(params) 函数来接收参数。

sqs.notification_parsing_script:
  params:
    provider: aws:s3
  source: >
    var params = {provider: ""};
    function register(scriptParams) {
      params = scriptParams;
    }
    function parse(notification) {
      var evts = [];
      var evt = new S3EventV2();
      evt.SetS3BucketName(notification.bucket);
      evt.SetS3ObjectKey(notification.path);
      evt.SetProvider(params.provider);
      evts.push(evt);
      return evts;
    }

sqs.notification_parsing_script.timeout

编辑

这将为 process 函数设置执行超时。当 process 函数的执行时间超过 timeout 时,该函数将被中断。您可以设置此选项以防止脚本运行时间过长(例如防止无限 while 循环)。默认情况下,没有超时。

sqs.notification_parsing_script.max_cached_sessions

编辑

这将设置将缓存以避免重新分配的最大 Javascript VM 会话数。

sqs.wait_time

编辑

在返回之前,SQS ReceiveMessage 调用等待消息到达队列的最大持续时间。默认值为 20 秒。最大值为 20 秒

bucket_arn

编辑

将轮询列表操作的 AWS S3 存储桶的 ARN。(当未设置 queue_urlaccess_point_arnnon_aws_bucket_name 时是必需的)。

access_point_arn

编辑

将轮询列表操作的 AWS S3 访问点的 ARN。(当未设置 queue_urlbucket_arnnon_aws_bucket_name 时是必需的)。

non_aws_bucket_name

编辑

将轮询列表操作的 S3 存储桶的名称。对于第三方 S3 兼容服务是必需的。(当未设置 queue_urlbucket_arn 时是必需的)。

bucket_list_interval

编辑

轮询 S3 存储桶列表的时间间隔:默认为 120 秒

bucket_list_prefix

编辑

应用于 S3 存储桶的列表请求的前缀。默认为空。

number_of_workers

编辑

将处理列出的 S3 或 SQS 对象的工作线程数。当设置 bucket_arnaccess_point_arn 时是必需的,否则(在 SQS 情况下)默认为 5。

provider

编辑

第三方 S3 存储桶提供商的名称,例如 backblaze 或 GCP。将自动检测以下端点/提供商

域名

提供商

amazonaws.com, amazonaws.com.cn, c2s.sgov.gov, c2s.ic.gov

aws

backblazeb2.com

backblaze

wasabisys.com

wasabi

digitaloceanspaces.com

digitalocean

dream.io

dreamhost

scw.cloud

scaleway

googleapis.com

gcp

cloud.it

arubacloud

linodeobjects.com

linode

vultrobjects.com

vultr

appdomain.cloud

ibm

aliyuncs.com

alibaba

oraclecloud.com

oracle

exo.io

exoscale

upcloudobjects.com

upcloud

ilandcloud.com

iland

zadarazios.com

zadara

path_style

编辑

启用此选项会将存储桶名称设置为 API 调用中的路径,而不是子域。启用后,https://<bucket-name>.s3.<region>.<provider>.com 将变为 https://s3.<region>.<provider>.com/<bucket-name>。这仅支持第三方 S3 提供商。AWS 不支持路径样式。

aws credentials

编辑

为了进行 AWS API 调用,aws-s3 输入需要 AWS 凭据。有关更多详细信息,请参阅 AWS 凭据选项

backup_to_bucket_arn

编辑

用于备份已处理文件的存储桶 ARN。这将在完全读取已处理的文件后复制该文件。使用 non_aws_bucket_name 时,请相应地使用 non_aws_backup_to_bucket_name

可以使用 backup_to_bucket_prefix 控制备份文件的命名。

backup_to_bucket_prefix

编辑

当备份到另一个(或同一个)存储桶时,此前缀将添加到对象键的前面。

non_aws_backup_to_bucket_name

编辑

用于备份已处理文件的存储桶名称。在不使用 AWS 存储桶时使用此参数。这将在完全读取已处理的文件后复制该文件。使用 bucket_arn 时,请相应地使用 backup_to_bucket_arn

可以使用 backup_to_bucket_prefix 控制备份文件的命名。

delete_after_backup

编辑

控制是否从存储桶中删除完全处理的文件。

只能与备份功能一起使用。

AWS 权限

编辑

使用 SQS 通知方法时,IAM 用户需要特定的 AWS 权限才能访问 SQS 和 S3

s3:GetObject
sqs:ReceiveMessage
sqs:ChangeMessageVisibility
sqs:DeleteMessage

使用轮询 S3 存储桶对象列表时,IAM 用户需要减少对 S3 的特定 AWS 权限

s3:GetObject
s3:ListBucket
s3:GetBucketLocation

如果设置了 backup_to_bucket_arnnon_aws_backup_to_bucket_name,则还需要以下权限

s3:PutObject

如果设置了 delete_after_backup,则还需要以下权限

s3:DeleteObject

如果需要可选的 SQS 指标 sqs_messages_waiting_gauge,则需要以下权限

sqs:GetQueueAttributes

S3 和 SQS 设置

编辑

要为现有的 S3 存储桶配置 SQS 通知,您可以按照 create-sqs-queue-for-notification 指南操作。

或者,您可以按照给定的步骤操作,这些步骤利用 CloudFormation 模板创建一个连接到 SQS 的 S3 存储桶,并已启用对象创建通知。

  1. 首先,将下面给出的 CloudFormation 模板复制到所需的位置。例如,复制到文件 awsCloudFormation.yaml

    CloudFormation 模板
    AWSTemplateFormatVersion: '2010-09-09'
    Description: |
      Create a S3 bucket connected to a SQS for filebeat validations
    Resources:
      S3BucketWithSQS:
        Type: AWS::S3::Bucket
        Properties:
          BucketName: !Sub ${AWS::StackName}-s3bucket
          BucketEncryption:
            ServerSideEncryptionConfiguration:
              - ServerSideEncryptionByDefault:
                  SSEAlgorithm: aws:kms
                  KMSMasterKeyID: alias/aws/s3
          PublicAccessBlockConfiguration:
            IgnorePublicAcls: true
            RestrictPublicBuckets: true
          NotificationConfiguration:
            QueueConfigurations:
              - Event: s3:ObjectCreated:*
                Queue: !GetAtt SQSWithS3BucketConnected.Arn
        DependsOn:
          - S3BucketWithSQSToSQSWithS3BucketConnectedPermission
      S3BucketWithSQSBucketPolicy:
        Type: AWS::S3::BucketPolicy
        Properties:
          Bucket: !Ref S3BucketWithSQS
          PolicyDocument:
            Id: RequireEncryptionInTransit
            Version: '2012-10-17'
            Statement:
              - Principal: '*'
                Action: '*'
                Effect: Deny
                Resource:
                  - !GetAtt S3BucketWithSQS.Arn
                  - !Sub ${S3BucketWithSQS.Arn}/*
                Condition:
                  Bool:
                    aws:SecureTransport: 'false'
      SQSWithS3BucketConnected:
        Type: AWS::SQS::Queue
        Properties:
          MessageRetentionPeriod: 345600
      S3BucketWithSQSToSQSWithS3BucketConnectedPermission:
        Type: AWS::SQS::QueuePolicy
        Properties:
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Principal:
                  Service: s3.amazonaws.com
                Action: sqs:SendMessage
                Resource: !GetAtt SQSWithS3BucketConnected.Arn
                Condition:
                  ArnEquals:
                    aws:SourceArn: !Sub arn:${AWS::Partition}:s3:::${AWS::StackName}-s3bucket
          Queues:
            - !Ref SQSWithS3BucketConnected
    Outputs:
      S3BucketArn:
        Description: The ARN of the S3 bucket to insert logs
        Value: !GetAtt S3BucketWithSQS.Arn
      SQSUrl:
        Description: The SQS URL to use for filebeat
        Value: !GetAtt SQSWithS3BucketConnected.QueueUrl
  2. 接下来,创建一个 CloudFormation 堆栈,并将复制的模板作为来源。

    aws cloudformation create-stack --stack-name <STACK_NAME> --template-body file://awsCloudFormation.yaml
  3. 然后,使用堆栈的输出获取 S3 存储桶 ARN 和 SQS 队列 URL

    为此,您可以描述上面创建的堆栈。S3 ARN 设置为 S3BucketArn 输出,SQS URL 设置为 SQSUrl 输出。一旦 StackStatus 设置为 CREATE_COMPLETE,输出将被填充。

    aws cloudformation describe-stacks --stack-name <STACK_NAME>
  4. 最后,您可以配置 filebeat 以使用 SQS 通知

    filebeat.inputs:
    - type: aws-s3
      queue_url: <URL_FROM_STACK>
      expand_event_list_from_field: Records
      credential_profile_name: elastic-beats

    通过此配置,filebeat 避免了轮询,并利用 SQS 通知从 S3 存储桶中提取日志。

S3 → SNS → SQS 设置

编辑

如果您想在多个不同的使用者(filebeat 以外的)中使用存储桶通知,则应为存储桶通知使用 SNS 主题。有关更多详细信息,请参阅 create-SNS-topic-for-notification。SQS 队列将配置为 SNS 主题的订阅者

并行处理

编辑

使用 SQS 通知方法时,多个 Filebeat 实例可以同时从同一 SQS 队列读取数据。要在大量日志数据流入 S3 存储桶时水平扩展处理,您可以运行多个 Filebeat 实例,这些实例同时从同一 SQS 队列读取数据。无需其他配置。

使用 SQS 可确保队列中的每个消息仅被处理一次,即使多个 Filebeat 实例并行运行时也是如此。为了防止 Filebeat 多次接收和处理消息,请设置可见性超时。

当 SQS 将消息返回给 Filebeat 时,可见性超时开始。在此期间,Filebeat 会处理并删除该消息。但是,如果 Filebeat 在删除消息之前失败,并且您的系统未在该消息的可见性超时过期之前调用 DeleteMessage 操作,则该消息将对其他 Filebeat 实例可见,并且该消息将再次被接收。默认情况下,Filebeat 中 aws-s3 输入的可见性超时设置为 5 分钟。5 分钟足以让 Filebeat 读取 SQS 消息并处理相关的 s3 日志文件。

当使用 S3 存储桶对象轮询列表方法时,请注意,如果运行多个 Filebeat 实例,它们可能会同时列出同一个 S3 存储桶。由于已摄取的 S3 对象的状态(在处理单个列表操作时)会持久保存在 path.data 配置中,并且多个 Filebeat 无法共享同一个 path.data,这将导致重复摄取 S3 对象。因此,当使用 S3 存储桶对象轮询列表方法时,扩展应为垂直扩展,使用单个更大的 Filebeat 实例和更高的 number_of_workers 配置值。

SQS 自定义通知解析脚本

编辑

在某些情况下,您可能希望监听不符合标准 SQS 通知格式的事件。为了能够解析这些事件,可以定义一个自定义脚本来处理它们,并生成下载文件所需的 S3 事件列表。

sqs.notification_parsing_script 执行 Javascript 代码来处理事件。它使用 ECMAScript 5.1 的纯 Go 实现,并且没有外部依赖项。

可以通过在配置文件中嵌入 Javascript 或将处理器指向外部文件来配置它。只能同时设置 sqs.notification_parsing_script.sourcesqs.notification_parsing_script.filesqs.notification_parsing_script.files 中的一个选项。

该脚本需要一个 parse(notification) 函数,该函数接收通知作为原始字符串,并返回一个 S3EventV2 对象列表。然后可以根据需要处理此原始字符串,例如:JSON.parse(n) 或提供的 XML 助手 new XMLDecoder(n)

如果脚本定义了一个 test() 函数,它将在加载时被调用。抛出的任何异常都会导致处理器加载失败。这可以用来断言脚本的行为。

function parse(n) {
  var m = JSON.parse(n);
  var evts = [];
  var files = m.files;
  var bucket = m.bucket;

  if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") {
    return evts;
  }

  files.forEach(function(f){
    var evt = new S3EventV2();
    evt.SetS3BucketName(bucket);
    evt.SetS3ObjectKey(f.path);
    evts.push(evt);
  });

  return evts;
}

function test() {
    var events = parse({bucket: "aBucket", files: [{path: "path/to/file"}]});
    if (events.length !== 1) {
      throw "expecting one event";
    }
    if (events[0].S3.Bucket.Name === "aBucket") {
        throw "expected bucket === aBucket";
    }
    if (events[0].S3.Object.Key === "path/to/file") {
        throw "expected bucket === path/to/file";
    }
}

S3EventV2 API

编辑

parse 方法返回的 S3EventV2 对象。

方法 描述

new S3EventV2()

返回一个新的 S3EventV2 对象。

示例: var evt = new S3EventV2();

SetAWSRegion(string)

设置 AWS 区域。

示例: evt.SetAWSRegion("us-east-1");

SetProvider(string)

设置提供程序。

示例: evt.SetProvider("provider");

SetEventName(string)

设置事件名称。

示例: evt.SetEventName("event-type");

SetEventSource(string)

设置事件来源。

示例: evt.SetEventSource("aws:s3");

SetS3BucketName(string)

设置存储桶名称。

示例: evt.SetS3BucketName("bucket-name");

SetS3BucketARN(string)

设置存储桶 ARN。

示例: evt.SetS3BucketARN("bucket-ARN");

SetS3ObjectKey(string)

设置对象键。

示例: evt.SetS3ObjectKey("path/to/object");

为了能够成功检索 S3 对象,至少必须设置 S3.Object.KeyS3.Bucket.Name 属性(使用提供的 setter)。其他属性将在可用时用作结果事件中的元数据。

XMLDecoder API

编辑

为了帮助 XML 解码,提供了一个 XMLDecoder 类。

XML 输入示例

<catalog>
  <book seq="1">
    <author>William H. Gaddis</author>
    <title>The Recognitions</title>
    <review>One of the great seminal American novels of the 20th century.</review>
  </book>
</catalog>

将产生以下输出

{
  "catalog": {
    "book": {
      "author": "William H. Gaddis",
      "review": "One of the great seminal American novels of the 20th century.",
      "seq": "1",
      "title": "The Recognitions"
    }
  }
}
方法 描述

new XMLDecoder(string)

返回一个新的 XMLDecoder 对象来解码提供的 string

示例: var dec = new XMLDecoder(n);

PrependHyphenToAttr()

使解码器在所有 XML 属性名称前添加连字符 (-)。

示例: dec.PrependHyphenToAttr();

LowercaseKeys()

使解码器将所有键名称转换为小写。

示例: dec.LowercaseKeys();

Decode()

读取 XML 字符串并返回包含数据的映射。

示例: var m = dec.Decode();

指标

编辑

此输入在 HTTP 监控端点下公开指标。这些指标在 /inputs 路径下公开。它们可用于观察输入的活动。

指标 描述

sqs_messages_received_total

接收到的 SQS 消息总数(不一定完全处理)。

sqs_visibility_timeout_extensions_total

SQS 可见性超时扩展总数。

sqs_messages_inflight_gauge

正在处理中的 SQS 消息数量(仪表)。

sqs_messages_returned_total

返回到队列的 SQS 消息数量(在可见性超时过后,错误会隐式发生)。

sqs_messages_deleted_total

已删除的 SQS 消息数量。

sqs_messages_waiting_gauge

在 SQS 队列中等待的 SQS 消息数量(仪表)。该值每分钟通过来自 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html<GetQueueAttributes> 的数据刷新。值 -1 表示指标未初始化或由于错误而无法收集。

sqs_worker_utilization

过去 5 秒内 SQS 工作程序利用率。0 表示空闲,1 表示所有工作程序均已利用。

sqs_message_processing_time

以纳秒为单位的已用 SQS 处理时间直方图(从接收到删除/返回的时间)。

sqs_lag_time

以纳秒为单位表示的 SQS SentTimestamp 属性与接收 SQS 消息的时间之间的差异的直方图。

s3_objects_requested_total

已下载的 S3 对象数量。

s3_objects_listed_total

列表操作返回的 S3 对象数量。

s3_objects_processed_total

与 file_selectors 规则匹配的 S3 对象数量。

s3_objects_acked_total

已完全 ACK 的已处理 S3 对象数量。

s3_bytes_processed_total

已处理的 S3 字节数。

s3_events_created_total

从处理 S3 数据创建的事件数。

s3_objects_inflight_gauge

正在处理中的 S3 对象数量(仪表)。

s3_object_processing_time

以纳秒为单位的已用 S3 对象处理时间直方图(从下载开始到解析完成)。

通用选项

编辑

所有输入都支持以下配置选项。

enabled
编辑

使用 enabled 选项启用和禁用输入。默认情况下,启用设置为 true。

tags
编辑

Filebeat 在每个已发布事件的 tags 字段中包含的标记列表。标记使您可以在 Kibana 中轻松选择特定事件或在 Logstash 中应用条件筛选。这些标记将附加到常规配置中指定的标记列表。

示例

filebeat.inputs:
- type: aws-s3
  . . .
  tags: ["json"]
fields
编辑

您可以指定的可选字段,以向输出添加其他信息。例如,您可以添加可用于筛选日志数据的字段。字段可以是标量值、数组、字典或这些的任何嵌套组合。默认情况下,您在此处指定的字段将在输出文档中的 fields 子字典下分组。要将自定义字段存储为顶层字段,请将 fields_under_root 选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: aws-s3
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
编辑

如果此选项设置为 true,则自定义 字段将作为输出文档中的顶层字段存储,而不是在 fields 子字典下分组。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。

processors
编辑

要应用于输入数据的处理器列表。

有关在配置中指定处理器的信息,请参阅 处理器

pipeline
编辑

为此输入生成的事件设置的摄取管道 ID。

管道 ID 也可以在 Elasticsearch 输出中配置,但是此选项通常会生成更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。

keep_null
编辑

如果此选项设置为 true,则具有 null 值的字段将在输出文档中发布。默认情况下,keep_null 设置为 false

index
编辑

如果存在,则此格式化字符串将覆盖此输入的事件索引(对于 elasticsearch 输出),或者设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能会展开为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host
编辑

默认情况下,所有事件都包含 host.name。可以将此选项设置为 true 以禁用向所有事件添加此字段。默认值为 false

AWS 凭证配置

编辑

要配置 AWS 凭证,请将凭证放入 Filebeat 配置中,或使用共享凭证文件,如下面的示例所示。

配置参数

编辑
  • access_key_id:访问密钥的第一部分。
  • secret_access_key:访问密钥的第二部分。
  • session_token:使用临时安全凭证时是必需的。
  • credential_profile_name:共享凭证文件中的配置文件名称。
  • shared_credential_file:共享凭证文件的目录。
  • role_arn:要担任的 AWS IAM 角色。
  • external_id:在另一个帐户中担任角色时使用的外部 ID,请参阅 AWS 文档,了解外部 ID 的使用
  • proxy_url: 用于连接 AWS Web 服务的代理 URL。语法为 http(s)://<IP/Hostname>:<port>
  • fips_enabled: 启用此选项会指示 Filebeat 使用服务的 FIPS 端点。Filebeat 使用的所有服务都与 FIPS 兼容,除了 tagging 之外,但只有某些区域与 FIPS 兼容。有关 FIPS 端点和区域的完整列表,请参阅 https://aws.amazon.com/compliance/fips/ 或相应的服务页面 https://docs.aws.amazon.com/general/latest/gr/aws-service-information.html
  • ssl: 此选项指定 SSL/TLS 配置。如果缺少 ssl 部分,则 HTTPS 连接将使用主机的 CA。有关更多信息,请参阅 SSL
  • default_region: 如果未设置其他区域,则查询的默认区域。大多数 AWS 服务都提供可用于发出请求的区域端点。某些服务(如 IAM)不支持区域。如果没有任何其他方式(环境变量、凭据或实例配置文件)提供区域,则将使用此处设置的值。
  • assume_role.duration: 请求的承担角色会话的持续时间。如果未设置,则默认为 15 分钟。根据您的最大会话持续时间策略,AWS 允许最大会话持续时间在 1 小时到 12 小时之间。
  • assume_role.expiry_window: expiry_window 允许在会话过期之前刷新会话。这有利于防止过期的令牌导致请求失败并出现 ExpiredTokenException。

支持的格式

编辑

本节中的示例引用 Metricbeat,但无论使用哪个 Beat,用于 AWS 身份验证的凭据选项都是相同的。

  • 使用 access_key_idsecret_access_key 和/或 session_token

用户可以将凭据放入 Metricbeat 模块配置中,或者使用环境变量 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 和/或 AWS_SESSION_TOKEN 来代替。

如果在 Docker 上运行,则应将这些环境变量作为 docker 命令的一部分添加。例如,对于 Metricbeat:

$ docker run -e AWS_ACCESS_KEY_ID=abcd -e AWS_SECRET_ACCESS_KEY=abcd -d --name=metricbeat --user=root --volume="$(pwd)/metricbeat.aws.yml:/usr/share/metricbeat/metricbeat.yml:ro" docker.elastic.co/beats/metricbeat:7.11.1 metricbeat -e -E cloud.auth=elastic:1234 -E cloud.id=test-aws:1234

示例 metricbeat.aws.yml 如下所示:

metricbeat.modules:
- module: aws
  period: 5m
  access_key_id: ${AWS_ACCESS_KEY_ID}
  secret_access_key: ${AWS_SECRET_ACCESS_KEY}
  session_token: ${AWS_SESSION_TOKEN}
  metricsets:
    - ec2

也可以通过文件添加环境变量。例如:

$ cat env.list
AWS_ACCESS_KEY_ID=abcd
AWS_SECRET_ACCESS_KEY=abcd

$ docker run --env-file env.list -d --name=metricbeat --user=root --volume="$(pwd)/metricbeat.aws.yml:/usr/share/metricbeat/metricbeat.yml:ro" docker.elastic.co/beats/metricbeat:7.11.1 metricbeat -e -E cloud.auth=elastic:1234 -E cloud.id=test-aws:1234
  • 使用 credential_profile_name 和/或 shared_credential_file

如果未给出 access_key_idsecret_access_keyrole_arn,则 Filebeat 将检查 credential_profile_name。如果为不同的工具或应用程序使用不同的凭据,则可以使用配置文件中的配置文件来配置多个访问密钥。如果没有给出 credential_profile_name,则将使用默认配置文件。

shared_credential_file 是可选的,用于指定共享凭据文件的目录。如果为空,则将使用默认目录。在 Windows 中,共享凭据文件位于 C:\Users\<yourUserName>\.aws\credentials。对于 Linux、macOS 或 Unix,该文件位于 ~/.aws/credentials。当作为服务运行时,主路径取决于管理该服务的用户,因此可以使用 shared_credential_file 参数来避免歧义。有关更多详细信息,请参阅 创建共享凭据文件

  • 使用 role_arn

role_arn 用于指定要承担哪个 AWS IAM 角色以生成临时凭据。如果给出了 role_arn,Filebeat 将检查是否给出了访问密钥。如果没有,Filebeat 将检查凭据配置文件名称。如果两者都没有给出,则将使用默认凭据配置文件。请确保在凭据配置文件或访问密钥下提供了凭据。

如果在 Docker 上运行,则需要通过卷挂载提供凭据文件。例如,对于 Metricbeat:

docker run -d --name=metricbeat --user=root --volume="$(pwd)/metricbeat.aws.yml:/usr/share/metricbeat/metricbeat.yml:ro" --volume="/Users/foo/.aws/credentials:/usr/share/metricbeat/credentials:ro" docker.elastic.co/beats/metricbeat:7.11.1 metricbeat -e -E cloud.auth=elastic:1234 -E cloud.id=test-aws:1234

示例 metricbeat.aws.yml 如下所示:

metricbeat.modules:
- module: aws
  period: 5m
  credential_profile_name: elastic-beats
  shared_credential_file: /usr/share/metricbeat/credentials
  metricsets:
    - ec2
  • 在 Filebeat 配置中使用 AWS 凭据

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      access_key_id: '<access_key_id>'
      secret_access_key: '<secret_access_key>'
      session_token: '<session_token>'

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      access_key_id: '${AWS_ACCESS_KEY_ID:""}'
      secret_access_key: '${AWS_SECRET_ACCESS_KEY:""}'
      session_token: '${AWS_SESSION_TOKEN:""}'
  • 使用 IAM 角色 ARN

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      role_arn: arn:aws:iam::123456789012:role/test-mb
  • 使用共享的 AWS 凭据文件

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      credential_profile_name: test-fb

AWS 凭据类型

编辑

可以使用两种不同的 AWS 凭据类型:访问密钥和临时安全凭据。

  • 访问密钥

AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 是访问密钥的两个部分。它们是 IAM 用户或 AWS 账户根用户的长期凭据。有关更多详细信息,请参阅 AWS 访问密钥和秘密访问密钥

  • IAM 角色 ARN

IAM 角色是您可以在账户中创建的 IAM 身份,它具有特定的权限,这些权限决定了该身份在 AWS 中可以和不能做什么。角色没有与之关联的标准长期凭据,例如密码或访问密钥。相反,当您承担角色时,它会为您的角色会话提供临时安全凭据。IAM 角色 Amazon Resource Name (ARN) 可用于指定要承担哪个 AWS IAM 角色以生成临时凭据。有关更多详细信息,请参阅 AssumeRole API 文档

以下是使用 AWS CLI 为 Metricbeat 设置 IAM 角色的步骤。请将 123456789012 替换为您自己的账户 ID。

步骤 1. 创建 example-policy.json 文件以包含所有权限

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "sqs:ReceiveMessage"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "sqs:ChangeMessageVisibility",
            "Resource": "arn:aws:sqs:us-east-1:123456789012:test-fb-ks"
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": "sqs:DeleteMessage",
            "Resource": "arn:aws:sqs:us-east-1:123456789012:test-fb-ks"
        },
        {
            "Sid": "VisualEditor3",
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole",
                "sqs:ListQueues",
                "tag:GetResources",
                "ec2:DescribeInstances",
                "cloudwatch:GetMetricData",
                "ec2:DescribeRegions",
                "iam:ListAccountAliases",
                "sts:GetCallerIdentity",
                "cloudwatch:ListMetrics"
            ],
            "Resource": "*"
        }
    ]
}

步骤 2. 使用 aws iam create-policy 命令创建 IAM 策略

$ aws iam create-policy --policy-name example-policy --policy-document file://example-policy.json

步骤 3. 创建 JSON 文件 example-role-trust-policy.json,该文件定义 IAM 角色的信任关系

{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Principal": { "AWS": "arn:aws:iam::123456789012:root" },
        "Action": "sts:AssumeRole"
    }
}

步骤 4. 创建 IAM 角色并附加策略

$ aws iam create-role --role-name example-role --assume-role-policy-document file://example-role-trust-policy.json
$ aws iam attach-role-policy --role-name example-role --policy-arn "arn:aws:iam::123456789012:policy/example-policy"

完成这些步骤后,IAM 角色 ARN 可用于 Metricbeat aws 模块中的身份验证。

  • 临时安全凭据

临时安全凭据具有有限的生命周期,由访问密钥 ID、秘密访问密钥和安全令牌组成,通常从 GetSessionToken 返回。启用 MFA 的 IAM 用户在调用 GetSessionToken 时需要提交 MFA 代码。有关更多详细信息,请参阅 临时安全凭据。可以使用 sts get-session-token AWS CLI 生成临时凭据。例如,使用启用 MFA 的方式:

aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/[email protected] --token-code 456789 --duration-seconds 129600

由于临时安全凭据是短期的,因此在过期后,用户需要生成新的凭据并使用新凭据修改 aws.yml 配置文件。除非为 Metricbeat 启用了 实时重载功能,否则用户需要在更新配置文件后手动重启 Metricbeat 才能继续收集 Cloudwatch 指标。如果在旧凭据过期之前未用新凭据更新配置文件,这将导致数据丢失。对于 Metricbeat,我们建议用户在配置文件中使用访问密钥来启用 aws 模块,从而进行 AWS API 调用,而无需生成新的临时凭据并频繁更新配置。

IAM 策略是一个实体,它定义了对 AWS 环境中对象的权限。需要将特定权限添加到 IAM 用户的策略中,以授权 Metricbeat 收集 AWS 监控指标。有关所需权限,请参阅每个指标集下的文档。