创建 Metricset编辑

Elastic 对用于生成 metricset 的代码不提供任何担保或支持。生成器主要供希望创建自己的数据传送器的开发人员参考。

Metricset 是 Metricbeat 模块的一部分,用于从远程服务中获取数据并构建数据结构。每个模块可以有多个 metricset。在本指南中,您将学习如何创建自己的 metricset。

首次创建 metricset 时,查看现有 metricset 的实现通常会有所帮助。

要创建新的 metricset

  1. 在 metricbeat beat 目录中运行以下命令

    make create-metricset

    您需要使用 Python 运行此命令,然后系统会提示您输入模块和 metricset 名称。请记住,模块表示您要从中检索指标的服务(如 Redis),而 metricset 是一组特定的分组指标(如 Redis 上的 info)。只能使用字符 [a-z],如果需要,可以使用下划线 (_)。不允许使用其他字符。

    当您运行 make create-metricset 时,它会为您的 metricset 创建所有基本文件,如果该模块尚不存在,还会创建所需的模块文件。有关模块文件的更多详细信息,请参阅创建 Metricbeat 模块

    在本指南中,我们使用 {metricset}{module}{beat} 作为占位符。您需要将它们替换为您的 metricset、模块和 beat 的实际名称。

    您创建的 metricset 已经是一个功能正常的 metricset,可以编译。

  2. 通过运行以下命令编译新的 metricset

    mage update
    mage build

    第一个命令 mage update 使用 metricset 中的最新文件、数据和元信息更新所有生成的文件。第二个命令 mage build 编译您的源代码,并在同一文件夹中提供一个名为 metricbeat 的二进制文件。您可以使用以下命令在调试模式下运行该二进制文件

    ./metricbeat -e -d "*"

运行 mage 命令后,您将在 module/{module}/{metricset} 下找到 metricset 及其生成的文件。此目录包含以下文件

  • \{metricset}.go
  • _meta/docs.asciidoc
  • _meta/data.json
  • _meta/fields.yml

接下来,我们将更详细地介绍这些文件。

{metricset}.go 文件编辑

第一个文件是 {metricset}.go。它包含有关如何从服务中获取数据并将其转换为可发送到输出的格式的逻辑。

生成的文件如下所示

https://github.com/elastic/beats/blob/main/metricbeat/scripts/module/metricset/metricset.go.tmpl

package {metricset}

import (
    "github.com/elastic/elastic-agent-libs/mapstr"
	"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
	"github.com/elastic/beats/v7/metricbeat/mb"
)

// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host is defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
	mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}

// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
	mb.BaseMetricSet
	counter int
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
	cfgwarn.Beta("The {module} {metricset} metricset is beta.")

	config := struct{}{}
	if err := base.Module().UnpackConfig(&config); err != nil {
		return nil, err
	}

	return &MetricSet{
		BaseMetricSet: base,
		counter:       1,
	}, nil
}

// Fetch method implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
	report.Event(mb.Event{
		MetricSetFields: mapstr.M{
			"counter": m.counter,
		},
	})
	m.counter++

	return nil
}

package 子句和 import 声明是每个 Go 文件基本结构的一部分。只有在您的实现需要更多导入时,才应修改文件的这一部分。

初始化编辑

init 方法使用中央注册表注册 metricset。在 Go 中,init() 函数在执行所有其他代码之前调用。这意味着模块将自动注册到全局注册表。

New 方法(传递给 MustAddMetricSet)将在模块设置完成后、开始获取数据之前调用。您通常不需要更改文件的这一部分。

func init() {
	mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}
定义编辑

MetricSet 类型定义了 metricset 的所有字段。它至少必须由 mb.BaseMetricSet 字段组成,但可以使用其他条目进行扩展。这些变量可用于在多个获取调用之间持久保存数据或配置。

您可以向 MetricSet 类型添加更多字段,如以下示例所示,其中添加了 usernamepassword 字符串字段

type MetricSet struct {
	mb.BaseMetricSet
	username    string
	password    string
}
创建编辑

New 函数创建 MetricSet 的新实例。MetricSet 的设置过程也是 New 的一部分。此方法将在第一次调用 Fetch 之前调用。

New 函数还通过处理其他配置条目(如果需要)来设置配置。

func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

	config := struct{}{}

	if err := base.Module().UnpackConfig(&config); err != nil {
		return nil, err
	}

	return &MetricSet{
		BaseMetricSet: base,
	}, nil
}
获取编辑

Fetch 方法是 metricset 的核心部分。Fetch 在每次检索新数据时调用。如果定义了多个主机,则为每个主机调用一次 Fetch。调用 Fetch 的频率基于配置文件中定义的 period

Fetch 必须使用 mb.ReporterV2.Event 方法发布事件。如果发生错误,Fetch 可以返回错误,或者如果在循环中调用 Event,则使用 mb.ReporterV2.Error 方法发布。这意味着即使失败,Metricbeat 也会始终发送事件。您必须确保错误消息有助于识别实际错误。

以下示例显示了一个 metricset Fetch 方法,其中包含一个计数器,该计数器在每次调用 Fetch 时递增

func (m *MetricSet) Fetch(report mb.ReporterV2) error {

	report.Event(mb.Event{
		MetricSetFields: common.MapStr{
			"counter": m.counter,
		}
	})
	m.counter++

	return nil
}

从报告的事件派生的 JSON 输出将与您在 common.MapStr 中使用的命名和结构相同。有关 MapStr 及其函数的更多详细信息,请参阅MapStr API 文档

多重获取编辑

对于可能会公开多个事件的 metricset,可以在 Fetch 方法中多次调用 EventEvent 返回一个布尔值,指示 metricset 是否已关闭且无法处理更多事件,在这种情况下,Fetch 应立即返回。如果在处理多个事件之一时出错,可以使用 mb.ReporterV2.Error 方法发布该错误,而不是返回错误值。

解析和规范化字段编辑

在 Metricbeat 中,我们的目标是规范化所有 metricset 中的指标名称,以遵循一组通用的约定。这使用户可以轻松地查找和解释指标。为了简化从受监控系统读取的对象到 Metricbeat 格式的解析、转换、重命名和重构,我们创建了schema 包,允许您声明性地定义转换。

例如,假设有以下输入对象

input := map[string]interface{}{
	"testString":     "hello",
	"testInt":        "42",
	"testBool":       "true",
	"testFloat":      "42.1",
	"testObjString":  "hello, object",
}

并且需要将其转换为以下对象

common.MapStr{
	"test_string": "hello",
	"test_int":    int64(42),
	"test_bool":   true,
	"test_float":  42.1,
	"test_obj": common.MapStr{
		"test_obj_string": "hello, object",
	},
}

您可以使用 schema 包转换数据,并可选择在架构中将某些字段标记为必需或非必需。例如

import (
	s "github.com/elastic/beats/libbeat/common/schema"
	c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
)

var (
	schema = s.Schema{
		"test_string": c.Str("testString", s.Required), 
		"test_int":    c.Int("testInt"), 
		"test_bool":   c.Bool("testBool", s.Optional), 
		"test_float":  c.Float("testFloat"),
		"test_obj": s.Object{
			"test_obj_string": c.Str("testObjString", s.IgnoreAllErrors), 
		},
	}
)

func eventMapping(input map[string]interface{}) common.MapStr {
	return schema.Apply(input) 
}

将字段标记为必需。

如果未设置字段的 schema 选项,则等效于 Required

将字段标记为可选。

忽略任何值转换错误

默认情况下,如果缺少任何必需字段,Apply 将失败并返回错误。使用可选的第二个参数,您可以指定 Apply 如何处理架构的不同字段。可能的值有

  • AllRequired 是默认行为。如果缺少任何必需字段(包括由于未设置 schema 选项而必需的字段),则返回错误。
  • 如果缺少显式标记为 required 的字段,则 FailOnRequired 将失败。
  • NotFoundKeys(cb func([]string)) 接受一个回调函数,该函数将使用缺少的键列表调用,从而实现更精细的错误处理。

在上面的示例中,请注意,可以创建一次 schema 对象并将其应用于所有事件。您还可以使用 ApplyTo 将其他数据添加到现有的 MapStr 对象中

var (
	schema = s.Schema{
		"test_string": c.Str("testString"),
		"test_int":    c.Int("testInt"),
		"test_bool":   c.Bool("testBool"),
		"test_float":  c.Float("testFloat"),
		"test_obj": s.Object{
			"test_obj_string": c.Str("testObjString"),
		},
	}

	additionalSchema = s.Schema{
		"second_string": c.Str("secondString"),
		"second_int": c.Int("secondInt"),
	}
)

	data, err := schema.Apply(input)
	if err != nil {
		return err
	}

	if m.parseMoreData{
		_, err := additionalSchema.ApplyTo(data, input)
		if len(err) > 0 { 
			return err.Err()
		}
	}

ApplyTo 返回原始的 MultiError 对象,使其适用于更精细的错误处理。

配置文件编辑

metricset 的配置文件由模块处理。如果一个模块中有多个 metricset,请确保将所有 metricset 添加到配置中。例如

metricbeat:
  modules:
    - module: {module-name}
      metricsets: ["{metricset1}", "{metricset2}"]

请确保在更新配置文件后运行 make collect,以便将您的更改也应用于全局配置文件和文档。

有关 Metricbeat 配置文件的更多详细信息,请参阅 Metricbeat 文档中有关模块的主题。

下一步做什么编辑

本主题提供了创建 metricset 的基本步骤。有关 metricset 以及如何进一步扩展 metricset 的更多详细信息,请参阅Metricset 详细信息