创建 Metricset
Elastic 不对用于生成 metricset 的代码提供任何保证或支持。 此生成器主要作为希望创建自己的数据收集器的开发人员的指导。
metricset 是 Metricbeat 模块的一部分,用于从远程服务获取和构建数据。 每个模块可以有多个 metricset。 在本指南中,您将学习如何创建自己的 metricset。
首次创建 metricset 时,通常查阅现有 metricset 的实现以获取灵感会有所帮助。
要创建新的 metricset
在 metricbeat beat 目录中运行以下命令
make create-metricset
您需要 Python 才能运行此命令,然后,系统将提示您输入模块和 metricset 名称。 请记住,模块代表您要从中检索指标的服务(例如 Redis),而 metricset 是一组特定的分组指标(例如 Redis 上的
info
)。 仅使用字符[a-z]
和(如果需要)下划线 (_
)。 不允许使用其他字符。当您运行
make create-metricset
时,它会为您的 metricset 创建所有基本文件,以及所需的模块文件(如果该模块尚不存在)。 有关模块文件的更多详细信息,请参阅 创建 Metricbeat 模块。注意在本指南中,我们使用
{{metricset}}
、{{module}}
和{{beat}}
作为占位符。 您需要将它们替换为您的 metricset、模块和 beat 的实际名称。您创建的 metricset 已经是一个功能正常的 metricset,可以进行编译。
通过运行以下命令来编译您的新 metricset
mage update mage build
第一个命令
mage update
使用来自 metricset 的最新文件、数据和元信息更新所有生成的文件。 第二个命令mage build
编译您的源代码,并在同一文件夹中为您提供一个名为 metricbeat 的二进制文件。 您可以使用以下命令在调试模式下运行该二进制文件./metricbeat -e -d "*"
运行 mage 命令后,您会在 module/{{module}}/{metricset}
下找到 metricset 及其生成的文件。 该目录包含以下文件
\{{metricset}}.go
_meta/docs.asciidoc
_meta/data.json
_meta/fields.yml
接下来让我们更详细地了解这些文件。
第一个文件是 {{metricset}}.go
。 它包含如何从服务获取数据并将其转换为发送到输出的逻辑。
生成的文件如下所示
https://github.com/elastic/beats/blob/main/metricbeat/scripts/module/metricset/metricset.go.tmpl
package {metricset}
import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/metricbeat/mb"
)
// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host is defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
counter int
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The {module} {metricset} metricset is beta.")
config := struct{}{}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
counter: 1,
}, nil
}
// Fetch method implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
report.Event(mb.Event{
MetricSetFields: mapstr.M{
"counter": m.counter,
},
})
m.counter++
return nil
}
package
子句和 import
声明是每个 Go 文件的基本结构的一部分。 只有在您的实现需要更多导入时,才应修改文件的这一部分。
init 方法向中央注册表注册 metricset。 在 Go 中,init()
函数在所有其他代码执行之前调用。 这意味着该模块将自动注册到全局注册表。
在模块设置之后和开始获取数据之前,将调用传递给 MustAddMetricSet
的 New
方法。 通常不需要更改文件的这一部分。
func init() {
mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}
MetricSet 类型定义 metricset 的所有字段。 至少它必须由 mb.BaseMetricSet
字段组成,但可以用其他条目扩展。 这些变量可用于在多个 fetch 调用之间持久化数据或配置。
您可以向 MetricSet 类型添加更多字段,如以下示例所示,其中添加了 username
和 password
字符串字段
type MetricSet struct {
mb.BaseMetricSet
username string
password string
}
New
函数创建 MetricSet 的新实例。 MetricSet 的设置过程也是 New
的一部分。 此方法将在首次调用 Fetch
之前调用。
New
函数还通过处理其他配置条目来设置配置(如果需要)。
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct{}{}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
}, nil
}
Fetch
方法是 metricset 的中心部分。 每次检索新数据时都会调用 Fetch
。 如果定义了多个主机,则为每个主机调用一次 Fetch
。 调用 Fetch
的频率基于配置文件中定义的 period
。
Fetch
必须使用 mb.ReporterV2.Event
方法发布事件。 如果发生错误,Fetch
可以返回错误,或者如果在循环中调用 Event
,则使用 mb.ReporterV2.Error
方法发布。 这意味着 Metricbeat 始终发送事件,即使在发生故障时也是如此。 您必须确保错误消息有助于识别实际错误。
以下示例显示了一个 metricset Fetch
方法,其中包含一个计数器,该计数器针对每个 Fetch
调用递增
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
report.Event(mb.Event{
MetricSetFields: common.MapStr{
"counter": m.counter,
}
})
m.counter++
return nil
}
从报告的事件派生的 JSON 输出将与您在 common.MapStr
中使用的命名和结构相同。 有关 MapStr
及其功能的更多详细信息,请参阅 MapStr API 文档。
对于可能公开多个事件的 metricset,可以在 Fetch
方法中多次调用 Event
。 Event
返回一个布尔值,指示 metricset 是否已关闭且无法再处理任何事件,在这种情况下,Fetch
应立即返回。 如果在处理多个事件中的一个事件时发生错误,则可以使用 mb.ReporterV2.Error
方法发布该错误,而不是返回错误值。
在 Metricbeat 中,我们的目标是将所有 metricset 的指标名称规范化,以遵守通用的 约定集。 这使用户可以轻松查找和解释指标。 为了简化将从受监控系统读取的对象解析、转换、重命名和重组为 Metricbeat 格式,我们创建了 schema 包,允许您声明性地定义转换。
例如,假设有以下输入对象
input := map[string]interface{}{
"testString": "hello",
"testInt": "42",
"testBool": "true",
"testFloat": "42.1",
"testObjString": "hello, object",
}
以及将其转换为以下对象的需要
common.MapStr{
"test_string": "hello",
"test_int": int64(42),
"test_bool": true,
"test_float": 42.1,
"test_obj": common.MapStr{
"test_obj_string": "hello, object",
},
}
您可以使用 schema 包转换数据,并且可以选择将 schema 中的某些字段标记为必需或非必需。 例如
import (
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
)
var (
schema = s.Schema{
"test_string": c.Str("testString", s.Required),
"test_int": c.Int("testInt"),
"test_bool": c.Bool("testBool", s.Optional),
"test_float": c.Float("testFloat"),
"test_obj": s.Object{
"test_obj_string": c.Str("testObjString", s.IgnoreAllErrors),
},
}
)
func eventMapping(input map[string]interface{}) common.MapStr {
return schema.Apply(input)
}
- 将字段标记为必需。
- 如果某个字段未设置 schema 选项,则它等效于
Required
。 - 将字段标记为可选。
- 忽略任何值转换错误
- 默认情况下,如果缺少任何必需字段,
Apply
将失败并返回错误。 使用可选的第二个参数,您可以指定Apply
如何处理 schema 的不同字段。 可能的值为
AllRequired
是默认行为。 如果缺少任何必需字段(包括由于未设置 schema 选项而成为必需的字段),则返回错误。- 如果显式标记为
required
的字段缺失,则FailOnRequired
将失败。 NotFoundKeys(cb func([]string))
接受一个回调函数,该函数将使用缺失键的列表进行调用,从而可以进行更细粒度的错误处理。
在上面的示例中,请注意可以创建一个 schema 对象并将其应用于所有事件。 您还可以使用 ApplyTo
将其他数据添加到现有 MapStr
对象
var (
schema = s.Schema{
"test_string": c.Str("testString"),
"test_int": c.Int("testInt"),
"test_bool": c.Bool("testBool"),
"test_float": c.Float("testFloat"),
"test_obj": s.Object{
"test_obj_string": c.Str("testObjString"),
},
}
additionalSchema = s.Schema{
"second_string": c.Str("secondString"),
"second_int": c.Int("secondInt"),
}
)
data, err := schema.Apply(input)
if err != nil {
return err
}
if m.parseMoreData{
_, err := additionalSchema.ApplyTo(data, input)
if len(err) > 0 {
return err.Err()
}
}
ApplyTo
返回原始 MultiError 对象,使其适合于更细粒度的错误处理。
metricset 的配置文件由模块处理。 如果一个模块中有多个 metricset,请确保将所有 metricset 添加到配置中。 例如
metricbeat:
modules:
- module: {module-name}
metricsets: ["{metricset1}", "{metricset2}"]
请确保在更新配置文件后运行 make collect
,以便您的更改也应用于全局配置文件和文档。
有关 Metricbeat 配置文件的更多详细信息,请参阅 Metricbeat 文档中有关 模块的主题。
本主题提供了创建 metricset 的基本步骤。 有关 metricset 以及如何进一步扩展 metricset 的更多详细信息,请参阅 Metricset 详细信息。