字段提取
编辑字段提取
编辑字段提取的目标很简单;你的数据中有一些字段包含大量信息,但你只想提取其中的一部分。
你有两个选项可供选择
让我们从一个简单的示例开始,将 @timestamp
和 message
字段添加到 my-index
映射中作为索引字段。为了保持灵活性,请对 message
使用 wildcard
作为字段类型
resp = client.indices.create( index="my-index", mappings={ "properties": { "@timestamp": { "format": "strict_date_optional_time||epoch_second", "type": "date" }, "message": { "type": "wildcard" } } }, ) print(resp)
response = client.indices.create( index: 'my-index', body: { mappings: { properties: { "@timestamp": { format: 'strict_date_optional_time||epoch_second', type: 'date' }, message: { type: 'wildcard' } } } } ) puts response
const response = await client.indices.create({ index: "my-index", mappings: { properties: { "@timestamp": { format: "strict_date_optional_time||epoch_second", type: "date", }, message: { type: "wildcard", }, }, }, }); console.log(response);
PUT /my-index/ { "mappings": { "properties": { "@timestamp": { "format": "strict_date_optional_time||epoch_second", "type": "date" }, "message": { "type": "wildcard" } } } }
映射完要检索的字段后,将一些日志数据记录索引到 Elasticsearch 中。以下请求使用 批量 API 将原始日志数据索引到 my-index
中。你可以使用一个小样本来试验运行时字段,而不是索引所有日志数据。
resp = client.bulk( index="my-index", refresh=True, operations=[ { "index": {} }, { "timestamp": "2020-04-30T14:30:17-05:00", "message": "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:30:53-05:00", "message": "232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:12-05:00", "message": "26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:19-05:00", "message": "247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:22-05:00", "message": "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:27-05:00", "message": "252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:28-05:00", "message": "not a valid apache log" } ], ) print(resp)
response = client.bulk( index: 'my-index', refresh: true, body: [ { index: {} }, { timestamp: '2020-04-30T14:30:17-05:00', message: '40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:30:53-05:00', message: '232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:31:12-05:00', message: '26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:31:19-05:00', message: '247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] "GET /french/splash_inet.html HTTP/1.0" 200 3781' }, { index: {} }, { timestamp: '2020-04-30T14:31:22-05:00', message: '247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0' }, { index: {} }, { timestamp: '2020-04-30T14:31:27-05:00', message: '252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:31:28-05:00', message: 'not a valid apache log' } ] ) puts response
const response = await client.bulk({ index: "my-index", refresh: "true", operations: [ { index: {}, }, { timestamp: "2020-04-30T14:30:17-05:00", message: '40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:30:53-05:00", message: '232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:31:12-05:00", message: '26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:31:19-05:00", message: '247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] "GET /french/splash_inet.html HTTP/1.0" 200 3781', }, { index: {}, }, { timestamp: "2020-04-30T14:31:22-05:00", message: '247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0', }, { index: {}, }, { timestamp: "2020-04-30T14:31:27-05:00", message: '252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:31:28-05:00", message: "not a valid apache log", }, ], }); console.log(response);
POST /my-index/_bulk?refresh {"index":{}} {"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"} {"index":{}} {"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"} {"index":{}} {"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}
从日志消息中提取 IP 地址 (Grok)
编辑如果想要检索包含 clientip
的结果,可以将该字段作为运行时字段添加到映射中。以下运行时脚本定义了一个 grok 模式,该模式从 message
字段中提取结构化字段。
该脚本匹配 %{COMMONAPACHELOG}
日志模式,该模式了解 Apache 日志的结构。如果模式匹配 (clientip != null
),则脚本会发出匹配的 IP 地址的值。如果模式不匹配,脚本只会返回字段值而不会崩溃。
resp = client.indices.put_mapping( index="my-index", runtime={ "http.clientip": { "type": "ip", "script": "\n String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n if (clientip != null) emit(clientip); \n " } }, ) print(resp)
const response = await client.indices.putMapping({ index: "my-index", runtime: { "http.clientip": { type: "ip", script: "\n String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n if (clientip != null) emit(clientip); \n ", }, }, }); console.log(response);
PUT my-index/_mappings { "runtime": { "http.clientip": { "type": "ip", "script": """ String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip; if (clientip != null) emit(clientip); """ } } }
你可以定义一个简单的查询来搜索特定的 IP 地址并返回所有相关字段。使用搜索 API 的 fields
参数来检索 http.clientip
运行时字段。
resp = client.search( index="my-index", query={ "match": { "http.clientip": "40.135.0.0" } }, fields=[ "http.clientip" ], ) print(resp)
response = client.search( index: 'my-index', body: { query: { match: { 'http.clientip' => '40.135.0.0' } }, fields: [ 'http.clientip' ] } ) puts response
const response = await client.search({ index: "my-index", query: { match: { "http.clientip": "40.135.0.0", }, }, fields: ["http.clientip"], }); console.log(response);
GET my-index/_search { "query": { "match": { "http.clientip": "40.135.0.0" } }, "fields" : ["http.clientip"] }
响应包括 http.clientip
的值与 40.135.0.0
匹配的文档。
{ "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "my-index", "_id" : "Rq-ex3gBA_A0V6dYGLQ7", "_score" : 1.0, "_source" : { "timestamp" : "2020-04-30T14:30:17-05:00", "message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, "fields" : { "http.clientip" : [ "40.135.0.0" ] } } ] } }
解析字符串以提取字段的一部分 (Dissect)
编辑你可以只定义一个 dissect 模式来包含要丢弃的字符串部分,而不是像前面的示例中那样匹配日志模式。
例如,本节开头的日志数据包含一个 message
字段。此字段包含多个数据片段
"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
你可以在运行时字段中定义一个 dissect 模式,以提取 HTTP 响应代码,在上一个示例中为 304
。
resp = client.indices.put_mapping( index="my-index", runtime={ "http.response": { "type": "long", "script": "\n String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{response} %{size}').extract(doc[\"message\"].value)?.response;\n if (response != null) emit(Integer.parseInt(response));\n " } }, ) print(resp)
const response = await client.indices.putMapping({ index: "my-index", runtime: { "http.response": { type: "long", script: '\n String response=dissect(\'%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}\').extract(doc["message"].value)?.response;\n if (response != null) emit(Integer.parseInt(response));\n ', }, }, }); console.log(response);
PUT my-index/_mappings { "runtime": { "http.response": { "type": "long", "script": """ String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response; if (response != null) emit(Integer.parseInt(response)); """ } } }
然后你可以运行查询,使用 http.response
运行时字段来检索特定的 HTTP 响应
resp = client.search( index="my-index", query={ "match": { "http.response": "304" } }, fields=[ "http.response" ], ) print(resp)
response = client.search( index: 'my-index', body: { query: { match: { 'http.response' => '304' } }, fields: [ 'http.response' ] } ) puts response
const response = await client.search({ index: "my-index", query: { match: { "http.response": "304", }, }, fields: ["http.response"], }); console.log(response);
GET my-index/_search { "query": { "match": { "http.response": "304" } }, "fields" : ["http.response"] }
响应包括一个 HTTP 响应为 304
的文档
{ "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "my-index", "_id" : "Sq-ex3gBA_A0V6dYGLQ7", "_score" : 1.0, "_source" : { "timestamp" : "2020-04-30T14:31:22-05:00", "message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0" }, "fields" : { "http.response" : [ 304 ] } } ] } }
按分隔符拆分字段中的值 (Dissect)
编辑假设你想提取字段的一部分,就像前面的示例中一样,但是你想要按特定值拆分。你可以使用 dissect 模式仅提取你想要的信息,并以特定格式返回该数据。
例如,假设你有一些来自 Elasticsearch 的垃圾回收 (gc) 日志数据,格式如下
[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K
你只想提取 used
、capacity
和 committed
数据以及关联的值。让我们索引一些包含日志数据的文档以用作示例
resp = client.bulk( index="my-index", refresh=True, operations=[ { "index": {} }, { "gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K" }, { "index": {} }, { "gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] class space used 15255K, capacity 16726K, committed 16844K, reserved 1048576K" }, { "index": {} }, { "gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] Metaspace used 115409K, capacity 119541K, committed 120248K, reserved 1153024K" }, { "index": {} }, { "gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] class space used 14503K, capacity 15894K, committed 15948K, reserved 1048576K" }, { "index": {} }, { "gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] Metaspace used 107719K, capacity 111775K, committed 112724K, reserved 1146880K" }, { "index": {} }, { "gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 367K, committed 384K, reserved 1048576K" } ], ) print(resp)
response = client.bulk( index: 'my-index', refresh: true, body: [ { index: {} }, { gc: '[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K' }, { index: {} }, { gc: '[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] class space used 15255K, capacity 16726K, committed 16844K, reserved 1048576K' }, { index: {} }, { gc: '[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] Metaspace used 115409K, capacity 119541K, committed 120248K, reserved 1153024K' }, { index: {} }, { gc: '[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] class space used 14503K, capacity 15894K, committed 15948K, reserved 1048576K' }, { index: {} }, { gc: '[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] Metaspace used 107719K, capacity 111775K, committed 112724K, reserved 1146880K' }, { index: {} }, { gc: '[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 367K, committed 384K, reserved 1048576K' } ] ) puts response
const response = await client.bulk({ index: "my-index", refresh: "true", operations: [ { index: {}, }, { gc: "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K", }, { index: {}, }, { gc: "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] class space used 15255K, capacity 16726K, committed 16844K, reserved 1048576K", }, { index: {}, }, { gc: "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] Metaspace used 115409K, capacity 119541K, committed 120248K, reserved 1153024K", }, { index: {}, }, { gc: "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] class space used 14503K, capacity 15894K, committed 15948K, reserved 1048576K", }, { index: {}, }, { gc: "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] Metaspace used 107719K, capacity 111775K, committed 112724K, reserved 1146880K", }, { index: {}, }, { gc: "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 367K, committed 384K, reserved 1048576K", }, ], }); console.log(response);
POST /my-index/_bulk?refresh {"index":{}} {"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K"} {"index":{}} {"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] class space used 15255K, capacity 16726K, committed 16844K, reserved 1048576K"} {"index":{}} {"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] Metaspace used 115409K, capacity 119541K, committed 120248K, reserved 1153024K"} {"index":{}} {"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] class space used 14503K, capacity 15894K, committed 15948K, reserved 1048576K"} {"index":{}} {"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] Metaspace used 107719K, capacity 111775K, committed 112724K, reserved 1146880K"} {"index":{}} {"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 367K, committed 384K, reserved 1048576K"}
再次查看数据,有一个时间戳,一些你不太感兴趣的其他数据,然后是 used
、capacity
和 committed
数据
[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K
你可以将变量分配给 gc
字段中数据的每个部分,然后仅返回你想要的部分。花括号 {}
中的任何内容都被视为变量。例如,变量 [%{@timestamp}][%{code}][%{desc}]
将匹配前三个数据块,所有这些数据块都在方括号 []
中。
[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}
你的 dissect 模式可以包含术语 used
、capacity
和 committed
,而不是使用变量,因为你想要完全返回这些术语。你还可以将变量分配给你想要返回的值,例如 %{usize}
、%{csize}
和 %{comsize}
。日志数据中的分隔符是逗号,因此你的 dissect 模式也需要使用该分隔符。
现在你有了 dissect 模式,你可以将其作为运行时字段的一部分包含在 Painless 脚本中。该脚本使用你的 dissect 模式拆分 gc
字段,然后返回你想要的信息,这由 emit
方法定义。由于 dissect 使用简单的语法,你只需要告诉它你想要什么即可。
以下模式告诉 dissect 返回术语 used
、一个空格、gc.usize
中的值和一个逗号。此模式会为你想要检索的其他数据重复。虽然此模式在生产环境中可能不是很有用,但它提供了很大的灵活性来试验和操作你的数据。在生产环境中,你可能只想使用 emit(gc.usize)
,然后聚合该值或在计算中使用它。
emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize)
将所有内容放在一起,你可以在搜索请求中创建一个名为 gc_size
的运行时字段。使用 fields
选项,你可以检索 gc_size
运行时字段的所有值。此查询还包括一个桶聚合来对你的数据进行分组。
resp = client.search( index="my-index", runtime_mappings={ "gc_size": { "type": "keyword", "script": "\n Map gc=dissect('[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}').extract(doc[\"gc.keyword\"].value);\n if (gc != null) emit(\"used\" + ' ' + gc.usize + ', ' + \"capacity\" + ' ' + gc.csize + ', ' + \"committed\" + ' ' + gc.comsize);\n " } }, size=1, aggs={ "sizes": { "terms": { "field": "gc_size", "size": 10 } } }, fields=[ "gc_size" ], ) print(resp)
const response = await client.search({ index: "my-index", runtime_mappings: { gc_size: { type: "keyword", script: "\n Map gc=dissect('[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}').extract(doc[\"gc.keyword\"].value);\n if (gc != null) emit(\"used\" + ' ' + gc.usize + ', ' + \"capacity\" + ' ' + gc.csize + ', ' + \"committed\" + ' ' + gc.comsize);\n ", }, }, size: 1, aggs: { sizes: { terms: { field: "gc_size", size: 10, }, }, }, fields: ["gc_size"], }); console.log(response);
GET my-index/_search { "runtime_mappings": { "gc_size": { "type": "keyword", "script": """ Map gc=dissect('[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}').extract(doc["gc.keyword"].value); if (gc != null) emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize); """ } }, "size": 1, "aggs": { "sizes": { "terms": { "field": "gc_size", "size": 10 } } }, "fields" : ["gc_size"] }
响应包括 gc_size
字段中的数据,其格式与你在 dissect 模式中定义的格式完全相同!
{ "took" : 2, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 6, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "my-index", "_id" : "GXx3H3kBKGE42WRNlddJ", "_score" : 1.0, "_source" : { "gc" : "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K" }, "fields" : { "gc_size" : [ "used 266K, capacity 384K, committed 384K" ] } } ] }, "aggregations" : { "sizes" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 0, "buckets" : [ { "key" : "used 107719K, capacity 111775K, committed 112724K", "doc_count" : 1 }, { "key" : "used 115409K, capacity 119541K, committed 120248K", "doc_count" : 1 }, { "key" : "used 14503K, capacity 15894K, committed 15948K", "doc_count" : 1 }, { "key" : "used 15255K, capacity 16726K, committed 16844K", "doc_count" : 1 }, { "key" : "used 266K, capacity 367K, committed 384K", "doc_count" : 1 }, { "key" : "used 266K, capacity 384K, committed 384K", "doc_count" : 1 } ] } } }