Transform 的 Painless 示例

编辑

使用 scripted_metric 聚合的示例在 Elasticsearch Serverless 上不受支持。

这些示例演示了如何在 Transform 中使用 Painless。您可以在 Painless 指南 中了解有关 Painless 脚本语言的更多信息。

  • 虽然以下示例的上下文是 Transform 使用案例,但以下代码段中的 Painless 脚本也可以用于其他 Elasticsearch 搜索聚合。
  • 以下所有示例都使用脚本,当字段由脚本创建时,Transform 无法推断输出字段的映射。Transform 不会为这些字段在目标索引中创建任何映射,这意味着它们将被动态映射。如果您希望使用显式映射,请在启动 Transform 之前创建目标索引。

使用脚本化指标聚合获取热门命中

编辑

此代码段显示了如何查找最新的文档,换句话说,就是具有最新时间戳的文档。从技术角度来看,它有助于通过在 Transform 中使用脚本化指标聚合实现 热门命中 的功能,从而提供指标输出。

此示例使用 scripted_metric 聚合,该聚合在 Elasticsearch Serverless 上不受支持。

"aggregations": {
  "latest_doc": {
    "scripted_metric": {
      "init_script": "state.timestamp_latest = 0L; state.last_doc = ''", 
      "map_script": """ 
        def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_doc = new HashMap(params['_source']);}
      """,
      "combine_script": "return state", 
      "reduce_script": """ 
        def last_doc = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
        return last_doc
      """
    }
  }
}

init_scriptstate 对象中创建长类型 timestamp_latest 和字符串类型 last_doc

map_script 根据文档的时间戳定义 current_date,然后将 current_datestate.timestamp_latest 进行比较,最后从分片返回 state.last_doc。通过使用 new HashMap(...),您可以复制源文档,当您希望将完整源对象从一个阶段传递到下一个阶段时,这一点非常重要。

combine_script 从每个分片返回 state

reduce_script 遍历每个分片返回的 s.timestamp_latest 的值,并返回具有最新时间戳的文档(last_doc)。在响应中,热门命中(换句话说,latest_doc)嵌套在 latest_doc 字段下方。

有关各个脚本的详细说明,请查看 脚本范围

您可以以类似的方式检索最后一个值

"aggregations": {
  "latest_value": {
    "scripted_metric": {
      "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
      "map_script": """
        def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_value = params['_source']['value'];}
      """,
      "combine_script": "return state",
      "reduce_script": """
        def last_value = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
        return last_value
      """
    }
  }
}
使用存储脚本获取热门命中
编辑

您还可以利用 存储脚本 的强大功能来获取最新值。存储脚本减少了编译时间,使搜索更快,并且可更新。

  1. 创建存储脚本

    POST _scripts/last-value-map-init
    {
      "script": {
        "lang": "painless",
        "source": """
            state.timestamp_latest = 0L; state.last_value = ''
        """
      }
    }
    
    POST _scripts/last-value-map
    {
      "script": {
        "lang": "painless",
        "source": """
          def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
            if (current_date > state.timestamp_latest)
            {state.timestamp_latest = current_date;
            state.last_value = doc[params['key']].value;}
        """
      }
    }
    
    POST _scripts/last-value-combine
    {
      "script": {
        "lang": "painless",
        "source": """
            return state
        """
      }
    }
    
    POST _scripts/last-value-reduce
    {
      "script": {
        "lang": "painless",
        "source": """
            def last_value = '';
            def timestamp_latest = 0L;
            for (s in states) {if (s.timestamp_latest > (timestamp_latest))
            {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
            return last_value
        """
      }
    }
  2. 在脚本化指标聚合中使用存储脚本。

    "aggregations":{
       "latest_value":{
          "scripted_metric":{
             "init_script":{
                "id":"last-value-map-init"
             },
             "map_script":{
                "id":"last-value-map",
                "params":{
                   "key":"field_with_last_value" 
                }
             },
             "combine_script":{
                "id":"last-value-combine"
             },
             "reduce_script":{
                "id":"last-value-reduce"
             }

    参数 field_with_last_value 可以设置为任何您希望获取最新值的字段。

使用聚合获取时间特征

编辑

此代码段显示了如何在 Transform 中使用 Painless 提取基于时间的功能。此代码段使用一个索引,其中 @timestamp 定义为 date 类型字段。

"aggregations": {
  "avg_hour_of_day": { 
    "avg":{
      "script": { 
        "source": """
          ZonedDateTime date =  doc['@timestamp'].value; 
          return date.getHour(); 
        """
      }
    }
  },
  "avg_month_of_year": { 
    "avg":{
      "script": { 
        "source": """
          ZonedDateTime date =  doc['@timestamp'].value; 
          return date.getMonthValue(); 
        """
      }
    }
  },
 ...
}

聚合的名称。

包含返回一天中的小时的 Painless 脚本。

根据文档的时间戳设置 date

date 返回小时值。

聚合的名称。

包含返回一年中的月份的 Painless 脚本。

根据文档的时间戳设置 date

date 返回月份值。

使用桶脚本获取持续时间

编辑

此示例向您展示了如何通过使用 桶脚本 从数据日志中获取客户端 IP 的会话持续时间。此示例使用 Kibana 示例 Web 日志数据集。

resp = client.transform.put_transform(
    transform_id="data_log",
    source={
        "index": "kibana_sample_data_logs"
    },
    dest={
        "index": "data-logs-by-client"
    },
    pivot={
        "group_by": {
            "machine.os": {
                "terms": {
                    "field": "machine.os.keyword"
                }
            },
            "machine.ip": {
                "terms": {
                    "field": "clientip"
                }
            }
        },
        "aggregations": {
            "time_frame.lte": {
                "max": {
                    "field": "timestamp"
                }
            },
            "time_frame.gte": {
                "min": {
                    "field": "timestamp"
                }
            },
            "time_length": {
                "bucket_script": {
                    "buckets_path": {
                        "min": "time_frame.gte.value",
                        "max": "time_frame.lte.value"
                    },
                    "script": "params.max - params.min"
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "data_log",
  source: {
    index: "kibana_sample_data_logs",
  },
  dest: {
    index: "data-logs-by-client",
  },
  pivot: {
    group_by: {
      "machine.os": {
        terms: {
          field: "machine.os.keyword",
        },
      },
      "machine.ip": {
        terms: {
          field: "clientip",
        },
      },
    },
    aggregations: {
      "time_frame.lte": {
        max: {
          field: "timestamp",
        },
      },
      "time_frame.gte": {
        min: {
          field: "timestamp",
        },
      },
      time_length: {
        bucket_script: {
          buckets_path: {
            min: "time_frame.gte.value",
            max: "time_frame.lte.value",
          },
          script: "params.max - params.min",
        },
      },
    },
  },
});
console.log(response);
PUT _transform/data_log
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest": {
    "index": "data-logs-by-client"
  },
  "pivot": {
    "group_by": {
      "machine.os": {"terms": {"field": "machine.os.keyword"}},
      "machine.ip": {"terms": {"field": "clientip"}}
    },
    "aggregations": {
      "time_frame.lte": {
        "max": {
          "field": "timestamp"
        }
      },
      "time_frame.gte": {
        "min": {
          "field": "timestamp"
        }
      },
      "time_length": { 
        "bucket_script": {
          "buckets_path": { 
            "min": "time_frame.gte.value",
            "max": "time_frame.lte.value"
          },
          "script": "params.max - params.min" 
        }
      }
    }
  }
}

为了定义会话的长度,我们使用桶脚本。

桶路径是脚本变量及其关联路径到您要用于变量的桶的映射。在本例中,minmax 是分别映射到 time_frame.gte.valuetime_frame.lte.value 的变量。

最后,脚本从结束日期减去会话的开始日期,从而得到会话的持续时间。

使用脚本化指标聚合统计 HTTP 响应

编辑

您可以通过将脚本化指标聚合用作 Transform 的一部分来统计 Web 日志数据集中的不同 HTTP 响应类型。您可以使用过滤器聚合实现类似的功能,请查看 查找可疑客户端 IP 示例以获取详细信息。

以下示例假设 HTTP 响应代码存储在文档的 response 字段中作为关键字。

此示例使用 scripted_metric 聚合,该聚合在 Elasticsearch Serverless 上不受支持。

"aggregations": { 
  "responses.counts": { 
    "scripted_metric": { 
      "init_script": "state.responses = ['error':0L,'success':0L,'other':0L]", 
      "map_script": """ 
        def code = doc['response.keyword'].value;
        if (code.startsWith('5') || code.startsWith('4')) {
          state.responses.error += 1 ;
        } else if(code.startsWith('2')) {
          state.responses.success += 1;
        } else {
          state.responses.other += 1;
        }
        """,
      "combine_script": "state.responses", 
      "reduce_script": """ 
        def counts = ['error': 0L, 'success': 0L, 'other': 0L];
        for (responses in states) {
          counts.error += responses['error'];
          counts.success += responses['success'];
          counts.other += responses['other'];
        }
        return counts;
        """
      }
    },
  ...
}

包含所有聚合的 Transform 的 aggregations 对象。

scripted_metric 聚合的对象。

scripted_metric 对 Web 日志数据执行分布式操作以统计特定类型的 HTTP 响应(错误、成功和其他)。

init_scriptstate 对象中创建一个名为 responses 的数组,该数组具有三个属性(errorsuccessother),数据类型为长整型。

map_script 根据文档的 response.keyword 值定义 code,然后根据响应的第一位数字统计错误、成功和其他响应。

combine_script 从每个分片返回 state.responses

reduce_script 创建一个名为 counts 的数组,该数组具有 errorsuccessother 属性,然后遍历每个分片返回的 responses 值,并将不同的响应类型分配给 counts 对象的相应属性;错误响应分配给错误计数,成功响应分配给成功计数,其他响应分配给其他计数。最后,返回包含响应计数的 counts 数组。

使用脚本化指标聚合比较索引

编辑

此示例显示了如何通过使用脚本化指标聚合的 Transform 来比较两个索引的内容。

此示例使用 scripted_metric 聚合,该聚合在 Elasticsearch Serverless 上不受支持。

resp = client.transform.preview_transform(
    id="index_compare",
    source={
        "index": [
            "index1",
            "index2"
        ],
        "query": {
            "match_all": {}
        }
    },
    dest={
        "index": "compare"
    },
    pivot={
        "group_by": {
            "unique-id": {
                "terms": {
                    "field": "<unique-id-field>"
                }
            }
        },
        "aggregations": {
            "compare": {
                "scripted_metric": {
                    "map_script": "state.doc = new HashMap(params['_source'])",
                    "combine_script": "return state",
                    "reduce_script": " \n            if (states.size() != 2) {\n              return \"count_mismatch\"\n            }\n            if (states.get(0).equals(states.get(1))) {\n              return \"match\"\n            } else {\n              return \"mismatch\"\n            }\n            "
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.previewTransform({
  id: "index_compare",
  source: {
    index: ["index1", "index2"],
    query: {
      match_all: {},
    },
  },
  dest: {
    index: "compare",
  },
  pivot: {
    group_by: {
      "unique-id": {
        terms: {
          field: "<unique-id-field>",
        },
      },
    },
    aggregations: {
      compare: {
        scripted_metric: {
          map_script: "state.doc = new HashMap(params['_source'])",
          combine_script: "return state",
          reduce_script:
            ' \n            if (states.size() != 2) {\n              return "count_mismatch"\n            }\n            if (states.get(0).equals(states.get(1))) {\n              return "match"\n            } else {\n              return "mismatch"\n            }\n            ',
        },
      },
    },
  },
});
console.log(response);
POST _transform/_preview
{
  "id" : "index_compare",
  "source" : { 
    "index" : [
      "index1",
      "index2"
    ],
    "query" : {
      "match_all" : { }
    }
  },
  "dest" : { 
    "index" : "compare"
  },
  "pivot" : {
    "group_by" : {
      "unique-id" : {
        "terms" : {
          "field" : "<unique-id-field>" 
        }
      }
    },
    "aggregations" : {
      "compare" : { 
        "scripted_metric" : {
          "map_script" : "state.doc = new HashMap(params['_source'])", 
          "combine_script" : "return state", 
          "reduce_script" : """ 
            if (states.size() != 2) {
              return "count_mismatch"
            }
            if (states.get(0).equals(states.get(1))) {
              return "match"
            } else {
              return "mismatch"
            }
            """
        }
      }
    }
  }
}

source 对象中引用的索引彼此进行比较。

dest 索引包含比较的结果。

group_by 字段需要是每个文档的唯一标识符。

scripted_metric 聚合的对象。

map_script 在 state 对象中定义 doc。通过使用 new HashMap(...),您可以复制源文档,当您希望将完整源对象从一个阶段传递到下一个阶段时,这一点非常重要。

combine_script 从每个分片返回 state

reduce_script 检查索引的大小是否相等。如果不相等,则报告 count_mismatch。然后它遍历两个索引的所有值并进行比较。如果值相等,则返回 match,否则返回 mismatch

使用脚本化指标聚合获取 Web 会话详细信息

编辑

此示例显示了如何从单个事务中派生多个功能。让我们看一下数据中示例源文档

源文档
{
  "_index":"apache-sessions",
  "_type":"_doc",
  "_id":"KvzSeGoB4bgw0KGbE3wP",
  "_score":1.0,
  "_source":{
    "@timestamp":1484053499256,
    "apache":{
      "access":{
        "sessionid":"571604f2b2b0c7b346dc685eeb0e2306774a63c2",
        "url":"http://www.leroymerlin.fr/v3/search/search.do?keyword=Carrelage%20salle%20de%20bain",
        "path":"/v3/search/search.do",
        "query":"keyword=Carrelage%20salle%20de%20bain",
        "referrer":"http://www.leroymerlin.fr/v3/p/produits/carrelage-parquet-sol-souple/carrelage-sol-et-mur/decor-listel-et-accessoires-carrelage-mural-l1308217717?resultOffset=0&resultLimit=51&resultListShape=MOSAIC&priceStyle=SALEUNIT_PRICE",
        "user_agent":{
          "original":"Mobile Safari 10.0 Mac OS X (iPad) Apple Inc.",
          "os_name":"Mac OS X (iPad)"
        },
        "remote_ip":"0337b1fa-5ed4-af81-9ef4-0ec53be0f45d",
        "geoip":{
          "country_iso_code":"FR",
          "location":{
            "lat":48.86,
            "lon":2.35
          }
        },
        "response_code":200,
        "method":"GET"
      }
    }
  }
}
...

通过使用 sessionid 作为 group-by 字段,您可以枚举会话中的事件,并通过使用脚本化指标聚合获取有关会话的更多详细信息。

此示例使用 scripted_metric 聚合,该聚合在 Elasticsearch Serverless 上不受支持。

POST _transform/_preview
{
  "source": {
    "index": "apache-sessions"
  },
  "pivot": {
    "group_by": {
      "sessionid": { 
        "terms": {
          "field": "apache.access.sessionid"
        }
      }
    },
    "aggregations": { 
      "distinct_paths": {
        "cardinality": {
          "field": "apache.access.path"
        }
      },
      "num_pages_viewed": {
        "value_count": {
          "field": "apache.access.url"
        }
      },
      "session_details": {
        "scripted_metric": {
          "init_script": "state.docs = []", 
          "map_script": """ 
            Map span = [
              '@timestamp':doc['@timestamp'].value,
              'url':doc['apache.access.url'].value,
              'referrer':doc['apache.access.referrer'].value
            ];
            state.docs.add(span)
          """,
          "combine_script": "return state.docs;", 
          "reduce_script": """ 
            def all_docs = [];
            for (s in states) {
              for (span in s) {
                all_docs.add(span);
              }
            }
            all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].toEpochMilli().compareTo(o2['@timestamp'].toEpochMilli()));
            def size = all_docs.size();
            def min_time = all_docs[0]['@timestamp'];
            def max_time = all_docs[size-1]['@timestamp'];
            def duration = max_time.toEpochMilli() - min_time.toEpochMilli();
            def entry_page = all_docs[0]['url'];
            def exit_path = all_docs[size-1]['url'];
            def first_referrer = all_docs[0]['referrer'];
            def ret = new HashMap();
            ret['first_time'] = min_time;
            ret['last_time'] = max_time;
            ret['duration'] = duration;
            ret['entry_page'] = entry_page;
            ret['exit_path'] = exit_path;
            ret['first_referrer'] = first_referrer;
            return ret;
          """
        }
      }
    }
  }
}

数据按 sessionid 分组。

聚合统计路径的数量并枚举会话期间查看的页面。

init_scriptstate 对象中创建一个数组类型 doc

map_script 定义一个名为 span 的数组,该数组包含时间戳、URL 和引用程序值,这些值基于文档的对应值,然后将 span 数组的值添加到 doc 对象。

combine_script 从每个分片返回 state.docs

reduce_script 根据文档字段定义各种对象,如 min_timemax_timeduration,然后声明一个 ret 对象,并通过使用 new HashMap () 复制源文档。接下来,脚本根据前面定义的对应对象在 ret 对象内部定义 first_timelast_timeduration 和其他字段,最后返回 ret

API 调用会产生类似的响应

{
  "num_pages_viewed" : 2.0,
  "session_details" : {
    "duration" : 100300001,
    "first_referrer" : "https://www.bing.com/",
    "entry_page" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463",
    "first_time" : "2017-01-10T21:22:52.982Z",
    "last_time" : "2017-01-10T21:25:04.356Z",
    "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463?__result-wrapper?pageTemplate=Famille%2FMat%C3%A9riaux+et+menuiserie&resultOffset=0&resultLimit=50&resultListShape=PLAIN&nomenclatureId=17942&priceStyle=SALEUNIT_PRICE&fcr=1&*4294718806=4294718806&*14072=14072&*4294718593=4294718593&*17942=17942"
  },
  "distinct_paths" : 1.0,
  "sessionid" : "000046f8154a80fd89849369c984b8cc9d795814"
},
{
  "num_pages_viewed" : 10.0,
  "session_details" : {
    "duration" : 343100405,
    "first_referrer" : "https://www.google.fr/",
    "entry_page" : "http://www.leroymerlin.fr/",
    "first_time" : "2017-01-10T16:57:39.937Z",
    "last_time" : "2017-01-10T17:03:23.049Z",
    "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/porte-de-douche-coulissante-adena-e168578"
  },
  "distinct_paths" : 8.0,
  "sessionid" : "000087e825da1d87a332b8f15fa76116c7467da6"
}
...