剖析数据
Elastic Stack Serverless
Dissect 将单个文本字段与定义的模式进行匹配。剖析模式由您要丢弃的字符串部分定义。特别注意字符串的每个部分有助于构建成功的剖析模式。
如果您不需要正则表达式的强大功能,请使用剖析模式而不是 grok。Dissect 使用比 grok 简单得多的语法,并且通常速度更快。dissect 的语法是透明的:告诉 dissect 你想要什么,它会将这些结果返回给你。
剖析模式由变量和分隔符组成。由百分号和花括号 `%{}` 定义的任何内容都被视为变量,例如 `%{clientip}`。您可以将变量分配给字段中数据的任何部分,然后仅返回您想要的部分。分隔符是变量之间的任何值,可以是空格、破折号或其他分隔符。
例如,假设您有包含类似如下内容的 `message` 字段的日志数据
"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
您可以将变量分配给数据的每个部分,以构建成功的剖析模式。请记住,准确地告诉 dissect 您想要匹配的内容。
数据的第一部分看起来像 IP 地址,因此您可以分配一个类似 `%{clientip}` 的变量。接下来的两个字符是破折号,两侧都有一个空格。您可以为每个破折号分配一个变量,也可以分配一个变量来表示破折号和空格。接下来是一组包含时间戳的括号。括号是分隔符,因此您将其包含在剖析模式中。到目前为止,数据和匹配的剖析模式如下所示
247.37.0.0 - - [30/Apr/2020:14:31:22 -0500]
%{clientip} %{ident} %{auth} [%{@timestamp}]
- `message` 字段中的第一个数据块
- 用于匹配选定数据块的剖析模式
使用相同的逻辑,您可以为剩余的数据块创建变量。双引号是分隔符,因此请将其包含在剖析模式中。该模式用 `%{verb}` 变量替换 `GET`,但保留 `HTTP` 作为模式的一部分。
\"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0
"%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}
组合这两个模式会产生如下所示的剖析模式
%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}
现在您已经有了剖析模式,如何测试和使用它呢?
您可以将剖析模式合并到 Painless 脚本中以提取数据。要测试您的脚本,请使用 Painless 执行 API 的字段上下文,或者创建一个包含脚本的运行时字段。运行时字段提供更大的灵活性并接受多个文档,但如果您没有对集群的写入权限来测试脚本,Painless 执行 API 是一个不错的选择。
例如,通过包含您的 Painless 脚本和单个与您的数据匹配的文档,使用 Painless 执行 API 测试您的剖析模式。首先将 `message` 字段索引为 `wildcard` 数据类型
PUT my-index
{
"mappings": {
"properties": {
"message": {
"type": "wildcard"
}
}
}
}
如果您想检索 HTTP 响应代码,请将您的剖析模式添加到提取 `response` 值的 Painless 脚本中。要从字段中提取值,请使用此函数
`.extract(doc["<field_name>"].value)?.<field_value>`
在本例中,`message` 是 `<field_name>`,`response` 是 `<field_value>`
POST /_scripts/painless/_execute
{
"script": {
"source": """
String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
if (response != null) emit(Integer.parseInt(response));
"""
},
"context": "long_field",
"context_setup": {
"index": "my-index",
"document": {
"message": """247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0"""
}
}
}
- 运行时字段需要 `emit` 方法才能返回值。
- 由于响应代码是整数,因此请使用 `long_field` 上下文。
- 包含一个与您的数据匹配的示例文档。
结果包括 HTTP 响应代码
{
"result" : [
304
]
}
如果您有一个功能正常的剖析模式,您可以将其添加到运行时字段以操作数据。由于运行时字段不需要您索引字段,因此您可以非常灵活地修改脚本及其功能。如果您已经使用 Painless 执行 API 测试了您的剖析模式,您可以在运行时字段中使用 *完全相同的* Painless 脚本。
首先,像上一节一样将 `message` 字段添加为 `wildcard` 类型,但也要将 `@timestamp` 添加为 `date` 类型,以防您想要对该字段进行操作以用于其他用例
PUT /my-index/
{
"mappings": {
"properties": {
"@timestamp": {
"format": "strict_date_optional_time||epoch_second",
"type": "date"
},
"message": {
"type": "wildcard"
}
}
}
}
如果您想使用您的剖析模式提取 HTTP 响应代码,您可以创建一个类似 `http.response` 的运行时字段
PUT my-index/_mappings
{
"runtime": {
"http.response": {
"type": "long",
"script": """
String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
if (response != null) emit(Integer.parseInt(response));
"""
}
}
}
映射您要检索的字段后,将一些来自您的日志数据的记录索引到 Elasticsearch 中。以下请求使用 批量 API 将原始日志数据索引到 `my-index` 中
POST /my-index/_bulk?refresh=true
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}
您可以定义一个简单的查询来搜索特定的 HTTP 响应并返回所有相关字段。使用搜索 API 的 `fields` 参数来检索 `http.response` 运行时字段。
GET my-index/_search
{
"query": {
"match": {
"http.response": "304"
}
},
"fields" : ["http.response"]
}
或者,您可以在搜索请求的上下文中定义相同的运行时字段。运行时定义和脚本与之前在索引映射中定义的完全相同。只需将该定义复制到搜索请求的 `runtime_mappings` 部分下,并包含一个与运行时字段匹配的查询。此查询返回与之前为索引映射中的 `http.response` 运行时字段定义的搜索查询相同的结果,但仅在此特定搜索的上下文中
GET my-index/_search
{
"runtime_mappings": {
"http.response": {
"type": "long",
"script": """
String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
if (response != null) emit(Integer.parseInt(response));
"""
}
},
"query": {
"match": {
"http.response": "304"
}
},
"fields" : ["http.response"]
}
{
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index",
"_id" : "D47UqXkBByC8cgZrkbOm",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-04-30T14:31:22-05:00",
"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
},
"fields" : {
"http.response" : [
304
]
}
}
]
}
}