转换的 Painless 示例

编辑

在 Elasticsearch Serverless 上不支持使用 scripted_metric 聚合的示例。

以下示例演示如何在转换中使用 Painless。您可以在 Painless 指南中了解有关 Painless 脚本语言的更多信息。

  • 虽然以下示例的上下文是转换用例,但以下代码片段中的 Painless 脚本也可以用于其他 Elasticsearch 搜索聚合。
  • 以下所有示例都使用脚本,当字段由脚本创建时,转换无法推断输出字段的映射。转换不会在目标索引中为这些字段创建任何映射,这意味着它们会被动态映射。如果您需要显式映射,请在启动转换之前创建目标索引。

使用脚本指标聚合获取热门点击

编辑

此代码段显示如何查找最新的文档,换句话说,就是时间戳最新的文档。从技术角度来看,它有助于通过在转换中使用脚本指标聚合来实现 热门点击 的功能,从而提供指标输出。

此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric 聚合。

"aggregations": {
  "latest_doc": {
    "scripted_metric": {
      "init_script": "state.timestamp_latest = 0L; state.last_doc = ''", 
      "map_script": """ 
        def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_doc = new HashMap(params['_source']);}
      """,
      "combine_script": "return state", 
      "reduce_script": """ 
        def last_doc = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
        return last_doc
      """
    }
  }
}

init_scriptstate 对象中创建一个 long 类型的 timestamp_latest 和一个 string 类型的 last_doc

map_script 基于文档的时间戳定义 current_date,然后将 current_datestate.timestamp_latest 进行比较,最后从分片返回 state.last_doc。通过使用 new HashMap(...),您可以复制源文档,这在您想将完整的源对象从一个阶段传递到下一个阶段时非常重要。

combine_script 返回每个分片的 state

reduce_script 遍历每个分片返回的 s.timestamp_latest 的值,并返回时间戳最新的文档 (last_doc)。在响应中,热门点击(换句话说,latest_doc)嵌套在 latest_doc 字段下。

有关各个脚本的详细说明,请查看脚本的作用域

您可以采用类似的方式检索最后一个值

"aggregations": {
  "latest_value": {
    "scripted_metric": {
      "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
      "map_script": """
        def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_value = params['_source']['value'];}
      """,
      "combine_script": "return state",
      "reduce_script": """
        def last_value = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
        return last_value
      """
    }
  }
}
使用存储脚本获取热门点击
编辑

您还可以使用 存储脚本 的强大功能来获取最新的值。存储脚本可以减少编译时间,加快搜索速度,并且可以更新。

  1. 创建存储脚本

    POST _scripts/last-value-map-init
    {
      "script": {
        "lang": "painless",
        "source": """
            state.timestamp_latest = 0L; state.last_value = ''
        """
      }
    }
    
    POST _scripts/last-value-map
    {
      "script": {
        "lang": "painless",
        "source": """
          def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
            if (current_date > state.timestamp_latest)
            {state.timestamp_latest = current_date;
            state.last_value = doc[params['key']].value;}
        """
      }
    }
    
    POST _scripts/last-value-combine
    {
      "script": {
        "lang": "painless",
        "source": """
            return state
        """
      }
    }
    
    POST _scripts/last-value-reduce
    {
      "script": {
        "lang": "painless",
        "source": """
            def last_value = '';
            def timestamp_latest = 0L;
            for (s in states) {if (s.timestamp_latest > (timestamp_latest))
            {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
            return last_value
        """
      }
    }
  2. 在脚本指标聚合中使用存储脚本。

    "aggregations":{
       "latest_value":{
          "scripted_metric":{
             "init_script":{
                "id":"last-value-map-init"
             },
             "map_script":{
                "id":"last-value-map",
                "params":{
                   "key":"field_with_last_value" 
                }
             },
             "combine_script":{
                "id":"last-value-combine"
             },
             "reduce_script":{
                "id":"last-value-reduce"
             }

    参数 field_with_last_value 可以设置为您想要获取最新值的任何字段。

使用聚合获取时间特征

编辑

此代码段显示如何通过在转换中使用 Painless 来提取基于时间的特征。此代码段使用一个索引,其中 @timestamp 被定义为 date 类型字段。

"aggregations": {
  "avg_hour_of_day": { 
    "avg":{
      "script": { 
        "source": """
          ZonedDateTime date =  doc['@timestamp'].value; 
          return date.getHour(); 
        """
      }
    }
  },
  "avg_month_of_year": { 
    "avg":{
      "script": { 
        "source": """
          ZonedDateTime date =  doc['@timestamp'].value; 
          return date.getMonthValue(); 
        """
      }
    }
  },
 ...
}

聚合的名称。

包含返回一天中小时数的 Painless 脚本。

基于文档的时间戳设置 date

date 返回小时值。

聚合的名称。

包含返回一年中月份的 Painless 脚本。

基于文档的时间戳设置 date

date 返回月份值。

使用存储桶脚本获取持续时间

编辑

此示例向您展示如何通过使用存储桶脚本,从数据日志中按客户端 IP 获取会话的持续时间。此示例使用 Kibana 示例 Web 日志数据集。

resp = client.transform.put_transform(
    transform_id="data_log",
    source={
        "index": "kibana_sample_data_logs"
    },
    dest={
        "index": "data-logs-by-client"
    },
    pivot={
        "group_by": {
            "machine.os": {
                "terms": {
                    "field": "machine.os.keyword"
                }
            },
            "machine.ip": {
                "terms": {
                    "field": "clientip"
                }
            }
        },
        "aggregations": {
            "time_frame.lte": {
                "max": {
                    "field": "timestamp"
                }
            },
            "time_frame.gte": {
                "min": {
                    "field": "timestamp"
                }
            },
            "time_length": {
                "bucket_script": {
                    "buckets_path": {
                        "min": "time_frame.gte.value",
                        "max": "time_frame.lte.value"
                    },
                    "script": "params.max - params.min"
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "data_log",
  source: {
    index: "kibana_sample_data_logs",
  },
  dest: {
    index: "data-logs-by-client",
  },
  pivot: {
    group_by: {
      "machine.os": {
        terms: {
          field: "machine.os.keyword",
        },
      },
      "machine.ip": {
        terms: {
          field: "clientip",
        },
      },
    },
    aggregations: {
      "time_frame.lte": {
        max: {
          field: "timestamp",
        },
      },
      "time_frame.gte": {
        min: {
          field: "timestamp",
        },
      },
      time_length: {
        bucket_script: {
          buckets_path: {
            min: "time_frame.gte.value",
            max: "time_frame.lte.value",
          },
          script: "params.max - params.min",
        },
      },
    },
  },
});
console.log(response);
PUT _transform/data_log
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest": {
    "index": "data-logs-by-client"
  },
  "pivot": {
    "group_by": {
      "machine.os": {"terms": {"field": "machine.os.keyword"}},
      "machine.ip": {"terms": {"field": "clientip"}}
    },
    "aggregations": {
      "time_frame.lte": {
        "max": {
          "field": "timestamp"
        }
      },
      "time_frame.gte": {
        "min": {
          "field": "timestamp"
        }
      },
      "time_length": { 
        "bucket_script": {
          "buckets_path": { 
            "min": "time_frame.gte.value",
            "max": "time_frame.lte.value"
          },
          "script": "params.max - params.min" 
        }
      }
    }
  }
}

为了定义会话的长度,我们使用存储桶脚本。

存储桶路径是脚本变量及其关联的存储桶路径的映射,您希望将这些存储桶用于变量。在此特定示例中,minmax 是映射到 time_frame.gte.valuetime_frame.lte.value 的变量。

最后,该脚本从结束日期中减去会话的开始日期,从而得出该会话的持续时间。

使用脚本指标聚合计算 HTTP 响应

编辑

您可以通过使用脚本指标聚合作为转换的一部分,来计算 Web 日志数据集中不同的 HTTP 响应类型。您可以使用筛选器聚合来实现类似的功能,请查看 查找可疑客户端 IP 示例了解详细信息。

以下示例假定 HTTP 响应代码以关键字的形式存储在文档的 response 字段中。

此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric 聚合。

"aggregations": { 
  "responses.counts": { 
    "scripted_metric": { 
      "init_script": "state.responses = ['error':0L,'success':0L,'other':0L]", 
      "map_script": """ 
        def code = doc['response.keyword'].value;
        if (code.startsWith('5') || code.startsWith('4')) {
          state.responses.error += 1 ;
        } else if(code.startsWith('2')) {
          state.responses.success += 1;
        } else {
          state.responses.other += 1;
        }
        """,
      "combine_script": "state.responses", 
      "reduce_script": """ 
        def counts = ['error': 0L, 'success': 0L, 'other': 0L];
        for (responses in states) {
          counts.error += responses['error'];
          counts.success += responses['success'];
          counts.other += responses['other'];
        }
        return counts;
        """
      }
    },
  ...
}

转换的 aggregations 对象,其中包含所有聚合。

scripted_metric 聚合的对象。

scripted_metric 对 Web 日志数据执行分布式操作,以计算特定类型的 HTTP 响应(错误、成功和其他)。

init_scriptstate 对象中创建一个 responses 数组,该数组具有三个属性(errorsuccessother),并且均为 long 数据类型。

map_script 基于文档的 response.keyword 值定义 code,然后根据响应的第一位数字计算错误、成功和其他响应。

combine_script 返回每个分片的 state.responses

reduce_script 创建一个具有 errorsuccessother 属性的 counts 数组,然后遍历每个分片返回的 responses 的值,并将不同的响应类型分配给 counts 对象的相应属性;错误响应分配给错误计数,成功响应分配给成功计数,其他响应分配给其他计数。最后,返回具有响应计数的 counts 数组。

使用脚本指标聚合比较索引

编辑

此示例展示了如何通过使用脚本指标聚合的转换来比较两个索引的内容。

此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric 聚合。

resp = client.transform.preview_transform(
    id="index_compare",
    source={
        "index": [
            "index1",
            "index2"
        ],
        "query": {
            "match_all": {}
        }
    },
    dest={
        "index": "compare"
    },
    pivot={
        "group_by": {
            "unique-id": {
                "terms": {
                    "field": "<unique-id-field>"
                }
            }
        },
        "aggregations": {
            "compare": {
                "scripted_metric": {
                    "map_script": "state.doc = new HashMap(params['_source'])",
                    "combine_script": "return state",
                    "reduce_script": " \n            if (states.size() != 2) {\n              return \"count_mismatch\"\n            }\n            if (states.get(0).equals(states.get(1))) {\n              return \"match\"\n            } else {\n              return \"mismatch\"\n            }\n            "
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.previewTransform({
  id: "index_compare",
  source: {
    index: ["index1", "index2"],
    query: {
      match_all: {},
    },
  },
  dest: {
    index: "compare",
  },
  pivot: {
    group_by: {
      "unique-id": {
        terms: {
          field: "<unique-id-field>",
        },
      },
    },
    aggregations: {
      compare: {
        scripted_metric: {
          map_script: "state.doc = new HashMap(params['_source'])",
          combine_script: "return state",
          reduce_script:
            ' \n            if (states.size() != 2) {\n              return "count_mismatch"\n            }\n            if (states.get(0).equals(states.get(1))) {\n              return "match"\n            } else {\n              return "mismatch"\n            }\n            ',
        },
      },
    },
  },
});
console.log(response);
POST _transform/_preview
{
  "id" : "index_compare",
  "source" : { 
    "index" : [
      "index1",
      "index2"
    ],
    "query" : {
      "match_all" : { }
    }
  },
  "dest" : { 
    "index" : "compare"
  },
  "pivot" : {
    "group_by" : {
      "unique-id" : {
        "terms" : {
          "field" : "<unique-id-field>" 
        }
      }
    },
    "aggregations" : {
      "compare" : { 
        "scripted_metric" : {
          "map_script" : "state.doc = new HashMap(params['_source'])", 
          "combine_script" : "return state", 
          "reduce_script" : """ 
            if (states.size() != 2) {
              return "count_mismatch"
            }
            if (states.get(0).equals(states.get(1))) {
              return "match"
            } else {
              return "mismatch"
            }
            """
        }
      }
    }
  }
}

source 对象中引用的索引会相互比较。

dest 索引包含比较的结果。

group_by 字段需要是每个文档的唯一标识符。

scripted_metric 聚合的对象。

map_script 在 state 对象中定义 doc。通过使用 new HashMap(...),您可以复制源文档,这在您想将完整的源对象从一个阶段传递到下一个阶段时非常重要。

combine_script 返回每个分片的 state

reduce_script 检查索引的大小是否相等。如果它们不相等,则报告 count_mismatch。然后,它遍历两个索引的所有值并进行比较。如果值相等,则返回 match,否则返回 mismatch

使用脚本指标聚合获取 Web 会话详细信息

编辑

此示例展示了如何从单个事务中派生多个特征。让我们看一下数据中的示例源文档

源文档
{
  "_index":"apache-sessions",
  "_type":"_doc",
  "_id":"KvzSeGoB4bgw0KGbE3wP",
  "_score":1.0,
  "_source":{
    "@timestamp":1484053499256,
    "apache":{
      "access":{
        "sessionid":"571604f2b2b0c7b346dc685eeb0e2306774a63c2",
        "url":"http://www.leroymerlin.fr/v3/search/search.do?keyword=Carrelage%20salle%20de%20bain",
        "path":"/v3/search/search.do",
        "query":"keyword=Carrelage%20salle%20de%20bain",
        "referrer":"http://www.leroymerlin.fr/v3/p/produits/carrelage-parquet-sol-souple/carrelage-sol-et-mur/decor-listel-et-accessoires-carrelage-mural-l1308217717?resultOffset=0&resultLimit=51&resultListShape=MOSAIC&priceStyle=SALEUNIT_PRICE",
        "user_agent":{
          "original":"Mobile Safari 10.0 Mac OS X (iPad) Apple Inc.",
          "os_name":"Mac OS X (iPad)"
        },
        "remote_ip":"0337b1fa-5ed4-af81-9ef4-0ec53be0f45d",
        "geoip":{
          "country_iso_code":"FR",
          "location":{
            "lat":48.86,
            "lon":2.35
          }
        },
        "response_code":200,
        "method":"GET"
      }
    }
  }
}
...

通过使用 sessionid 作为分组依据字段,您可以枚举会话中的事件,并通过使用脚本指标聚合来获取有关会话的更多详细信息。

此示例使用 Elasticsearch Serverless 上不支持的 scripted_metric 聚合。

POST _transform/_preview
{
  "source": {
    "index": "apache-sessions"
  },
  "pivot": {
    "group_by": {
      "sessionid": { 
        "terms": {
          "field": "apache.access.sessionid"
        }
      }
    },
    "aggregations": { 
      "distinct_paths": {
        "cardinality": {
          "field": "apache.access.path"
        }
      },
      "num_pages_viewed": {
        "value_count": {
          "field": "apache.access.url"
        }
      },
      "session_details": {
        "scripted_metric": {
          "init_script": "state.docs = []", 
          "map_script": """ 
            Map span = [
              '@timestamp':doc['@timestamp'].value,
              'url':doc['apache.access.url'].value,
              'referrer':doc['apache.access.referrer'].value
            ];
            state.docs.add(span)
          """,
          "combine_script": "return state.docs;", 
          "reduce_script": """ 
            def all_docs = [];
            for (s in states) {
              for (span in s) {
                all_docs.add(span);
              }
            }
            all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].toEpochMilli().compareTo(o2['@timestamp'].toEpochMilli()));
            def size = all_docs.size();
            def min_time = all_docs[0]['@timestamp'];
            def max_time = all_docs[size-1]['@timestamp'];
            def duration = max_time.toEpochMilli() - min_time.toEpochMilli();
            def entry_page = all_docs[0]['url'];
            def exit_path = all_docs[size-1]['url'];
            def first_referrer = all_docs[0]['referrer'];
            def ret = new HashMap();
            ret['first_time'] = min_time;
            ret['last_time'] = max_time;
            ret['duration'] = duration;
            ret['entry_page'] = entry_page;
            ret['exit_path'] = exit_path;
            ret['first_referrer'] = first_referrer;
            return ret;
          """
        }
      }
    }
  }
}

数据按 sessionid 分组。

聚合计算会话期间的路径数并枚举已查看的页面。

init_scriptstate 对象中创建一个数组类型的 doc

map_script 定义一个 span 数组,其中包含时间戳、URL 和引用值,这些值基于文档的相应值,然后将 span 数组的值添加到 doc 对象。

combine_script 返回每个分片的 state.docs

reduce_script 基于文档字段定义各种对象,例如 min_timemax_timeduration,然后声明一个 ret 对象,并通过使用 new HashMap() 来复制源文档。接下来,该脚本基于先前定义的相应对象在 ret 对象中定义 first_timelast_timeduration 和其他字段,最后返回 ret

API 调用会产生类似的响应

{
  "num_pages_viewed" : 2.0,
  "session_details" : {
    "duration" : 100300001,
    "first_referrer" : "https://www.bing.com/",
    "entry_page" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463",
    "first_time" : "2017-01-10T21:22:52.982Z",
    "last_time" : "2017-01-10T21:25:04.356Z",
    "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463?__result-wrapper?pageTemplate=Famille%2FMat%C3%A9riaux+et+menuiserie&resultOffset=0&resultLimit=50&resultListShape=PLAIN&nomenclatureId=17942&priceStyle=SALEUNIT_PRICE&fcr=1&*4294718806=4294718806&*14072=14072&*4294718593=4294718593&*17942=17942"
  },
  "distinct_paths" : 1.0,
  "sessionid" : "000046f8154a80fd89849369c984b8cc9d795814"
},
{
  "num_pages_viewed" : 10.0,
  "session_details" : {
    "duration" : 343100405,
    "first_referrer" : "https://www.google.fr/",
    "entry_page" : "http://www.leroymerlin.fr/",
    "first_time" : "2017-01-10T16:57:39.937Z",
    "last_time" : "2017-01-10T17:03:23.049Z",
    "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/porte-de-douche-coulissante-adena-e168578"
  },
  "distinct_paths" : 8.0,
  "sessionid" : "000087e825da1d87a332b8f15fa76116c7467da6"
}
...