将 Logstash 管道转换为 OpenTelemetry Collector 管道
简介
Elastic 可观测性策略越来越与 OpenTelemetry 保持一致。随着最近 Elastic OpenTelemetry 发行版的发布,我们正在扩展我们的产品,以便更容易使用 OpenTelemetry,现在 Elastic Agent 提供 “otel”模式,使其能够运行 OpenTelemetry Collector 的自定义发行版,从而无缝增强您使用 Elastic 的可观测性入门和体验。
这篇文章旨在通过演示如何将一些标准的 Logstash 管道转换为相应的 OpenTelemetry Collector 配置,来帮助熟悉 Logstash 的用户过渡到 OpenTelemetry。
什么是 OpenTelemetry Collector?我为什么要关心它?
OpenTelemetry 是一个开源框架,可确保与供应商无关的数据收集,为可观测性数据的收集、处理和摄取提供标准化方法。Elastic 完全致力于这一原则,旨在使可观测性真正与供应商无关,并消除用户在切换平台时重新检测其可观测性的需求。
通过采用 OpenTelemetry,您可以获得以下好处
- 统一的可观测性:通过使用 OpenTelemetry Collector,您可以从单个工具收集和管理日志、指标和跟踪,从而全面了解系统的性能和行为。这简化了微服务等复杂分布式环境中的监控和调试。
- 灵活性和可伸缩性:无论您是运行小型服务还是大型分布式系统,OpenTelemetry Collector 都可以扩展以处理生成的数据量,从而灵活地部署为代理(与应用程序一起运行)或网关(集中式枢纽)。
- 开放标准:由于 OpenTelemetry 是云原生计算基金会 (CNCF) 下的开源项目,因此它可以确保您使用广泛接受的标准,从而为可观测性堆栈的长期可持续性和兼容性做出贡献。
- 简化的遥测管道:使用接收器、处理器和导出器构建管道的能力通过集中数据流并最大限度地减少对多个代理的需求来简化遥测管理。
在接下来的章节中,我们将解释 OTEL Collector 和 Logstash 管道的结构,并阐明如何使用每种选项的步骤。
OTEL Collector 配置
OpenTelemetry Collector 配置 有不同的部分
- 接收器:从不同的来源收集数据。
- 处理器:转换接收器收集的数据
- 导出器:将数据发送到不同的收集器
- 连接器:将两个管道链接在一起
- 服务:定义哪些组件处于活动状态
- 管道:组合定义的接收器、处理器、导出器和连接器来处理数据
- 扩展是可选组件,可扩展 Collector 的功能,以完成与处理遥测数据没有直接关系的任务(例如,运行状况监控)
- 遥测,您可以在其中设置收集器本身的可观测性(例如,日志记录和监控)
我们可以按如下方式示意性地可视化它
有关组件的深入介绍,请参阅官方文档 配置 | OpenTelemetry。
Logstash 管道定义
Logstash 管道 由三个主要组件组成
- 输入插件:允许我们从不同的来源读取数据
- 过滤器插件:允许我们转换和过滤数据
- 输出插件:允许我们发送数据
Logstash 还有一个特殊的输入和一个特殊的输出,允许管道到管道的通信,我们可以将此视为类似于 OpenTelemetry 连接器的概念。
Logstash 管道与 Otel Collector 组件的比较
我们可以示意性地说明 Logstash 管道和 OTEL Collector 管道组件如何相互关联,如下所示
理论足够了!让我们深入研究一些示例。
将 Logstash 管道转换为 OpenTelemetry Collector 管道
示例 1:解析和转换日志行
让我们考虑下面这行
2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404
我们将应用以下步骤
- 从文件中读取行/tmp/demo-line.log.
- 定义输出为 Elasticsearch 数据流logs-access-default.
- 提取@timestamp,user.name,client.ip,client.port,url.path和http.status.code.
- 删除与以下项相关的日志消息SYSTEMuser.
- 使用相关的日期格式解析日期时间戳并将其存储在@timestamp.
- 添加一个代码http.status.code_description基于已知代码的描述。
- 将数据发送到 Elasticsearch。
Logstash 管道
input {
file {
path => "/tmp/demo-line.log" #[1]
start_position => "beginning"
add_field => { #[2]
"[data_stream][type]" => "logs"
"[data_stream][dataset]" => "access_log"
"[data_stream][namespace]" => "default"
}
}
}
filter {
grok { #[3]
match => {
"message" => "%{TIMESTAMP_ISO8601:[date]}: user %{WORD:[user][name]} accessed from %{IP:[client][ip]}:%{NUMBER:[client][port]:int} path %{URIPATH:[url][path]} with error %{NUMBER:[http][status][code]}"
}
}
if "_grokparsefailure" not in [tags] {
if [user][name] == "SYSTEM" { #[4]
drop {}
}
date { #[5]
match => ["[date]", "ISO8601"]
target => "[@timestamp]"
timezone => "UTC"
remove_field => [ "date" ]
}
translate { #[6]
source => "[http][status][code]"
target => "[http][status][code_description]"
dictionary => {
"200" => "OK"
"403" => "Permission denied"
"404" => "Not Found"
"500" => "Server Error"
}
fallback => "Unknown error"
}
}
}
output {
elasticsearch { #[7]
hosts => "elasticsearch-enpoint:443"
api_key => "${ES_API_KEY}"
}
}
OpenTelemtry Collector 配置
receivers:
filelog: #[1]
start_at: beginning
include:
- /tmp/demo-line.log
include_file_name: false
include_file_path: true
storage: file_storage
operators:
# Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
- type: copy
from: body
to: attributes['event.original']
- type: add #[2]
field: attributes["data_stream.type"]
value: "logs"
- type: add #[2]
field: attributes["data_stream.dataset"]
value: "access_log_otel"
- type: add #[2]
field: attributes["data_stream.namespace"]
value: "default"
extensions:
file_storage:
directory: /var/lib/otelcol/file_storage
processors:
# Adding host.name (this is done OOTB by Logstash)
resourcedetection/system:
detectors: ["system"]
system:
hostname_sources: ["os"]
resource_attributes:
os.type:
enabled: false
transform/grok: #[3]
log_statements:
- context: log
statements:
- 'merge_maps(attributes, ExtractGrokPatterns(attributes["event.original"], "%{TIMESTAMP_ISO8601:date}: user %{WORD:user.name} accessed from %{IP:client.ip}:%{NUMBER:client.port:int} path %{URIPATH:url.path} with error %{NUMBER:http.status.code}", true), "insert")'
filter/exclude_system_user: #[4]
error_mode: ignore
logs:
log_record:
- attributes["user.name"] == "SYSTEM"
transform/parse_date: #[5]
log_statements:
- context: log
statements:
- set(time, Time(attributes["date"], "%Y-%m-%dT%H:%M:%S"))
- delete_key(attributes, "date")
conditions:
- attributes["date"] != nil
transform/translate_status_code: #[6]
log_statements:
- context: log
conditions:
- attributes["http.status.code"] != nil
statements:
- set(attributes["http.status.code_description"], "OK") where attributes["http.status.code"] == "200"
- set(attributes["http.status.code_description"], "Permission Denied") where attributes["http.status.code"] == "403"
- set(attributes["http.status.code_description"], "Not Found") where attributes["http.status.code"] == "404"
- set(attributes["http.status.code_description"], "Server Error") where attributes["http.status.code"] == "500"
- set(attributes["http.status.code_description"], "Unknown Error") where attributes["http.status.code_description"] == nil
exporters:
elasticsearch: #[7]
endpoints: ["elasticsearch-enpoint:443"]
api_key: ${env:ES_API_KEY}
tls:
logs_dynamic_index:
enabled: true
mapping:
mode: ecs
service:
extensions: [file_storage]
pipelines:
logs:
receivers:
- filelog
processors:
- resourcedetection/system
- transform/grok
- filter/exclude_system_user
- transform/parse_date
- transform/translate_status_code
exporters:
- elasticsearch
这些将在 Elasticsearch 中生成以下文档
{
"@timestamp": "2024-09-20T08:33:27.000Z",
"client": {
"ip": "89.66.167.22",
"port": 10592
},
"data_stream": {
"dataset": "access_log",
"namespace": "default",
"type": "logs"
},
"event": {
"original": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404"
},
"host": {
"hostname": "my-laptop",
"name": "my-laptop",
},
"http": {
"status": {
"code": "404",
"code_description": "Not Found"
}
},
"log": {
"file": {
"path": "/tmp/demo-line.log"
}
},
"message": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404",
"url": {
"path": "/blog"
},
"user": {
"name": "frank"
}
}
示例 2:解析和转换 NDJSON 格式的日志文件
让我们考虑下面的 JSON 行
{"log_level":"INFO","message":"User login successful","service":"auth-service","timestamp":"2024-10-11 12:34:56.123 +0100","user":{"id":"A1230","name":"john_doe"}}
我们将应用以下步骤
- 从文件中读取一行/tmp/demo.ndjson.
- 定义输出为 Elasticsearch 数据流logs-json-default
- 解析 JSON 并分配相关的键和值。
- 解析日期。
- 覆盖 message 字段。
- 重命名字段以遵循 ECS 约定。
- 将数据发送到 Elasticsearch。
Logstash 管道
input {
file {
path => "/tmp/demo.ndjson" #[1]
start_position => "beginning"
add_field => { #[2]
"[data_stream][type]" => "logs"
"[data_stream][dataset]" => "json"
"[data_stream][namespace]" => "default"
}
}
}
filter {
if [message] =~ /^\{.*/ {
json { #[3] & #[5]
source => "message"
}
}
date { #[4]
match => ["[timestamp]", "yyyy-MM-dd HH:mm:ss.SSS Z"]
remove_field => "[timestamp]"
}
mutate {
rename => { #[6]
"service" => "[service][name]"
"log_level" => "[log][level]"
}
}
}
output {
elasticsearch { # [7]
hosts => "elasticsearch-enpoint:443"
api_key => "${ES_API_KEY}"
}
}
OpenTelemtry Collector 配置
receivers:
filelog/json: # [1]
include:
- /tmp/demo.ndjson
retry_on_failure:
enabled: true
start_at: beginning
storage: file_storage
operators:
# Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
- type: copy
from: body
to: attributes['event.original']
- type: add #[2]
field: attributes["data_stream.type"]
value: "logs"
- type: add #[2]
field: attributes["data_stream.dataset"]
value: "otel" #[2]
- type: add
field: attributes["data_stream.namespace"]
value: "default"
extensions:
file_storage:
directory: /var/lib/otelcol/file_storage
processors:
# Adding host.name (this is done OOTB by Logstash)
resourcedetection/system:
detectors: ["system"]
system:
hostname_sources: ["os"]
resource_attributes:
os.type:
enabled: false
transform/json_parse: #[3]
error_mode: ignore
log_statements:
- context: log
statements:
- merge_maps(attributes, ParseJSON(body), "upsert")
conditions:
- IsMatch(body, "^\\{")
transform/parse_date: #[4]
error_mode: ignore
log_statements:
- context: log
statements:
- set(time, Time(attributes["timestamp"], "%Y-%m-%d %H:%M:%S.%L %z"))
- delete_key(attributes, "timestamp")
conditions:
- attributes["timestamp"] != nil
transform/override_message_field: [5]
error_mode: ignore
log_statements:
- context: log
statements:
- set(body, attributes["message"])
- delete_key(attributes, "message")
transform/set_log_severity: # [6]
error_mode: ignore
log_statements:
- context: log
statements:
- set(severity_text, attributes["log_level"])
attributes/rename_attributes: #[6]
actions:
- key: service.name
from_attribute: service
action: insert
- key: service
action: delete
- key: log_level
action: delete
exporters:
elasticsearch: #[7]
endpoints: ["elasticsearch-enpoint:443"]
api_key: ${env:ES_API_KEY}
tls:
logs_dynamic_index:
enabled: true
mapping:
mode: ecs
service:
extensions: [file_storage]
pipelines:
logs/json:
receivers:
- filelog/json
processors:
- resourcedetection/system
- transform/json_parse
- transform/parse_date
- transform/override_message_field
- transform/set_log_severity
- attributes/rename_attributes
exporters:
- elasticsearch
这些将在 Elasticsearch 中生成以下文档
{
"@timestamp": "2024-10-11T12:34:56.123000000Z",
"data_stream": {
"dataset": "otel",
"namespace": "default",
"type": "logs"
},
"event": {
"original": "{\"log_level\":\"WARNING\",\"message\":\"User login successful\",\"service\":\"auth-service\",\"timestamp\":\"2024-10-11 12:34:56.123 +0100\",\"user\":{\"id\":\"A1230\",\"name\":\"john_doe\"}}"
},
"host": {
"hostname": "my-laptop",
"name": "my-laptop",
},
"log": {
"file": {
"name": "json.log"
},
"level": "WARNING"
},
"message": "User login successful",
"service": {
"name": "auth-service"
},
"user": {
"id": "A1230",
"name": "john_doe"
}
}
结论
在这篇文章中,我们展示了如何将典型的 Logstash 管道转换为 OpenTelemetry Collector 日志管道的示例。虽然 OpenTelemetry 提供了用于收集和导出日志的强大工具,但如果您的管道依赖于复杂的转换或脚本,Logstash 仍然是更好的选择。这是因为 Logstash 提供了更广泛的内置功能以及更灵活的方式来处理高级数据操作任务。
下一步是什么?
既然您已经看到了将 Logstash 管道转换为 OpenTelemetry 的基本(但现实)示例,现在轮到您深入研究了。根据您的需要,您可以在以下存储库中进一步探索并找到更详细的资源
- OpenTelemetry Collector:了解核心 OpenTelemetry 组件,从接收器到导出器。
- OpenTelemetry Collector Contrib:查找社区贡献的组件,以获得更广泛的集成和功能。
- Elastic 的 opentelemetry-collector-components:深入了解 Elastic OpenTelemetry Collector 的扩展,为 Elastic Stack 用户提供更多定制功能。
如果您遇到特定的挑战或需要处理更高级的用例,这些存储库将是发现可以增强管道的其他组件或集成的绝佳资源。所有这些存储库都具有类似的结构,文件夹名称为