使用运行时字段探索您的数据
编辑使用运行时字段探索您的数据
编辑假设您有一组大型日志数据,您想从中提取字段。为数据建立索引既耗时又占用大量磁盘空间,而您只想在不预先确定模式的情况下探索数据结构。
您知道您的日志数据包含您想要提取的特定字段。在本例中,我们想关注 @timestamp
和 message
字段。通过使用运行时字段,您可以定义脚本来在搜索时计算这些字段的值。
将索引字段定义为起点
编辑您可以从一个简单的例子开始,将 @timestamp
和 message
字段作为索引字段添加到 my-index-000001
的映射中。为了保持灵活性,请使用 wildcard
作为 message
的字段类型
resp = client.indices.create( index="my-index-000001", mappings={ "properties": { "@timestamp": { "format": "strict_date_optional_time||epoch_second", "type": "date" }, "message": { "type": "wildcard" } } }, ) print(resp)
response = client.indices.create( index: 'my-index-000001', body: { mappings: { properties: { "@timestamp": { format: 'strict_date_optional_time||epoch_second', type: 'date' }, message: { type: 'wildcard' } } } } ) puts response
const response = await client.indices.create({ index: "my-index-000001", mappings: { properties: { "@timestamp": { format: "strict_date_optional_time||epoch_second", type: "date", }, message: { type: "wildcard", }, }, }, }); console.log(response);
PUT /my-index-000001/ { "mappings": { "properties": { "@timestamp": { "format": "strict_date_optional_time||epoch_second", "type": "date" }, "message": { "type": "wildcard" } } } }
摄取一些数据
编辑在映射您想要检索的字段后,将日志数据中的一些记录索引到 Elasticsearch 中。以下请求使用 批量 API 将原始日志数据索引到 my-index-000001
中。您可以不索引所有日志数据,而是使用一个小样本来试用运行时字段。
最终文档不是有效的 Apache 日志格式,但我们可以在脚本中考虑这种情况。
resp = client.bulk( index="my-index-000001", refresh=True, operations=[ { "index": {} }, { "timestamp": "2020-04-30T14:30:17-05:00", "message": "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:30:53-05:00", "message": "232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:12-05:00", "message": "26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:19-05:00", "message": "247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:22-05:00", "message": "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:27-05:00", "message": "252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, { "index": {} }, { "timestamp": "2020-04-30T14:31:28-05:00", "message": "not a valid apache log" } ], ) print(resp)
response = client.bulk( index: 'my-index-000001', refresh: true, body: [ { index: {} }, { timestamp: '2020-04-30T14:30:17-05:00', message: '40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:30:53-05:00', message: '232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:31:12-05:00', message: '26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:31:19-05:00', message: '247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] "GET /french/splash_inet.html HTTP/1.0" 200 3781' }, { index: {} }, { timestamp: '2020-04-30T14:31:22-05:00', message: '247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0' }, { index: {} }, { timestamp: '2020-04-30T14:31:27-05:00', message: '252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736' }, { index: {} }, { timestamp: '2020-04-30T14:31:28-05:00', message: 'not a valid apache log' } ] ) puts response
const response = await client.bulk({ index: "my-index-000001", refresh: "true", operations: [ { index: {}, }, { timestamp: "2020-04-30T14:30:17-05:00", message: '40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:30:53-05:00", message: '232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:31:12-05:00", message: '26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:31:19-05:00", message: '247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] "GET /french/splash_inet.html HTTP/1.0" 200 3781', }, { index: {}, }, { timestamp: "2020-04-30T14:31:22-05:00", message: '247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0', }, { index: {}, }, { timestamp: "2020-04-30T14:31:27-05:00", message: '252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736', }, { index: {}, }, { timestamp: "2020-04-30T14:31:28-05:00", message: "not a valid apache log", }, ], }); console.log(response);
POST /my-index-000001/_bulk?refresh {"index":{}} {"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"} {"index":{}} {"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"} {"index":{}} {"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"} {"index":{}} {"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}
此时,您可以查看 Elasticsearch 如何存储您的原始数据。
resp = client.indices.get( index="my-index-000001", ) print(resp)
response = client.indices.get( index: 'my-index-000001' ) puts response
const response = await client.indices.get({ index: "my-index-000001", }); console.log(response);
GET /my-index-000001
映射包含两个字段:@timestamp
和 message
。
{ "my-index-000001" : { "aliases" : { }, "mappings" : { "properties" : { "@timestamp" : { "type" : "date", "format" : "strict_date_optional_time||epoch_second" }, "message" : { "type" : "wildcard" }, "timestamp" : { "type" : "date" } } }, ... } }
使用 grok 模式定义运行时字段
编辑如果您想检索包含 clientip
的结果,您可以在映射中添加该字段作为运行时字段。以下运行时脚本定义了一个 grok 模式,该模式从文档中的单个文本字段中提取结构化字段。grok 模式类似于正则表达式,它支持可以重复使用的别名表达式。
该脚本匹配 %{COMMONAPACHELOG}
日志模式,该模式理解 Apache 日志的结构。如果模式匹配 (clientip != null
),则脚本会发出匹配的 IP 地址的值。如果模式不匹配,则脚本仅返回字段值而不会崩溃。
resp = client.indices.put_mapping( index="my-index-000001", runtime={ "http.client_ip": { "type": "ip", "script": "\n String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n if (clientip != null) emit(clientip); \n " } }, ) print(resp)
const response = await client.indices.putMapping({ index: "my-index-000001", runtime: { "http.client_ip": { type: "ip", script: "\n String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n if (clientip != null) emit(clientip); \n ", }, }, }); console.log(response);
PUT my-index-000001/_mappings { "runtime": { "http.client_ip": { "type": "ip", "script": """ String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip; if (clientip != null) emit(clientip); """ } } }
或者,您可以在搜索请求的上下文中定义相同的运行时字段。运行时定义和脚本与之前在索引映射中定义的完全相同。只需将该定义复制到搜索请求的 runtime_mappings
部分,并包含一个与运行时字段匹配的查询。此查询返回的结果与您在索引映射中为 http.clientip
运行时字段定义搜索查询的结果相同,但仅限于此特定搜索的上下文
resp = client.search( index="my-index-000001", runtime_mappings={ "http.clientip": { "type": "ip", "script": "\n String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n if (clientip != null) emit(clientip);\n " } }, query={ "match": { "http.clientip": "40.135.0.0" } }, fields=[ "http.clientip" ], ) print(resp)
const response = await client.search({ index: "my-index-000001", runtime_mappings: { "http.clientip": { type: "ip", script: "\n String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n if (clientip != null) emit(clientip);\n ", }, }, query: { match: { "http.clientip": "40.135.0.0", }, }, fields: ["http.clientip"], }); console.log(response);
GET my-index-000001/_search { "runtime_mappings": { "http.clientip": { "type": "ip", "script": """ String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip; if (clientip != null) emit(clientip); """ } }, "query": { "match": { "http.clientip": "40.135.0.0" } }, "fields" : ["http.clientip"] }
定义复合运行时字段
编辑您还可以定义一个复合运行时字段,以从单个脚本发出多个字段。您可以定义一组类型化的子字段并发出一个值映射。在搜索时,每个子字段都会检索映射中与其名称关联的值。这意味着您只需要指定一次 grok 模式,就可以返回多个值
resp = client.indices.put_mapping( index="my-index-000001", runtime={ "http": { "type": "composite", "script": "emit(grok(\"%{COMMONAPACHELOG}\").extract(doc[\"message\"].value))", "fields": { "clientip": { "type": "ip" }, "verb": { "type": "keyword" }, "response": { "type": "long" } } } }, ) print(resp)
response = client.indices.put_mapping( index: 'my-index-000001', body: { runtime: { http: { type: 'composite', script: 'emit(grok("%<COMMONAPACHELOG>s").extract(doc["message"].value))', fields: { clientip: { type: 'ip' }, verb: { type: 'keyword' }, response: { type: 'long' } } } } } ) puts response
const response = await client.indices.putMapping({ index: "my-index-000001", runtime: { http: { type: "composite", script: 'emit(grok("%{COMMONAPACHELOG}").extract(doc["message"].value))', fields: { clientip: { type: "ip", }, verb: { type: "keyword", }, response: { type: "long", }, }, }, }, }); console.log(response);
PUT my-index-000001/_mappings { "runtime": { "http": { "type": "composite", "script": "emit(grok(\"%{COMMONAPACHELOG}\").extract(doc[\"message\"].value))", "fields": { "clientip": { "type": "ip" }, "verb": { "type": "keyword" }, "response": { "type": "long" } } } } }
搜索特定的 IP 地址
编辑使用 http.clientip
运行时字段,您可以定义一个简单的查询来搜索特定的 IP 地址并返回所有相关字段。
resp = client.search( index="my-index-000001", query={ "match": { "http.clientip": "40.135.0.0" } }, fields=[ "*" ], ) print(resp)
const response = await client.search({ index: "my-index-000001", query: { match: { "http.clientip": "40.135.0.0", }, }, fields: ["*"], }); console.log(response);
GET my-index-000001/_search { "query": { "match": { "http.clientip": "40.135.0.0" } }, "fields" : ["*"] }
API 返回以下结果。由于 http
是一个 composite
运行时字段,因此响应在 fields
下包含每个子字段,包括任何与查询匹配的关联值。无需预先构建数据结构,您就可以通过有意义的方式搜索和探索您的数据,以试验并确定要索引的字段。
{ ... "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "my-index-000001", "_id" : "sRVHBnwBB-qjgFni7h_O", "_score" : 1.0, "_source" : { "timestamp" : "2020-04-30T14:30:17-05:00", "message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" }, "fields" : { "http.verb" : [ "GET" ], "http.clientip" : [ "40.135.0.0" ], "http.response" : [ 200 ], "message" : [ "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" ], "http.client_ip" : [ "40.135.0.0" ], "timestamp" : [ "2020-04-30T19:30:17.000Z" ] } } ] } }
另外,还记得脚本中的 if
语句吗?
if (clientip != null) emit(clientip);
如果脚本不包含此条件,则查询将在任何与模式不匹配的分片上失败。通过包含此条件,查询会跳过与 grok 模式不匹配的数据。
搜索特定范围内的文档
编辑您还可以运行一个 范围查询,该查询在 timestamp
字段上运行。以下查询返回 timestamp
大于或等于 2020-04-30T14:31:27-05:00
的所有文档
resp = client.search( index="my-index-000001", query={ "range": { "timestamp": { "gte": "2020-04-30T14:31:27-05:00" } } }, ) print(resp)
const response = await client.search({ index: "my-index-000001", query: { range: { timestamp: { gte: "2020-04-30T14:31:27-05:00", }, }, }, }); console.log(response);
GET my-index-000001/_search { "query": { "range": { "timestamp": { "gte": "2020-04-30T14:31:27-05:00" } } } }
响应包括日志格式不匹配但时间戳在定义范围内的文档。
{ ... "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "my-index-000001", "_id" : "hdEhyncBRSB6iD-PoBqe", "_score" : 1.0, "_source" : { "timestamp" : "2020-04-30T14:31:27-05:00", "message" : "252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736" } }, { "_index" : "my-index-000001", "_id" : "htEhyncBRSB6iD-PoBqe", "_score" : 1.0, "_source" : { "timestamp" : "2020-04-30T14:31:28-05:00", "message" : "not a valid apache log" } } ] } }
使用 dissect 模式定义运行时字段
编辑如果您不需要正则表达式的功能,可以使用 dissect 模式 而不是 grok 模式。Dissect 模式匹配固定分隔符,但通常比 grok 更快。
您可以使用 dissect 来实现与使用 grok 模式 解析 Apache 日志相同的结果。您不必匹配日志模式,而是包含您想要丢弃的字符串部分。特别注意您想要丢弃的字符串部分将有助于构建成功的 dissect 模式。
resp = client.indices.put_mapping( index="my-index-000001", runtime={ "http.client.ip": { "type": "ip", "script": "\n String clientip=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}').extract(doc[\"message\"].value)?.clientip;\n if (clientip != null) emit(clientip);\n " } }, ) print(resp)
const response = await client.indices.putMapping({ index: "my-index-000001", runtime: { "http.client.ip": { type: "ip", script: '\n String clientip=dissect(\'%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{status} %{size}\').extract(doc["message"].value)?.clientip;\n if (clientip != null) emit(clientip);\n ', }, }, }); console.log(response);
PUT my-index-000001/_mappings { "runtime": { "http.client.ip": { "type": "ip", "script": """ String clientip=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{status} %{size}').extract(doc["message"].value)?.clientip; if (clientip != null) emit(clientip); """ } } }
类似地,您可以定义一个 dissect 模式来提取 HTTP 响应代码
resp = client.indices.put_mapping( index="my-index-000001", runtime={ "http.responses": { "type": "long", "script": "\n String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{response} %{size}').extract(doc[\"message\"].value)?.response;\n if (response != null) emit(Integer.parseInt(response));\n " } }, ) print(resp)
const response = await client.indices.putMapping({ index: "my-index-000001", runtime: { "http.responses": { type: "long", script: '\n String response=dissect(\'%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}\').extract(doc["message"].value)?.response;\n if (response != null) emit(Integer.parseInt(response));\n ', }, }, }); console.log(response);
PUT my-index-000001/_mappings { "runtime": { "http.responses": { "type": "long", "script": """ String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response; if (response != null) emit(Integer.parseInt(response)); """ } } }
然后,您可以运行查询以使用 http.responses
运行时字段检索特定的 HTTP 响应。使用 _search
请求的 fields
参数来指示您想要检索的字段
resp = client.search( index="my-index-000001", query={ "match": { "http.responses": "304" } }, fields=[ "http.client_ip", "timestamp", "http.verb" ], ) print(resp)
response = client.search( index: 'my-index-000001', body: { query: { match: { 'http.responses' => '304' } }, fields: [ 'http.client_ip', 'timestamp', 'http.verb' ] } ) puts response
const response = await client.search({ index: "my-index-000001", query: { match: { "http.responses": "304", }, }, fields: ["http.client_ip", "timestamp", "http.verb"], }); console.log(response);
GET my-index-000001/_search { "query": { "match": { "http.responses": "304" } }, "fields" : ["http.client_ip","timestamp","http.verb"] }
响应包含一个 HTTP 响应为 304
的文档
{ ... "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "my-index-000001", "_id" : "A2qDy3cBWRMvVAuI7F8M", "_score" : 1.0, "_source" : { "timestamp" : "2020-04-30T14:31:22-05:00", "message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0" }, "fields" : { "http.verb" : [ "GET" ], "http.client_ip" : [ "247.37.0.0" ], "timestamp" : [ "2020-04-30T19:31:22.000Z" ] } } ] } }