Azure 事件中心插件

编辑
  • 插件版本: v1.5.0
  • 发布日期: 2024-10-25
  • 更新日志

有关其他版本,请参阅 版本化插件文档

获取帮助

编辑

有关插件的问题,请在 Discuss 论坛中开一个主题。对于错误或功能请求,请在 Github 中打开一个 issue。有关 Elastic 支持的插件列表,请查阅 Elastic 支持矩阵

描述

编辑

此插件从 Azure 事件中心(一个高度可扩展的数据流平台和事件摄取服务)消费事件。事件生产者将事件发送到 Azure 事件中心,此插件消费这些事件以供 Logstash 使用。

许多 Azure 服务与 Azure 事件中心集成。例如,Azure 监控与 Azure 事件中心集成以提供基础设施指标。

Microsoft 事件中心文档中所述,此插件需要与端口 tcp/443tcp/9093tcp/5671tcp/5672 的出站连接。

事件中心连接字符串

编辑

该插件使用连接字符串来访问 Azure 事件中心。在此处查找连接字符串:Azure 门户-> 事件中心 -> 共享访问策略。event_hub_connections 选项传递基本配置的事件中心连接字符串。

连接字符串示例

Endpoint=sb://logstash.servicebus.windows.net/;SharedAccessKeyName=activity-log-read-only;SharedAccessKey=mm6AbDcEfj8lk7sjsbzoTJ10qAkiSaG663YykEAG2eg=;EntityPath=insights-operational-logs

Blob 存储和连接字符串

编辑

Azure Blob 存储帐户是 Azure 到 Logstash 配置的重要组成部分。Blob 存储帐户是一个中心位置,使 Logstash 的多个实例可以协同工作以处理事件。它记录已处理事件的偏移量(位置)。重新启动时,Logstash 会从中断的地方恢复处理。

配置注意事项

  • 强烈建议将 Blob 存储帐户与此插件一起使用,并且很可能生产服务器需要使用它。
  • storage_connection 选项传递 Blob 存储连接字符串。
  • 配置所有 Logstash 实例以使用相同的 storage_connection,以获得共享处理的好处。

Blob 存储连接字符串示例

DefaultEndpointsProtocol=https;AccountName=logstash;AccountKey=ETOPnkd/hDAWidkEpPZDiXffQPku/SZdXhPSLnfqdRTalssdEuPkZwIcouzXjCLb/xPZjzhmHfwRCGo0SBSw==;EndpointSuffix=core.windows.net

在此处查找 Blob 存储的连接字符串:Azure 门户-> Blob 存储帐户 -> 访问密钥

最佳实践

编辑

以下是一些准则,可帮助您避免可能导致事件丢失的数据冲突。

创建 Logstash 消费者组
编辑

专门为 Logstash 创建一个新的消费者组。不要使用 $default 或任何其他可能已经在使用的消费者组。在不相关的消费者之间重用消费者组可能会导致意外行为,并可能导致事件丢失。所有 Logstash 实例应使用相同的消费者组,以便它们可以协同工作来处理事件。

避免使用多个事件中心覆盖偏移量
编辑

事件中心的偏移量(位置)存储在配置的 Azure Blob 存储中。Azure Blob 存储使用类似于文件系统的路径来存储偏移量。如果多个事件中心之间的路径重叠,则偏移量可能会存储不正确。

为了避免重复的文件路径,请使用高级配置模型,并确保每个事件中心至少有一个选项不同

  • storage_connection
  • storage_container(如果未定义,则默认为事件中心名称)
  • consumer_group
正确设置线程数
编辑

默认情况下,用于服务所有事件中心的线程数为 16。虽然这可能足以满足大多数用例,但可以通过改进此数字来提高吞吐量。当为一个或多个事件中心的大量分区提供服务时,设置较高的值可能会提高性能。最大线程数并不严格受限于正在服务的总分区数,但将该值设置得远高于该值可能意味着某些线程处于空闲状态。

线程数必须大于或等于事件中心数加一。

目前,线程仅作为单个 azure_event_hubs 输入定义中所有事件中心的全局设置可用。但是,如果您的配置包含多个 azure_event_hubs 输入,则线程设置将独立应用于每个输入。

示例:单个事件中心

如果您正在从一个事件中心实例收集活动日志,则只需要 2 个线程。

  • 事件中心 = 1
  • 最小线程数 = 2 (1 个事件中心 + 1)

示例:多个事件中心

如果您正在从多个事件中心实例收集活动日志,则每个事件中心至少需要 1 个线程。

  • 事件中心 = 4
  • 最小线程数 = 5 (4 个事件中心 + 1)

当您每个事件中心使用多个分区时,您可能需要分配更多线程。一个良好的基本级别是 (1 + 事件中心数 * 分区数)。也就是说,每个事件中心的所有分区都需要一个线程。

配置模型

编辑

此插件支持两种配置模型:基本和高级。建议大多数用例使用基本配置,本主题中的示例说明了这一点。

基本配置(默认)

编辑

基本配置是默认配置,支持从多个事件中心进行消费。除了连接字符串之外,所有事件中心都共享相同的配置。

您提供一个事件中心连接字符串列表,其中包含定义事件中心名称的事件中心 EntityPath。所有其他配置设置都是共享的。

input {
   azure_event_hubs {
      event_hub_connections => ["Endpoint=sb://example1...EntityPath=insights-logs-errors", "Endpoint=sb://example2...EntityPath=insights-metrics-pt1m"]
      threads => 8
      decorate_events => true
      consumer_group => "logstash"
      storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
   }
}

高级配置

编辑

高级配置模型适用于需要不同配置的不同事件中心的部署。可以为每个事件中心配置选项。您通过 event_hubs 选项提供事件中心名称列表。在每个名称下,指定该事件中心的配置。可以在全局定义选项,也可以按事件中心表达选项。

如果相同的配置选项同时出现在全局和 event_hub 部分中,则更具体的(event_hub)设置优先。

对于大多数用例,高级配置不是必需的或推荐的。

input {
   azure_event_hubs {
     config_mode => "advanced"
     threads => 8
     decorate_events => true
     storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
     event_hubs => [
        {"insights-operational-logs" => {
         event_hub_connection => "Endpoint=sb://example1..."
         initial_position => "beginning"
         consumer_group => "iam_team"
        }},
      {"insights-metrics-pt1m" => {
         event_hub_connection => "Endpoint=sb://example2..."
         initial_position => "end"
         consumer_group => "db_team"
       }}
     ]
   }
}

在此示例中,storage_connectiondecorate_events 全局应用。两个事件中心分别具有自己的 consumer_groupsinitial_position 设置。

Azure 事件中心配置选项

编辑

此插件支持以下配置选项以及稍后描述的 通用选项

设置 输入类型 必需

config_mode

字符串,(basicadvanced

event_hubs

数组

是,当 config_mode => advanced

event_hub_connections

数组

是,当 config_mode => basic

event_hub_connection

字符串

是,当 config_mode => advanced

checkpoint_interval

数字

consumer_group

字符串

decorate_events

布尔值

initial_position

字符串,(beginningendlook_back)

initial_position_look_back

数字

否,除非 initial_position => look_back

max_batch_size

数字

storage_connection

字符串

storage_container

字符串

threads

数字

另请参阅 通用选项,其中列出了所有输入插件支持的选项。

所有事件中心选项在基本配置和高级配置中都是通用的,但以下情况除外。基本配置使用 event_hub_connections 来支持多个连接。高级配置使用 event_hubsevent_hub_connection(单数)。

config_mode

编辑
  • 值类型为 字符串
  • 有效条目为 basicadvanced
  • 默认值为 basic

将配置设置为 基本配置(默认)高级配置

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"  , "Endpoint=sb://example2...;EntityPath=event_hub_name2"  ]
}

event_hubs

编辑
  • 值类型为 数组
  • 无默认值
  • 基本配置忽略
  • 高级配置必需

定义要读取的事件中心。一个哈希数组,其中每个条目都是事件中心名称及其配置选项的哈希。

azure_event_hubs {
  config_mode => "advanced"
  event_hubs => [
      { "event_hub_name1" => {
          event_hub_connection => "Endpoint=sb://example1..."
      }},
      { "event_hub_name2" => {
          event_hub_connection => "Endpoint=sb://example2..."
          storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
          storage_container => "my_container"
     }}
   ]
   consumer_group => "logstash" # shared across all Event Hubs
}

event_hub_connections

编辑
  • 值类型为 数组
  • 无默认值
  • 基本配置必需

标识要读取的事件中心的连接字符串列表。连接字符串包含事件中心的 EntityPath。

event_hub_connections 选项是针对每个事件中心定义的。所有其他配置选项在事件中心之间共享。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"  , "Endpoint=sb://example2...;EntityPath=event_hub_name2"  ]
}

event_hub_connection

编辑
  • 值类型为 字符串
  • 无默认值
  • 仅对高级配置有效

用于标识要读取的事件中心的连接字符串。可以为每个事件中心设置高级配置选项。此选项会修改 event_hub_name,并且应嵌套在其下。(请参阅示例。)此选项仅接受一个连接字符串。

azure_event_hubs {
   config_mode => "advanced"
   event_hubs => [
     { "event_hub_name1" => {
        event_hub_connection => "Endpoint=sb://example1...;EntityPath=event_hub_name1"
     }}
   ]
}

checkpoint_interval

编辑
  • 值类型为 数字
  • 默认值为 5
  • 设置为 0 以禁用。

以秒为单位的时间间隔,用于在批处理期间写入检查点。检查点告诉 Logstash 在重启后从哪里恢复处理。无论此设置如何,检查点都会在每个批处理结束时自动写入。

过于频繁地写入检查点可能会不必要地降低处理速度。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   checkpoint_interval => 5
}

consumer_group

编辑
  • 值类型为 字符串
  • 默认值为 $Default

用于读取事件中心的使用者组。专门为 Logstash 创建一个使用者组。然后确保 Logstash 的所有实例都使用该使用者组,以便它们可以正确协同工作。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   consumer_group => "logstash"
}

decorate_events

编辑

添加有关事件中心的元数据,包括事件中心名称、consumer_group、processor_host、分区、偏移量、序列、时间戳和 event_size。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   decorate_events => true
}

initial_position

编辑
  • 值类型为 字符串
  • 有效参数为 beginningendlook_back
  • 默认值为 beginning

首次从事件中心读取时,从此位置开始

  • beginning 读取事件中心中所有预先存在的事件
  • end 不读取事件中心中任何预先存在的事件
  • look_back 读取 end 减去以秒为单位的预先存在事件的值。您可以使用 initial_position_look_back 选项控制秒数。

注意:如果设置了 storage_connection,则 initial_position 值仅在 Logstash 首次从事件中心读取时使用。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   initial_position => "beginning"
}

initial_position_look_back

编辑
  • 值类型为 数字
  • 默认值为 86400
  • 仅当 initial_position 设置为 look-back 时使用

回溯以查找预先存在事件的初始位置的秒数。仅当 initial_position 设置为 look_back 时才使用此选项。如果设置了 storage_connection,则此配置仅在 Logstash 首次从事件中心读取时应用。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   initial_position => "look_back"
   initial_position_look_back => 86400
}

max_batch_size

编辑
  • 值类型为 数字
  • 默认值为 125

一起检索和处理的最大事件数。每个批次后都会创建一个检查点。增加此值可能有助于提高性能,但需要更多内存。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   max_batch_size => 125
}

storage_connection

编辑

用于 blob 帐户存储的连接字符串。Blob 帐户存储会在重启之间持久保存偏移量,并确保 Logstash 的多个实例处理不同的分区。设置此值后,重启会从上次停止处理的位置恢复。如果未设置此值,则每次重启时都会使用 initial_position 值。

我们强烈建议您为生产环境定义此值。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
}

storage_container

编辑
  • 值类型为 字符串
  • 如果未定义,则默认为事件中心名称

用于持久化偏移量并允许多个 Logstash 实例协同工作的存储容器的名称。

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
   storage_container => "my_container"
}

为了避免覆盖偏移量,您可以使用不同的存储容器。如果您要监视两个名称相同的事件中心,这一点尤其重要。您可以使用高级配置模型来配置不同的存储容器。

azure_event_hubs {
     config_mode => "advanced"
     consumer_group => "logstash"
     storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
     event_hubs => [
        {"insights-operational-logs" => {
         event_hub_connection => "Endpoint=sb://example1..."
         storage_container => "insights-operational-logs-1"
        }},
        {"insights-operational-logs" => {
         event_hub_connection => "Endpoint=sb://example2..."
         storage_container => "insights-operational-logs-2"
        }}
     ]
   }

threads

编辑
  • 值类型为 数字
  • 最小值是 2
  • 默认值为 16

用于处理事件的线程总数。您在此处设置的值适用于所有事件中心。即使使用高级配置,此值也是全局设置,不能为每个事件中心设置。

azure_event_hubs {
   threads => 16
}

线程数应为事件中心数加一或多个。有关更多信息,请参阅最佳实践

通用选项

编辑

所有输入插件都支持这些配置选项

设置 输入类型 必需

add_field

hash

codec

codec

enable_metric

布尔值

id

字符串

tags

数组

type

字符串

add_field

编辑
  • 值类型为 hash
  • 默认值为 {}

向事件添加一个字段

codec

编辑
  • 值类型为 codec
  • 默认值为 "plain"

用于输入数据的编解码器。输入编解码器是一种方便的方法,可以在数据进入输入之前对其进行解码,而无需在 Logstash 管道中单独设置过滤器。

enable_metric

编辑

默认情况下,禁用或启用此特定插件实例的指标日志记录,我们会记录所有可以记录的指标,但您可以禁用特定插件的指标收集。

  • 值类型为 字符串
  • 此设置没有默认值。

向插件配置添加唯一的 ID。如果未指定 ID,Logstash 将生成一个。强烈建议您在配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用,例如,如果您有 2 个 azure_event_hubs 输入。在这种情况下添加命名 ID 将有助于在使用监视 API 时监视 Logstash。

input {
  azure_event_hubs {
    id => "my_plugin_id"
  }
}

id 字段中的变量替换仅支持环境变量,不支持使用来自密钥存储的值。

tags

编辑
  • 值类型为 数组
  • 此设置没有默认值。

向您的事件添加任意数量的标签。

这有助于以后的处理。

type

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

向此输入处理的所有事件添加一个 type 字段。

类型主要用于筛选器激活。

该类型作为事件本身的一部分存储,因此您也可以使用该类型在 Kibana 中进行搜索。

如果您尝试在已经具有类型的事件上设置类型(例如,当您将事件从发货人发送到索引器时),则新的输入不会覆盖现有类型。在发货人处设置的类型会与该事件的整个生命周期保持不变,即使将其发送到另一个 Logstash 服务器也是如此。