Mirko BezDavid RicordelPhilipp Kahr

使用 Red Hat 的 OpenShift Logging Operator 收集 OpenShift 容器日志

了解如何优化使用 Red Hat OpenShift Logging Operator 收集的 OpenShift 日志,以及如何在 Elasticsearch 中高效地格式化和路由它们。

23 分钟阅读
Collecting OpenShift container logs using Red Hat’s OpenShift Logging Operator

本博客探讨了使用 Red Hat OpenShift Logging Operator 收集和格式化 OpenShift 容器平台日志和审计日志的一种可能方法。我们建议使用 Elastic® Agent 以获得最佳体验!我们还将展示如何将日志格式化为 Elastic Common Schema (ECS),以获得最佳的日志查看、搜索和可视化体验。本博客中的所有示例均基于 OpenShift 4.14。

为什么使用 OpenShift Logging Operator?

许多企业客户使用 OpenShift 作为他们的编排解决方案。这种方法的优点是

  • 它由 Red Hat 开发和支持

  • 它可以自动更新 OpenShift 集群以及操作系统,以确保它们兼容并保持兼容

  • 它可以使用诸如源到镜像之类的功能来加速开发生命周期

  • 它使用增强的安全性

在我们的咨询经验中,当我们尝试安装 Elastic Agent 来收集 pod 的日志时,后一方面给 OpenShift 管理员带来了挑战和摩擦。实际上,Elastic Agent 需要将主机的文件挂载到 pod 中,并且还需要在特权模式下运行。(在官方 Elasticsearch® 文档中阅读有关 Elastic Agent 所需权限的更多信息)。虽然我们在本文中探讨的解决方案在底层需要类似的权限,但它由 OpenShift Logging Operator 管理,该 Operator 由 Red Hat 开发和支持。

我们将收集哪些日志?

在 OpenShift 容器平台中,我们区分三大类日志:审计日志、应用程序日志和基础架构日志

  • 审计日志描述了用户、管理员和其他组件影响系统的活动列表。

  • 应用程序日志由在非保留命名空间中运行的 pod 的容器日志组成。

  • 基础架构日志由在保留命名空间(如 openshift*、kube* 和 default)中运行的 pod 的容器日志以及来自节点的 journald 消息组成。

在下文中,为了简单起见,我们仅考虑审计日志和应用程序日志。在本文中,我们将描述如何以 Kubernetes 集成期望的格式格式化审计日志和应用程序日志,以充分利用 Elastic 可观测性。

开始使用

要从 OpenShift 收集日志,我们必须在 Elasticsearch 和 OpenShift 中执行一些准备步骤。

在 Elasticsearch 内部

我们首先安装 Kubernetes 集成资产。我们主要对 logs-kubernetes.container_logs 和 logs-kubernetes.audit_logs 的索引模板和摄取管道感兴趣。

要将从 ClusterLogForwarder 收到的日志格式化为 ECS 格式,我们将定义一个管道来规范化容器日志。OpenShift 使用的字段命名约定与 ECS 使用的约定略有不同。要获取从 OpenShift 导出的字段列表,请参阅导出的字段 | 日志 | OpenShift 容器平台 4.14。要获取 Kubernetes 集成导出的字段列表,您可以参考 Kubernetes 字段 | Filebeat 参考 [8.11] | Elastic日志应用程序字段 | Elastic 可观测性 [8.11]。此外,必须通过将点替换为下划线来规范化诸如 kubernetes.annotations 之类的特定字段。此操作通常由 Elastic Agent 自动完成。

PUT _ingest/pipeline/openshift-2-ecs
{
  "processors": [
    {
      "rename": {
        "field": "kubernetes.pod_id",
        "target_field": "kubernetes.pod.uid",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.pod_ip",
        "target_field": "kubernetes.pod.ip",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.pod_name",
        "target_field": "kubernetes.pod.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.namespace_name",
        "target_field": "kubernetes.namespace",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.namespace_id",
        "target_field": "kubernetes.namespace_uid",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.container_id",
        "target_field": "container.id",
        "ignore_missing": true
      }
    },
    {
      "dissect": {
        "field": "container.id",
        "pattern": "%{container.runtime}://%{container.id}",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.container_image",
        "target_field": "container.image.name",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "kubernetes.container.image",
        "copy_from": "container.image.name",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "copy_from": "kubernetes.container_name",
        "field": "container.name",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.container_name",
        "target_field": "kubernetes.container.name",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "kubernetes.node.name",
        "copy_from": "hostname",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "hostname",
        "target_field": "host.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "level",
        "target_field": "log.level",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "file",
        "target_field": "log.file.path",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "copy_from": "openshift.cluster_id",
        "field": "orchestrator.cluster.name",
        "ignore_failure": true
      }
    },
    {
      "dissect": {
        "field": "kubernetes.pod_owner",
        "pattern": "%{_tmp.parent_type}/%{_tmp.parent_name}",
        "ignore_missing": true
      }
    },
    {
      "lowercase": {
        "field": "_tmp.parent_type",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "kubernetes.pod.{{_tmp.parent_type}}.name",
        "value": "{{_tmp.parent_name}}",
        "if": "ctx?._tmp?.parent_type != null",
        "ignore_failure": true
      }
    },
    {
      "remove": {
        "field": [
          "_tmp",
          "kubernetes.pod_owner"
          ],
          "ignore_missing": true
      }
    },
    {
      "script": {
        "description": "Normalize kubernetes annotations",
        "if": "ctx?.kubernetes?.annotations != null",
        "source": """
        def keys = new ArrayList(ctx.kubernetes.annotations.keySet());
        for(k in keys) {
          if (k.indexOf(".") >= 0) {
            def sanitizedKey = k.replace(".", "_");
            ctx.kubernetes.annotations[sanitizedKey] = ctx.kubernetes.annotations[k];
            ctx.kubernetes.annotations.remove(k);
          }
        }
        """
      }
    },
    {
      "script": {
        "description": "Normalize kubernetes namespace_labels",
        "if": "ctx?.kubernetes?.namespace_labels != null",
        "source": """
        def keys = new ArrayList(ctx.kubernetes.namespace_labels.keySet());
        for(k in keys) {
          if (k.indexOf(".") >= 0) {
            def sanitizedKey = k.replace(".", "_");
            ctx.kubernetes.namespace_labels[sanitizedKey] = ctx.kubernetes.namespace_labels[k];
            ctx.kubernetes.namespace_labels.remove(k);
          }
        }
        """
      }
    },
    {
      "script": {
        "description": "Normalize special Kubernetes Labels used in logs-kubernetes.container_logs to determine service.name and service.version",
        "if": "ctx?.kubernetes?.labels != null",
        "source": """
        def keys = new ArrayList(ctx.kubernetes.labels.keySet());
        for(k in keys) {
          if (k.startsWith("app_kubernetes_io_component_")) {
            def sanitizedKey = k.replace("app_kubernetes_io_component_", "app_kubernetes_io_component/");
            ctx.kubernetes.labels[sanitizedKey] = ctx.kubernetes.labels[k];
            ctx.kubernetes.labels.remove(k);
          }
        }
        """
      }
    }
    ]
}

类似地,要处理像 Kubernetes 收集的审计日志,我们定义一个摄取管道

PUT _ingest/pipeline/openshift-audit-2-ecs
{
  "processors": [
    {
      "script": {
        "source": """
        def audit = [:];
        def keyToRemove = [];
        for(k in ctx.keySet()) {
          if (k.indexOf('_') != 0 && !['@timestamp', 'data_stream', 'openshift', 'event', 'hostname'].contains(k)) {
            audit[k] = ctx[k];
            keyToRemove.add(k);
          }
        }
        for(k in keyToRemove) {
          ctx.remove(k);
        }
        ctx.kubernetes=["audit":audit];
        """,
        "description": "Move all the 'kubernetes.audit' fields under 'kubernetes.audit' object"
      }
    },
    {
      "set": {
        "copy_from": "openshift.cluster_id",
        "field": "orchestrator.cluster.name",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "kubernetes.node.name",
        "copy_from": "hostname",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "hostname",
        "target_field": "host.name",
        "ignore_missing": true
      }
    },
    {
      "script": {
        "if": "ctx?.kubernetes?.audit?.annotations != null",
        "source": """
          def keys = new ArrayList(ctx.kubernetes.audit.annotations.keySet());
          for(k in keys) {
            if (k.indexOf(".") >= 0) {
              def sanitizedKey = k.replace(".", "_");
              ctx.kubernetes.audit.annotations[sanitizedKey] = ctx.kubernetes.audit.annotations[k];
              ctx.kubernetes.audit.annotations.remove(k);
            }
          }
          """,
        "description": "Normalize kubernetes audit annotations field as expected by the Integration"
      }
    }
  ]
}

管道的主要目标是模仿 Elastic Agent 所做的事情:将所有审计字段存储在 kubernetes.audit 对象下。

我们不打算使用传统的 @custom 管道方法,因为必须在调用 logs-kubernetes.container_logs 集成管道之前规范化字段,该管道使用诸如 kubernetes.container.name 和 kubernetes.labels 之类的字段来确定字段 service.name 和 service.version。在 教程:使用自定义摄取管道转换数据 | Fleet 和 Elastic Agent 指南 [8.11] 中阅读有关自定义管道的更多信息。

OpenShift 集群日志转发器默认将数据写入 app-write 和 audit-write 索引。可以更改此行为,但它仍然尝试预先添加前缀“app”和后缀“write”,因此我们选择将数据发送到默认目标,并使用重新路由处理器将其发送到正确的数据流。在我们的博客 简化日志数据管理:利用 Elastic 的灵活路由功能 和我们的文档 重新路由处理器 | Elasticsearch 指南 [8.11] | Elastic 中阅读有关重新路由处理器的更多信息。

在这种情况下,我们希望将容器日志(app-write 索引)重定向到 logs-kubernetes.container_logs,并将审计日志(audit-write)重定向到 logs-kubernetes.audit_logs

PUT _ingest/pipeline/app-write-reroute-pipeline
{
  "processors": [
    {
      "pipeline": {
        "name": "openshift-2-ecs",
        "description": "Format the Openshift data in ECS"
      }
    },
    {
      "set": {
        "field": "event.dataset",
        "value": "kubernetes.container_logs"
      }
    },
    {
      "reroute": {
        "destination": "logs-kubernetes.container_logs-openshift"
      }
    }
  ]
}



PUT _ingest/pipeline/audit-write-reroute-pipeline
{
  "processors": [
    {
      "pipeline": {
        "name": "openshift-audit-2-ecs",
        "description": "Format the Openshift data in ECS"
      }
    },
    {
      "set": {
        "field": "event.dataset",
        "value": "kubernetes.audit_logs"
      }
    },
    {
      "reroute": {
        "destination": "logs-kubernetes.audit_logs-openshift"
      }
    }
  ]
}

请注意,鉴于 app-write 和 audit-write 不遵循数据流命名约定,我们被迫在重新路由处理器中添加目标字段。重新路由处理器还将为我们填充 data_stream 字段。请注意,此步骤由 Elastic Agent 在源头自动完成。

此外,我们使用我们创建的默认管道创建索引,以根据我们的需要重新路由日志。

PUT app-write
{
  "settings": {
      "index.default_pipeline": "app-write-reroute-pipeline"
   }
}


PUT audit-write
{
  "settings": {
    "index.default_pipeline": "audit-write-reroute-pipeline"
  }
}

基本上,我们所做的事情可以概括在这张图中

让我们以容器日志为例。当操作员尝试写入 app-write 索引时,它将调用默认管道“app-write-reroute-pipeline”,该管道将日志格式化为 ECS 格式,并将日志重新路由到 logs-kubernetes.container_logs-openshift 数据流。这将调用集成管道,该管道会调用(如果存在)logs-kubernetes.container_logs@custom 管道。最后,logs-kubernetes_container_logs 管道可以使用 Kubernetes 集成文档中描述的 elastic.co/dataset 和 elastic.co/namespace 注释,将日志重新路由到另一个数据集和命名空间,这反过来可能导致另一个集成管道的执行。

创建用于发送日志的用户

我们将使用基本身份验证,因为在撰写本文时,它是 OpenShift 日志中 Elasticsearch 唯一支持的身份验证方法。因此,我们需要一个角色,该角色允许用户写入和读取 app-write 和 audit-write 日志(OpenShift 代理需要)以及对 logs-*-* 的 auto_configure 访问权限,以允许自定义 Kubernetes 重新路由

PUT _security/role/YOURROLE
{
    "cluster": [
      "monitor"
    ],
    "indices": [
      {
        "names": [
          "logs-*-*"
        ],
        "privileges": [
          "auto_configure",
          "create_doc"
        ],
        "allow_restricted_indices": false
      },
      {
        "names": [
          "app-write",
          "audit-write",
        ],
        "privileges": [
          "create_doc",
          "read"
        ],
        "allow_restricted_indices": false
      }
    ],
    "applications": [],
    "run_as": [],
    "metadata": {},
    "transient_metadata": {
      "enabled": true
    }

}



PUT _security/user/YOUR_USERNAME
{
  "password": "YOUR_PASSWORD",
  "roles": ["YOURROLE"]
}

在 OpenShift 上

在 OpenShift 集群上,我们需要遵循 Red Hat 的官方文档,了解如何安装 Red Hat OpenShift Logging 并配置集群日志记录和集群日志转发器。

我们需要安装 Red Hat OpenShift Logging Operator,该 Operator 定义了 ClusterLogging 和 ClusterLogForwarder 资源。之后,我们可以定义集群日志记录资源

apiVersion: logging.openshift.io/v1
kind: ClusterLogging
metadata:
  name: instance
  namespace: openshift-logging
spec:
  collection:
    logs:
      type: vector
      vector: {}

集群日志转发器是负责定义一个守护进程集的资源,该守护进程集会将日志转发到远程 Elasticsearch。在创建它之前,我们需要在与 ClusterLogForwarder 相同的命名空间中创建一个 secret,其中包含先前在该命名空间中创建的用户的 Elasticsearch 凭据,ClusterLogForwarder 将部署在该命名空间中。

apiVersion: v1
kind: Secret
metadata:
  name: elasticsearch-password
  namespace: openshift-logging
type: Opaque
stringData:
  username: YOUR_USERNAME
  password: YOUR_PASSWORD

最后,我们创建 ClusterLogForwarder 资源。

kind: ClusterLogForwarder
apiVersion: logging.openshift.io/v1
metadata:
  name: instance
  namespace: openshift-logging
spec:
  outputs:
    - name: remote-elasticsearch
      secret:
        name: elasticsearch-password
      type: elasticsearch
      url: "https://YOUR_ELASTICSEARCH_URL:443"
      elasticsearch:
        version: 8 # The default is version 6 with the _type field
  pipelines:
    - inputRefs:
        - application
        - audit
      name: enable-default-log-store
      outputRefs:
        - remote-elasticsearch

请注意,我们显式地定义了 Elasticsearch 的版本为 8,否则 ClusterLogForwarder 将发送 _type 字段,该字段与 Elasticsearch 8 不兼容,并且我们仅收集应用程序和审计日志。

结果

一旦日志被收集并通过所有管道,结果非常接近开箱即用的 Kubernetes 集成。存在重要的差异,例如缺少主机和云元数据信息,这些信息似乎没有被收集(至少在没有额外配置的情况下)。我们可以在日志浏览器中查看 Kubernetes 容器日志。

在这篇文章中,我们介绍了如何使用 OpenShift Logging Operator 来收集容器日志和审计日志。我们仍然建议利用 Elastic Agent 来收集所有日志。这是你能获得的最佳用户体验。无需自己维护或转换日志为 ECS 格式。此外,Elastic Agent 使用 API 密钥作为身份验证方法,并收集云信息等元数据,这使您从长远来看可以做更多事情

了解有关使用 Elastic Stack 进行日志监控的更多信息.

对这篇博客有反馈吗? 在此处分享

本文中描述的任何特性或功能的发布和时间安排仍由 Elastic 自行决定。任何当前不可用的特性或功能可能无法按时交付,甚至根本不会交付。

分享这篇文章