字段提取编辑

字段提取的目标很简单:你的数据中有一些字段包含大量信息,但你只想提取其中的一部分。

你有两种选择:

  • Grok 是一种正则表达式方言,支持你可以重复使用的别名表达式。由于 Grok 建立在正则表达式 (regex) 之上,因此任何正则表达式在 Grok 中都是有效的。
  • Dissect 使用分隔符定义匹配模式,从文本中提取结构化字段。与 Grok 不同,Dissect 不使用正则表达式。

让我们从一个简单的例子开始,将 @timestampmessage 字段作为索引字段添加到 my-index 映射中。为了保持灵活性,对 message 使用 wildcard 作为字段类型。

response = client.indices.create(
  index: 'my-index',
  body: {
    mappings: {
      properties: {
        "@timestamp": {
          format: 'strict_date_optional_time||epoch_second',
          type: 'date'
        },
        message: {
          type: 'wildcard'
        }
      }
    }
  }
)
puts response
PUT /my-index/
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "format": "strict_date_optional_time||epoch_second",
        "type": "date"
      },
      "message": {
        "type": "wildcard"
      }
    }
  }
}

映射完要检索的字段后,将一些日志数据记录索引到 Elasticsearch 中。以下请求使用 批量 API 将原始日志数据索引到 my-index 中。你无需索引所有日志数据,可以使用一小部分样本进行运行时字段的实验。

response = client.bulk(
  index: 'my-index',
  refresh: true,
  body: [
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:30:17-05:00',
      message: '40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:30:53-05:00',
      message: '232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:12-05:00',
      message: '26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:19-05:00',
      message: '247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] "GET /french/splash_inet.html HTTP/1.0" 200 3781'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:22-05:00',
      message: '247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:27-05:00',
      message: '252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:28-05:00',
      message: 'not a valid apache log'
    }
  ]
)
puts response
POST /my-index/_bulk?refresh
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}

从日志消息中提取 IP 地址 (Grok)编辑

如果你想检索包含 clientip 的结果,可以在映射中添加该字段作为运行时字段。以下运行时脚本定义了一个 Grok 模式,用于从 message 字段中提取结构化字段。

该脚本匹配 %{COMMONAPACHELOG} 日志模式,该模式理解 Apache 日志的结构。如果模式匹配 (clientip != null),脚本将发出匹配的 IP 地址的值。如果模式不匹配,脚本将只返回字段值,而不会崩溃。

PUT my-index/_mappings
{
  "runtime": {
    "http.clientip": {
      "type": "ip",
      "script": """
        String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
        if (clientip != null) emit(clientip); 
      """
    }
  }
}

此条件确保即使消息的模式不匹配,脚本也不会发出任何内容。

你可以定义一个简单的查询来运行特定 IP 地址的搜索并返回所有相关字段。使用搜索 API 的 fields 参数检索 http.clientip 运行时字段。

response = client.search(
  index: 'my-index',
  body: {
    query: {
      match: {
        'http.clientip' => '40.135.0.0'
      }
    },
    fields: [
      'http.clientip'
    ]
  }
)
puts response
GET my-index/_search
{
  "query": {
    "match": {
      "http.clientip": "40.135.0.0"
    }
  },
  "fields" : ["http.clientip"]
}

响应包含 http.clientip 值与 40.135.0.0 匹配的文档。

{
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "Rq-ex3gBA_A0V6dYGLQ7",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:30:17-05:00",
          "message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        "fields" : {
          "http.clientip" : [
            "40.135.0.0"
          ]
        }
      }
    ]
  }
}

解析字符串以提取字段的一部分 (Dissect)编辑

你无需像 上一个示例 中那样匹配日志模式,只需定义一个 Dissect 模式来包含要丢弃的字符串部分。

例如,本节开头处的日志数据包含一个 message 字段。该字段包含多个数据片段。

"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"

你可以在运行时字段中定义一个 Dissect 模式来提取 HTTP 响应代码,在上一个示例中是 304

PUT my-index/_mappings
{
  "runtime": {
    "http.response": {
      "type": "long",
      "script": """
        String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
        if (response != null) emit(Integer.parseInt(response));
      """
    }
  }
}

然后,你可以运行一个查询来检索使用 http.response 运行时字段的特定 HTTP 响应。

response = client.search(
  index: 'my-index',
  body: {
    query: {
      match: {
        'http.response' => '304'
      }
    },
    fields: [
      'http.response'
    ]
  }
)
puts response
GET my-index/_search
{
  "query": {
    "match": {
      "http.response": "304"
    }
  },
  "fields" : ["http.response"]
}

响应包含一个 HTTP 响应为 304 的单个文档。

{
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "Sq-ex3gBA_A0V6dYGLQ7",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:31:22-05:00",
          "message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
        },
        "fields" : {
          "http.response" : [
            304
          ]
        }
      }
    ]
  }
}

按分隔符拆分字段中的值 (Dissect)编辑

假设你想像上一个示例中那样提取字段的一部分,但你想按特定值拆分。你可以使用 Dissect 模式来提取你想要的信息,并将该数据以特定格式返回。

例如,假设你有一堆来自 Elasticsearch 的垃圾回收 (gc) 日志数据,格式如下:

[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K

你只想提取 usedcapacitycommitted 数据以及相关值。让我们索引一些包含日志数据的文档作为示例。

response = client.bulk(
  index: 'my-index',
  refresh: true,
  body: [
    {
      index: {}
    },
    {
      gc: '[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K'
    },
    {
      index: {}
    },
    {
      gc: '[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit]   class space    used 15255K, capacity 16726K, committed 16844K, reserved 1048576K'
    },
    {
      index: {}
    },
    {
      gc: '[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit]  Metaspace       used 115409K, capacity 119541K, committed 120248K, reserved 1153024K'
    },
    {
      index: {}
    },
    {
      gc: '[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit]   class space    used 14503K, capacity 15894K, committed 15948K, reserved 1048576K'
    },
    {
      index: {}
    },
    {
      gc: '[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit]  Metaspace       used 107719K, capacity 111775K, committed 112724K, reserved 1146880K'
    },
    {
      index: {}
    },
    {
      gc: '[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]  class space  used 266K, capacity 367K, committed 384K, reserved 1048576K'
    }
  ]
)
puts response
POST /my-index/_bulk?refresh
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit]   class space    used 15255K, capacity 16726K, committed 16844K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit]  Metaspace       used 115409K, capacity 119541K, committed 120248K, reserved 1153024K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit]   class space    used 14503K, capacity 15894K, committed 15948K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit]  Metaspace       used 107719K, capacity 111775K, committed 112724K, reserved 1146880K"}
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]  class space  used 266K, capacity 367K, committed 384K, reserved 1048576K"}

再次查看数据,你会发现一个时间戳、一些你并不感兴趣的其他数据,以及 usedcapacitycommitted 数据。

[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K

你可以为 gc 字段中数据的每个部分分配变量,然后只返回你想要的部分。任何在花括号 {} 中的内容都被视为变量。例如,变量 [%{@timestamp}][%{code}][%{desc}] 将匹配前三个数据块,它们都在方括号 [] 中。

[%{@timestamp}][%{code}][%{desc}]  %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}

你的 Dissect 模式可以包含 usedcapacitycommitted 术语,而不是使用变量,因为你想要精确地返回这些术语。你还可以为要返回的值分配变量,例如 %{usize}%{csize}%{comsize}。日志数据中的分隔符是逗号,因此你的 Dissect 模式也需要使用该分隔符。

现在你有了 Dissect 模式,可以将其包含在 Painless 脚本中,作为运行时字段的一部分。该脚本使用你的 Dissect 模式来拆分 gc 字段,然后按照 emit 方法定义的格式精确地返回你想要的信息。由于 Dissect 使用简单的语法,你只需告诉它你想要什么即可。

以下模式告诉 Dissect 返回 used 术语、一个空格、来自 gc.usize 的值以及一个逗号。此模式对要检索的其他数据重复。虽然此模式在生产环境中可能不太有用,但它提供了很大的灵活性,可以让你对数据进行实验和操作。在生产环境中,你可能只想使用 emit(gc.usize),然后对该值进行聚合或在计算中使用它。

emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize)

将所有内容整合在一起,你可以在搜索请求中创建一个名为 gc_size 的运行时字段。使用 fields 选项,你可以检索 gc_size 运行时字段的所有值。此查询还包含一个桶聚合,用于对数据进行分组。

GET my-index/_search
{
  "runtime_mappings": {
    "gc_size": {
      "type": "keyword",
      "script": """
        Map gc=dissect('[%{@timestamp}][%{code}][%{desc}]  %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}').extract(doc["gc.keyword"].value);
        if (gc != null) emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize);
      """
    }
  },
  "size": 1,
  "aggs": {
    "sizes": {
      "terms": {
        "field": "gc_size",
        "size": 10
      }
    }
  },
  "fields" : ["gc_size"]
}

响应包含来自 gc_size 字段的数据,格式与你在 Dissect 模式中定义的格式完全相同!

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "GXx3H3kBKGE42WRNlddJ",
        "_score" : 1.0,
        "_source" : {
          "gc" : "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K"
        },
        "fields" : {
          "gc_size" : [
            "used 266K, capacity 384K, committed 384K"
          ]
        }
      }
    ]
  },
  "aggregations" : {
    "sizes" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "used 107719K, capacity 111775K, committed 112724K",
          "doc_count" : 1
        },
        {
          "key" : "used 115409K, capacity 119541K, committed 120248K",
          "doc_count" : 1
        },
        {
          "key" : "used 14503K, capacity 15894K, committed 15948K",
          "doc_count" : 1
        },
        {
          "key" : "used 15255K, capacity 16726K, committed 16844K",
          "doc_count" : 1
        },
        {
          "key" : "used 266K, capacity 367K, committed 384K",
          "doc_count" : 1
        },
        {
          "key" : "used 266K, capacity 384K, committed 384K",
          "doc_count" : 1
        }
      ]
    }
  }
}