转换的 Painless 示例
编辑转换的 Painless 示例
编辑在 Elasticsearch Serverless 上不支持使用 scripted_metric
聚合的示例。
以下示例演示如何在转换中使用 Painless。您可以在 Painless 指南中了解有关 Painless 脚本语言的更多信息。
- 虽然以下示例的上下文是转换用例,但以下代码片段中的 Painless 脚本也可以用于其他 Elasticsearch 搜索聚合。
- 以下所有示例都使用脚本,当字段由脚本创建时,转换无法推断输出字段的映射。转换不会在目标索引中为这些字段创建任何映射,这意味着它们会被动态映射。如果您需要显式映射,请在启动转换之前创建目标索引。
使用脚本指标聚合获取热门点击
编辑此代码段显示如何查找最新的文档,换句话说,就是时间戳最新的文档。从技术角度来看,它有助于通过在转换中使用脚本指标聚合来实现 热门点击 的功能,从而提供指标输出。
此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric
聚合。
"aggregations": { "latest_doc": { "scripted_metric": { "init_script": "state.timestamp_latest = 0L; state.last_doc = ''", "map_script": """ def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_doc = new HashMap(params['_source']);} """, "combine_script": "return state", "reduce_script": """ def last_doc = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}} return last_doc """ } } }
|
|
|
|
|
|
|
有关各个脚本的详细说明,请查看脚本的作用域。
您可以采用类似的方式检索最后一个值
"aggregations": { "latest_value": { "scripted_metric": { "init_script": "state.timestamp_latest = 0L; state.last_value = ''", "map_script": """ def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_value = params['_source']['value'];} """, "combine_script": "return state", "reduce_script": """ def last_value = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value """ } } }
使用存储脚本获取热门点击
编辑您还可以使用 存储脚本 的强大功能来获取最新的值。存储脚本可以减少编译时间,加快搜索速度,并且可以更新。
-
创建存储脚本
POST _scripts/last-value-map-init { "script": { "lang": "painless", "source": """ state.timestamp_latest = 0L; state.last_value = '' """ } } POST _scripts/last-value-map { "script": { "lang": "painless", "source": """ def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_value = doc[params['key']].value;} """ } } POST _scripts/last-value-combine { "script": { "lang": "painless", "source": """ return state """ } } POST _scripts/last-value-reduce { "script": { "lang": "painless", "source": """ def last_value = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value """ } }
-
在脚本指标聚合中使用存储脚本。
使用聚合获取时间特征
编辑此代码段显示如何通过在转换中使用 Painless 来提取基于时间的特征。此代码段使用一个索引,其中 @timestamp
被定义为 date
类型字段。
"aggregations": { "avg_hour_of_day": { "avg":{ "script": { "source": """ ZonedDateTime date = doc['@timestamp'].value; return date.getHour(); """ } } }, "avg_month_of_year": { "avg":{ "script": { "source": """ ZonedDateTime date = doc['@timestamp'].value; return date.getMonthValue(); """ } } }, ... }
使用存储桶脚本获取持续时间
编辑此示例向您展示如何通过使用存储桶脚本,从数据日志中按客户端 IP 获取会话的持续时间。此示例使用 Kibana 示例 Web 日志数据集。
resp = client.transform.put_transform( transform_id="data_log", source={ "index": "kibana_sample_data_logs" }, dest={ "index": "data-logs-by-client" }, pivot={ "group_by": { "machine.os": { "terms": { "field": "machine.os.keyword" } }, "machine.ip": { "terms": { "field": "clientip" } } }, "aggregations": { "time_frame.lte": { "max": { "field": "timestamp" } }, "time_frame.gte": { "min": { "field": "timestamp" } }, "time_length": { "bucket_script": { "buckets_path": { "min": "time_frame.gte.value", "max": "time_frame.lte.value" }, "script": "params.max - params.min" } } } }, ) print(resp)
const response = await client.transform.putTransform({ transform_id: "data_log", source: { index: "kibana_sample_data_logs", }, dest: { index: "data-logs-by-client", }, pivot: { group_by: { "machine.os": { terms: { field: "machine.os.keyword", }, }, "machine.ip": { terms: { field: "clientip", }, }, }, aggregations: { "time_frame.lte": { max: { field: "timestamp", }, }, "time_frame.gte": { min: { field: "timestamp", }, }, time_length: { bucket_script: { buckets_path: { min: "time_frame.gte.value", max: "time_frame.lte.value", }, script: "params.max - params.min", }, }, }, }, }); console.log(response);
PUT _transform/data_log { "source": { "index": "kibana_sample_data_logs" }, "dest": { "index": "data-logs-by-client" }, "pivot": { "group_by": { "machine.os": {"terms": {"field": "machine.os.keyword"}}, "machine.ip": {"terms": {"field": "clientip"}} }, "aggregations": { "time_frame.lte": { "max": { "field": "timestamp" } }, "time_frame.gte": { "min": { "field": "timestamp" } }, "time_length": { "bucket_script": { "buckets_path": { "min": "time_frame.gte.value", "max": "time_frame.lte.value" }, "script": "params.max - params.min" } } } } }
使用脚本指标聚合计算 HTTP 响应
编辑您可以通过使用脚本指标聚合作为转换的一部分,来计算 Web 日志数据集中不同的 HTTP 响应类型。您可以使用筛选器聚合来实现类似的功能,请查看 查找可疑客户端 IP 示例了解详细信息。
以下示例假定 HTTP 响应代码以关键字的形式存储在文档的 response
字段中。
此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric
聚合。
"aggregations": { "responses.counts": { "scripted_metric": { "init_script": "state.responses = ['error':0L,'success':0L,'other':0L]", "map_script": """ def code = doc['response.keyword'].value; if (code.startsWith('5') || code.startsWith('4')) { state.responses.error += 1 ; } else if(code.startsWith('2')) { state.responses.success += 1; } else { state.responses.other += 1; } """, "combine_script": "state.responses", "reduce_script": """ def counts = ['error': 0L, 'success': 0L, 'other': 0L]; for (responses in states) { counts.error += responses['error']; counts.success += responses['success']; counts.other += responses['other']; } return counts; """ } }, ... }
转换的 |
|
|
|
此 |
|
|
|
|
|
|
|
|
使用脚本指标聚合比较索引
编辑此示例展示了如何通过使用脚本指标聚合的转换来比较两个索引的内容。
此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric
聚合。
resp = client.transform.preview_transform( id="index_compare", source={ "index": [ "index1", "index2" ], "query": { "match_all": {} } }, dest={ "index": "compare" }, pivot={ "group_by": { "unique-id": { "terms": { "field": "<unique-id-field>" } } }, "aggregations": { "compare": { "scripted_metric": { "map_script": "state.doc = new HashMap(params['_source'])", "combine_script": "return state", "reduce_script": " \n if (states.size() != 2) {\n return \"count_mismatch\"\n }\n if (states.get(0).equals(states.get(1))) {\n return \"match\"\n } else {\n return \"mismatch\"\n }\n " } } } }, ) print(resp)
const response = await client.transform.previewTransform({ id: "index_compare", source: { index: ["index1", "index2"], query: { match_all: {}, }, }, dest: { index: "compare", }, pivot: { group_by: { "unique-id": { terms: { field: "<unique-id-field>", }, }, }, aggregations: { compare: { scripted_metric: { map_script: "state.doc = new HashMap(params['_source'])", combine_script: "return state", reduce_script: ' \n if (states.size() != 2) {\n return "count_mismatch"\n }\n if (states.get(0).equals(states.get(1))) {\n return "match"\n } else {\n return "mismatch"\n }\n ', }, }, }, }, }); console.log(response);
POST _transform/_preview { "id" : "index_compare", "source" : { "index" : [ "index1", "index2" ], "query" : { "match_all" : { } } }, "dest" : { "index" : "compare" }, "pivot" : { "group_by" : { "unique-id" : { "terms" : { "field" : "<unique-id-field>" } } }, "aggregations" : { "compare" : { "scripted_metric" : { "map_script" : "state.doc = new HashMap(params['_source'])", "combine_script" : "return state", "reduce_script" : """ if (states.size() != 2) { return "count_mismatch" } if (states.get(0).equals(states.get(1))) { return "match" } else { return "mismatch" } """ } } } } }
|
|
|
|
|
|
|
|
|
|
|
|
|
使用脚本指标聚合获取 Web 会话详细信息
编辑此示例展示了如何从单个事务中派生多个特征。让我们看一下数据中的示例源文档
源文档
{ "_index":"apache-sessions", "_type":"_doc", "_id":"KvzSeGoB4bgw0KGbE3wP", "_score":1.0, "_source":{ "@timestamp":1484053499256, "apache":{ "access":{ "sessionid":"571604f2b2b0c7b346dc685eeb0e2306774a63c2", "url":"http://www.leroymerlin.fr/v3/search/search.do?keyword=Carrelage%20salle%20de%20bain", "path":"/v3/search/search.do", "query":"keyword=Carrelage%20salle%20de%20bain", "referrer":"http://www.leroymerlin.fr/v3/p/produits/carrelage-parquet-sol-souple/carrelage-sol-et-mur/decor-listel-et-accessoires-carrelage-mural-l1308217717?resultOffset=0&resultLimit=51&resultListShape=MOSAIC&priceStyle=SALEUNIT_PRICE", "user_agent":{ "original":"Mobile Safari 10.0 Mac OS X (iPad) Apple Inc.", "os_name":"Mac OS X (iPad)" }, "remote_ip":"0337b1fa-5ed4-af81-9ef4-0ec53be0f45d", "geoip":{ "country_iso_code":"FR", "location":{ "lat":48.86, "lon":2.35 } }, "response_code":200, "method":"GET" } } } } ...
通过使用 sessionid
作为分组依据字段,您可以枚举会话中的事件,并通过使用脚本指标聚合来获取有关会话的更多详细信息。
此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric
聚合。
POST _transform/_preview { "source": { "index": "apache-sessions" }, "pivot": { "group_by": { "sessionid": { "terms": { "field": "apache.access.sessionid" } } }, "aggregations": { "distinct_paths": { "cardinality": { "field": "apache.access.path" } }, "num_pages_viewed": { "value_count": { "field": "apache.access.url" } }, "session_details": { "scripted_metric": { "init_script": "state.docs = []", "map_script": """ Map span = [ '@timestamp':doc['@timestamp'].value, 'url':doc['apache.access.url'].value, 'referrer':doc['apache.access.referrer'].value ]; state.docs.add(span) """, "combine_script": "return state.docs;", "reduce_script": """ def all_docs = []; for (s in states) { for (span in s) { all_docs.add(span); } } all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].toEpochMilli().compareTo(o2['@timestamp'].toEpochMilli())); def size = all_docs.size(); def min_time = all_docs[0]['@timestamp']; def max_time = all_docs[size-1]['@timestamp']; def duration = max_time.toEpochMilli() - min_time.toEpochMilli(); def entry_page = all_docs[0]['url']; def exit_path = all_docs[size-1]['url']; def first_referrer = all_docs[0]['referrer']; def ret = new HashMap(); ret['first_time'] = min_time; ret['last_time'] = max_time; ret['duration'] = duration; ret['entry_page'] = entry_page; ret['exit_path'] = exit_path; ret['first_referrer'] = first_referrer; return ret; """ } } } } }
数据按 |
|
聚合计算会话期间的路径数并枚举已查看的页面。 |
|
|
|
|
|
|
|
|
API 调用会产生类似的响应
{ "num_pages_viewed" : 2.0, "session_details" : { "duration" : 100300001, "first_referrer" : "https://www.bing.com/", "entry_page" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463", "first_time" : "2017-01-10T21:22:52.982Z", "last_time" : "2017-01-10T21:25:04.356Z", "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463?__result-wrapper?pageTemplate=Famille%2FMat%C3%A9riaux+et+menuiserie&resultOffset=0&resultLimit=50&resultListShape=PLAIN&nomenclatureId=17942&priceStyle=SALEUNIT_PRICE&fcr=1&*4294718806=4294718806&*14072=14072&*4294718593=4294718593&*17942=17942" }, "distinct_paths" : 1.0, "sessionid" : "000046f8154a80fd89849369c984b8cc9d795814" }, { "num_pages_viewed" : 10.0, "session_details" : { "duration" : 343100405, "first_referrer" : "https://www.google.fr/", "entry_page" : "http://www.leroymerlin.fr/", "first_time" : "2017-01-10T16:57:39.937Z", "last_time" : "2017-01-10T17:03:23.049Z", "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/porte-de-douche-coulissante-adena-e168578" }, "distinct_paths" : 8.0, "sessionid" : "000087e825da1d87a332b8f15fa76116c7467da6" } ...