创建 Metricset

编辑

Elastic 对用于生成 metricsets 的代码不提供任何担保或支持。该生成器主要作为想要创建自己数据发送器的开发人员的指导。

Metricset 是 Metricbeat 模块的一部分,用于从远程服务获取和构造数据。每个模块可以有多个 metricset。在本指南中,您将学习如何创建自己的 metricset。

第一次创建 metricset 时,通常查看现有 metricset 的实现以获取灵感会很有帮助。

要创建一个新的 metricset

  1. 在 Metricbeat 的 beat 目录中运行以下命令:

    make create-metricset

    您需要 Python 来运行此命令,然后,系统会提示您输入模块和 metricset 名称。请记住,模块代表您要从中检索指标的服务(例如 Redis),而 metricset 是一组特定的分组指标(例如 Redis 上的 info)。仅使用字符 [a-z],并在需要时使用下划线 (_)。不允许使用其他字符。

    运行 make create-metricset 时,它会创建 metricset 的所有基本文件,如果模块尚不存在,则还会创建所需的模块文件。有关模块文件的更多详细信息,请参阅 创建 Metricbeat 模块

    在本指南中,我们使用 {metricset}{module}{beat} 作为占位符。您需要将这些替换为您 metricset、模块和 beat 的实际名称。

    您创建的 metricset 已经是可运行的 metricset,可以进行编译。

  2. 运行以下命令来编译您的新 metricset:

    mage update
    mage build

    第一个命令 mage update 使用 metricset 中最新的文件、数据和元信息更新所有生成的文。第二个命令 mage build 编译您的源代码,并在同一文件夹中提供名为 metricbeat 的二进制文件。您可以使用以下命令在调试模式下运行该二进制文件:

    ./metricbeat -e -d "*"

运行 mage 命令后,您会在 module/{module}/{metricset} 下找到 metricset 及其生成的文。此目录包含以下文件:

  • \{metricset}.go
  • _meta/docs.asciidoc
  • _meta/data.json
  • _meta/fields.yml

接下来,让我们更详细地了解这些文件。

{metricset}.go 文件

编辑

第一个文件是 {metricset}.go。它包含有关如何从服务获取数据并将其转换为发送到输出的数据的逻辑。

生成的文如下所示:

https://github.com/elastic/beats/blob/main/metricbeat/scripts/module/metricset/metricset.go.tmpl

package {metricset}

import (
    "github.com/elastic/elastic-agent-libs/mapstr"
	"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
	"github.com/elastic/beats/v7/metricbeat/mb"
)

// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host is defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
	mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}

// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
	mb.BaseMetricSet
	counter int
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
	cfgwarn.Beta("The {module} {metricset} metricset is beta.")

	config := struct{}{}
	if err := base.Module().UnpackConfig(&config); err != nil {
		return nil, err
	}

	return &MetricSet{
		BaseMetricSet: base,
		counter:       1,
	}, nil
}

// Fetch method implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
	report.Event(mb.Event{
		MetricSetFields: mapstr.M{
			"counter": m.counter,
		},
	})
	m.counter++

	return nil
}

package 语句和 import 声明是每个 Go 文件基本结构的一部分。只有在您的实现需要更多导入时,才应修改文件的这一部分。

初始化
编辑

init 方法将 metricset 注册到中央注册表中。在 Go 中,init() 函数在所有其他代码执行之前调用。这意味着模块将自动注册到全局注册表中。

传递给 MustAddMetricSetNew 方法将在模块设置之后以及开始获取数据之前调用。通常不需要更改文件的这一部分。

func init() {
	mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}
定义
编辑

MetricSet 类型定义了 metricset 的所有字段。至少必须由 mb.BaseMetricSet 字段组成,但可以使用其他条目进行扩展。这些变量可用于在多个获取调用之间持久保存数据或配置。

您可以向 MetricSet 类型添加更多字段,如下例所示,其中添加了 usernamepassword 字符串字段:

type MetricSet struct {
	mb.BaseMetricSet
	username    string
	password    string
}
创建
编辑

New 函数创建 MetricSet 的新实例。MetricSet 的设置过程也是 New 的一部分。此方法将在第一次调用 Fetch 之前调用。

New 函数还通过处理其他配置条目(如果需要)来设置配置。

func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

	config := struct{}{}

	if err := base.Module().UnpackConfig(&config); err != nil {
		return nil, err
	}

	return &MetricSet{
		BaseMetricSet: base,
	}, nil
}
获取
编辑

Fetch 方法是 metricset 的核心部分。Fetch 在每次检索新数据时都会被调用。如果定义了多个主机,则会为每个主机调用一次 Fetch。调用 Fetch 的频率基于配置文件中定义的 period

Fetch 必须使用 mb.ReporterV2.Event 方法发布事件。如果发生错误,Fetch 可以返回错误,或者如果在循环中调用 Event,则可以使用 mb.ReporterV2.Error 方法发布。这意味着 Metricbeat 始终发送事件,即使在失败时也是如此。您必须确保错误消息有助于识别实际错误。

以下示例显示了一个 metricset Fetch 方法,该方法带有一个计数器,该计数器在每次 Fetch 调用时都会递增:

func (m *MetricSet) Fetch(report mb.ReporterV2) error {

	report.Event(mb.Event{
		MetricSetFields: common.MapStr{
			"counter": m.counter,
		}
	})
	m.counter++

	return nil
}

从报告的事件派生的 JSON 输出将与您在 common.MapStr 中使用的命名和结构相同。有关 MapStr 及其功能的更多详细信息,请参阅 MapStr API 文档

多重获取
编辑

对于可能公开多个事件的 metricset,可以在 Fetch 方法内部多次调用 EventEvent 返回一个布尔值,指示 metricset 是否已关闭并且无法处理更多事件,在这种情况下,Fetch 应立即返回。如果在处理多个事件之一时发生错误,可以使用 mb.ReporterV2.Error 方法发布该错误,而不是返回错误值。

解析和规范化字段
编辑

在 Metricbeat 中,我们的目标是规范化所有 metricset 的指标名称,以遵循常见的 约定集。这使用户可以轻松查找和解释指标。为了简化从受监控系统读取的对象到 Metricbeat 格式的对象的解析、转换、重命名和重构,我们创建了 schema 包,允许您声明式地定义转换。

例如,假设此输入对象:

input := map[string]interface{}{
	"testString":     "hello",
	"testInt":        "42",
	"testBool":       "true",
	"testFloat":      "42.1",
	"testObjString":  "hello, object",
}

以及将其转换为以下对象的需要:

common.MapStr{
	"test_string": "hello",
	"test_int":    int64(42),
	"test_bool":   true,
	"test_float":  42.1,
	"test_obj": common.MapStr{
		"test_obj_string": "hello, object",
	},
}

您可以使用 schema 包来转换数据,并可以选择将 schema 中的某些字段标记为必需或非必需。例如:

import (
	s "github.com/elastic/beats/libbeat/common/schema"
	c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
)

var (
	schema = s.Schema{
		"test_string": c.Str("testString", s.Required), 
		"test_int":    c.Int("testInt"), 
		"test_bool":   c.Bool("testBool", s.Optional), 
		"test_float":  c.Float("testFloat"),
		"test_obj": s.Object{
			"test_obj_string": c.Str("testObjString", s.IgnoreAllErrors), 
		},
	}
)

func eventMapping(input map[string]interface{}) common.MapStr {
	return schema.Apply(input) 
}

将字段标记为必需。

如果字段没有设置 schema 选项,则等效于 Required

将字段标记为可选。

忽略任何值转换错误。

默认情况下,如果任何必需字段丢失,Apply 将失败并返回错误。使用可选的第二个参数,您可以指定 Apply 如何处理 schema 的不同字段。可能的值为:

  • AllRequired 是默认行为。如果任何必需字段丢失,则返回错误,包括由于未设置 schema 选项而必需的字段。
  • FailOnRequired 如果明确标记为 required 的字段丢失,则将失败。
  • NotFoundKeys(cb func([]string)) 获取一个回调函数,该函数将使用丢失键的列表调用,从而允许更细致的错误处理。

在上例中,请注意,可以一次创建 schema 对象并将其应用于所有事件。您还可以使用 ApplyTo 将其他数据添加到现有的 MapStr 对象:

var (
	schema = s.Schema{
		"test_string": c.Str("testString"),
		"test_int":    c.Int("testInt"),
		"test_bool":   c.Bool("testBool"),
		"test_float":  c.Float("testFloat"),
		"test_obj": s.Object{
			"test_obj_string": c.Str("testObjString"),
		},
	}

	additionalSchema = s.Schema{
		"second_string": c.Str("secondString"),
		"second_int": c.Int("secondInt"),
	}
)

	data, err := schema.Apply(input)
	if err != nil {
		return err
	}

	if m.parseMoreData{
		_, err := additionalSchema.ApplyTo(data, input)
		if len(err) > 0 { 
			return err.Err()
		}
	}

ApplyTo 返回一个原始 MultiError 对象,使其适合更细致的错误处理。

配置文件

编辑

metricset 的配置文件由模块处理。如果一个模块中有多个 metricset,请确保将所有 metricset 添加到配置中。例如:

metricbeat:
  modules:
    - module: {module-name}
      metricsets: ["{metricset1}", "{metricset2}"]

更新配置文件后,请确保运行 make collect,以便您的更改也应用于全局配置文件和文档。

有关 Metricbeat 配置文件的更多详细信息,请参阅 Metricbeat 文档中有关 模块 的主题。

下一步做什么

编辑

本主题提供了创建 metricset 的基本步骤。有关 metricset 以及如何进一步扩展 metricset 的更多详细信息,请参阅 Metricset 详情