这篇博文探讨了一种使用 Red Hat OpenShift Logging Operator 收集和格式化 OpenShift Container Platform 日志和审计日志的可能方法。我们建议使用 Elastic® Agent 以获得最佳体验!我们还将展示如何将日志格式化为 Elastic 通用架构(ECS),以便更好地查看、搜索和可视化您的日志。本博文中的所有示例都基于 OpenShift 4.14。
为什么要使用 OpenShift Logging Operator?
许多企业客户使用 OpenShift 作为他们的编排解决方案。这种方法的优点是:
-
它由 Red Hat 开发和支持
-
它可以自动更新 OpenShift 集群和操作系统,以确保它们兼容并保持兼容
-
它可以使用诸如 source-to-image 之类的功能来加快开发周期
-
它使用了增强的安全性
在我们咨询的经验中,当我们尝试安装 Elastic Agent 来收集 Pod 的日志时,后一个方面给 OpenShift 管理员带来了挑战和摩擦。事实上,Elastic Agent 需要将宿主的文件挂载到 Pod 中,并且还需要以特权模式运行。(有关 Elastic Agent 需要哪些权限的更多信息,请阅读Elasticsearch® 官方文档)。虽然我们在本文中探讨的解决方案在幕后需要类似的权限,但它由 Red Hat 开发和支持的 OpenShift Logging Operator 管理。
我们将收集哪些日志?
在 OpenShift Container Platform 中,我们将日志分为三大类:审计日志、应用程序日志和基础设施日志
-
**审计日志** 描述了用户、管理员和其他组件影响系统的一系列活动。
-
**应用程序日志** 由在非保留命名空间中运行的 Pod 的容器日志组成。
-
**基础设施日志** 由在保留命名空间(如 openshift*、kube* 和 default)中运行的 Pod 的容器日志以及节点的 journald 消息组成。
为了简单起见,我们将在下面只考虑审计日志和应用程序日志。在这篇文章中,我们将描述如何将审计日志和应用程序日志格式化为 Kubernetes 集成预期的格式,以便充分利用 Elastic 可观测性。
入门
要从 OpenShift 收集日志,我们必须在 Elasticsearch 和 OpenShift 中执行一些准备步骤。
在 Elasticsearch 中
我们首先安装 Kubernetes 集成资源。我们主要关注 logs-kubernetes.container_logs 和 logs-kubernetes.audit_logs 的索引模板和摄取管道。
为了将从 ClusterLogForwarder 收到的日志格式化为ECS格式,我们将定义一个管道来规范化容器日志。OpenShift 使用的字段命名约定与 ECS 使用的略有不同。要获取 OpenShift 导出的字段列表,请参阅导出的字段 | 日志记录 | OpenShift Container Platform 4.14。要获取 Kubernetes 集成导出的字段列表,您可以参阅Kubernetes 字段 | Filebeat 参考 [8.11] | Elastic和Logs app fields | Elastic 可观测性 [8.11]。此外,必须通过将点替换为下划线来规范化诸如 kubernetes.annotations 之类的特定字段。此操作通常由 Elastic Agent 自动完成。
PUT _ingest/pipeline/openshift-2-ecs
{
"processors": [
{
"rename": {
"field": "kubernetes.pod_id",
"target_field": "kubernetes.pod.uid",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.pod_ip",
"target_field": "kubernetes.pod.ip",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.pod_name",
"target_field": "kubernetes.pod.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.namespace_name",
"target_field": "kubernetes.namespace",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.namespace_id",
"target_field": "kubernetes.namespace_uid",
"ignore_missing": true
}
},
{
"rename": {
"field": "kubernetes.container_id",
"target_field": "container.id",
"ignore_missing": true
}
},
{
"dissect": {
"field": "container.id",
"pattern": "%{container.runtime}://%{container.id}",
"ignore_failure": true
}
},
{
"rename": {
"field": "kubernetes.container_image",
"target_field": "container.image.name",
"ignore_missing": true
}
},
{
"set": {
"field": "kubernetes.container.image",
"copy_from": "container.image.name",
"ignore_failure": true
}
},
{
"set": {
"copy_from": "kubernetes.container_name",
"field": "container.name",
"ignore_failure": true
}
},
{
"rename": {
"field": "kubernetes.container_name",
"target_field": "kubernetes.container.name",
"ignore_missing": true
}
},
{
"set": {
"field": "kubernetes.node.name",
"copy_from": "hostname",
"ignore_failure": true
}
},
{
"rename": {
"field": "hostname",
"target_field": "host.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "level",
"target_field": "log.level",
"ignore_missing": true
}
},
{
"rename": {
"field": "file",
"target_field": "log.file.path",
"ignore_missing": true
}
},
{
"set": {
"copy_from": "openshift.cluster_id",
"field": "orchestrator.cluster.name",
"ignore_failure": true
}
},
{
"dissect": {
"field": "kubernetes.pod_owner",
"pattern": "%{_tmp.parent_type}/%{_tmp.parent_name}",
"ignore_missing": true
}
},
{
"lowercase": {
"field": "_tmp.parent_type",
"ignore_missing": true
}
},
{
"set": {
"field": "kubernetes.pod.{{_tmp.parent_type}}.name",
"value": "{{_tmp.parent_name}}",
"if": "ctx?._tmp?.parent_type != null",
"ignore_failure": true
}
},
{
"remove": {
"field": [
"_tmp",
"kubernetes.pod_owner"
],
"ignore_missing": true
}
},
{
"script": {
"description": "Normalize kubernetes annotations",
"if": "ctx?.kubernetes?.annotations != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.annotations.keySet());
for(k in keys) {
if (k.indexOf(".") >= 0) {
def sanitizedKey = k.replace(".", "_");
ctx.kubernetes.annotations[sanitizedKey] = ctx.kubernetes.annotations[k];
ctx.kubernetes.annotations.remove(k);
}
}
"""
}
},
{
"script": {
"description": "Normalize kubernetes namespace_labels",
"if": "ctx?.kubernetes?.namespace_labels != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.namespace_labels.keySet());
for(k in keys) {
if (k.indexOf(".") >= 0) {
def sanitizedKey = k.replace(".", "_");
ctx.kubernetes.namespace_labels[sanitizedKey] = ctx.kubernetes.namespace_labels[k];
ctx.kubernetes.namespace_labels.remove(k);
}
}
"""
}
},
{
"script": {
"description": "Normalize special Kubernetes Labels used in logs-kubernetes.container_logs to determine service.name and service.version",
"if": "ctx?.kubernetes?.labels != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.labels.keySet());
for(k in keys) {
if (k.startsWith("app_kubernetes_io_component_")) {
def sanitizedKey = k.replace("app_kubernetes_io_component_", "app_kubernetes_io_component/");
ctx.kubernetes.labels[sanitizedKey] = ctx.kubernetes.labels[k];
ctx.kubernetes.labels.remove(k);
}
}
"""
}
}
]
}
类似地,为了处理 Kubernetes 收集的审计日志,我们定义了一个摄取管道
PUT _ingest/pipeline/openshift-audit-2-ecs
{
"processors": [
{
"script": {
"source": """
def audit = [:];
def keyToRemove = [];
for(k in ctx.keySet()) {
if (k.indexOf('_') != 0 && !['@timestamp', 'data_stream', 'openshift', 'event', 'hostname'].contains(k)) {
audit[k] = ctx[k];
keyToRemove.add(k);
}
}
for(k in keyToRemove) {
ctx.remove(k);
}
ctx.kubernetes=["audit":audit];
""",
"description": "Move all the 'kubernetes.audit' fields under 'kubernetes.audit' object"
}
},
{
"set": {
"copy_from": "openshift.cluster_id",
"field": "orchestrator.cluster.name",
"ignore_failure": true
}
},
{
"set": {
"field": "kubernetes.node.name",
"copy_from": "hostname",
"ignore_failure": true
}
},
{
"rename": {
"field": "hostname",
"target_field": "host.name",
"ignore_missing": true
}
},
{
"script": {
"if": "ctx?.kubernetes?.audit?.annotations != null",
"source": """
def keys = new ArrayList(ctx.kubernetes.audit.annotations.keySet());
for(k in keys) {
if (k.indexOf(".") >= 0) {
def sanitizedKey = k.replace(".", "_");
ctx.kubernetes.audit.annotations[sanitizedKey] = ctx.kubernetes.audit.annotations[k];
ctx.kubernetes.audit.annotations.remove(k);
}
}
""",
"description": "Normalize kubernetes audit annotations field as expected by the Integration"
}
}
]
}
管道的主要目标是模拟 Elastic Agent 的功能:将所有审计字段存储在 kubernetes.audit 对象下。
我们不会使用传统的 @custom 管道方法,因为在调用使用 kubernetes.container.name 和 kubernetes.labels 字段来确定字段 service.name 和 service.version 的 logs-kubernetes.container_logs 集成管道之前,必须规范化这些字段。有关自定义管道的更多信息,请阅读教程:使用自定义摄取管道转换数据 | Fleet 和 Elastic Agent 指南 [8.11]。
OpenShift Cluster Log Forwarder 默认情况下将数据写入 app-write 和 audit-write 索引中。可以更改此行为,但它仍然尝试添加前缀“app”和后缀“write”,因此我们选择将数据发送到默认目标并使用 reroute 处理器将其发送到正确的 Data Stream。有关 Reroute 处理器的更多信息,请阅读我们的博文简化日志数据管理:利用 Elastic 的灵活路由功能和我们的文档Reroute 处理器 | Elasticsearch 指南 [8.11] | Elastic。
在这种情况下,我们希望将容器日志(app-write 索引)重定向到 logs-kubernetes.container_logs,将审计日志(audit-write)重定向到 logs-kubernetes.audit_logs。
PUT _ingest/pipeline/app-write-reroute-pipeline
{
"processors": [
{
"pipeline": {
"name": "openshift-2-ecs",
"description": "Format the Openshift data in ECS"
}
},
{
"set": {
"field": "event.dataset",
"value": "kubernetes.container_logs"
}
},
{
"reroute": {
"destination": "logs-kubernetes.container_logs-openshift"
}
}
]
}
PUT _ingest/pipeline/audit-write-reroute-pipeline
{
"processors": [
{
"pipeline": {
"name": "openshift-audit-2-ecs",
"description": "Format the Openshift data in ECS"
}
},
{
"set": {
"field": "event.dataset",
"value": "kubernetes.audit_logs"
}
},
{
"reroute": {
"destination": "logs-kubernetes.audit_logs-openshift"
}
}
]
}
请注意,鉴于 app-write 和 audit-write 不遵循 Data Stream 命名约定,我们被迫在 reroute 处理器中添加 destination 字段。reroute 处理器还将为我们填充data_stream 字段。请注意,此步骤由 Elastic Agent 在源端自动完成。
此外,我们使用创建的默认管道创建索引,以根据我们的需要重新路由日志。
PUT app-write
{
"settings": {
"index.default_pipeline": "app-write-reroute-pipeline"
}
}
PUT audit-write
{
"settings": {
"index.default_pipeline": "audit-write-reroute-pipeline"
}
}
基本上,我们所做的可以在此图片中总结
让我们以容器日志为例。当 operator 尝试写入 app-write 索引时,它将调用默认管道“app-write-reroute-pipeline”,该管道将日志格式化为 ECS 格式并将日志重新路由到 logs-kubernetes.container_logs-openshift 数据流。这将调用集成管道,该管道(如果存在)将调用 logs-kubernetes.container_logs@custom 管道。最后,logs-kubernetes_container_logs 管道可能会使用 elastic.co/dataset 和 elastic.co/namespace 注释将日志重新路由到另一个数据集和命名空间,如 Kubernetes 集成文档中所述,这反过来又可能导致执行另一个集成管道。
创建用于发送日志的用户
我们将使用基本身份验证,因为在撰写本文时,这是 OpenShift 日志记录中唯一支持的 Elasticsearch 身份验证方法。因此,我们需要一个角色,允许用户写入和读取 app-write 和 audit-write 日志(OpenShift 代理需要),并允许对 logs-*-* 进行自动配置访问,以允许自定义 Kubernetes 重新路由。
PUT _security/role/YOURROLE
{
"cluster": [
"monitor"
],
"indices": [
{
"names": [
"logs-*-*"
],
"privileges": [
"auto_configure",
"create_doc"
],
"allow_restricted_indices": false
},
{
"names": [
"app-write",
"audit-write",
],
"privileges": [
"create_doc",
"read"
],
"allow_restricted_indices": false
}
],
"applications": [],
"run_as": [],
"metadata": {},
"transient_metadata": {
"enabled": true
}
}
PUT _security/user/YOUR_USERNAME
{
"password": "YOUR_PASSWORD",
"roles": ["YOURROLE"]
}
在 OpenShift 上
在 OpenShift 集群上,我们需要按照 Red Hat 关于如何安装 Red Hat OpenShift Logging 并配置集群日志记录和集群日志转发器的官方文档进行操作。
我们需要安装 Red Hat OpenShift Logging Operator,它定义了 ClusterLogging 和 ClusterLogForwarder 资源。之后,我们可以定义 Cluster Logging 资源。
apiVersion: logging.openshift.io/v1
kind: ClusterLogging
metadata:
name: instance
namespace: openshift-logging
spec:
collection:
logs:
type: vector
vector: {}
Cluster Log Forwarder 是负责定义守护进程集的资源,该守护进程集将日志转发到远程 Elasticsearch。在创建它之前,我们需要在与 ClusterLogForwarder 相同的命名空间中创建一个 Secret,其中包含我们之前在 ClusterLogForwarder 将要部署的命名空间中创建的用户 Elasticsearch 凭据。
apiVersion: v1
kind: Secret
metadata:
name: elasticsearch-password
namespace: openshift-logging
type: Opaque
stringData:
username: YOUR_USERNAME
password: YOUR_PASSWORD
最后,我们创建 ClusterLogForwarder 资源。
kind: ClusterLogForwarder
apiVersion: logging.openshift.io/v1
metadata:
name: instance
namespace: openshift-logging
spec:
outputs:
- name: remote-elasticsearch
secret:
name: elasticsearch-password
type: elasticsearch
url: "https://YOUR_ELASTICSEARCH_URL:443"
elasticsearch:
version: 8 # The default is version 6 with the _type field
pipelines:
- inputRefs:
- application
- audit
name: enable-default-log-store
outputRefs:
- remote-elasticsearch
请注意,我们显式地将 Elasticsearch 版本定义为 8,否则 ClusterLogForwarder 将发送 `_type` 字段,这与 Elasticsearch 8 不兼容,并且我们只收集应用程序和审计日志。
结果
一旦日志收集并通过所有管道传递,结果就非常接近开箱即用的 Kubernetes 集成。但也有一些重要的区别,例如缺少主机和云元数据信息(至少在没有额外配置的情况下)似乎没有被收集。我们可以在日志资源管理器中查看 Kubernetes 容器日志。
在这篇文章中,我们描述了如何使用 OpenShift Logging Operator 收集容器日志和审计日志。我们仍然建议利用 Elastic Agent 收集所有日志。这是您可以获得的最佳用户体验。无需自行维护或转换日志到 ECS 格式。此外,Elastic Agent 使用 API 密钥作为身份验证方法,并收集诸如云信息之类的元数据,从长远来看,这使您可以了解更多。
了解更多关于使用 Elastic Stack 进行日志监控的信息.
对这篇博文有反馈? 在此处分享。
本文中描述的任何功能或功能的发布和时间安排完全由 Elastic 自行决定。任何目前不可用的功能或功能可能无法按时或根本无法交付。