ES|QL 查询入门

编辑

本指南介绍如何使用 ES|QL 查询和聚合数据。

此入门指南还提供 交互式 Python 笔记本,位于 elasticsearch-labs GitHub 存储库中。

前提条件

编辑

要按照本指南中的查询进行操作,您可以设置自己的部署,或使用 Elastic 的公共 ES|QL 演示环境。

首先摄入一些示例数据。在 Kibana 中,打开主菜单并选择 Dev Tools。运行以下两个请求

resp = client.indices.create(
    index="sample_data",
    mappings={
        "properties": {
            "client_ip": {
                "type": "ip"
            },
            "message": {
                "type": "keyword"
            }
        }
    },
)
print(resp)

resp1 = client.bulk(
    index="sample_data",
    operations=[
        {
            "index": {}
        },
        {
            "@timestamp": "2023-10-23T12:15:03.360Z",
            "client_ip": "172.21.2.162",
            "message": "Connected to 10.1.0.3",
            "event_duration": 3450233
        },
        {
            "index": {}
        },
        {
            "@timestamp": "2023-10-23T12:27:28.948Z",
            "client_ip": "172.21.2.113",
            "message": "Connected to 10.1.0.2",
            "event_duration": 2764889
        },
        {
            "index": {}
        },
        {
            "@timestamp": "2023-10-23T13:33:34.937Z",
            "client_ip": "172.21.0.5",
            "message": "Disconnected",
            "event_duration": 1232382
        },
        {
            "index": {}
        },
        {
            "@timestamp": "2023-10-23T13:51:54.732Z",
            "client_ip": "172.21.3.15",
            "message": "Connection error",
            "event_duration": 725448
        },
        {
            "index": {}
        },
        {
            "@timestamp": "2023-10-23T13:52:55.015Z",
            "client_ip": "172.21.3.15",
            "message": "Connection error",
            "event_duration": 8268153
        },
        {
            "index": {}
        },
        {
            "@timestamp": "2023-10-23T13:53:55.832Z",
            "client_ip": "172.21.3.15",
            "message": "Connection error",
            "event_duration": 5033755
        },
        {
            "index": {}
        },
        {
            "@timestamp": "2023-10-23T13:55:01.543Z",
            "client_ip": "172.21.3.15",
            "message": "Connected to 10.1.0.1",
            "event_duration": 1756467
        }
    ],
)
print(resp1)
response = client.indices.create(
  index: 'sample_data',
  body: {
    mappings: {
      properties: {
        client_ip: {
          type: 'ip'
        },
        message: {
          type: 'keyword'
        }
      }
    }
  }
)
puts response

response = client.bulk(
  index: 'sample_data',
  body: [
    {
      index: {}
    },
    {
      "@timestamp": '2023-10-23T12:15:03.360Z',
      client_ip: '172.21.2.162',
      message: 'Connected to 10.1.0.3',
      event_duration: 3_450_233
    },
    {
      index: {}
    },
    {
      "@timestamp": '2023-10-23T12:27:28.948Z',
      client_ip: '172.21.2.113',
      message: 'Connected to 10.1.0.2',
      event_duration: 2_764_889
    },
    {
      index: {}
    },
    {
      "@timestamp": '2023-10-23T13:33:34.937Z',
      client_ip: '172.21.0.5',
      message: 'Disconnected',
      event_duration: 1_232_382
    },
    {
      index: {}
    },
    {
      "@timestamp": '2023-10-23T13:51:54.732Z',
      client_ip: '172.21.3.15',
      message: 'Connection error',
      event_duration: 725_448
    },
    {
      index: {}
    },
    {
      "@timestamp": '2023-10-23T13:52:55.015Z',
      client_ip: '172.21.3.15',
      message: 'Connection error',
      event_duration: 8_268_153
    },
    {
      index: {}
    },
    {
      "@timestamp": '2023-10-23T13:53:55.832Z',
      client_ip: '172.21.3.15',
      message: 'Connection error',
      event_duration: 5_033_755
    },
    {
      index: {}
    },
    {
      "@timestamp": '2023-10-23T13:55:01.543Z',
      client_ip: '172.21.3.15',
      message: 'Connected to 10.1.0.1',
      event_duration: 1_756_467
    }
  ]
)
puts response
const response = await client.indices.create({
  index: "sample_data",
  mappings: {
    properties: {
      client_ip: {
        type: "ip",
      },
      message: {
        type: "keyword",
      },
    },
  },
});
console.log(response);

const response1 = await client.bulk({
  index: "sample_data",
  operations: [
    {
      index: {},
    },
    {
      "@timestamp": "2023-10-23T12:15:03.360Z",
      client_ip: "172.21.2.162",
      message: "Connected to 10.1.0.3",
      event_duration: 3450233,
    },
    {
      index: {},
    },
    {
      "@timestamp": "2023-10-23T12:27:28.948Z",
      client_ip: "172.21.2.113",
      message: "Connected to 10.1.0.2",
      event_duration: 2764889,
    },
    {
      index: {},
    },
    {
      "@timestamp": "2023-10-23T13:33:34.937Z",
      client_ip: "172.21.0.5",
      message: "Disconnected",
      event_duration: 1232382,
    },
    {
      index: {},
    },
    {
      "@timestamp": "2023-10-23T13:51:54.732Z",
      client_ip: "172.21.3.15",
      message: "Connection error",
      event_duration: 725448,
    },
    {
      index: {},
    },
    {
      "@timestamp": "2023-10-23T13:52:55.015Z",
      client_ip: "172.21.3.15",
      message: "Connection error",
      event_duration: 8268153,
    },
    {
      index: {},
    },
    {
      "@timestamp": "2023-10-23T13:53:55.832Z",
      client_ip: "172.21.3.15",
      message: "Connection error",
      event_duration: 5033755,
    },
    {
      index: {},
    },
    {
      "@timestamp": "2023-10-23T13:55:01.543Z",
      client_ip: "172.21.3.15",
      message: "Connected to 10.1.0.1",
      event_duration: 1756467,
    },
  ],
});
console.log(response1);
PUT sample_data
{
  "mappings": {
    "properties": {
      "client_ip": {
        "type": "ip"
      },
      "message": {
        "type": "keyword"
      }
    }
  }
}

PUT sample_data/_bulk
{"index": {}}
{"@timestamp": "2023-10-23T12:15:03.360Z", "client_ip": "172.21.2.162", "message": "Connected to 10.1.0.3", "event_duration": 3450233}
{"index": {}}
{"@timestamp": "2023-10-23T12:27:28.948Z", "client_ip": "172.21.2.113", "message": "Connected to 10.1.0.2", "event_duration": 2764889}
{"index": {}}
{"@timestamp": "2023-10-23T13:33:34.937Z", "client_ip": "172.21.0.5", "message": "Disconnected", "event_duration": 1232382}
{"index": {}}
{"@timestamp": "2023-10-23T13:51:54.732Z", "client_ip": "172.21.3.15", "message": "Connection error", "event_duration": 725448}
{"index": {}}
{"@timestamp": "2023-10-23T13:52:55.015Z", "client_ip": "172.21.3.15", "message": "Connection error", "event_duration": 8268153}
{"index": {}}
{"@timestamp": "2023-10-23T13:53:55.832Z", "client_ip": "172.21.3.15", "message": "Connection error", "event_duration": 5033755}
{"index": {}}
{"@timestamp": "2023-10-23T13:55:01.543Z", "client_ip": "172.21.3.15", "message": "Connected to 10.1.0.1", "event_duration": 1756467}

运行 ES|QL 查询

编辑

在 Kibana 中,您可以使用控制台或 Discover 来运行 ES|QL 查询

要开始在控制台中使用 ES|QL,请打开主菜单并选择 Dev Tools

ES|QL 查询 API 请求的一般结构如下

POST /_query?format=txt
{
  "query": """

  """
}

在两组三引号之间输入实际的 ES|QL 查询。例如

POST /_query?format=txt
{
  "query": """
FROM sample_data
  """
}

您的第一个 ES|QL 查询

编辑

每个 ES|QL 查询都以源命令开头。源命令生成一个表,通常包含来自 Elasticsearch 的数据。

A source command producing a table from Elasticsearch

FROM 源命令返回一个表,其中包含来自数据流、索引或别名的文档。结果表中的每一行代表一个文档。此查询从 sample_data 索引返回最多 1000 个文档

FROM sample_data

每列对应一个字段,可以通过该字段的名称进行访问。

ES|QL 关键字不区分大小写。以下查询与前一个查询相同

from sample_data

处理命令

编辑

一个源命令后面可以跟一个或多个处理命令,用管道符分隔:|。处理命令通过添加、删除或更改行和列来更改输入表。处理命令可以执行过滤、投影、聚合等操作。

A processing command changing an input table

例如,您可以使用LIMIT命令来限制返回的行数,最多 10,000 行

FROM sample_data
| LIMIT 3

为了方便阅读,您可以将每个命令放在单独的一行上。但是,您不必这样做。以下查询与前一个查询相同

FROM sample_data | LIMIT 3

排序表

编辑
A processing command sorting an input table

另一个处理命令是SORT命令。默认情况下,FROM 返回的行没有定义的排序顺序。使用 SORT 命令根据一个或多个列对行进行排序

FROM sample_data
| SORT @timestamp DESC

查询数据

编辑

使用WHERE命令来查询数据。例如,要查找所有持续时间超过 5 毫秒的事件

FROM sample_data
| WHERE event_duration > 5000000

WHERE 支持几个运算符。例如,您可以使用LIKEmessage 列运行通配符查询

FROM sample_data
| WHERE message LIKE "Connected*"

更多处理命令

编辑

还有许多其他处理命令,例如 KEEPDROP 用于保留或删除列,ENRICH 用于使用 Elasticsearch 中索引的数据丰富表,以及 DISSECTGROK 用于处理数据。有关所有处理命令的概述,请参阅处理命令

链式处理命令

编辑

您可以链式处理命令,用管道符分隔:|。每个处理命令都对前一个命令的输出表进行操作。查询的结果是最终处理命令生成的表。

Processing commands can be chained

以下示例首先按 @timestamp 对表进行排序,然后将结果集限制为 3 行

FROM sample_data
| SORT @timestamp DESC
| LIMIT 3

处理命令的顺序很重要。在排序之前先将结果集限制为 3 行,很可能会返回与此示例不同的结果,在此示例中,排序在限制之前。

计算值

编辑

使用EVAL命令向表中附加具有计算值的新列。例如,以下查询附加一个 duration_ms 列。列中的值通过将 event_duration 除以 1,000,000 来计算。换句话说:event_duration 从纳秒转换为毫秒。

FROM sample_data
| EVAL duration_ms = event_duration/1000000.0

EVAL 支持多个函数。例如,要将数字四舍五入到具有指定位数的最接近数字,请使用ROUND函数

FROM sample_data
| EVAL duration_ms = ROUND(event_duration/1000000.0, 1)

计算统计信息

编辑

ES|QL 不仅可以用于查询数据,还可以用于聚合数据。使用STATS命令来计算统计信息。例如,中位数持续时间

FROM sample_data
| STATS median_duration = MEDIAN(event_duration)

您可以使用一个命令计算多个统计信息

FROM sample_data
| STATS median_duration = MEDIAN(event_duration), max_duration = MAX(event_duration)

使用 BY 按一个或多个列对计算的统计信息进行分组。例如,要计算每个客户端 IP 的中位数持续时间

FROM sample_data
| STATS median_duration = MEDIAN(event_duration) BY client_ip

访问列

编辑

您可以通过列的名称访问列。如果名称包含特殊字符,则需要用反引号 (`) 引起来。

EVALSTATS 创建的列分配显式名称是可选的。如果您不提供名称,则新列名称等于函数表达式。例如

FROM sample_data
| EVAL event_duration/1000000.0

在此查询中,EVAL 添加了一个名为 event_duration/1000000.0 的新列。由于其名称包含特殊字符,要访问此列,请用反引号将其引起来

FROM sample_data
| EVAL event_duration/1000000.0
| STATS MEDIAN(`event_duration/1000000.0`)

创建直方图

编辑

为了跟踪一段时间内的统计信息,ES|QL 使您能够使用 BUCKET 函数创建直方图。BUCKET 创建用户友好的存储桶大小,并为每一行返回一个值,该值对应于该行落入的存储桶。

BUCKETSTATS 结合使用以创建直方图。例如,要计算每小时的事件数

FROM sample_data
| STATS c = COUNT(*) BY bucket = BUCKET(@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")

或者每小时的中位数持续时间

FROM sample_data
| KEEP @timestamp, event_duration
| STATS median_duration = MEDIAN(event_duration) BY bucket = BUCKET(@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")

丰富数据

编辑

ES|QL 使您能够使用 ENRICH 命令使用 Elasticsearch 中索引的数据丰富表。

esql enrich

在您可以使用 ENRICH 之前,您首先需要创建执行丰富策略

以下请求创建并执行一个名为 clientip_policy 的策略。该策略将 IP 地址链接到环境(“开发”、“QA”或“生产”)

resp = client.indices.create(
    index="clientips",
    mappings={
        "properties": {
            "client_ip": {
                "type": "keyword"
            },
            "env": {
                "type": "keyword"
            }
        }
    },
)
print(resp)

resp1 = client.bulk(
    index="clientips",
    operations=[
        {
            "index": {}
        },
        {
            "client_ip": "172.21.0.5",
            "env": "Development"
        },
        {
            "index": {}
        },
        {
            "client_ip": "172.21.2.113",
            "env": "QA"
        },
        {
            "index": {}
        },
        {
            "client_ip": "172.21.2.162",
            "env": "QA"
        },
        {
            "index": {}
        },
        {
            "client_ip": "172.21.3.15",
            "env": "Production"
        },
        {
            "index": {}
        },
        {
            "client_ip": "172.21.3.16",
            "env": "Production"
        }
    ],
)
print(resp1)

resp2 = client.enrich.put_policy(
    name="clientip_policy",
    match={
        "indices": "clientips",
        "match_field": "client_ip",
        "enrich_fields": [
            "env"
        ]
    },
)
print(resp2)

resp3 = client.enrich.execute_policy(
    name="clientip_policy",
    wait_for_completion=False,
)
print(resp3)
response = client.indices.create(
  index: 'clientips',
  body: {
    mappings: {
      properties: {
        client_ip: {
          type: 'keyword'
        },
        env: {
          type: 'keyword'
        }
      }
    }
  }
)
puts response

response = client.bulk(
  index: 'clientips',
  body: [
    {
      index: {}
    },
    {
      client_ip: '172.21.0.5',
      env: 'Development'
    },
    {
      index: {}
    },
    {
      client_ip: '172.21.2.113',
      env: 'QA'
    },
    {
      index: {}
    },
    {
      client_ip: '172.21.2.162',
      env: 'QA'
    },
    {
      index: {}
    },
    {
      client_ip: '172.21.3.15',
      env: 'Production'
    },
    {
      index: {}
    },
    {
      client_ip: '172.21.3.16',
      env: 'Production'
    }
  ]
)
puts response

response = client.enrich.put_policy(
  name: 'clientip_policy',
  body: {
    match: {
      indices: 'clientips',
      match_field: 'client_ip',
      enrich_fields: [
        'env'
      ]
    }
  }
)
puts response

response = client.enrich.execute_policy(
  name: 'clientip_policy',
  wait_for_completion: false
)
puts response
const response = await client.indices.create({
  index: "clientips",
  mappings: {
    properties: {
      client_ip: {
        type: "keyword",
      },
      env: {
        type: "keyword",
      },
    },
  },
});
console.log(response);

const response1 = await client.bulk({
  index: "clientips",
  operations: [
    {
      index: {},
    },
    {
      client_ip: "172.21.0.5",
      env: "Development",
    },
    {
      index: {},
    },
    {
      client_ip: "172.21.2.113",
      env: "QA",
    },
    {
      index: {},
    },
    {
      client_ip: "172.21.2.162",
      env: "QA",
    },
    {
      index: {},
    },
    {
      client_ip: "172.21.3.15",
      env: "Production",
    },
    {
      index: {},
    },
    {
      client_ip: "172.21.3.16",
      env: "Production",
    },
  ],
});
console.log(response1);

const response2 = await client.enrich.putPolicy({
  name: "clientip_policy",
  match: {
    indices: "clientips",
    match_field: "client_ip",
    enrich_fields: ["env"],
  },
});
console.log(response2);

const response3 = await client.enrich.executePolicy({
  name: "clientip_policy",
  wait_for_completion: "false",
});
console.log(response3);
PUT clientips
{
  "mappings": {
    "properties": {
      "client_ip": {
        "type": "keyword"
      },
      "env": {
        "type": "keyword"
      }
    }
  }
}

PUT clientips/_bulk
{ "index" : {}}
{ "client_ip": "172.21.0.5", "env": "Development" }
{ "index" : {}}
{ "client_ip": "172.21.2.113", "env": "QA" }
{ "index" : {}}
{ "client_ip": "172.21.2.162", "env": "QA" }
{ "index" : {}}
{ "client_ip": "172.21.3.15", "env": "Production" }
{ "index" : {}}
{ "client_ip": "172.21.3.16", "env": "Production" }

PUT /_enrich/policy/clientip_policy
{
  "match": {
    "indices": "clientips",
    "match_field": "client_ip",
    "enrich_fields": ["env"]
  }
}

PUT /_enrich/policy/clientip_policy/_execute?wait_for_completion=false

在创建和执行策略后,您可以将其与 ENRICH 命令一起使用

FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env

您可以使用 ENRICH 命令添加的新 env 列在后续命令中。例如,要计算每个环境的中位数持续时间

FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env
| STATS median_duration = MEDIAN(event_duration) BY env

有关使用 ES|QL 进行数据丰富的更多信息,请参阅数据丰富

处理数据

编辑

您的数据可能包含您想要结构化的非结构化字符串,以便更容易分析数据。例如,示例数据包含如下日志消息

"Connected to 10.1.0.3"

通过从这些消息中提取 IP 地址,您可以确定哪个 IP 已接受最多的客户端连接。

要在查询时结构化非结构化字符串,您可以使用 ES|QL DISSECTGROK 命令。DISSECT 通过使用基于分隔符的模式来分解字符串。 GROK 的工作方式类似,但使用正则表达式。这使得 GROK 更强大,但通常也更慢。

在这种情况下,不需要使用正则表达式,因为 message 非常直接: “Connected to ”,后跟服务器 IP。 要匹配此字符串,可以使用以下 DISSECT 命令

FROM sample_data
| DISSECT message "Connected to %{server_ip}"

这将为那些 message 与此模式匹配的行添加一个 server_ip 列。 对于其他行,server_ip 的值是 null

您可以在后续命令中使用 DISSECT 命令添加的新 server_ip 列。 例如,要确定每个服务器已接受多少个连接

FROM sample_data
| WHERE STARTS_WITH(message, "Connected to")
| DISSECT message "Connected to %{server_ip}"
| STATS COUNT(*) BY server_ip

有关使用 ES|QL 进行数据处理的更多信息,请参阅 使用 DISSECT 和 GROK 进行数据处理

了解更多

编辑

要了解有关 ES|QL 的更多信息,请参阅 ES|QL 参考使用 ES|QL