Azure 事件中心插件
编辑Azure 事件中心插件编辑
- 插件版本:v1.4.7
- 发布时间:2024-06-07
- 更新日志
有关其他版本,请参阅版本化插件文档。
获取帮助编辑
如果您对插件有任何疑问,请在论坛中打开一个主题。对于错误或功能请求,请在Github中打开一个问题。有关 Elastic 支持的插件列表,请参阅Elastic 支持矩阵。
描述编辑
此插件从Azure 事件中心(一个高度可扩展的数据流平台和事件提取服务)中使用事件。事件生产者将事件发送到 Azure 事件中心,此插件使用这些事件供 Logstash 使用。
许多 Azure 服务与 Azure 事件中心集成。例如,Azure Monitor 与 Azure 事件中心集成以提供基础结构指标。
如Microsoft 事件中心文档中所述,此插件需要到端口 tcp/443
、tcp/9093
、tcp/5671
和 tcp/5672
的出站连接。
事件中心连接字符串编辑
该插件使用连接字符串访问 Azure 事件中心。在此处查找连接字符串:Azure 门户-> 事件中心 -> 共享访问策略
。event_hub_connections 选项传递基本配置的事件中心连接字符串。
示例连接字符串
Endpoint=sb://logstash.servicebus.windows.net/;SharedAccessKeyName=activity-log-read-only;SharedAccessKey=mm6AbDcEfj8lk7sjsbzoTJ10qAkiSaG663YykEAG2eg=;EntityPath=insights-operational-logs
Blob 存储和连接字符串编辑
Azure Blob 存储帐户是 Azure 到 Logstash 配置的重要组成部分。Blob 存储帐户是一个中心位置,允许多个 Logstash 实例协同工作以处理事件。它记录已处理事件的偏移量(位置)。重新启动后,Logstash 会从上次停止的位置继续处理。
配置说明
- 强烈建议将 Blob 存储帐户与此插件一起使用,并且生产服务器可能需要它。
storage_connection
选项传递 blob 存储连接字符串。- 将所有 Logstash 实例配置为使用相同的
storage_connection
以获得共享处理的优势。
示例 Blob 存储连接字符串
DefaultEndpointsProtocol=https;AccountName=logstash;AccountKey=ETOPnkd/hDAWidkEpPZDiXffQPku/SZdXhPSLnfqdRTalssdEuPkZwIcouzXjCLb/xPZjzhmHfwRCGo0SBSw==;EndpointSuffix=core.windows.net
在此处查找 Blob 存储的连接字符串:Azure 门户-> Blob 存储帐户 -> 访问密钥
。
最佳实践编辑
以下是一些指南,可帮助您避免可能导致事件丢失的数据冲突。
创建 Logstash 消费者组编辑
专门为 Logstash 创建一个新的消费者组。不要使用 $default 或任何其他可能已在使用的消费者组。在不相关的消费者之间重复使用消费者组可能会导致意外行为,并可能导致事件丢失。所有 Logstash 实例都应使用相同的消费者组,以便它们可以协同工作以处理事件。
避免使用多个事件中心覆盖偏移量编辑
事件中心的偏移量(位置)存储在配置的 Azure Blob 存储中。Azure Blob 存储使用类似于文件系统的路径来存储偏移量。如果多个事件中心之间的路径重叠,则偏移量可能会存储不正确。
为避免重复的文件路径,请使用高级配置模型并确保每个事件中心至少有一个选项不同
- storage_connection
- storage_container(如果未定义,则默认为事件中心名称)
- consumer_group
正确设置线程数编辑
默认情况下,用于为所有事件中心提供服务的线程数为 16
。虽然这对于大多数用例来说可能足够了,但可以通过优化此数字来提高吞吐量。当为一个或多个事件中心中的大量分区提供服务时,设置更高的值可能会提高性能。最大线程数不受所服务分区总数的严格限制,但将该值设置得远高于该值可能意味着某些线程处于空闲状态。
线程数必须大于或等于事件中心数加一。
线程当前仅作为单个 azure_event_hubs
输入定义中所有事件中心的全局设置可用。但是,如果您的配置包含多个 azure_event_hubs
输入,则线程设置将独立应用于每个输入。
示例:单个事件中心
如果您要从一个事件中心实例收集活动日志,则只需要 2 个线程。
- 事件中心 = 1
- 最小线程数 = 2(1 个事件中心 + 1)
示例:多个事件中心
如果您要从多个事件中心实例收集活动日志,则每个事件中心至少需要 1 个线程。
- 事件中心 = 4
- 最小线程数 = 5(4 个事件中心 + 1)
当您每个事件中心使用多个分区时,您可能希望分配更多线程。一个良好的基准是(1 + 事件中心数 * 分区数
)。也就是说,所有事件中心中每个分区一个线程。
配置模型编辑
此插件支持两种配置模型:基本和高级。对于大多数用例,建议使用基本配置,并在本主题的示例中进行了说明。
基本配置(默认)编辑
基本配置是默认配置,支持从多个事件中心使用。除连接字符串外,所有事件中心共享相同的配置。
您提供一个事件中心连接字符串列表,其中包含定义事件中心名称的事件中心 EntityPath。所有其他配置设置都是共享的。
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...EntityPath=insights-logs-errors", "Endpoint=sb://example2...EntityPath=insights-metrics-pt1m"] threads => 8 decorate_events => true consumer_group => "logstash" storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." } }
高级配置编辑
高级配置模型适用于不同事件中心需要不同配置的部署。可以为每个事件中心配置选项。您可以通过 event_hubs
选项提供事件中心名称列表。在每个名称下,指定该事件中心的配置。选项可以在全局定义,也可以为每个事件中心单独表示。
如果相同的配置选项同时出现在全局和 event_hub
部分中,则更具体的 (event_hub) 设置优先。
对于大多数用例,不需要或建议使用高级配置。
input { azure_event_hubs { config_mode => "advanced" threads => 8 decorate_events => true storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." event_hubs => [ {"insights-operational-logs" => { event_hub_connection => "Endpoint=sb://example1..." initial_position => "beginning" consumer_group => "iam_team" }}, {"insights-metrics-pt1m" => { event_hub_connection => "Endpoint=sb://example2..." initial_position => "end" consumer_group => "db_team" }} ] } }
在此示例中,storage_connection
和 decorate_events
全局应用。两个事件中心分别具有自己的 consumer_groups
和 initial_position
设置。
Azure 事件中心配置选项编辑
此插件支持以下配置选项以及稍后描述的通用选项。
设置 | 输入类型 | 必需 |
---|---|---|
字符串,( |
否 |
|
是,当 |
||
是,当 |
||
是,当 |
||
否 |
||
否 |
||
否 |
||
字符串,( |
否 |
|
否,除非 |
||
否 |
||
否 |
||
否 |
||
否 |
另请参阅通用选项,了解所有输入插件支持的选项列表。
所有事件中心选项在基本和高级配置中都是通用的,但以下情况除外。基本配置使用 event_hub_connections
支持多个连接。高级配置使用 event_hubs
和 event_hub_connection
(单数)。
config_mode
编辑
- 值类型为字符串
- 有效条目为
basic
或advanced
- 默认值为
basic
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1" , "Endpoint=sb://example2...;EntityPath=event_hub_name2" ] }
event_hubs
编辑
- 值类型为数组
- 无默认值
- 基本配置忽略
- 高级配置必需
定义要读取的事件中心。一个哈希数组,其中每个条目都是事件中心名称及其配置选项的哈希。
azure_event_hubs { config_mode => "advanced" event_hubs => [ { "event_hub_name1" => { event_hub_connection => "Endpoint=sb://example1..." }}, { "event_hub_name2" => { event_hub_connection => "Endpoint=sb://example2..." storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." storage_container => "my_container" }} ] consumer_group => "logstash" # shared across all Event Hubs }
event_hub_connections
编辑
- 值类型为数组
- 无默认值
- 基本配置必需
标识要读取的事件中心的连接字符串列表。连接字符串包括事件中心的 EntityPath。
event_hub_connections
选项是针对每个事件中心定义的。所有其他配置选项在事件中心之间共享。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1" , "Endpoint=sb://example2...;EntityPath=event_hub_name2" ] }
event_hub_connection
编辑
- 值类型为字符串
- 无默认值
- 仅对高级配置有效
标识要读取的事件中心的连接字符串。可以为每个事件中心设置高级配置选项。此选项修改 event_hub_name
,并且应该嵌套在其下。(请参阅示例。)此选项仅接受一个连接字符串。
azure_event_hubs { config_mode => "advanced" event_hubs => [ { "event_hub_name1" => { event_hub_connection => "Endpoint=sb://example1...;EntityPath=event_hub_name1" }} ] }
checkpoint_interval
编辑
- 值类型为 数字
- 默认值为
5
秒 - 设置为
0
可禁用。
在批处理期间写入检查点的间隔(以秒为单位)。检查点告诉 Logstash 在重新启动后从哪里恢复处理。无论此设置如何,都会在每批处理结束时自动写入检查点。
过于频繁地写入检查点可能会不必要地降低处理速度。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] checkpoint_interval => 5 }
consumer_group
编辑
- 值类型为字符串
- 默认值为
$Default
用于读取事件中心的使用者组。为 Logstash 创建一个专门的使用者组。然后确保 Logstash 的所有实例都使用该使用者组,以便它们可以正常协同工作。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] consumer_group => "logstash" }
decorate_events
编辑
- 值类型为 布尔值
- 默认值为
false
添加有关事件中心的元数据,包括事件中心名称、使用者组、处理器主机、分区、偏移量、序列号、时间戳和事件大小。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] decorate_events => true }
initial_position
编辑
- 值类型为字符串
- 有效参数为
beginning
、end
、look_back
- 默认值为
beginning
首次从事件中心读取时,从此位置开始
-
beginning
读取事件中心中所有预先存在的事件 -
end
不读取事件中心中任何预先存在的事件 -
look_back
读取end
减去若干秒的预先存在的事件。您可以使用initial_position_look_back
选项控制秒数。
注意:如果设置了 storage_connection
,则 initial_position
值仅在 Logstash 首次从事件中心读取时使用。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] initial_position => "beginning" }
initial_position_look_back
编辑
- 值类型为 数字
- 默认值为
86400
- 仅在
initial_position
设置为look-back
时使用
查找预先存在的事件的初始位置时要回溯的秒数。仅当 initial_position
设置为 look_back
时,才会使用此选项。如果设置了 storage_connection
,则此配置仅在 Logstash 首次从事件中心读取时适用。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] initial_position => "look_back" initial_position_look_back => 86400 }
max_batch_size
编辑
- 值类型为 数字
- 默认值为
125
一起检索和处理的最大事件数。每批处理后都会创建一个检查点。增加此值可能有助于提高性能,但需要更多内存。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] max_batch_size => 125 }
storage_connection
编辑
- 值类型为字符串
- 无默认值
用于 Blob 帐户存储的连接字符串。Blob 帐户存储在重新启动之间保留偏移量,并确保 Logstash 的多个实例处理不同的分区。设置此值后,重新启动将从处理停止的地方继续。如果未设置此值,则每次重新启动时都会使用 initial_position
值。
强烈建议您为生产环境定义此值。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." }
storage_container
编辑
- 值类型为字符串
- 如果未定义,则默认为事件中心名称
用于保留偏移量并允许多个 Logstash 实例协同工作的存储容器的名称。
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." storage_container => "my_container" }
为避免覆盖偏移量,您可以使用不同的存储容器。如果您正在监视两个名称相同的事件中心,这一点尤其重要。您可以使用高级配置模型来配置不同的存储容器。
azure_event_hubs { config_mode => "advanced" consumer_group => "logstash" storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." event_hubs => [ {"insights-operational-logs" => { event_hub_connection => "Endpoint=sb://example1..." storage_container => "insights-operational-logs-1" }}, {"insights-operational-logs" => { event_hub_connection => "Endpoint=sb://example2..." storage_container => "insights-operational-logs-2" }} ] }
通用选项编辑
所有输入插件都支持以下配置选项
详细信息编辑
codec
编辑
- 值类型为 编解码器
- 默认值为
"plain"
用于输入数据的编解码器。输入编解码器是一种方便的方法,可以在数据进入输入之前对其进行解码,而无需在 Logstash 管道中使用单独的过滤器。