转换的 Painless 示例编辑

这些示例演示了如何在转换中使用 Painless。您可以在Painless 指南中了解有关 Painless 脚本语言的更多信息。

  • 虽然以下示例的上下文是转换用例,但以下代码段中的 Painless 脚本也可以在其他 Elasticsearch 搜索聚合中使用。
  • 以下所有示例都使用脚本,当字段由脚本创建时,转换无法推断输出字段的映射。转换不会为这些字段在目标索引中创建任何映射,这意味着它们将被动态映射。如果您希望使用显式映射,请在开始转换之前创建目标索引。

使用脚本化指标聚合获取热门结果编辑

此代码段显示了如何查找最新文档,换句话说,就是查找具有最新时间戳的文档。从技术角度来看,它有助于通过在转换中使用脚本化指标聚合来实现热门结果的功能,该聚合提供指标输出。

"aggregations": {
  "latest_doc": {
    "scripted_metric": {
      "init_script": "state.timestamp_latest = 0L; state.last_doc = ''", 
      "map_script": """ 
        def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_doc = new HashMap(params['_source']);}
      """,
      "combine_script": "return state", 
      "reduce_script": """ 
        def last_doc = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
        return last_doc
      """
    }
  }
}

init_scriptstate 对象中创建了一个长类型的 timestamp_latest 和一个字符串类型的 last_doc

map_script 根据文档的时间戳定义 current_date,然后将 current_datestate.timestamp_latest 进行比较,最后从分片返回 state.last_doc。通过使用 new HashMap(...),您可以复制源文档,当您希望将完整的源对象从一个阶段传递到下一个阶段时,这一点非常重要。

combine_script 从每个分片返回 state

reduce_script 迭代每个分片返回的 s.timestamp_latest 的值,并返回具有最新时间戳的文档 (last_doc)。在响应中,热门结果(换句话说,latest_doc)嵌套在 latest_doc 字段下。

有关各个脚本的详细说明,请查看脚本范围

您可以用类似的方式检索最后一个值

"aggregations": {
  "latest_value": {
    "scripted_metric": {
      "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
      "map_script": """
        def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_value = params['_source']['value'];}
      """,
      "combine_script": "return state",
      "reduce_script": """
        def last_value = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
        return last_value
      """
    }
  }
}
使用存储的脚本获取热门结果编辑

您还可以使用存储的脚本的功能来获取最新值。存储的脚本可以减少编译时间,加快搜索速度,并且可以更新。

  1. 创建存储的脚本

    POST _scripts/last-value-map-init
    {
      "script": {
        "lang": "painless",
        "source": """
            state.timestamp_latest = 0L; state.last_value = ''
        """
      }
    }
    
    POST _scripts/last-value-map
    {
      "script": {
        "lang": "painless",
        "source": """
          def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
            if (current_date > state.timestamp_latest)
            {state.timestamp_latest = current_date;
            state.last_value = doc[params['key']].value;}
        """
      }
    }
    
    POST _scripts/last-value-combine
    {
      "script": {
        "lang": "painless",
        "source": """
            return state
        """
      }
    }
    
    POST _scripts/last-value-reduce
    {
      "script": {
        "lang": "painless",
        "source": """
            def last_value = '';
            def timestamp_latest = 0L;
            for (s in states) {if (s.timestamp_latest > (timestamp_latest))
            {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
            return last_value
        """
      }
    }
  2. 在脚本化指标聚合中使用存储的脚本。

    "aggregations":{
       "latest_value":{
          "scripted_metric":{
             "init_script":{
                "id":"last-value-map-init"
             },
             "map_script":{
                "id":"last-value-map",
                "params":{
                   "key":"field_with_last_value" 
                }
             },
             "combine_script":{
                "id":"last-value-combine"
             },
             "reduce_script":{
                "id":"last-value-reduce"
             }

    参数 field_with_last_value 可以设置为您希望获取其最新值的任何字段。

使用聚合获取时间特征编辑

此代码段显示了如何在转换中使用 Painless 提取基于时间的特征。该代码段使用了一个索引,其中 @timestamp 被定义为 date 类型字段。

"aggregations": {
  "avg_hour_of_day": { 
    "avg":{
      "script": { 
        "source": """
          ZonedDateTime date =  doc['@timestamp'].value; 
          return date.getHour(); 
        """
      }
    }
  },
  "avg_month_of_year": { 
    "avg":{
      "script": { 
        "source": """
          ZonedDateTime date =  doc['@timestamp'].value; 
          return date.getMonthValue(); 
        """
      }
    }
  },
 ...
}

聚合的名称。

包含返回一天中小时数的 Painless 脚本。

根据文档的时间戳设置 date

date 返回小时值。

聚合的名称。

包含返回一年中月份的 Painless 脚本。

根据文档的时间戳设置 date

date 返回月份值。

使用桶脚本获取持续时间编辑

此示例展示了如何使用桶脚本从数据日志中获取客户端 IP 的会话持续时间。该示例使用了 Kibana 示例 Web 日志数据集。

PUT _transform/data_log
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest": {
    "index": "data-logs-by-client"
  },
  "pivot": {
    "group_by": {
      "machine.os": {"terms": {"field": "machine.os.keyword"}},
      "machine.ip": {"terms": {"field": "clientip"}}
    },
    "aggregations": {
      "time_frame.lte": {
        "max": {
          "field": "timestamp"
        }
      },
      "time_frame.gte": {
        "min": {
          "field": "timestamp"
        }
      },
      "time_length": { 
        "bucket_script": {
          "buckets_path": { 
            "min": "time_frame.gte.value",
            "max": "time_frame.lte.value"
          },
          "script": "params.max - params.min" 
        }
      }
    }
  }
}

为了定义会话的长度,我们使用了一个桶脚本。

桶路径是一个脚本变量及其关联路径的映射,指向您希望用于该变量的桶。在此特定情况下,minmax 是映射到 time_frame.gte.valuetime_frame.lte.value 的变量。

最后,脚本从结束日期减去会话的开始日期,从而得到会话的持续时间。

使用脚本化指标聚合统计 HTTP 响应编辑

您可以使用脚本化指标聚合作为转换的一部分来统计 Web 日志数据集中不同的 HTTP 响应类型。您可以使用过滤器聚合实现类似的功能,有关详细信息,请查看查找可疑客户端 IP示例。

下面的示例假设 HTTP 响应代码存储为文档 response 字段中的关键字。

"aggregations": { 
  "responses.counts": { 
    "scripted_metric": { 
      "init_script": "state.responses = ['error':0L,'success':0L,'other':0L]", 
      "map_script": """ 
        def code = doc['response.keyword'].value;
        if (code.startsWith('5') || code.startsWith('4')) {
          state.responses.error += 1 ;
        } else if(code.startsWith('2')) {
          state.responses.success += 1;
        } else {
          state.responses.other += 1;
        }
        """,
      "combine_script": "state.responses", 
      "reduce_script": """ 
        def counts = ['error': 0L, 'success': 0L, 'other': 0L];
        for (responses in states) {
          counts.error += responses['error'];
          counts.success += responses['success'];
          counts.other += responses['other'];
        }
        return counts;
        """
      }
    },
  ...
}

转换的 aggregations 对象,其中包含所有聚合。

scripted_metric 聚合的对象。

scripted_metric 对 Web 日志数据执行分布式操作,以统计特定类型的 HTTP 响应(错误、成功和其他)。

init_scriptstate 对象中创建一个 responses 数组,该数组具有三个属性(errorsuccessother),数据类型为 long。

map_script 根据文档的 response.keyword 值定义 code,然后根据响应的第一个数字统计错误、成功和其他响应。

combine_script 从每个分片返回 state.responses

reduce_script 创建一个 counts 数组,该数组具有 errorsuccessother 属性,然后迭代每个分片返回的 responses 的值,并将不同的响应类型分配给 counts 对象的相应属性;将错误响应分配给错误计数,将成功响应分配给成功计数,将其他响应分配给其他计数。最后,返回带有响应计数的 counts 数组。

使用脚本化指标聚合比较索引编辑

此示例展示了如何通过使用脚本化指标聚合的转换来比较两个索引的内容。

POST _transform/_preview
{
  "id" : "index_compare",
  "source" : { 
    "index" : [
      "index1",
      "index2"
    ],
    "query" : {
      "match_all" : { }
    }
  },
  "dest" : { 
    "index" : "compare"
  },
  "pivot" : {
    "group_by" : {
      "unique-id" : {
        "terms" : {
          "field" : "<unique-id-field>" 
        }
      }
    },
    "aggregations" : {
      "compare" : { 
        "scripted_metric" : {
          "map_script" : "state.doc = new HashMap(params['_source'])", 
          "combine_script" : "return state", 
          "reduce_script" : """ 
            if (states.size() != 2) {
              return "count_mismatch"
            }
            if (states.get(0).equals(states.get(1))) {
              return "match"
            } else {
              return "mismatch"
            }
            """
        }
      }
    }
  }
}

source 对象中引用的索引将相互比较。

dest 索引包含比较的结果。

group_by 字段需要是每个文档的唯一标识符。

scripted_metric 聚合的对象。

map_script 在 state 对象中定义 doc。通过使用 new HashMap(...),您可以复制源文档,当您希望将完整的源对象从一个阶段传递到下一个阶段时,这一点非常重要。

combine_script 从每个分片返回 state

reduce_script 检查索引的大小是否相等。如果它们不相等,则报告 count_mismatch。然后,它迭代两个索引的所有值并进行比较。如果值相等,则返回 match,否则返回 mismatch

使用脚本化指标聚合获取 Web 会话详细信息编辑

此示例展示了如何从单个事务中派生多个特征。让我们看一下数据中的示例源文档

源文档
{
  "_index":"apache-sessions",
  "_type":"_doc",
  "_id":"KvzSeGoB4bgw0KGbE3wP",
  "_score":1.0,
  "_source":{
    "@timestamp":1484053499256,
    "apache":{
      "access":{
        "sessionid":"571604f2b2b0c7b346dc685eeb0e2306774a63c2",
        "url":"http://www.leroymerlin.fr/v3/search/search.do?keyword=Carrelage%20salle%20de%20bain",
        "path":"/v3/search/search.do",
        "query":"keyword=Carrelage%20salle%20de%20bain",
        "referrer":"http://www.leroymerlin.fr/v3/p/produits/carrelage-parquet-sol-souple/carrelage-sol-et-mur/decor-listel-et-accessoires-carrelage-mural-l1308217717?resultOffset=0&resultLimit=51&resultListShape=MOSAIC&priceStyle=SALEUNIT_PRICE",
        "user_agent":{
          "original":"Mobile Safari 10.0 Mac OS X (iPad) Apple Inc.",
          "os_name":"Mac OS X (iPad)"
        },
        "remote_ip":"0337b1fa-5ed4-af81-9ef4-0ec53be0f45d",
        "geoip":{
          "country_iso_code":"FR",
          "location":{
            "lat":48.86,
            "lon":2.35
          }
        },
        "response_code":200,
        "method":"GET"
      }
    }
  }
}
...

通过使用 sessionid 作为分组依据字段,您可以枚举会话中的事件,并使用脚本化指标聚合获取有关会话的更多详细信息。

POST _transform/_preview
{
  "source": {
    "index": "apache-sessions"
  },
  "pivot": {
    "group_by": {
      "sessionid": { 
        "terms": {
          "field": "apache.access.sessionid"
        }
      }
    },
    "aggregations": { 
      "distinct_paths": {
        "cardinality": {
          "field": "apache.access.path"
        }
      },
      "num_pages_viewed": {
        "value_count": {
          "field": "apache.access.url"
        }
      },
      "session_details": {
        "scripted_metric": {
          "init_script": "state.docs = []", 
          "map_script": """ 
            Map span = [
              '@timestamp':doc['@timestamp'].value,
              'url':doc['apache.access.url'].value,
              'referrer':doc['apache.access.referrer'].value
            ];
            state.docs.add(span)
          """,
          "combine_script": "return state.docs;", 
          "reduce_script": """ 
            def all_docs = [];
            for (s in states) {
              for (span in s) {
                all_docs.add(span);
              }
            }
            all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].toEpochMilli().compareTo(o2['@timestamp'].toEpochMilli()));
            def size = all_docs.size();
            def min_time = all_docs[0]['@timestamp'];
            def max_time = all_docs[size-1]['@timestamp'];
            def duration = max_time.toEpochMilli() - min_time.toEpochMilli();
            def entry_page = all_docs[0]['url'];
            def exit_path = all_docs[size-1]['url'];
            def first_referrer = all_docs[0]['referrer'];
            def ret = new HashMap();
            ret['first_time'] = min_time;
            ret['last_time'] = max_time;
            ret['duration'] = duration;
            ret['entry_page'] = entry_page;
            ret['exit_path'] = exit_path;
            ret['first_referrer'] = first_referrer;
            return ret;
          """
        }
      }
    }
  }
}

数据按 sessionid 分组。

聚合统计路径的数量,并枚举会话期间查看的页面。

init_scriptstate 对象中创建一个数组类型的 doc

map_script 定义了一个 span 数组,该数组具有时间戳、URL 和引用值,这些值基于文档的相应值,然后将 span 数组的值添加到 doc 对象中。

combine_script 从每个分片返回 state.docs

reduce_script 根据文档字段定义各种对象,例如 min_timemax_timeduration,然后声明一个 ret 对象,并使用 new HashMap () 复制源文档。接下来,脚本根据之前定义的相应对象在 ret 对象内定义 first_timelast_timeduration 和其他字段,最后返回 ret

API 调用会产生类似的响应

{
  "num_pages_viewed" : 2.0,
  "session_details" : {
    "duration" : 100300001,
    "first_referrer" : "https://www.bing.com/",
    "entry_page" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463",
    "first_time" : "2017-01-10T21:22:52.982Z",
    "last_time" : "2017-01-10T21:25:04.356Z",
    "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463?__result-wrapper?pageTemplate=Famille%2FMat%C3%A9riaux+et+menuiserie&resultOffset=0&resultLimit=50&resultListShape=PLAIN&nomenclatureId=17942&priceStyle=SALEUNIT_PRICE&fcr=1&*4294718806=4294718806&*14072=14072&*4294718593=4294718593&*17942=17942"
  },
  "distinct_paths" : 1.0,
  "sessionid" : "000046f8154a80fd89849369c984b8cc9d795814"
},
{
  "num_pages_viewed" : 10.0,
  "session_details" : {
    "duration" : 343100405,
    "first_referrer" : "https://www.google.fr/",
    "entry_page" : "http://www.leroymerlin.fr/",
    "first_time" : "2017-01-10T16:57:39.937Z",
    "last_time" : "2017-01-10T17:03:23.049Z",
    "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/porte-de-douche-coulissante-adena-e168578"
  },
  "distinct_paths" : 8.0,
  "sessionid" : "000087e825da1d87a332b8f15fa76116c7467da6"
}
...