本博客探讨了使用 Red Hat OpenShift Logging Operator 收集和格式化 OpenShift 容器平台日志和审计日志的一种可能方法。我们建议使用 Elastic® Agent 以获得最佳体验!我们还将展示如何将日志格式化为 Elastic Common Schema (ECS),以获得最佳的日志查看、搜索和可视化体验。本博客中的所有示例均基于 OpenShift 4.14。
为什么使用 OpenShift Logging Operator?
许多企业客户使用 OpenShift 作为他们的编排解决方案。这种方法的优点是
-
它由 Red Hat 开发和支持
-
它可以自动更新 OpenShift 集群以及操作系统,以确保它们兼容并保持兼容
-
它可以使用诸如源到镜像之类的功能来加速开发生命周期
-
它使用增强的安全性
在我们的咨询经验中,当我们尝试安装 Elastic Agent 来收集 pod 的日志时,后一方面给 OpenShift 管理员带来了挑战和摩擦。实际上,Elastic Agent 需要将主机的文件挂载到 pod 中,并且还需要在特权模式下运行。(在官方 Elasticsearch® 文档中阅读有关 Elastic Agent 所需权限的更多信息)。虽然我们在本文中探讨的解决方案在底层需要类似的权限,但它由 OpenShift Logging Operator 管理,该 Operator 由 Red Hat 开发和支持。
我们将收集哪些日志?
在 OpenShift 容器平台中,我们区分三大类日志:审计日志、应用程序日志和基础架构日志
-
审计日志描述了用户、管理员和其他组件影响系统的活动列表。
-
应用程序日志由在非保留命名空间中运行的 pod 的容器日志组成。
-
基础架构日志由在保留命名空间(如 openshift*、kube* 和 default)中运行的 pod 的容器日志以及来自节点的 journald 消息组成。
在下文中,为了简单起见,我们仅考虑审计日志和应用程序日志。在本文中,我们将描述如何以 Kubernetes 集成期望的格式格式化审计日志和应用程序日志,以充分利用 Elastic 可观测性。
开始使用
要从 OpenShift 收集日志,我们必须在 Elasticsearch 和 OpenShift 中执行一些准备步骤。
在 Elasticsearch 内部
我们首先安装 Kubernetes 集成资产。我们主要对 logs-kubernetes.container_logs 和 logs-kubernetes.audit_logs 的索引模板和摄取管道感兴趣。
要将从 ClusterLogForwarder 收到的日志格式化为 ECS 格式,我们将定义一个管道来规范化容器日志。OpenShift 使用的字段命名约定与 ECS 使用的约定略有不同。要获取从 OpenShift 导出的字段列表,请参阅导出的字段 | 日志 | OpenShift 容器平台 4.14。要获取 Kubernetes 集成导出的字段列表,您可以参考 Kubernetes 字段 | Filebeat 参考 [8.11] | Elastic 和 日志应用程序字段 | Elastic 可观测性 [8.11]。此外,必须通过将点替换为下划线来规范化诸如 kubernetes.annotations 之类的特定字段。此操作通常由 Elastic Agent 自动完成。
PUT _ingest/pipeline/openshift-2-ecs
{
"processors": [
{
"rename": {
"field": "kubernetes.pod_id",
"target_field": "kubernetes.pod.uid",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.pod_ip",
"target_field": "kubernetes.pod.ip",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.pod_name",
"target_field": "kubernetes.pod.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.namespace_name",
"target_field": "kubernetes.namespace",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.namespace_id",
"target_field": "kubernetes.namespace_uid",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.container_id",
"target_field": "container.id",
"ignore_missing": true
}
},
{
"dissect": {
"field": "container.id",
"pattern": "%{container.runtime}://%{container.id}",
"ignore_failure": true
}
},
{
"rename": {
"field": "kubernetes.container_image",
"target_field": "container.image.name",
"ignore_missing": true
}
},
{
"set": {
"field": "kubernetes.container.image",
"copy_from": "container.image.name",
"ignore_failure": true
}
},
{
"set": {
"copy_from": "kubernetes.container_name",
"field": "container.name",
"ignore_failure": true
}
},
{
"rename": {
"field": "kubernetes.container_name",
"target_field": "kubernetes.container.name",
"ignore_missing": true
}
},
{
"set": {
"field": "kubernetes.node.name",
"copy_from": "hostname",
"ignore_failure": true
}
},
{
"rename": {
"field": "hostname",
"target_field": "host.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "level",
"target_field": "log.level",
"ignore_missing": true
}
},
{
"rename": {
"field": "file",
"target_field": "log.file.path",
"ignore_missing": true
}
},
{
"set": {
"copy_from": "openshift.cluster_id",
"field": "orchestrator.cluster.name",
"ignore_failure": true
}
},
{
"dissect": {
"field": "kubernetes.pod_owner",
"pattern": "%{_tmp.parent_type}/%{_tmp.parent_name}",
"ignore_missing": true
}
},
{
"lowercase": {
"field": "_tmp.parent_type",
"ignore_missing": true
}
},
{
"set": {
"field": "kubernetes.pod.{{_tmp.parent_type}}.name",
"value": "{{_tmp.parent_name}}",
"if": "ctx?._tmp?.parent_type != null",
"ignore_failure": true
}
},
{
"remove": {
"field": [
"_tmp",
"kubernetes.pod_owner"
],
"ignore_missing": true
}
},
{
"script": {
"description": "Normalize kubernetes annotations",
"if": "ctx?.kubernetes?.annotations != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.annotations.keySet());
for(k in keys) {
if (k.indexOf(".") >= 0) {
def sanitizedKey = k.replace(".", "_");
ctx.kubernetes.annotations[sanitizedKey] = ctx.kubernetes.annotations[k];
ctx.kubernetes.annotations.remove(k);
}
}
"""
}
},
{
"script": {
"description": "Normalize kubernetes namespace_labels",
"if": "ctx?.kubernetes?.namespace_labels != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.namespace_labels.keySet());
for(k in keys) {
if (k.indexOf(".") >= 0) {
def sanitizedKey = k.replace(".", "_");
ctx.kubernetes.namespace_labels[sanitizedKey] = ctx.kubernetes.namespace_labels[k];
ctx.kubernetes.namespace_labels.remove(k);
}
}
"""
}
},
{
"script": {
"description": "Normalize special Kubernetes Labels used in logs-kubernetes.container_logs to determine service.name and service.version",
"if": "ctx?.kubernetes?.labels != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.labels.keySet());
for(k in keys) {
if (k.startsWith("app_kubernetes_io_component_")) {
def sanitizedKey = k.replace("app_kubernetes_io_component_", "app_kubernetes_io_component/");
ctx.kubernetes.labels[sanitizedKey] = ctx.kubernetes.labels[k];
ctx.kubernetes.labels.remove(k);
}
}
"""
}
}
]
}
类似地,要处理像 Kubernetes 收集的审计日志,我们定义一个摄取管道
PUT _ingest/pipeline/openshift-audit-2-ecs
{
"processors": [
{
"script": {
"source": """
def audit = [:];
def keyToRemove = [];
for(k in ctx.keySet()) {
if (k.indexOf('_') != 0 && !['@timestamp', 'data_stream', 'openshift', 'event', 'hostname'].contains(k)) {
audit[k] = ctx[k];
keyToRemove.add(k);
}
}
for(k in keyToRemove) {
ctx.remove(k);
}
ctx.kubernetes=["audit":audit];
""",
"description": "Move all the 'kubernetes.audit' fields under 'kubernetes.audit' object"
}
},
{
"set": {
"copy_from": "openshift.cluster_id",
"field": "orchestrator.cluster.name",
"ignore_failure": true
}
},
{
"set": {
"field": "kubernetes.node.name",
"copy_from": "hostname",
"ignore_failure": true
}
},
{
"rename": {
"field": "hostname",
"target_field": "host.name",
"ignore_missing": true
}
},
{
"script": {
"if": "ctx?.kubernetes?.audit?.annotations != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.audit.annotations.keySet());
for(k in keys) {
if (k.indexOf(".") >= 0) {
def sanitizedKey = k.replace(".", "_");
ctx.kubernetes.audit.annotations[sanitizedKey] = ctx.kubernetes.audit.annotations[k];
ctx.kubernetes.audit.annotations.remove(k);
}
}
""",
"description": "Normalize kubernetes audit annotations field as expected by the Integration"
}
}
]
}
管道的主要目标是模仿 Elastic Agent 所做的事情:将所有审计字段存储在 kubernetes.audit 对象下。
我们不打算使用传统的 @custom 管道方法,因为必须在调用 logs-kubernetes.container_logs 集成管道之前规范化字段,该管道使用诸如 kubernetes.container.name 和 kubernetes.labels 之类的字段来确定字段 service.name 和 service.version。在 教程:使用自定义摄取管道转换数据 | Fleet 和 Elastic Agent 指南 [8.11] 中阅读有关自定义管道的更多信息。
OpenShift 集群日志转发器默认将数据写入 app-write 和 audit-write 索引。可以更改此行为,但它仍然尝试预先添加前缀“app”和后缀“write”,因此我们选择将数据发送到默认目标,并使用重新路由处理器将其发送到正确的数据流。在我们的博客 简化日志数据管理:利用 Elastic 的灵活路由功能 和我们的文档 重新路由处理器 | Elasticsearch 指南 [8.11] | Elastic 中阅读有关重新路由处理器的更多信息。
在这种情况下,我们希望将容器日志(app-write 索引)重定向到 logs-kubernetes.container_logs,并将审计日志(audit-write)重定向到 logs-kubernetes.audit_logs
PUT _ingest/pipeline/app-write-reroute-pipeline
{
"processors": [
{
"pipeline": {
"name": "openshift-2-ecs",
"description": "Format the Openshift data in ECS"
}
},
{
"set": {
"field": "event.dataset",
"value": "kubernetes.container_logs"
}
},
{
"reroute": {
"destination": "logs-kubernetes.container_logs-openshift"
}
}
]
}
PUT _ingest/pipeline/audit-write-reroute-pipeline
{
"processors": [
{
"pipeline": {
"name": "openshift-audit-2-ecs",
"description": "Format the Openshift data in ECS"
}
},
{
"set": {
"field": "event.dataset",
"value": "kubernetes.audit_logs"
}
},
{
"reroute": {
"destination": "logs-kubernetes.audit_logs-openshift"
}
}
]
}
请注意,鉴于 app-write 和 audit-write 不遵循数据流命名约定,我们被迫在重新路由处理器中添加目标字段。重新路由处理器还将为我们填充 data_stream 字段。请注意,此步骤由 Elastic Agent 在源头自动完成。
此外,我们使用我们创建的默认管道创建索引,以根据我们的需要重新路由日志。
PUT app-write
{
"settings": {
"index.default_pipeline": "app-write-reroute-pipeline"
}
}
PUT audit-write
{
"settings": {
"index.default_pipeline": "audit-write-reroute-pipeline"
}
}
基本上,我们所做的事情可以概括在这张图中
让我们以容器日志为例。当操作员尝试写入 app-write 索引时,它将调用默认管道“app-write-reroute-pipeline”,该管道将日志格式化为 ECS 格式,并将日志重新路由到 logs-kubernetes.container_logs-openshift 数据流。这将调用集成管道,该管道会调用(如果存在)logs-kubernetes.container_logs@custom 管道。最后,logs-kubernetes_container_logs 管道可以使用 Kubernetes 集成文档中描述的 elastic.co/dataset 和 elastic.co/namespace 注释,将日志重新路由到另一个数据集和命名空间,这反过来可能导致另一个集成管道的执行。
创建用于发送日志的用户
我们将使用基本身份验证,因为在撰写本文时,它是 OpenShift 日志中 Elasticsearch 唯一支持的身份验证方法。因此,我们需要一个角色,该角色允许用户写入和读取 app-write 和 audit-write 日志(OpenShift 代理需要)以及对 logs-*-* 的 auto_configure 访问权限,以允许自定义 Kubernetes 重新路由
PUT _security/role/YOURROLE
{
"cluster": [
"monitor"
],
"indices": [
{
"names": [
"logs-*-*"
],
"privileges": [
"auto_configure",
"create_doc"
],
"allow_restricted_indices": false
},
{
"names": [
"app-write",
"audit-write",
],
"privileges": [
"create_doc",
"read"
],
"allow_restricted_indices": false
}
],
"applications": [],
"run_as": [],
"metadata": {},
"transient_metadata": {
"enabled": true
}
}
PUT _security/user/YOUR_USERNAME
{
"password": "YOUR_PASSWORD",
"roles": ["YOURROLE"]
}
在 OpenShift 上
在 OpenShift 集群上,我们需要遵循 Red Hat 的官方文档,了解如何安装 Red Hat OpenShift Logging 并配置集群日志记录和集群日志转发器。
我们需要安装 Red Hat OpenShift Logging Operator,该 Operator 定义了 ClusterLogging 和 ClusterLogForwarder 资源。之后,我们可以定义集群日志记录资源
apiVersion: logging.openshift.io/v1
kind: ClusterLogging
metadata:
name: instance
namespace: openshift-logging
spec:
collection:
logs:
type: vector
vector: {}
集群日志转发器是负责定义一个守护进程集的资源,该守护进程集会将日志转发到远程 Elasticsearch。在创建它之前,我们需要在与 ClusterLogForwarder 相同的命名空间中创建一个 secret,其中包含先前在该命名空间中创建的用户的 Elasticsearch 凭据,ClusterLogForwarder 将部署在该命名空间中。
apiVersion: v1
kind: Secret
metadata:
name: elasticsearch-password
namespace: openshift-logging
type: Opaque
stringData:
username: YOUR_USERNAME
password: YOUR_PASSWORD
最后,我们创建 ClusterLogForwarder 资源。
kind: ClusterLogForwarder
apiVersion: logging.openshift.io/v1
metadata:
name: instance
namespace: openshift-logging
spec:
outputs:
- name: remote-elasticsearch
secret:
name: elasticsearch-password
type: elasticsearch
url: "https://YOUR_ELASTICSEARCH_URL:443"
elasticsearch:
version: 8 # The default is version 6 with the _type field
pipelines:
- inputRefs:
- application
- audit
name: enable-default-log-store
outputRefs:
- remote-elasticsearch
请注意,我们显式地定义了 Elasticsearch 的版本为 8,否则 ClusterLogForwarder 将发送 _type 字段,该字段与 Elasticsearch 8 不兼容,并且我们仅收集应用程序和审计日志。
结果
一旦日志被收集并通过所有管道,结果非常接近开箱即用的 Kubernetes 集成。存在重要的差异,例如缺少主机和云元数据信息,这些信息似乎没有被收集(至少在没有额外配置的情况下)。我们可以在日志浏览器中查看 Kubernetes 容器日志。
在这篇文章中,我们介绍了如何使用 OpenShift Logging Operator 来收集容器日志和审计日志。我们仍然建议利用 Elastic Agent 来收集所有日志。这是你能获得的最佳用户体验。无需自己维护或转换日志为 ECS 格式。此外,Elastic Agent 使用 API 密钥作为身份验证方法,并收集云信息等元数据,这使您从长远来看可以做更多事情。
了解有关使用 Elastic Stack 进行日志监控的更多信息.
对这篇博客有反馈吗? 在此处分享。
本文中描述的任何特性或功能的发布和时间安排仍由 Elastic 自行决定。任何当前不可用的特性或功能可能无法按时交付,甚至根本不会交付。