Transform 的 Painless 示例
编辑Transform 的 Painless 示例
编辑使用 scripted_metric
聚合的示例在 Elasticsearch Serverless 上不受支持。
这些示例演示了如何在 Transform 中使用 Painless。您可以在 Painless 指南 中了解有关 Painless 脚本语言的更多信息。
- 虽然以下示例的上下文是 Transform 使用案例,但以下代码段中的 Painless 脚本也可以用于其他 Elasticsearch 搜索聚合。
- 以下所有示例都使用脚本,当字段由脚本创建时,Transform 无法推断输出字段的映射。Transform 不会为这些字段在目标索引中创建任何映射,这意味着它们将被动态映射。如果您希望使用显式映射,请在启动 Transform 之前创建目标索引。
使用脚本化指标聚合获取热门命中
编辑此代码段显示了如何查找最新的文档,换句话说,就是具有最新时间戳的文档。从技术角度来看,它有助于通过在 Transform 中使用脚本化指标聚合实现 热门命中 的功能,从而提供指标输出。
此示例使用 scripted_metric
聚合,该聚合在 Elasticsearch Serverless 上不受支持。
"aggregations": { "latest_doc": { "scripted_metric": { "init_script": "state.timestamp_latest = 0L; state.last_doc = ''", "map_script": """ def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_doc = new HashMap(params['_source']);} """, "combine_script": "return state", "reduce_script": """ def last_doc = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}} return last_doc """ } } }
|
|
|
|
|
|
|
有关各个脚本的详细说明,请查看 脚本范围。
您可以以类似的方式检索最后一个值
"aggregations": { "latest_value": { "scripted_metric": { "init_script": "state.timestamp_latest = 0L; state.last_value = ''", "map_script": """ def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_value = params['_source']['value'];} """, "combine_script": "return state", "reduce_script": """ def last_value = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value """ } } }
使用存储脚本获取热门命中
编辑您还可以利用 存储脚本 的强大功能来获取最新值。存储脚本减少了编译时间,使搜索更快,并且可更新。
-
创建存储脚本
POST _scripts/last-value-map-init { "script": { "lang": "painless", "source": """ state.timestamp_latest = 0L; state.last_value = '' """ } } POST _scripts/last-value-map { "script": { "lang": "painless", "source": """ def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_value = doc[params['key']].value;} """ } } POST _scripts/last-value-combine { "script": { "lang": "painless", "source": """ return state """ } } POST _scripts/last-value-reduce { "script": { "lang": "painless", "source": """ def last_value = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value """ } }
-
在脚本化指标聚合中使用存储脚本。
使用聚合获取时间特征
编辑此代码段显示了如何在 Transform 中使用 Painless 提取基于时间的功能。此代码段使用一个索引,其中 @timestamp
定义为 date
类型字段。
"aggregations": { "avg_hour_of_day": { "avg":{ "script": { "source": """ ZonedDateTime date = doc['@timestamp'].value; return date.getHour(); """ } } }, "avg_month_of_year": { "avg":{ "script": { "source": """ ZonedDateTime date = doc['@timestamp'].value; return date.getMonthValue(); """ } } }, ... }
使用桶脚本获取持续时间
编辑此示例向您展示了如何通过使用 桶脚本 从数据日志中获取客户端 IP 的会话持续时间。此示例使用 Kibana 示例 Web 日志数据集。
resp = client.transform.put_transform( transform_id="data_log", source={ "index": "kibana_sample_data_logs" }, dest={ "index": "data-logs-by-client" }, pivot={ "group_by": { "machine.os": { "terms": { "field": "machine.os.keyword" } }, "machine.ip": { "terms": { "field": "clientip" } } }, "aggregations": { "time_frame.lte": { "max": { "field": "timestamp" } }, "time_frame.gte": { "min": { "field": "timestamp" } }, "time_length": { "bucket_script": { "buckets_path": { "min": "time_frame.gte.value", "max": "time_frame.lte.value" }, "script": "params.max - params.min" } } } }, ) print(resp)
const response = await client.transform.putTransform({ transform_id: "data_log", source: { index: "kibana_sample_data_logs", }, dest: { index: "data-logs-by-client", }, pivot: { group_by: { "machine.os": { terms: { field: "machine.os.keyword", }, }, "machine.ip": { terms: { field: "clientip", }, }, }, aggregations: { "time_frame.lte": { max: { field: "timestamp", }, }, "time_frame.gte": { min: { field: "timestamp", }, }, time_length: { bucket_script: { buckets_path: { min: "time_frame.gte.value", max: "time_frame.lte.value", }, script: "params.max - params.min", }, }, }, }, }); console.log(response);
PUT _transform/data_log { "source": { "index": "kibana_sample_data_logs" }, "dest": { "index": "data-logs-by-client" }, "pivot": { "group_by": { "machine.os": {"terms": {"field": "machine.os.keyword"}}, "machine.ip": {"terms": {"field": "clientip"}} }, "aggregations": { "time_frame.lte": { "max": { "field": "timestamp" } }, "time_frame.gte": { "min": { "field": "timestamp" } }, "time_length": { "bucket_script": { "buckets_path": { "min": "time_frame.gte.value", "max": "time_frame.lte.value" }, "script": "params.max - params.min" } } } } }
使用脚本化指标聚合统计 HTTP 响应
编辑您可以通过将脚本化指标聚合用作 Transform 的一部分来统计 Web 日志数据集中的不同 HTTP 响应类型。您可以使用过滤器聚合实现类似的功能,请查看 查找可疑客户端 IP 示例以获取详细信息。
以下示例假设 HTTP 响应代码存储在文档的 response
字段中作为关键字。
此示例使用 scripted_metric
聚合,该聚合在 Elasticsearch Serverless 上不受支持。
"aggregations": { "responses.counts": { "scripted_metric": { "init_script": "state.responses = ['error':0L,'success':0L,'other':0L]", "map_script": """ def code = doc['response.keyword'].value; if (code.startsWith('5') || code.startsWith('4')) { state.responses.error += 1 ; } else if(code.startsWith('2')) { state.responses.success += 1; } else { state.responses.other += 1; } """, "combine_script": "state.responses", "reduce_script": """ def counts = ['error': 0L, 'success': 0L, 'other': 0L]; for (responses in states) { counts.error += responses['error']; counts.success += responses['success']; counts.other += responses['other']; } return counts; """ } }, ... }
包含所有聚合的 Transform 的 |
|
|
|
此 |
|
|
|
|
|
|
|
|
使用脚本化指标聚合比较索引
编辑此示例显示了如何通过使用脚本化指标聚合的 Transform 来比较两个索引的内容。
此示例使用 scripted_metric
聚合,该聚合在 Elasticsearch Serverless 上不受支持。
resp = client.transform.preview_transform( id="index_compare", source={ "index": [ "index1", "index2" ], "query": { "match_all": {} } }, dest={ "index": "compare" }, pivot={ "group_by": { "unique-id": { "terms": { "field": "<unique-id-field>" } } }, "aggregations": { "compare": { "scripted_metric": { "map_script": "state.doc = new HashMap(params['_source'])", "combine_script": "return state", "reduce_script": " \n if (states.size() != 2) {\n return \"count_mismatch\"\n }\n if (states.get(0).equals(states.get(1))) {\n return \"match\"\n } else {\n return \"mismatch\"\n }\n " } } } }, ) print(resp)
const response = await client.transform.previewTransform({ id: "index_compare", source: { index: ["index1", "index2"], query: { match_all: {}, }, }, dest: { index: "compare", }, pivot: { group_by: { "unique-id": { terms: { field: "<unique-id-field>", }, }, }, aggregations: { compare: { scripted_metric: { map_script: "state.doc = new HashMap(params['_source'])", combine_script: "return state", reduce_script: ' \n if (states.size() != 2) {\n return "count_mismatch"\n }\n if (states.get(0).equals(states.get(1))) {\n return "match"\n } else {\n return "mismatch"\n }\n ', }, }, }, }, }); console.log(response);
POST _transform/_preview { "id" : "index_compare", "source" : { "index" : [ "index1", "index2" ], "query" : { "match_all" : { } } }, "dest" : { "index" : "compare" }, "pivot" : { "group_by" : { "unique-id" : { "terms" : { "field" : "<unique-id-field>" } } }, "aggregations" : { "compare" : { "scripted_metric" : { "map_script" : "state.doc = new HashMap(params['_source'])", "combine_script" : "return state", "reduce_script" : """ if (states.size() != 2) { return "count_mismatch" } if (states.get(0).equals(states.get(1))) { return "match" } else { return "mismatch" } """ } } } } }
在 |
|
|
|
|
|
|
|
|
|
|
|
|
使用脚本化指标聚合获取 Web 会话详细信息
编辑此示例显示了如何从单个事务中派生多个功能。让我们看一下数据中示例源文档
源文档
{ "_index":"apache-sessions", "_type":"_doc", "_id":"KvzSeGoB4bgw0KGbE3wP", "_score":1.0, "_source":{ "@timestamp":1484053499256, "apache":{ "access":{ "sessionid":"571604f2b2b0c7b346dc685eeb0e2306774a63c2", "url":"http://www.leroymerlin.fr/v3/search/search.do?keyword=Carrelage%20salle%20de%20bain", "path":"/v3/search/search.do", "query":"keyword=Carrelage%20salle%20de%20bain", "referrer":"http://www.leroymerlin.fr/v3/p/produits/carrelage-parquet-sol-souple/carrelage-sol-et-mur/decor-listel-et-accessoires-carrelage-mural-l1308217717?resultOffset=0&resultLimit=51&resultListShape=MOSAIC&priceStyle=SALEUNIT_PRICE", "user_agent":{ "original":"Mobile Safari 10.0 Mac OS X (iPad) Apple Inc.", "os_name":"Mac OS X (iPad)" }, "remote_ip":"0337b1fa-5ed4-af81-9ef4-0ec53be0f45d", "geoip":{ "country_iso_code":"FR", "location":{ "lat":48.86, "lon":2.35 } }, "response_code":200, "method":"GET" } } } } ...
通过使用 sessionid
作为 group-by 字段,您可以枚举会话中的事件,并通过使用脚本化指标聚合获取有关会话的更多详细信息。
此示例使用 scripted_metric
聚合,该聚合在 Elasticsearch Serverless 上不受支持。
POST _transform/_preview { "source": { "index": "apache-sessions" }, "pivot": { "group_by": { "sessionid": { "terms": { "field": "apache.access.sessionid" } } }, "aggregations": { "distinct_paths": { "cardinality": { "field": "apache.access.path" } }, "num_pages_viewed": { "value_count": { "field": "apache.access.url" } }, "session_details": { "scripted_metric": { "init_script": "state.docs = []", "map_script": """ Map span = [ '@timestamp':doc['@timestamp'].value, 'url':doc['apache.access.url'].value, 'referrer':doc['apache.access.referrer'].value ]; state.docs.add(span) """, "combine_script": "return state.docs;", "reduce_script": """ def all_docs = []; for (s in states) { for (span in s) { all_docs.add(span); } } all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].toEpochMilli().compareTo(o2['@timestamp'].toEpochMilli())); def size = all_docs.size(); def min_time = all_docs[0]['@timestamp']; def max_time = all_docs[size-1]['@timestamp']; def duration = max_time.toEpochMilli() - min_time.toEpochMilli(); def entry_page = all_docs[0]['url']; def exit_path = all_docs[size-1]['url']; def first_referrer = all_docs[0]['referrer']; def ret = new HashMap(); ret['first_time'] = min_time; ret['last_time'] = max_time; ret['duration'] = duration; ret['entry_page'] = entry_page; ret['exit_path'] = exit_path; ret['first_referrer'] = first_referrer; return ret; """ } } } } }
数据按 |
|
聚合统计路径的数量并枚举会话期间查看的页面。 |
|
|
|
|
|
|
|
|
API 调用会产生类似的响应
{ "num_pages_viewed" : 2.0, "session_details" : { "duration" : 100300001, "first_referrer" : "https://www.bing.com/", "entry_page" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463", "first_time" : "2017-01-10T21:22:52.982Z", "last_time" : "2017-01-10T21:25:04.356Z", "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463?__result-wrapper?pageTemplate=Famille%2FMat%C3%A9riaux+et+menuiserie&resultOffset=0&resultLimit=50&resultListShape=PLAIN&nomenclatureId=17942&priceStyle=SALEUNIT_PRICE&fcr=1&*4294718806=4294718806&*14072=14072&*4294718593=4294718593&*17942=17942" }, "distinct_paths" : 1.0, "sessionid" : "000046f8154a80fd89849369c984b8cc9d795814" }, { "num_pages_viewed" : 10.0, "session_details" : { "duration" : 343100405, "first_referrer" : "https://www.google.fr/", "entry_page" : "http://www.leroymerlin.fr/", "first_time" : "2017-01-10T16:57:39.937Z", "last_time" : "2017-01-10T17:03:23.049Z", "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/porte-de-douche-coulissante-adena-e168578" }, "distinct_paths" : 8.0, "sessionid" : "000087e825da1d87a332b8f15fa76116c7467da6" } ...