使用运行时字段探索您的数据

编辑

使用运行时字段探索您的数据

编辑

假设您有一组大型日志数据,您想从中提取字段。为数据建立索引既耗时又占用大量磁盘空间,而您只想在不预先确定模式的情况下探索数据结构。

您知道您的日志数据包含您想要提取的特定字段。在本例中,我们想关注 @timestampmessage 字段。通过使用运行时字段,您可以定义脚本来在搜索时计算这些字段的值。

将索引字段定义为起点

编辑

您可以从一个简单的例子开始,将 @timestampmessage 字段作为索引字段添加到 my-index-000001 的映射中。为了保持灵活性,请使用 wildcard 作为 message 的字段类型

resp = client.indices.create(
    index="my-index-000001",
    mappings={
        "properties": {
            "@timestamp": {
                "format": "strict_date_optional_time||epoch_second",
                "type": "date"
            },
            "message": {
                "type": "wildcard"
            }
        }
    },
)
print(resp)
response = client.indices.create(
  index: 'my-index-000001',
  body: {
    mappings: {
      properties: {
        "@timestamp": {
          format: 'strict_date_optional_time||epoch_second',
          type: 'date'
        },
        message: {
          type: 'wildcard'
        }
      }
    }
  }
)
puts response
const response = await client.indices.create({
  index: "my-index-000001",
  mappings: {
    properties: {
      "@timestamp": {
        format: "strict_date_optional_time||epoch_second",
        type: "date",
      },
      message: {
        type: "wildcard",
      },
    },
  },
});
console.log(response);
PUT /my-index-000001/
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "format": "strict_date_optional_time||epoch_second",
        "type": "date"
      },
      "message": {
        "type": "wildcard"
      }
    }
  }
}

摄取一些数据

编辑

在映射您想要检索的字段后,将日志数据中的一些记录索引到 Elasticsearch 中。以下请求使用 批量 API 将原始日志数据索引到 my-index-000001 中。您可以不索引所有日志数据,而是使用一个小样本来试用运行时字段。

最终文档不是有效的 Apache 日志格式,但我们可以在脚本中考虑这种情况。

resp = client.bulk(
    index="my-index-000001",
    refresh=True,
    operations=[
        {
            "index": {}
        },
        {
            "timestamp": "2020-04-30T14:30:17-05:00",
            "message": "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        {
            "index": {}
        },
        {
            "timestamp": "2020-04-30T14:30:53-05:00",
            "message": "232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        {
            "index": {}
        },
        {
            "timestamp": "2020-04-30T14:31:12-05:00",
            "message": "26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        {
            "index": {}
        },
        {
            "timestamp": "2020-04-30T14:31:19-05:00",
            "message": "247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"
        },
        {
            "index": {}
        },
        {
            "timestamp": "2020-04-30T14:31:22-05:00",
            "message": "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
        },
        {
            "index": {}
        },
        {
            "timestamp": "2020-04-30T14:31:27-05:00",
            "message": "252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        {
            "index": {}
        },
        {
            "timestamp": "2020-04-30T14:31:28-05:00",
            "message": "not a valid apache log"
        }
    ],
)
print(resp)
response = client.bulk(
  index: 'my-index-000001',
  refresh: true,
  body: [
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:30:17-05:00',
      message: '40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:30:53-05:00',
      message: '232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:12-05:00',
      message: '26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:19-05:00',
      message: '247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] "GET /french/splash_inet.html HTTP/1.0" 200 3781'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:22-05:00',
      message: '247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:27-05:00',
      message: '252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736'
    },
    {
      index: {}
    },
    {
      timestamp: '2020-04-30T14:31:28-05:00',
      message: 'not a valid apache log'
    }
  ]
)
puts response
const response = await client.bulk({
  index: "my-index-000001",
  refresh: "true",
  operations: [
    {
      index: {},
    },
    {
      timestamp: "2020-04-30T14:30:17-05:00",
      message:
        '40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736',
    },
    {
      index: {},
    },
    {
      timestamp: "2020-04-30T14:30:53-05:00",
      message:
        '232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736',
    },
    {
      index: {},
    },
    {
      timestamp: "2020-04-30T14:31:12-05:00",
      message:
        '26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736',
    },
    {
      index: {},
    },
    {
      timestamp: "2020-04-30T14:31:19-05:00",
      message:
        '247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] "GET /french/splash_inet.html HTTP/1.0" 200 3781',
    },
    {
      index: {},
    },
    {
      timestamp: "2020-04-30T14:31:22-05:00",
      message:
        '247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0',
    },
    {
      index: {},
    },
    {
      timestamp: "2020-04-30T14:31:27-05:00",
      message:
        '252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] "GET /images/hm_bg.jpg HTTP/1.0" 200 24736',
    },
    {
      index: {},
    },
    {
      timestamp: "2020-04-30T14:31:28-05:00",
      message: "not a valid apache log",
    },
  ],
});
console.log(response);
POST /my-index-000001/_bulk?refresh
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}

此时,您可以查看 Elasticsearch 如何存储您的原始数据。

resp = client.indices.get(
    index="my-index-000001",
)
print(resp)
response = client.indices.get(
  index: 'my-index-000001'
)
puts response
const response = await client.indices.get({
  index: "my-index-000001",
});
console.log(response);
GET /my-index-000001

映射包含两个字段:@timestampmessage

{
  "my-index-000001" : {
    "aliases" : { },
    "mappings" : {
      "properties" : {
        "@timestamp" : {
          "type" : "date",
          "format" : "strict_date_optional_time||epoch_second"
        },
        "message" : {
          "type" : "wildcard"
        },
        "timestamp" : {
          "type" : "date"
        }
      }
    },
    ...
  }
}

使用 grok 模式定义运行时字段

编辑

如果您想检索包含 clientip 的结果,您可以在映射中添加该字段作为运行时字段。以下运行时脚本定义了一个 grok 模式,该模式从文档中的单个文本字段中提取结构化字段。grok 模式类似于正则表达式,它支持可以重复使用的别名表达式。

该脚本匹配 %{COMMONAPACHELOG} 日志模式,该模式理解 Apache 日志的结构。如果模式匹配 (clientip != null),则脚本会发出匹配的 IP 地址的值。如果模式不匹配,则脚本仅返回字段值而不会崩溃。

resp = client.indices.put_mapping(
    index="my-index-000001",
    runtime={
        "http.client_ip": {
            "type": "ip",
            "script": "\n        String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n        if (clientip != null) emit(clientip); \n      "
        }
    },
)
print(resp)
const response = await client.indices.putMapping({
  index: "my-index-000001",
  runtime: {
    "http.client_ip": {
      type: "ip",
      script:
        "\n        String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n        if (clientip != null) emit(clientip); \n      ",
    },
  },
});
console.log(response);
PUT my-index-000001/_mappings
{
  "runtime": {
    "http.client_ip": {
      "type": "ip",
      "script": """
        String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
        if (clientip != null) emit(clientip); 
      """
    }
  }
}

此条件确保即使消息的模式不匹配,脚本也不会崩溃。

或者,您可以在搜索请求的上下文中定义相同的运行时字段。运行时定义和脚本与之前在索引映射中定义的完全相同。只需将该定义复制到搜索请求的 runtime_mappings 部分,并包含一个与运行时字段匹配的查询。此查询返回的结果与您在索引映射中为 http.clientip 运行时字段定义搜索查询的结果相同,但仅限于此特定搜索的上下文

resp = client.search(
    index="my-index-000001",
    runtime_mappings={
        "http.clientip": {
            "type": "ip",
            "script": "\n        String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n        if (clientip != null) emit(clientip);\n      "
        }
    },
    query={
        "match": {
            "http.clientip": "40.135.0.0"
        }
    },
    fields=[
        "http.clientip"
    ],
)
print(resp)
const response = await client.search({
  index: "my-index-000001",
  runtime_mappings: {
    "http.clientip": {
      type: "ip",
      script:
        "\n        String clientip=grok('%{COMMONAPACHELOG}').extract(doc[\"message\"].value)?.clientip;\n        if (clientip != null) emit(clientip);\n      ",
    },
  },
  query: {
    match: {
      "http.clientip": "40.135.0.0",
    },
  },
  fields: ["http.clientip"],
});
console.log(response);
GET my-index-000001/_search
{
  "runtime_mappings": {
    "http.clientip": {
      "type": "ip",
      "script": """
        String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
        if (clientip != null) emit(clientip);
      """
    }
  },
  "query": {
    "match": {
      "http.clientip": "40.135.0.0"
    }
  },
  "fields" : ["http.clientip"]
}

定义复合运行时字段

编辑

您还可以定义一个复合运行时字段,以从单个脚本发出多个字段。您可以定义一组类型化的子字段并发出一个值映射。在搜索时,每个子字段都会检索映射中与其名称关联的值。这意味着您只需要指定一次 grok 模式,就可以返回多个值

resp = client.indices.put_mapping(
    index="my-index-000001",
    runtime={
        "http": {
            "type": "composite",
            "script": "emit(grok(\"%{COMMONAPACHELOG}\").extract(doc[\"message\"].value))",
            "fields": {
                "clientip": {
                    "type": "ip"
                },
                "verb": {
                    "type": "keyword"
                },
                "response": {
                    "type": "long"
                }
            }
        }
    },
)
print(resp)
response = client.indices.put_mapping(
  index: 'my-index-000001',
  body: {
    runtime: {
      http: {
        type: 'composite',
        script: 'emit(grok("%<COMMONAPACHELOG>s").extract(doc["message"].value))',
        fields: {
          clientip: {
            type: 'ip'
          },
          verb: {
            type: 'keyword'
          },
          response: {
            type: 'long'
          }
        }
      }
    }
  }
)
puts response
const response = await client.indices.putMapping({
  index: "my-index-000001",
  runtime: {
    http: {
      type: "composite",
      script: 'emit(grok("%{COMMONAPACHELOG}").extract(doc["message"].value))',
      fields: {
        clientip: {
          type: "ip",
        },
        verb: {
          type: "keyword",
        },
        response: {
          type: "long",
        },
      },
    },
  },
});
console.log(response);
PUT my-index-000001/_mappings
{
  "runtime": {
    "http": {
      "type": "composite",
      "script": "emit(grok(\"%{COMMONAPACHELOG}\").extract(doc[\"message\"].value))",
      "fields": {
        "clientip": {
          "type": "ip"
        },
        "verb": {
          "type": "keyword"
        },
        "response": {
          "type": "long"
        }
      }
    }
  }
}

搜索特定的 IP 地址

编辑

使用 http.clientip 运行时字段,您可以定义一个简单的查询来搜索特定的 IP 地址并返回所有相关字段。

resp = client.search(
    index="my-index-000001",
    query={
        "match": {
            "http.clientip": "40.135.0.0"
        }
    },
    fields=[
        "*"
    ],
)
print(resp)
const response = await client.search({
  index: "my-index-000001",
  query: {
    match: {
      "http.clientip": "40.135.0.0",
    },
  },
  fields: ["*"],
});
console.log(response);
GET my-index-000001/_search
{
  "query": {
    "match": {
      "http.clientip": "40.135.0.0"
    }
  },
  "fields" : ["*"]
}

API 返回以下结果。由于 http 是一个 composite 运行时字段,因此响应在 fields 下包含每个子字段,包括任何与查询匹配的关联值。无需预先构建数据结构,您就可以通过有意义的方式搜索和探索您的数据,以试验并确定要索引的字段。

{
  ...
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index-000001",
        "_id" : "sRVHBnwBB-qjgFni7h_O",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:30:17-05:00",
          "message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        "fields" : {
          "http.verb" : [
            "GET"
          ],
          "http.clientip" : [
            "40.135.0.0"
          ],
          "http.response" : [
            200
          ],
          "message" : [
            "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
          ],
          "http.client_ip" : [
            "40.135.0.0"
          ],
          "timestamp" : [
            "2020-04-30T19:30:17.000Z"
          ]
        }
      }
    ]
  }
}

另外,还记得脚本中的 if 语句吗?

if (clientip != null) emit(clientip);

如果脚本不包含此条件,则查询将在任何与模式不匹配的分片上失败。通过包含此条件,查询会跳过与 grok 模式不匹配的数据。

搜索特定范围内的文档

编辑

您还可以运行一个 范围查询,该查询在 timestamp 字段上运行。以下查询返回 timestamp 大于或等于 2020-04-30T14:31:27-05:00 的所有文档

resp = client.search(
    index="my-index-000001",
    query={
        "range": {
            "timestamp": {
                "gte": "2020-04-30T14:31:27-05:00"
            }
        }
    },
)
print(resp)
const response = await client.search({
  index: "my-index-000001",
  query: {
    range: {
      timestamp: {
        gte: "2020-04-30T14:31:27-05:00",
      },
    },
  },
});
console.log(response);
GET my-index-000001/_search
{
  "query": {
    "range": {
      "timestamp": {
        "gte": "2020-04-30T14:31:27-05:00"
      }
    }
  }
}

响应包括日志格式不匹配但时间戳在定义范围内的文档。

{
  ...
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index-000001",
        "_id" : "hdEhyncBRSB6iD-PoBqe",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:31:27-05:00",
          "message" : "252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        }
      },
      {
        "_index" : "my-index-000001",
        "_id" : "htEhyncBRSB6iD-PoBqe",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:31:28-05:00",
          "message" : "not a valid apache log"
        }
      }
    ]
  }
}

使用 dissect 模式定义运行时字段

编辑

如果您不需要正则表达式的功能,可以使用 dissect 模式 而不是 grok 模式。Dissect 模式匹配固定分隔符,但通常比 grok 更快。

您可以使用 dissect 来实现与使用 grok 模式 解析 Apache 日志相同的结果。您不必匹配日志模式,而是包含您想要丢弃的字符串部分。特别注意您想要丢弃的字符串部分将有助于构建成功的 dissect 模式。

resp = client.indices.put_mapping(
    index="my-index-000001",
    runtime={
        "http.client.ip": {
            "type": "ip",
            "script": "\n        String clientip=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}').extract(doc[\"message\"].value)?.clientip;\n        if (clientip != null) emit(clientip);\n      "
        }
    },
)
print(resp)
const response = await client.indices.putMapping({
  index: "my-index-000001",
  runtime: {
    "http.client.ip": {
      type: "ip",
      script:
        '\n        String clientip=dissect(\'%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{status} %{size}\').extract(doc["message"].value)?.clientip;\n        if (clientip != null) emit(clientip);\n      ',
    },
  },
});
console.log(response);
PUT my-index-000001/_mappings
{
  "runtime": {
    "http.client.ip": {
      "type": "ip",
      "script": """
        String clientip=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{status} %{size}').extract(doc["message"].value)?.clientip;
        if (clientip != null) emit(clientip);
      """
    }
  }
}

类似地,您可以定义一个 dissect 模式来提取 HTTP 响应代码

resp = client.indices.put_mapping(
    index="my-index-000001",
    runtime={
        "http.responses": {
            "type": "long",
            "script": "\n        String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{response} %{size}').extract(doc[\"message\"].value)?.response;\n        if (response != null) emit(Integer.parseInt(response));\n      "
        }
    },
)
print(resp)
const response = await client.indices.putMapping({
  index: "my-index-000001",
  runtime: {
    "http.responses": {
      type: "long",
      script:
        '\n        String response=dissect(\'%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}\').extract(doc["message"].value)?.response;\n        if (response != null) emit(Integer.parseInt(response));\n      ',
    },
  },
});
console.log(response);
PUT my-index-000001/_mappings
{
  "runtime": {
    "http.responses": {
      "type": "long",
      "script": """
        String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
        if (response != null) emit(Integer.parseInt(response));
      """
    }
  }
}

然后,您可以运行查询以使用 http.responses 运行时字段检索特定的 HTTP 响应。使用 _search 请求的 fields 参数来指示您想要检索的字段

resp = client.search(
    index="my-index-000001",
    query={
        "match": {
            "http.responses": "304"
        }
    },
    fields=[
        "http.client_ip",
        "timestamp",
        "http.verb"
    ],
)
print(resp)
response = client.search(
  index: 'my-index-000001',
  body: {
    query: {
      match: {
        'http.responses' => '304'
      }
    },
    fields: [
      'http.client_ip',
      'timestamp',
      'http.verb'
    ]
  }
)
puts response
const response = await client.search({
  index: "my-index-000001",
  query: {
    match: {
      "http.responses": "304",
    },
  },
  fields: ["http.client_ip", "timestamp", "http.verb"],
});
console.log(response);
GET my-index-000001/_search
{
  "query": {
    "match": {
      "http.responses": "304"
    }
  },
  "fields" : ["http.client_ip","timestamp","http.verb"]
}

响应包含一个 HTTP 响应为 304 的文档

{
  ...
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index-000001",
        "_id" : "A2qDy3cBWRMvVAuI7F8M",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:31:22-05:00",
          "message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
        },
        "fields" : {
          "http.verb" : [
            "GET"
          ],
          "http.client_ip" : [
            "247.37.0.0"
          ],
          "timestamp" : [
            "2020-04-30T19:31:22.000Z"
          ]
        }
      }
    ]
  }
}