字段提取
Elastic Stack Serverless
字段提取的目标很简单:您的数据中的字段包含大量信息,但您只想提取部分内容。
您有两种选择
- Grok 是一种正则表达式方言,它支持您可以重复使用的别名表达式。 由于 Grok 位于正则表达式 (regex) 之上,因此任何正则表达式在 grok 中也有效。
- Dissect 从文本中提取结构化字段,使用分隔符来定义匹配模式。 与 grok 不同,dissect 不使用正则表达式。
让我们从一个简单的示例开始,将 @timestamp
和 message
字段添加到 my-index
映射作为索引字段。 为了保持灵活性,请使用 wildcard
作为 message
的字段类型
PUT /my-index/
{
"mappings": {
"properties": {
"@timestamp": {
"format": "strict_date_optional_time||epoch_second",
"type": "date"
},
"message": {
"type": "wildcard"
}
}
}
}
映射您要检索的字段后,将日志数据中的一些记录索引到 Elasticsearch 中。 以下请求使用 bulk API 将原始日志数据索引到 my-index
中。 您可以使用一个小的样本来试验运行时字段,而不是索引所有日志数据。
POST /my-index/_bulk?refresh
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}
如果您想检索包含 clientip
的结果,您可以在映射中将该字段添加为运行时字段。 以下运行时脚本定义了一个 grok 模式,该模式从 message
字段中提取结构化字段。
该脚本匹配 %{{COMMONAPACHELOG}}
日志模式,该模式了解 Apache 日志的结构。 如果模式匹配 (clientip != null
),则脚本会发出匹配的 IP 地址的值。 如果模式不匹配,脚本只会返回字段值而不会崩溃。
PUT my-index/_mappings
{
"runtime": {
"http.clientip": {
"type": "ip",
"script": """
String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
if (clientip != null) emit(clientip);
"""
}
}
}
- 此条件确保即使消息的模式不匹配,脚本也不会发出任何内容。
您可以定义一个简单的查询来搜索特定的 IP 地址并返回所有相关字段。 使用搜索 API 的 fields
参数来检索 http.clientip
运行时字段。
GET my-index/_search
{
"query": {
"match": {
"http.clientip": "40.135.0.0"
}
},
"fields" : ["http.clientip"]
}
响应包括 http.clientip
的值与 40.135.0.0
匹配的文档。
{
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index",
"_id" : "Rq-ex3gBA_A0V6dYGLQ7",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-04-30T14:30:17-05:00",
"message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
},
"fields" : {
"http.clientip" : [
"40.135.0.0"
]
}
}
]
}
}
您可以只定义一个 dissect 模式来包含您想要丢弃的字符串部分,而不是像 上一个示例 中那样匹配日志模式。
例如,本节开头的日志数据包括一个 message
字段。 此字段包含多个数据片段
"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
您可以在运行时字段中定义一个 dissect 模式来提取 HTTP 响应代码,在上一个示例中为 304
。
PUT my-index/_mappings
{
"runtime": {
"http.response": {
"type": "long",
"script": """
String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
if (response != null) emit(Integer.parseInt(response));
"""
}
}
}
然后,您可以使用 http.response
运行时字段运行查询来检索特定的 HTTP 响应
GET my-index/_search
{
"query": {
"match": {
"http.response": "304"
}
},
"fields" : ["http.response"]
}
响应包括一个 HTTP 响应为 304
的文档
{
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index",
"_id" : "Sq-ex3gBA_A0V6dYGLQ7",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-04-30T14:31:22-05:00",
"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
},
"fields" : {
"http.response" : [
304
]
}
}
]
}
}
假设您想要像上一个示例中那样提取字段的一部分,但是您想要按特定值进行拆分。 您可以使用 dissect 模式仅提取您想要的信息,并将该数据以特定格式返回。
例如,假设您有一些来自 Elasticsearch 的垃圾回收 (gc) 日志数据,格式如下
[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K
您只想提取 used
、capacity
和 committed
数据,以及关联的值。 让我们索引一些包含日志数据的文档作为示例
POST /my-index/_bulk?refresh
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] class space used 15255K, capacity 16726K, committed 16844K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] Metaspace used 115409K, capacity 119541K, committed 120248K, reserved 1153024K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] class space used 14503K, capacity 15894K, committed 15948K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] Metaspace used 107719K, capacity 111775K, committed 112724K, reserved 1146880K"}
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 367K, committed 384K, reserved 1048576K"}
再次查看数据,有一个时间戳、一些您不感兴趣的其他数据,然后是 used
、capacity
和 committed
数据
[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K
您可以将变量分配给 gc
字段中的每个数据部分,然后仅返回您想要的部分。 花括号 {}
中的任何内容都被视为变量。 例如,变量 [%{@timestamp}][%{{code}}][%{{desc}}]
将匹配前三个数据块,它们都在方括号 []
中。
[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}
您的 dissect 模式可以包含术语 used
、capacity
和 committed
,而不是使用变量,因为您想要完全返回这些术语。 您还将变量分配给您想要返回的值,例如 %{{usize}}
、%{{csize}}
和 %{{comsize}}
。 日志数据中的分隔符是逗号,因此您的 dissect 模式也需要使用该分隔符。
现在您有了一个 dissect 模式,您可以将其包含在 Painless 脚本中作为运行时字段的一部分。 该脚本使用您的 dissect 模式拆分 gc
字段,然后完全返回您想要的信息,如 emit
方法定义的那样。 由于 dissect 使用简单的语法,因此您只需告诉它您想要什么。
以下模式告诉 dissect 返回术语 used
、一个空格、gc.usize
中的值和一个逗号。 此模式会为其他您想要检索的数据重复。 虽然此模式在生产环境中可能不太有用,但它提供了很大的灵活性来试验和操作您的数据。 在生产环境中,您可能只想使用 emit(gc.usize)
,然后对该值进行聚合或在计算中使用它。
emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize)
总而言之,您可以在搜索请求中创建一个名为 gc_size
的运行时字段。 使用 fields
选项,您可以检索 gc_size
运行时字段的所有值。 此查询还包括一个 bucket 聚合来对您的数据进行分组。
GET my-index/_search
{
"runtime_mappings": {
"gc_size": {
"type": "keyword",
"script": """
Map gc=dissect('[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}').extract(doc["gc.keyword"].value);
if (gc != null) emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize);
"""
}
},
"size": 1,
"aggs": {
"sizes": {
"terms": {
"field": "gc_size",
"size": 10
}
}
},
"fields" : ["gc_size"]
}
响应包括来自 gc_size
字段的数据,其格式完全按照您在 dissect 模式中定义的那样!
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 6,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index",
"_id" : "GXx3H3kBKGE42WRNlddJ",
"_score" : 1.0,
"_source" : {
"gc" : "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K"
},
"fields" : {
"gc_size" : [
"used 266K, capacity 384K, committed 384K"
]
}
}
]
},
"aggregations" : {
"sizes" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "used 107719K, capacity 111775K, committed 112724K",
"doc_count" : 1
},
{
"key" : "used 115409K, capacity 119541K, committed 120248K",
"doc_count" : 1
},
{
"key" : "used 14503K, capacity 15894K, committed 15948K",
"doc_count" : 1
},
{
"key" : "used 15255K, capacity 16726K, committed 16844K",
"doc_count" : 1
},
{
"key" : "used 266K, capacity 367K, committed 384K",
"doc_count" : 1
},
{
"key" : "used 266K, capacity 384K, committed 384K",
"doc_count" : 1
}
]
}
}
}