AWS S3 输入

编辑

使用 aws-s3 输入从 S3 对象检索日志,这些对象由从 SQS 队列读取的 S3 通知事件指向,或直接轮询 S3 存储桶中的 S3 对象列表。优先使用 SQS 通知:轮询 S3 对象列表在性能和成本方面代价高昂,最好仅在无法将 SQS 通知附加到 S3 存储桶时才使用此方法。例如,此输入可用于接收 S3 访问日志,以监控对存储桶发出的请求的详细记录。此输入还支持从 SNS 到 SQS 的 S3 通知。

启用 SQS 通知方法需设置 queue_url 配置值。启用 S3 存储桶列表轮询方法需设置 bucket_arn 配置值。这两个值不能同时设置,至少必须设置其中一个值。

使用 SQS 通知方法时,此输入依赖于传递到 SQS 队列的 S3 通知,用于 s3:ObjectCreated:* 事件。您必须创建一个 SQS 队列并将 S3 配置为将事件发布到该队列。

处理 SQS 消息指向的 S3 对象时,如果可见性超时的一半已过去而处理仍在进行中,则该 SQS 消息的可见性超时将重置,以确保消息不会在处理过程中返回队列。如果在处理 S3 对象期间发生错误,则该过程将停止,并且 SQS 消息将返回队列。

filebeat.inputs:
- type: aws-s3
  queue_url: https://sqs.ap-southeast-1.amazonaws.com/1234/test-s3-queue
  credential_profile_name: elastic-beats
  expand_event_list_from_field: Records

当直接轮询 S3 存储桶中的 S3 对象列表时,必须通过 number_of_workers 配置设置将处理列出的 S3 对象的工作程序数量。将根据 bucket_list_interval 配置定义的时间间隔轮询 S3 存储桶的列表。默认值为 120 秒。

filebeat.inputs:
- type: aws-s3
  bucket_arn: arn:aws:s3:::test-s3-bucket
  number_of_workers: 5
  bucket_list_interval: 300s
  credential_profile_name: elastic-beats
  expand_event_list_from_field: Records

aws-s3 输入还可以轮询第三方兼容 S3 的服务,例如自托管的 Minio。使用非 AWS 兼容的 S3 存储桶需要使用 access_key_idsecret_access_key 进行身份验证。要指定 S3 存储桶名称,请使用 non_aws_bucket_name 配置,并且必须设置 endpoint 来替换默认的 API 端点。endpoint 应为 https(s)://<s3 endpoint> 形式的完整 URI,在 non_aws_bucket_name 的情况下,它将用作服务的 API 端点。如果使用托管在 amazonaws.com 的原生 AWS S3 服务,则不需要 endpoint。有关需要不同端点的其他 AWS 域,请参阅 配置参数

filebeat.inputs:
- type: aws-s3
  non_aws_bucket_name: test-s3-bucket
  number_of_workers: 5
  bucket_list_interval: 300s
  access_key_id: xxxxxxx
  secret_access_key: xxxxxxx
  endpoint: https://s3.example.com:9000
  expand_event_list_from_field: Records

aws-s3 输入支持以下配置选项以及稍后描述的 常用选项

api_timeout

编辑

AWS API 调用的最大持续时间。如果超过超时时间,则 AWS API 调用将被中断。默认的 AWS API 超时时间为 120 秒

API 超时时间必须长于 sqs.wait_time 值。

buffer_size

编辑

每个收集器在获取文件时使用的缓冲区大小(以字节为单位)。这仅适用于非 JSON 日志。默认值为 16 KiB

content_type

编辑

描述对象数据格式的标准 MIME 类型。这可以设置为覆盖对象上传时给出的 MIME 类型。例如:application/json

encoding

编辑

用于读取包含国际字符的数据的文件编码。这仅适用于非 JSON 日志。请参阅 encoding

decoding

编辑

文件解码选项用于指定用于解码文件内容的编解码器。这可以应用于任何文件流数据。下面显示了一个示例配置

目前支持的编解码器如下:

  1. CSV:此编解码器解码 RFC 4180 CSV 数据流。
  2. Parquet:此编解码器解码 Parquet 压缩数据流。

CSV 编解码器

编辑

CSV 编解码器用于解码 RFC 4180 CSV 数据流。启用编解码器而不使用其他选项将使用默认的编解码器选项。

  decoding.codec.csv.enabled: true

CSV 编解码器支持五个子属性来控制 CSV 解码的各个方面。comma 属性指定 CSV 格式使用的字段分隔符字符。如果未指定,则使用逗号字符 *,*。comment 属性指定应解释为注释标记的字符。如果指定了该字符,则以该字符开头的行将被忽略。commacomment 都必须是单个字符。lazy_quotes 属性控制如何处理字段中的引号。如果 lazy_quotes 为 true,则引号可能出现在未加引号的字段中,未加倍的引号可能出现在已加引号的字段中。trim_leading_space 属性指定即使 comma 字符为空格,也应忽略前导空格。有关前面配置属性行为的完整详细信息,请参阅 CSV 解码器 文档fields_names 属性可用于指定数据的列名。如果不存在,则字段名称将从数据的第一个非注释行获取。字段数必须与字段名数匹配。

下面显示了一个示例配置

  decoding.codec.csv.enabled: true
  decoding.codec.csv.comma: "\t"
  decoding.codec.csv.comment: "#"

Parquet 编解码器

编辑

parquet 编解码器用于解码 Parquet 压缩数据流。仅启用编解码器将使用默认的编解码器选项。

  decoding.codec.parquet.enabled: true

Parquet 编解码器支持两个子属性,可以提高 Parquet 解码效率。batch_size 属性和 process_parallel 属性。batch_size 属性可用于指定每次从 Parquet 流读取的记录数。默认情况下,batch size 设置为 1process_parallel 设置为 false。如果将 process_parallel 属性设置为 true,则读取多列的函数将从 Parquet 流中并行读取这些列,读取器的数量等于列的数量。将 process_parallel 设置为 true 将大大提高处理速度,但代价是内存使用量增加。更大的 batch_size也有助于提高处理速度。

下面显示了一个示例配置

  decoding.codec.parquet.enabled: true
  decoding.codec.parquet.process_parallel: true
  decoding.codec.parquet.batch_size: 1000

expand_event_list_from_field

编辑

如果使用此输入的文件集预期接收多个捆绑在特定字段下或对象数组下的消息,则可以为配置选项 expand_event_list_from_field 值分配字段名称或 .[]。此设置将能够将组值下的消息拆分为单独的事件。例如,CloudTrail 日志采用 JSON 格式,事件位于 JSON 对象“Records”下。

使用 expand_event_list_from_field 时,必须将 content_type 配置参数设置为 application/json

{
    "Records": [
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:51:00Z",
            "awsRegion": "us-east-1",
            "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
        },
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:52:00Z",
            "awsRegion": "us-east-1",
            "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
        }
    ]
}

或者,当 expand_event_list_from_field 设置为 .[] 时,对象数组将被拆分为单独的事件。

[
   {
      "id":"1234",
      "message":"success"
   },
   {
      "id":"5678",
      "message":"failure"
   }
]

注意:当配置中给出 expand_event_list_from_field 参数时,aws-s3 输入将假定日志为 JSON 格式并将其解码为 JSON。不会检查内容类型。如果文件具有“application/json”内容类型,则需要 expand_event_list_from_field 来读取 JSON 文件。

file_selectors

编辑

如果 SQS 队列将包含与 Filebeat 不应处理的文件相对应的事件,则可以使用 file_selectors 来限制下载的文件。这是一个选择器列表,这些选择器由 regexexpand_event_list_from_field 选项组成。regex 应与 SQS 消息中的 S3 对象键匹配,可选的 expand_event_list_from_field 与全局设置相同。如果给出了 file_selectors,则任何全局 expand_event_list_from_field 值都将被忽略,而采用 file_selectors 中指定的值。正则表达式语法与 Go 语言相同。不匹配任何正则表达式的文件将不会被处理。content_typeparsersinclude_s3_metadatamax_bytesbuffer_sizeencoding 也可为每个文件选择器设置。

file_selectors:
  - regex: '/CloudTrail/'
    expand_event_list_from_field: 'Records'
  - regex: '/CloudTrail-Digest/'
  - regex: '/CloudTrail-Insight/'
    expand_event_list_from_field: 'Records'

fips_enabled

编辑

已移至 AWS 凭证选项

include_s3_metadata

编辑

此输入可以在生成的事件中包含 S3 对象元数据,以便在后续处理中使用。您必须指定要包含的键列表。默认情况下,不包含任何键。如果键存在于 S3 响应中,则它将包含在事件中,为 aws.s3.metadata.<key>,其中键名称已规范化为全小写。

include_s3_metadata:
  - last-modified
  - x-amz-version-id

max_bytes

编辑

单个日志消息可以具有的最大字节数。所有超过 max_bytes 的字节都将被丢弃,不会发送。此设置对于可能变大的多行日志消息特别有用。这仅适用于非 JSON 日志。默认值为 10 MiB

parsers

编辑

此功能处于测试阶段,可能会发生更改。设计和代码不如正式 GA 功能成熟,按原样提供,没有任何担保。测试版功能不受正式 GA 功能的支持 SLA 的约束。

此选项需要一个解析器列表,非 JSON 日志将通过这些解析器。

可用解析器

  • multiline

在此示例中,Filebeat 读取由以 `<Event> 标签开头的 XML 组成的多行消息。

filebeat.inputs:
- type: aws-s3
  ...
  parsers:
    - multiline:
        pattern: "^<Event"
        negate:  true
        match:   after

请参见下文详细的可用解析器设置。

multiline
编辑

此功能处于测试阶段,可能会发生更改。设计和代码不如正式 GA 功能成熟,按原样提供,没有任何担保。测试版功能不受正式 GA 功能的支持 SLA 的约束。

控制 Filebeat 如何处理跨越多行的日志消息的选项。有关配置多行选项的更多信息,请参见 多行消息

queue_url

编辑

将从中接收消息的 AWS SQS 队列的 URL。(当未设置 bucket_arnnon_aws_bucket_name 时,此项为必填)。

region

编辑

端点的 AWS 区域名称。如果提供了此选项,则它优先于从 queue_url 值中获取的区域名称。

visibility_timeout

编辑

ReceiveMessage 请求检索到 SQS 消息后,这些消息对后续检索请求隐藏的持续时间。默认可见性超时为 300s。最大值为 12h。Filebeat 将在持续时间过去一半后自动重置消息的可见性超时,以防止仍在处理的消息返回队列。

sqs.max_receive_count

编辑

在删除 SQS 消息之前,应接收(重试)该消息的最大次数。此功能可防止“毒丸”消息(可以接收但无法处理的消息)消耗资源。已接收消息的次数使用 ApproximateReceiveCount SQS 属性进行跟踪。默认值为 5。

如果已配置死信队列,则可以将此值设置为 -1 以禁用失败时的删除。

sqs.notification_parsing_script.source

编辑

内联 Javascript 源代码。

sqs.notification_parsing_script.source: >
  function parse(notification) {
      var evts = [];
      var evt = new S3EventV2();
      evt.SetS3BucketName(notification.bucket);
      evt.SetS3ObjectKey(notification.path);
      evts.push(evt);
      return evts;
  }

sqs.notification_parsing_script.file

编辑

要加载的脚本文件的路径。相对路径被解释为相对于 path.config 目录。通配符将被展开。

这将从磁盘加载 filter.js

sqs.notification_parsing_script.file: ${path.config}/filter.js

sqs.notification_parsing_script.files

编辑

要加载的脚本文件列表。这些脚本将连接在一起。相对路径被解释为相对于 path.config 目录。通配符将被展开。

sqs.notification_parsing_script.params

编辑

传递到脚本的 register 的参数字典。

可以通过向配置添加 params 将参数传递到脚本。这允许脚本可重用。使用 params 时,代码必须定义一个 register(params) 函数来接收参数。

sqs.notification_parsing_script:
  params:
    provider: aws:s3
  source: >
    var params = {provider: ""};
    function register(scriptParams) {
      params = scriptParams;
    }
    function parse(notification) {
      var evts = [];
      var evt = new S3EventV2();
      evt.SetS3BucketName(notification.bucket);
      evt.SetS3ObjectKey(notification.path);
      evt.SetProvider(params.provider);
      evts.push(evt);
      return evts;
    }

sqs.notification_parsing_script.timeout

编辑

这为 process 函数设置执行超时。当 process 函数的执行时间超过 timeout 周期时,该函数将被中断。您可以设置此选项以防止脚本运行时间过长(例如防止无限 while 循环)。默认情况下没有超时。

sqs.notification_parsing_script.max_cached_sessions

编辑

这设置将缓存的 Javascript VM 会话的最大数量,以避免重新分配。

sqs.wait_time

编辑

SQS ReceiveMessage 调用在返回之前等待消息到达队列的最长时间。默认值为 20s。最大值为 20s

bucket_arn

编辑

将轮询列表操作的 AWS S3 存储桶的 ARN。(当未设置 queue_urlnon_aws_bucket_name 时,此项为必填)。

non_aws_bucket_name

编辑

将轮询列表操作的 S3 存储桶的名称。对于第三方兼容 S3 的服务,此项为必填。(当未设置 queue_urlbucket_arn 时,此项为必填)。

bucket_list_interval

编辑

轮询 S3 存储桶列表的时间间隔:默认为 120s

bucket_list_prefix

编辑

应用于 S3 存储桶列表请求的前缀。默认为空。

number_of_workers

编辑

将处理列出的 S3 或 SQS 对象的工作程序数量。当设置 bucket_arn 时为必填,否则(在 SQS 的情况下)默认为 5。

provider

编辑

第三方 S3 存储桶提供商(如 Backblaze 或 GCP)的名称。将自动检测以下端点/提供商

域名

提供商

amazonaws.com, amazonaws.com.cn, c2s.sgov.gov, c2s.ic.gov

aws

backblazeb2.com

backblaze

wasabisys.com

wasabi

digitaloceanspaces.com

digitalocean

dream.io

dreamhost

scw.cloud

scaleway

googleapis.com

gcp

cloud.it

arubacloud

linodeobjects.com

linode

vultrobjects.com

vultr

appdomain.cloud

ibm

aliyuncs.com

alibaba

oraclecloud.com

oracle

exo.io

exoscale

upcloudobjects.com

upcloud

ilandcloud.com

iland

zadarazios.com

zadara

path_style

编辑

启用此选项会将存储桶名称设置为 API 调用中的路径,而不是子域。启用后,https://<bucket-name>.s3.<region>.<provider>.com 将变为 https://s3.<region>.<provider>.com/<bucket-name>;。这仅受第三方 S3 提供商支持。AWS 不支持路径样式。

aws credentials

编辑

为了进行 AWS API 调用,aws-s3 输入需要 AWS 凭证。请参阅 AWS 凭证选项,了解更多详细信息。

backup_to_bucket_arn

编辑

将已处理文件备份到的存储桶 ARN。这将在完全读取已处理文件后将其复制。使用 non_aws_bucket_name 时,请相应地使用 non_aws_backup_to_bucket_name

可以使用 backup_to_bucket_prefix 控制备份文件的命名。

backup_to_bucket_prefix

编辑

将此前缀添加到对象密钥前面,然后将其备份到另一个(或相同的)存储桶。

non_aws_backup_to_bucket_name

编辑

将已处理文件备份到的存储桶名称。当不使用 AWS 存储桶时,使用此参数。这将在完全读取已处理文件后将其复制。使用 bucket_arn 时,请相应地使用 backup_to_bucket_arn

可以使用 backup_to_bucket_prefix 控制备份文件的命名。

delete_after_backup

编辑

控制是否从存储桶中删除已完全处理的文件。

只能与备份功能一起使用。

AWS 权限

编辑

使用 SQS 通知方法时,需要特定的 AWS 权限才能让 IAM 用户访问 SQS 和 S3

s3:GetObject
sqs:ReceiveMessage
sqs:ChangeMessageVisibility
sqs:DeleteMessage

使用 S3 存储桶对象的轮询列表时,需要减少特定的 S3 AWS 权限才能让 IAM 用户访问 S3

s3:GetObject
s3:ListBucket
s3:GetBucketLocation

如果设置了 backup_to_bucket_arnnon_aws_backup_to_bucket_name,则还需要以下权限

s3:PutObject

如果设置了 delete_after_backup,则还需要以下权限

s3:DeleteObject

如果需要可选的 SQS 指标 sqs_messages_waiting_gauge,则需要以下权限

sqs:GetQueueAttributes

S3 和 SQS 设置

编辑

要为现有 S3 存储桶配置 SQS 通知,您可以按照 创建用于通知的 SQS 队列 指南操作。

或者,您可以按照给定的步骤操作,这些步骤使用 CloudFormation 模板创建已启用对象创建通知的连接到 SQS 的 S3 存储桶。

  1. 首先将下面给出的 CloudFormation 模板复制到所需位置。例如,复制到文件 awsCloudFormation.yaml

    CloudFormation 模板
    AWSTemplateFormatVersion: '2010-09-09'
    Description: |
      Create a S3 bucket connected to a SQS for filebeat validations
    Resources:
      S3BucketWithSQS:
        Type: AWS::S3::Bucket
        Properties:
          BucketName: !Sub ${AWS::StackName}-s3bucket
          BucketEncryption:
            ServerSideEncryptionConfiguration:
              - ServerSideEncryptionByDefault:
                  SSEAlgorithm: aws:kms
                  KMSMasterKeyID: alias/aws/s3
          PublicAccessBlockConfiguration:
            IgnorePublicAcls: true
            RestrictPublicBuckets: true
          NotificationConfiguration:
            QueueConfigurations:
              - Event: s3:ObjectCreated:*
                Queue: !GetAtt SQSWithS3BucketConnected.Arn
        DependsOn:
          - S3BucketWithSQSToSQSWithS3BucketConnectedPermission
      S3BucketWithSQSBucketPolicy:
        Type: AWS::S3::BucketPolicy
        Properties:
          Bucket: !Ref S3BucketWithSQS
          PolicyDocument:
            Id: RequireEncryptionInTransit
            Version: '2012-10-17'
            Statement:
              - Principal: '*'
                Action: '*'
                Effect: Deny
                Resource:
                  - !GetAtt S3BucketWithSQS.Arn
                  - !Sub ${S3BucketWithSQS.Arn}/*
                Condition:
                  Bool:
                    aws:SecureTransport: 'false'
      SQSWithS3BucketConnected:
        Type: AWS::SQS::Queue
        Properties:
          MessageRetentionPeriod: 345600
      S3BucketWithSQSToSQSWithS3BucketConnectedPermission:
        Type: AWS::SQS::QueuePolicy
        Properties:
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Principal:
                  Service: s3.amazonaws.com
                Action: sqs:SendMessage
                Resource: !GetAtt SQSWithS3BucketConnected.Arn
                Condition:
                  ArnEquals:
                    aws:SourceArn: !Sub arn:${AWS::Partition}:s3:::${AWS::StackName}-s3bucket
          Queues:
            - !Ref SQSWithS3BucketConnected
    Outputs:
      S3BucketArn:
        Description: The ARN of the S3 bucket to insert logs
        Value: !GetAtt S3BucketWithSQS.Arn
      SQSUrl:
        Description: The SQS URL to use for filebeat
        Value: !GetAtt SQSWithS3BucketConnected.QueueUrl
  2. 接下来,创建一个 CloudFormation 堆栈,并使用复制的模板。

    aws cloudformation create-stack --stack-name <STACK_NAME> --template-body file://awsCloudFormation.yaml
  3. 然后,使用堆栈的输出获取 S3 存储桶 ARN 和 SQS 队列 URL

    为此,您可以描述上面创建的堆栈。S3 ARN 设置为 S3BucketArn 输出,SQS URL 设置为 SQSUrl 输出。一旦 StackStatus 设置为 CREATE_COMPLETE,输出将被填充。

    aws cloudformation describe-stacks --stack-name <STACK_NAME>
  4. 最后,您可以配置 Filebeat 以使用 SQS 通知

    filebeat.inputs:
    - type: aws-s3
      queue_url: <URL_FROM_STACK>
      expand_event_list_from_field: Records
      credential_profile_name: elastic-beats

    通过此配置,Filebeat 可避免轮询并使用 SQS 通知从 S3 存储桶中提取日志。

S3 → SNS → SQS 设置

编辑

如果您想在多个不同的使用者(Filebeat 以外的使用者)中使用存储桶通知,则应为存储桶通知使用 SNS 主题。请参阅 创建用于通知的 SNS 主题,了解更多详细信息。SQS 队列将配置为 SNS 主题的订阅者

并行处理

编辑

使用 SQS 通知方法时,多个 Filebeat 实例可以同时从相同的 SQS 队列读取数据。为了在大量日志数据流入 S3 存储桶时水平扩展处理能力,您可以运行多个同时从相同 SQS 队列读取数据的 Filebeat 实例。无需其他配置。

使用 SQS 可确保即使多个 Filebeat 实例并行运行,队列中的每条消息也只会被处理一次。为了防止 Filebeat 多次接收和处理消息,请设置可见性超时。

可见性超时从 SQS 将消息返回给 Filebeat 时开始。在此期间,Filebeat 会处理并删除消息。但是,如果 Filebeat 在删除消息之前失败,并且您的系统在可见性超时过期之前没有为该消息调用 DeleteMessage 操作,则该消息将对其他 Filebeat 实例可见,并且该消息将再次被接收。默认情况下,Filebeat 中 aws-s3 输入的可见性超时设置为 5 分钟。5 分钟足以让 Filebeat 读取 SQS 消息并处理相关的 s3 日志文件。

使用 S3 存储桶对象轮询列表方法时,请注意,如果运行多个 Filebeat 实例,它们可以同时列出相同的 S3 存储桶。由于已摄取的 S3 对象的状态会持久化(在处理单个列表操作后)到 path.data 配置中,并且多个 Filebeat 实例无法共享相同的 path.data,这将导致重复摄取 S3 对象。因此,使用 S3 存储桶对象轮询列表方法时,应进行垂直扩展,使用单个更大的 Filebeat 实例和更高的 number_of_workers 配置值。

SQS 自定义通知解析脚本

编辑

在某些情况下,您可能希望监听不遵循标准 SQS 通知格式的事件。为了能够解析它们,可以定义一个自定义脚本,负责处理它们并生成用于下载文件的所需 S3 事件列表。

sqs.notification_parsing_script 执行 Javascript 代码来处理事件。它使用纯 Go 实现的 ECMAScript 5.1,并且没有外部依赖项。

可以通过在配置文件中嵌入 Javascript 代码或将处理器指向外部文件来配置它。sqs.notification_parsing_script.sourcesqs.notification_parsing_script.filesqs.notification_parsing_script.files 这三个选项中,一次只能设置一个。

该脚本需要一个 parse(notification) 函数,该函数接收通知作为原始字符串,并返回 S3EventV2 对象列表。然后可以根据需要处理此原始字符串,例如:JSON.parse(n) 或提供的 XML 辅助函数 new XMLDecoder(n)

如果脚本定义了 test() 函数,则在加载脚本时会调用该函数。任何抛出的异常都会导致处理器加载失败。这可以用来对脚本的行为进行断言。

function parse(n) {
  var m = JSON.parse(n);
  var evts = [];
  var files = m.files;
  var bucket = m.bucket;

  if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") {
    return evts;
  }

  files.forEach(function(f){
    var evt = new S3EventV2();
    evt.SetS3BucketName(bucket);
    evt.SetS3ObjectKey(f.path);
    evts.push(evt);
  });

  return evts;
}

function test() {
    var events = parse({bucket: "aBucket", files: [{path: "path/to/file"}]});
    if (events.length !== 1) {
      throw "expecting one event";
    }
    if (events[0].S3.Bucket.Name === "aBucket") {
        throw "expected bucket === aBucket";
    }
    if (events[0].S3.Object.Key === "path/to/file") {
        throw "expected bucket === path/to/file";
    }
}

S3EventV2 API

编辑

parse 方法返回的 S3EventV2 对象。

方法 描述

new S3EventV2()

返回一个新的 S3EventV2 对象。

示例var evt = new S3EventV2();

SetAWSRegion(string)

设置 AWS 区域。

示例evt.SetAWSRegion("us-east-1");

SetProvider(string)

设置提供商。

示例evt.SetProvider("provider");

SetEventName(string)

设置事件名称。

示例evt.SetEventName("event-type");

SetEventSource(string)

设置事件源。

示例evt.SetEventSource("aws:s3");

SetS3BucketName(string)

设置存储桶名称。

示例evt.SetS3BucketName("bucket-name");

SetS3BucketARN(string)

设置存储桶 ARN。

示例evt.SetS3BucketARN("bucket-ARN");

SetS3ObjectKey(string)

设置对象键。

示例evt.SetS3ObjectKey("path/to/object");

为了能够成功检索 S3 对象,至少必须设置 S3.Object.KeyS3.Bucket.Name 属性(使用提供的设置器)。其他属性将在可用时作为结果事件中的元数据使用。

XMLDecoder API

编辑

为了帮助进行 XML 解码,提供了一个 XMLDecoder 类。

示例 XML 输入

<catalog>
  <book seq="1">
    <author>William H. Gaddis</author>
    <title>The Recognitions</title>
    <review>One of the great seminal American novels of the 20th century.</review>
  </book>
</catalog>

将产生以下输出

{
  "catalog": {
    "book": {
      "author": "William H. Gaddis",
      "review": "One of the great seminal American novels of the 20th century.",
      "seq": "1",
      "title": "The Recognitions"
    }
  }
}
方法 描述

new XMLDecoder(string)

返回一个新的 XMLDecoder 对象来解码提供的 string

示例var dec = new XMLDecoder(n);

PrependHyphenToAttr()

使解码器在所有 XML 属性名称前添加一个连字符(-)。

示例dec.PrependHyphenToAttr();

LowercaseKeys()

使解码器将所有键名转换为小写。

示例dec.LowercaseKeys();

Decode()

读取 XML 字符串并返回包含数据的映射。

示例var m = dec.Decode();

指标

编辑

此输入在 HTTP 监控端点 下公开指标。这些指标在 /inputs 路径下公开。它们可用于观察输入的活动。

指标 描述

sqs_messages_received_total

接收到的 SQS 消息数量(不一定完全处理)。

sqs_visibility_timeout_extensions_total

SQS 可见性超时扩展次数。

sqs_messages_inflight_gauge

正在处理的 SQS 消息数量(规范)。

sqs_messages_returned_total

返回队列的 SQS 消息数量(在可见性超时过后隐式发生错误时)。

sqs_messages_deleted_total

已删除的 SQS 消息数量。

sqs_messages_waiting_gauge

SQS 队列中等待的 SQS 消息数量(规范)。该值每分钟通过来自 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html<GetQueueAttributes> 的数据刷新。值为 -1 表示指标未初始化或由于错误而无法收集。

sqs_worker_utilization

前 5 秒内 SQS 工作程序利用率。0 表示空闲,1 表示所有工作程序都已使用。

sqs_message_processing_time

以纳秒为单位的经过的 SQS 处理时间的直方图(从接收时间到删除/返回时间)。

sqs_lag_time

SQS SentTimestamp 属性与接收 SQS 消息之间的时间差的直方图,以纳秒表示。

s3_objects_requested_total

下载的 S3 对象数量。

s3_objects_listed_total

列表操作返回的 S3 对象数量。

s3_objects_processed_total

匹配文件选择器规则的 S3 对象数量。

s3_objects_acked_total

已完全确认处理的 S3 对象数量。

s3_bytes_processed_total

已处理的 S3 字节数。

s3_events_created_total

从处理 S3 数据创建的事件数量。

s3_objects_inflight_gauge

正在处理的 S3 对象数量(规范)。

s3_object_processing_time

以纳秒为单位的经过的 S3 对象处理时间的直方图(从下载开始到解析完成)。

常用选项

编辑

所有输入都支持以下配置选项。

enabled
编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags
编辑

Filebeat 包含在每个发布事件的 tags 字段中的一组标签。标签可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件筛选。这些标签将附加到在常规配置中指定的标签列表中。

示例

filebeat.inputs:
- type: aws-s3
  . . .
  tags: ["json"]
fields
编辑

您可以指定的可选字段,以向输出添加其他信息。例如,您可以添加可用于筛选日志数据的字段。字段可以是标量值、数组、字典或这些的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields 子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: aws-s3
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
编辑

如果将此选项设置为 true,则自定义 fields 将存储为输出文档中的顶级字段,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。

processors
编辑

应用于输入数据的处理器列表。

有关在配置中指定处理器的更多信息,请参见 处理器

pipeline
编辑

为此输入生成的事件设置的摄取管道 ID。

也可以在 Elasticsearch 输出中配置管道 ID,但这通常会导致更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。

keep_null
编辑

如果将此选项设置为 true,则输出文档中将发布具有 null 值的字段。默认情况下,keep_null 设置为 false

index
编辑

如果存在,此格式化的字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能扩展为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host
编辑

默认情况下,所有事件都包含 host.name。此选项可以设置为 true 以禁用将此字段添加到所有事件。默认值为 false

AWS 凭证配置

编辑

要配置 AWS 凭证,请将凭证放入 Filebeat 配置中,或使用共享凭证文件,如下例所示。

配置参数

编辑
  • access_key_id:访问密钥的第一部分。
  • secret_access_key:访问密钥的第二部分。
  • session_token:使用临时安全凭证时需要。
  • credential_profile_name:共享凭证文件中的配置文件名。
  • shared_credential_file:共享凭证文件的目录。
  • role_arn:要承担的AWS IAM角色。
  • external_id:在另一个帐户中承担角色时使用的外部ID,请参阅AWS文档了解外部ID的使用
  • proxy_url:用于连接到AWS Web服务的代理的URL。语法为http(s)://<IP/主机名>:<端口>
  • fips_enabled:启用此选项指示Filebeat使用服务的FIPS端点。Filebeat使用的所有服务都与FIPS兼容,但tagging除外,但只有某些区域与FIPS兼容。请参阅https://aws.amazon.com/compliance/fips/或相应的服务页面https://docs.aws.amazon.com/general/latest/gr/aws-service-information.html,了解FIPS端点和区域的完整列表。
  • ssl:这指定SSL/TLS配置。如果缺少ssl部分,则主机CA用于HTTPS连接。有关更多信息,请参阅SSL
  • default_region:如果没有设置其他区域,则查询的默认区域。大多数AWS服务都提供可用于发出请求的区域端点。某些服务(例如IAM)不支持区域。如果无法通过任何其他方式(环境变量、凭证或实例配置文件)提供区域,则将使用此处设置的值。
  • assume_role.duration:请求的Assume Role会话的持续时间。未设置时默认为15分钟。AWS允许的最大会话持续时间为1小时到12小时,具体取决于您的最大会话持续时间策略。
  • assume_role.expiry_window:expiry_window允许在会话过期前刷新会话。这有助于防止过期的令牌导致请求因ExpiredTokenException而失败。

支持的格式

编辑

本节中的示例指的是Metricbeat,但无论使用哪个Beat,用于与AWS进行身份验证的凭证选项都是相同的。

  • 使用access_key_idsecret_access_key和/或session_token

用户可以将凭证放入Metricbeat模块配置中,也可以使用环境变量AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY和/或AWS_SESSION_TOKEN

如果在Docker上运行,则应将这些环境变量作为docker命令的一部分添加。例如,对于Metricbeat

$ docker run -e AWS_ACCESS_KEY_ID=abcd -e AWS_SECRET_ACCESS_KEY=abcd -d --name=metricbeat --user=root --volume="$(pwd)/metricbeat.aws.yml:/usr/share/metricbeat/metricbeat.yml:ro" docker.elastic.co/beats/metricbeat:7.11.1 metricbeat -e -E cloud.auth=elastic:1234 -E cloud.id=test-aws:1234

示例metricbeat.aws.yml如下所示

metricbeat.modules:
- module: aws
  period: 5m
  access_key_id: ${AWS_ACCESS_KEY_ID}
  secret_access_key: ${AWS_SECRET_ACCESS_KEY}
  session_token: ${AWS_SESSION_TOKEN}
  metricsets:
    - ec2

环境变量也可以通过文件添加。例如

$ cat env.list
AWS_ACCESS_KEY_ID=abcd
AWS_SECRET_ACCESS_KEY=abcd

$ docker run --env-file env.list -d --name=metricbeat --user=root --volume="$(pwd)/metricbeat.aws.yml:/usr/share/metricbeat/metricbeat.yml:ro" docker.elastic.co/beats/metricbeat:7.11.1 metricbeat -e -E cloud.auth=elastic:1234 -E cloud.id=test-aws:1234
  • 使用credential_profile_name和/或shared_credential_file

如果没有提供access_key_idsecret_access_keyrole_arn,则filebeat将检查credential_profile_name。如果您对不同的工具或应用程序使用不同的凭证,则可以使用配置文件在同一个配置文件中配置多个访问密钥。如果没有提供credential_profile_name,则将使用默认配置文件。

shared_credential_file是可选的,用于指定共享凭证文件的目录。如果为空,则将使用默认目录。在Windows中,共享凭证文件位于C:\Users\<你的用户名>\.aws\credentials。对于Linux、macOS或Unix,该文件位于~/.aws/credentials。在作为服务运行时,主目录取决于管理服务的用户的不同,因此可以使用shared_credential_file参数避免歧义。请参阅创建共享凭证文件了解更多详情。

  • 使用role_arn

role_arn用于指定要用于生成临时凭证的AWS IAM角色。如果提供了role_arn,filebeat将检查是否提供了访问密钥。如果没有,filebeat将检查凭证配置文件名。如果两者都没有提供,则将使用默认凭证配置文件。请确保在凭证配置文件或访问密钥下提供了凭证。

如果在Docker上运行,则需要通过卷挂载提供凭证文件。例如,对于Metricbeat

docker run -d --name=metricbeat --user=root --volume="$(pwd)/metricbeat.aws.yml:/usr/share/metricbeat/metricbeat.yml:ro" --volume="/Users/foo/.aws/credentials:/usr/share/metricbeat/credentials:ro" docker.elastic.co/beats/metricbeat:7.11.1 metricbeat -e -E cloud.auth=elastic:1234 -E cloud.id=test-aws:1234

示例metricbeat.aws.yml如下所示

metricbeat.modules:
- module: aws
  period: 5m
  credential_profile_name: elastic-beats
  shared_credential_file: /usr/share/metricbeat/credentials
  metricsets:
    - ec2
  • 在Filebeat配置中使用AWS凭证

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      access_key_id: '<access_key_id>'
      secret_access_key: '<secret_access_key>'
      session_token: '<session_token>'

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      access_key_id: '${AWS_ACCESS_KEY_ID:""}'
      secret_access_key: '${AWS_SECRET_ACCESS_KEY:""}'
      session_token: '${AWS_SESSION_TOKEN:""}'
  • 使用IAM角色ARN

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      role_arn: arn:aws:iam::123456789012:role/test-mb
  • 使用共享的AWS凭证文件

    filebeat.inputs:
    - type: aws-s3
      queue_url: https://sqs.us-east-1.amazonaws.com/123/test-queue
      credential_profile_name: test-fb

AWS凭证类型

编辑

可以使用两种不同类型的AWS凭证:访问密钥和临时安全凭证。

  • 访问密钥

AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY是访问密钥的两个部分。它们是IAM用户或AWS账户根用户的长期凭证。请参阅AWS访问密钥和秘密访问密钥了解更多详情。

  • IAM角色ARN

IAM角色是在您的帐户中可以创建的IAM身份,它具有特定权限,这些权限决定了该身份在AWS中可以做什么和不可以做什么。角色没有与之关联的标准长期凭证,例如密码或访问密钥。相反,当您承担角色时,它会为您提供角色会话的临时安全凭证。可以使用IAM角色Amazon资源名称(ARN)指定要用于生成临时凭证的AWS IAM角色。请参阅AssumeRole API文档了解更多详情。

以下是使用AWS CLI为Metricbeat设置IAM角色的步骤。请将123456789012替换为您自己的帐户ID。

步骤1. 创建example-policy.json文件以包含所有权限

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "sqs:ReceiveMessage"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "sqs:ChangeMessageVisibility",
            "Resource": "arn:aws:sqs:us-east-1:123456789012:test-fb-ks"
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": "sqs:DeleteMessage",
            "Resource": "arn:aws:sqs:us-east-1:123456789012:test-fb-ks"
        },
        {
            "Sid": "VisualEditor3",
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole",
                "sqs:ListQueues",
                "tag:GetResources",
                "ec2:DescribeInstances",
                "cloudwatch:GetMetricData",
                "ec2:DescribeRegions",
                "iam:ListAccountAliases",
                "sts:GetCallerIdentity",
                "cloudwatch:ListMetrics"
            ],
            "Resource": "*"
        }
    ]
}

步骤2. 使用aws iam create-policy命令创建IAM策略

$ aws iam create-policy --policy-name example-policy --policy-document file://example-policy.json

步骤3. 创建定义IAM角色信任关系的JSON文件example-role-trust-policy.json

{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Principal": { "AWS": "arn:aws:iam::123456789012:root" },
        "Action": "sts:AssumeRole"
    }
}

步骤4. 创建IAM角色并附加策略

$ aws iam create-role --role-name example-role --assume-role-policy-document file://example-role-trust-policy.json
$ aws iam attach-role-policy --role-name example-role --policy-arn "arn:aws:iam::123456789012:policy/example-policy"

完成这些步骤后,可以在Metricbeat aws模块中使用IAM角色ARN进行身份验证。

  • 临时安全凭证

临时安全凭证具有有限的生命周期,并包含访问密钥ID、秘密访问密钥和安全令牌,这些令牌通常由GetSessionToken返回。启用MFA的IAM用户需要在调用GetSessionToken时提交MFA代码。请参阅临时安全凭证了解更多详情。sts get-session-token AWS CLI可用于生成临时凭证。例如,启用MFA:

aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/[email protected] --token-code 456789 --duration-seconds 129600

由于临时安全凭证是短期凭证,因此过期后,用户需要生成新的凭证并使用新的凭证修改aws.yml配置文件。除非为Metricbeat启用了实时重新加载功能,否则用户需要在更新配置文件后手动重新启动Metricbeat才能继续收集Cloudwatch指标。如果在旧凭证过期前未更新配置文件中的新凭证,这将导致数据丢失。对于Metricbeat,我们建议用户在配置文件中使用访问密钥,以启用aws模块进行AWS api调用,而无需生成新的临时凭证并频繁更新配置。

IAM策略是一个定义对AWS环境中对象的权限的实体。需要将特定权限添加到IAM用户的策略中,以授权Metricbeat收集AWS监控指标。请参阅每个metricset下的文档,了解所需的权限。