Mirko BezDavid RicordelPhilipp Kahr

使用 Red Hat 的 OpenShift Logging Operator 收集 OpenShift 容器日志

学习如何优化使用 Red Hat OpenShift Logging Operator 收集的 OpenShift 日志,以及如何在 Elasticsearch 中高效地格式化和路由这些日志。

阅读时间:23分钟
Collecting OpenShift container logs using Red Hat’s OpenShift Logging Operator

这篇博文探讨了一种使用 Red Hat OpenShift Logging Operator 收集和格式化 OpenShift Container Platform 日志和审计日志的可能方法。我们建议使用 Elastic® Agent 以获得最佳体验!我们还将展示如何将日志格式化为 Elastic 通用架构(ECS),以便更好地查看、搜索和可视化您的日志。本博文中的所有示例都基于 OpenShift 4.14。

为什么要使用 OpenShift Logging Operator?

许多企业客户使用 OpenShift 作为他们的编排解决方案。这种方法的优点是:

  • 它由 Red Hat 开发和支持

  • 它可以自动更新 OpenShift 集群和操作系统,以确保它们兼容并保持兼容

  • 它可以使用诸如 source-to-image 之类的功能来加快开发周期

  • 它使用了增强的安全性

在我们咨询的经验中,当我们尝试安装 Elastic Agent 来收集 Pod 的日志时,后一个方面给 OpenShift 管理员带来了挑战和摩擦。事实上,Elastic Agent 需要将宿主的文件挂载到 Pod 中,并且还需要以特权模式运行。(有关 Elastic Agent 需要哪些权限的更多信息,请阅读Elasticsearch® 官方文档)。虽然我们在本文中探讨的解决方案在幕后需要类似的权限,但它由 Red Hat 开发和支持的 OpenShift Logging Operator 管理。

我们将收集哪些日志?

在 OpenShift Container Platform 中,我们将日志分为三大类:审计日志、应用程序日志和基础设施日志

  • **审计日志** 描述了用户、管理员和其他组件影响系统的一系列活动。

  • **应用程序日志** 由在非保留命名空间中运行的 Pod 的容器日志组成。

  • **基础设施日志** 由在保留命名空间(如 openshift*、kube* 和 default)中运行的 Pod 的容器日志以及节点的 journald 消息组成。

为了简单起见,我们将在下面只考虑审计日志和应用程序日志。在这篇文章中,我们将描述如何将审计日志和应用程序日志格式化为 Kubernetes 集成预期的格式,以便充分利用 Elastic 可观测性。

入门

要从 OpenShift 收集日志,我们必须在 Elasticsearch 和 OpenShift 中执行一些准备步骤。

在 Elasticsearch 中

我们首先安装 Kubernetes 集成资源。我们主要关注 logs-kubernetes.container_logs 和 logs-kubernetes.audit_logs 的索引模板和摄取管道。

为了将从 ClusterLogForwarder 收到的日志格式化为ECS格式,我们将定义一个管道来规范化容器日志。OpenShift 使用的字段命名约定与 ECS 使用的略有不同。要获取 OpenShift 导出的字段列表,请参阅导出的字段 | 日志记录 | OpenShift Container Platform 4.14。要获取 Kubernetes 集成导出的字段列表,您可以参阅Kubernetes 字段 | Filebeat 参考 [8.11] | ElasticLogs app fields | Elastic 可观测性 [8.11]。此外,必须通过将点替换为下划线来规范化诸如 kubernetes.annotations 之类的特定字段。此操作通常由 Elastic Agent 自动完成。

PUT _ingest/pipeline/openshift-2-ecs
{
  "processors": [
    {
      "rename": {
        "field": "kubernetes.pod_id",
        "target_field": "kubernetes.pod.uid",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.pod_ip",
        "target_field": "kubernetes.pod.ip",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.pod_name",
        "target_field": "kubernetes.pod.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.namespace_name",
        "target_field": "kubernetes.namespace",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.namespace_id",
        "target_field": "kubernetes.namespace_uid",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.container_id",
        "target_field": "container.id",
        "ignore_missing": true
      }
    },
    {
      "dissect": {
        "field": "container.id",
        "pattern": "%{container.runtime}://%{container.id}",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.container_image",
        "target_field": "container.image.name",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "kubernetes.container.image",
        "copy_from": "container.image.name",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "copy_from": "kubernetes.container_name",
        "field": "container.name",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "kubernetes.container_name",
        "target_field": "kubernetes.container.name",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "kubernetes.node.name",
        "copy_from": "hostname",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "hostname",
        "target_field": "host.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "level",
        "target_field": "log.level",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "file",
        "target_field": "log.file.path",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "copy_from": "openshift.cluster_id",
        "field": "orchestrator.cluster.name",
        "ignore_failure": true
      }
    },
    {
      "dissect": {
        "field": "kubernetes.pod_owner",
        "pattern": "%{_tmp.parent_type}/%{_tmp.parent_name}",
        "ignore_missing": true
      }
    },
    {
      "lowercase": {
        "field": "_tmp.parent_type",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "field": "kubernetes.pod.{{_tmp.parent_type}}.name",
        "value": "{{_tmp.parent_name}}",
        "if": "ctx?._tmp?.parent_type != null",
        "ignore_failure": true
      }
    },
    {
      "remove": {
        "field": [
          "_tmp",
          "kubernetes.pod_owner"
          ],
          "ignore_missing": true
      }
    },
    {
      "script": {
        "description": "Normalize kubernetes annotations",
        "if": "ctx?.kubernetes?.annotations != null",
        "source": """
        def keys = new ArrayList(ctx.kubernetes.annotations.keySet());
        for(k in keys) {
          if (k.indexOf(".") >= 0) {
            def sanitizedKey = k.replace(".", "_");
            ctx.kubernetes.annotations[sanitizedKey] = ctx.kubernetes.annotations[k];
            ctx.kubernetes.annotations.remove(k);
          }
        }
        """
      }
    },
    {
      "script": {
        "description": "Normalize kubernetes namespace_labels",
        "if": "ctx?.kubernetes?.namespace_labels != null",
        "source": """
        def keys = new ArrayList(ctx.kubernetes.namespace_labels.keySet());
        for(k in keys) {
          if (k.indexOf(".") >= 0) {
            def sanitizedKey = k.replace(".", "_");
            ctx.kubernetes.namespace_labels[sanitizedKey] = ctx.kubernetes.namespace_labels[k];
            ctx.kubernetes.namespace_labels.remove(k);
          }
        }
        """
      }
    },
    {
      "script": {
        "description": "Normalize special Kubernetes Labels used in logs-kubernetes.container_logs to determine service.name and service.version",
        "if": "ctx?.kubernetes?.labels != null",
        "source": """
        def keys = new ArrayList(ctx.kubernetes.labels.keySet());
        for(k in keys) {
          if (k.startsWith("app_kubernetes_io_component_")) {
            def sanitizedKey = k.replace("app_kubernetes_io_component_", "app_kubernetes_io_component/");
            ctx.kubernetes.labels[sanitizedKey] = ctx.kubernetes.labels[k];
            ctx.kubernetes.labels.remove(k);
          }
        }
        """
      }
    }
    ]
}

类似地,为了处理 Kubernetes 收集的审计日志,我们定义了一个摄取管道

PUT _ingest/pipeline/openshift-audit-2-ecs
{
  "processors": [
    {
      "script": {
        "source": """
        def audit = [:];
        def keyToRemove = [];
        for(k in ctx.keySet()) {
          if (k.indexOf('_') != 0 && !['@timestamp', 'data_stream', 'openshift', 'event', 'hostname'].contains(k)) {
            audit[k] = ctx[k];
            keyToRemove.add(k);
          }
        }
        for(k in keyToRemove) {
          ctx.remove(k);
        }
        ctx.kubernetes=["audit":audit];
        """,
        "description": "Move all the 'kubernetes.audit' fields under 'kubernetes.audit' object"
      }
    },
    {
      "set": {
        "copy_from": "openshift.cluster_id",
        "field": "orchestrator.cluster.name",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "kubernetes.node.name",
        "copy_from": "hostname",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "hostname",
        "target_field": "host.name",
        "ignore_missing": true
      }
    },
    {
      "script": {
        "if": "ctx?.kubernetes?.audit?.annotations != null",
        "source": """
          def keys = new ArrayList(ctx.kubernetes.audit.annotations.keySet());
          for(k in keys) {
            if (k.indexOf(".") >= 0) {
              def sanitizedKey = k.replace(".", "_");
              ctx.kubernetes.audit.annotations[sanitizedKey] = ctx.kubernetes.audit.annotations[k];
              ctx.kubernetes.audit.annotations.remove(k);
            }
          }
          """,
        "description": "Normalize kubernetes audit annotations field as expected by the Integration"
      }
    }
  ]
}

管道的主要目标是模拟 Elastic Agent 的功能:将所有审计字段存储在 kubernetes.audit 对象下。

我们不会使用传统的 @custom 管道方法,因为在调用使用 kubernetes.container.name 和 kubernetes.labels 字段来确定字段 service.name 和 service.version 的 logs-kubernetes.container_logs 集成管道之前,必须规范化这些字段。有关自定义管道的更多信息,请阅读教程:使用自定义摄取管道转换数据 | Fleet 和 Elastic Agent 指南 [8.11]

OpenShift Cluster Log Forwarder 默认情况下将数据写入 app-write 和 audit-write 索引中。可以更改此行为,但它仍然尝试添加前缀“app”和后缀“write”,因此我们选择将数据发送到默认目标并使用 reroute 处理器将其发送到正确的 Data Stream。有关 Reroute 处理器的更多信息,请阅读我们的博文简化日志数据管理:利用 Elastic 的灵活路由功能和我们的文档Reroute 处理器 | Elasticsearch 指南 [8.11] | Elastic

在这种情况下,我们希望将容器日志(app-write 索引)重定向到 logs-kubernetes.container_logs,将审计日志(audit-write)重定向到 logs-kubernetes.audit_logs。

PUT _ingest/pipeline/app-write-reroute-pipeline
{
  "processors": [
    {
      "pipeline": {
        "name": "openshift-2-ecs",
        "description": "Format the Openshift data in ECS"
      }
    },
    {
      "set": {
        "field": "event.dataset",
        "value": "kubernetes.container_logs"
      }
    },
    {
      "reroute": {
        "destination": "logs-kubernetes.container_logs-openshift"
      }
    }
  ]
}



PUT _ingest/pipeline/audit-write-reroute-pipeline
{
  "processors": [
    {
      "pipeline": {
        "name": "openshift-audit-2-ecs",
        "description": "Format the Openshift data in ECS"
      }
    },
    {
      "set": {
        "field": "event.dataset",
        "value": "kubernetes.audit_logs"
      }
    },
    {
      "reroute": {
        "destination": "logs-kubernetes.audit_logs-openshift"
      }
    }
  ]
}

请注意,鉴于 app-write 和 audit-write 不遵循 Data Stream 命名约定,我们被迫在 reroute 处理器中添加 destination 字段。reroute 处理器还将为我们填充data_stream 字段。请注意,此步骤由 Elastic Agent 在源端自动完成。

此外,我们使用创建的默认管道创建索引,以根据我们的需要重新路由日志。

PUT app-write
{
  "settings": {
      "index.default_pipeline": "app-write-reroute-pipeline"
   }
}


PUT audit-write
{
  "settings": {
    "index.default_pipeline": "audit-write-reroute-pipeline"
  }
}

基本上,我们所做的可以在此图片中总结

让我们以容器日志为例。当 operator 尝试写入 app-write 索引时,它将调用默认管道“app-write-reroute-pipeline”,该管道将日志格式化为 ECS 格式并将日志重新路由到 logs-kubernetes.container_logs-openshift 数据流。这将调用集成管道,该管道(如果存在)将调用 logs-kubernetes.container_logs@custom 管道。最后,logs-kubernetes_container_logs 管道可能会使用 elastic.co/dataset 和 elastic.co/namespace 注释将日志重新路由到另一个数据集和命名空间,如 Kubernetes 集成文档中所述,这反过来又可能导致执行另一个集成管道。

创建用于发送日志的用户

我们将使用基本身份验证,因为在撰写本文时,这是 OpenShift 日志记录中唯一支持的 Elasticsearch 身份验证方法。因此,我们需要一个角色,允许用户写入和读取 app-write 和 audit-write 日志(OpenShift 代理需要),并允许对 logs-*-* 进行自动配置访问,以允许自定义 Kubernetes 重新路由。

PUT _security/role/YOURROLE
{
    "cluster": [
      "monitor"
    ],
    "indices": [
      {
        "names": [
          "logs-*-*"
        ],
        "privileges": [
          "auto_configure",
          "create_doc"
        ],
        "allow_restricted_indices": false
      },
      {
        "names": [
          "app-write",
          "audit-write",
        ],
        "privileges": [
          "create_doc",
          "read"
        ],
        "allow_restricted_indices": false
      }
    ],
    "applications": [],
    "run_as": [],
    "metadata": {},
    "transient_metadata": {
      "enabled": true
    }

}



PUT _security/user/YOUR_USERNAME
{
  "password": "YOUR_PASSWORD",
  "roles": ["YOURROLE"]
}

在 OpenShift 上

在 OpenShift 集群上,我们需要按照 Red Hat 关于如何安装 Red Hat OpenShift Logging 并配置集群日志记录和集群日志转发器的官方文档进行操作。

我们需要安装 Red Hat OpenShift Logging Operator,它定义了 ClusterLogging 和 ClusterLogForwarder 资源。之后,我们可以定义 Cluster Logging 资源。

apiVersion: logging.openshift.io/v1
kind: ClusterLogging
metadata:
  name: instance
  namespace: openshift-logging
spec:
  collection:
    logs:
      type: vector
      vector: {}

Cluster Log Forwarder 是负责定义守护进程集的资源,该守护进程集将日志转发到远程 Elasticsearch。在创建它之前,我们需要在与 ClusterLogForwarder 相同的命名空间中创建一个 Secret,其中包含我们之前在 ClusterLogForwarder 将要部署的命名空间中创建的用户 Elasticsearch 凭据。

apiVersion: v1
kind: Secret
metadata:
  name: elasticsearch-password
  namespace: openshift-logging
type: Opaque
stringData:
  username: YOUR_USERNAME
  password: YOUR_PASSWORD

最后,我们创建 ClusterLogForwarder 资源。

kind: ClusterLogForwarder
apiVersion: logging.openshift.io/v1
metadata:
  name: instance
  namespace: openshift-logging
spec:
  outputs:
    - name: remote-elasticsearch
      secret:
        name: elasticsearch-password
      type: elasticsearch
      url: "https://YOUR_ELASTICSEARCH_URL:443"
      elasticsearch:
        version: 8 # The default is version 6 with the _type field
  pipelines:
    - inputRefs:
        - application
        - audit
      name: enable-default-log-store
      outputRefs:
        - remote-elasticsearch

请注意,我们显式地将 Elasticsearch 版本定义为 8,否则 ClusterLogForwarder 将发送 `_type` 字段,这与 Elasticsearch 8 不兼容,并且我们只收集应用程序和审计日志。

结果

一旦日志收集并通过所有管道传递,结果就非常接近开箱即用的 Kubernetes 集成。但也有一些重要的区别,例如缺少主机和云元数据信息(至少在没有额外配置的情况下)似乎没有被收集。我们可以在日志资源管理器中查看 Kubernetes 容器日志。

在这篇文章中,我们描述了如何使用 OpenShift Logging Operator 收集容器日志和审计日志。我们仍然建议利用 Elastic Agent 收集所有日志。这是您可以获得的最佳用户体验。无需自行维护或转换日志到 ECS 格式。此外,Elastic Agent 使用 API 密钥作为身份验证方法,并收集诸如云信息之类的元数据,从长远来看,这使您可以了解更多

了解更多关于使用 Elastic Stack 进行日志监控的信息.

对这篇博文有反馈? 在此处分享

本文中描述的任何功能或功能的发布和时间安排完全由 Elastic 自行决定。任何目前不可用的功能或功能可能无法按时或根本无法交付。