移动函数聚合编辑

给定一个有序的数据序列,移动函数聚合将在数据上滑动一个窗口,并允许用户指定一个自定义脚本,该脚本在每个数据窗口上执行。为了方便起见,预定义了许多常用函数,例如最小值/最大值、移动平均值等。

语法编辑

一个 moving_fn 聚合单独看起来像这样

{
  "moving_fn": {
    "buckets_path": "the_sum",
    "window": 10,
    "script": "MovingFunctions.min(values)"
  }
}

表 64. moving_fn 参数

参数名称 描述 必需 默认值

buckets_path

感兴趣指标的路径(有关详细信息,请参阅 buckets_path 语法

必需

window

在直方图上“滑动”的窗口大小。

必需

script

应该在每个数据窗口上执行的脚本

必需

gap_policy

在数据中发现缺口时要应用的策略。请参阅 处理数据中的缺口

可选

skip

shift

shift 窗口位置的偏移量。

可选

0

moving_fn 聚合必须嵌入到 histogramdate_histogram 聚合中。它们可以像任何其他指标聚合一样嵌入

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_movfn: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.unweightedAvg(values)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {                  
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }   
        },
        "the_movfn": {
          "moving_fn": {
            "buckets_path": "the_sum",  
            "window": 10,
            "script": "MovingFunctions.unweightedAvg(values)"
          }
        }
      }
    }
  }
}

一个名为“my_date_histo”的 date_histogram 是在“timestamp”字段上构建的,间隔为一个月

一个 sum 指标用于计算字段的总和。这可以是任何数字指标(总和、最小值、最大值等)

最后,我们指定一个 moving_fn 聚合,它使用“the_sum”指标作为其输入。

移动平均线的构建方法是:首先指定一个字段上的 histogramdate_histogram。然后,您可以选择在该直方图中添加数字指标,例如 sum。最后,将 moving_fn 嵌入到直方图中。然后使用 buckets_path 参数“指向”直方图内的某个兄弟指标(有关 buckets_path 语法的描述,请参阅 buckets_path 语法)。

上述聚合的示例响应可能如下所示

{
   "took": 11,
   "timed_out": false,
   "_shards": ...,
   "hits": ...,
   "aggregations": {
      "my_date_histo": {
         "buckets": [
             {
                 "key_as_string": "2015/01/01 00:00:00",
                 "key": 1420070400000,
                 "doc_count": 3,
                 "the_sum": {
                    "value": 550.0
                 },
                 "the_movfn": {
                    "value": null
                 }
             },
             {
                 "key_as_string": "2015/02/01 00:00:00",
                 "key": 1422748800000,
                 "doc_count": 2,
                 "the_sum": {
                    "value": 60.0
                 },
                 "the_movfn": {
                    "value": 550.0
                 }
             },
             {
                 "key_as_string": "2015/03/01 00:00:00",
                 "key": 1425168000000,
                 "doc_count": 2,
                 "the_sum": {
                    "value": 375.0
                 },
                 "the_movfn": {
                    "value": 305.0
                 }
             }
         ]
      }
   }
}

自定义用户脚本编辑

移动函数聚合允许用户指定任何任意脚本来定义自定义逻辑。每次收集到新的数据窗口时,都会调用该脚本。这些值在 values 变量中提供给脚本。然后,脚本应执行某种计算并发出单个 double 作为结果。不允许发出 null,但允许使用 NaN 和 +/- Inf

例如,此脚本将简单地返回窗口中的第一个值,如果没有任何值可用,则返回 NaN

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_movavg: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'return values.length > 0 ? values[0] : Double.NaN'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_movavg": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "return values.length > 0 ? values[0] : Double.NaN"
          }
        }
      }
    }
  }
}

shift 参数编辑

默认情况下(使用 shift = 0),提供用于计算的窗口是最后 n 个值(不包括当前桶)。将 shift 增加 1 会将起始窗口位置向右移动 1

  • 要将当前桶包含在窗口中,请使用 shift = 1
  • 对于中心对齐(当前桶前后 n / 2 个值),请使用 shift = window / 2
  • 对于右对齐(当前桶后 n 个值),请使用 shift = window

如果任一窗口边缘移动到数据序列边界之外,则窗口会缩小以仅包含可用值。

预构建函数编辑

为了方便起见,许多函数已经预先构建,并且可以在 moving_fn 脚本上下文中使用

  • max()
  • min()
  • sum()
  • stdDev()
  • unweightedAvg()
  • linearWeightedAvg()
  • ewma()
  • holt()
  • holtWinters()

这些函数可从 MovingFunctions 命名空间中获得。例如,MovingFunctions.max()

max 函数编辑

此函数接受一个双精度集合,并返回该窗口中的最大值。忽略 nullNaN 值;最大值仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 NaN 作为结果。

表 65. max(double[] values) 参数

参数名称 描述

values

要查找最大值的数值窗口

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_moving_max: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.max(values)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_moving_max": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.max(values)"
          }
        }
      }
    }
  }
}

min 函数编辑

此函数接受一个双精度集合,并返回该窗口中的最小值。忽略 nullNaN 值;最小值仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 NaN 作为结果。

表 66. min(double[] values) 参数

参数名称 描述

values

要查找最小值的数值窗口

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_moving_min: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.min(values)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_moving_min": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.min(values)"
          }
        }
      }
    }
  }
}

sum 函数编辑

此函数接受一个双精度集合,并返回该窗口中值的总和。忽略 nullNaN 值;总和仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 0.0 作为结果。

表 67. sum(double[] values) 参数

参数名称 描述

values

要查找总和的数值窗口

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_moving_sum: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.sum(values)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_moving_sum": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.sum(values)"
          }
        }
      }
    }
  }
}

stdDev 函数编辑

此函数接受一个双精度集合和平均值,然后返回该窗口中值的标准差。忽略 nullNaN 值;总和仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 0.0 作为结果。

表 68. stdDev(double[] values) 参数

参数名称 描述

values

要查找标准差的数值窗口

avg

窗口的平均值

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_moving_sum: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_moving_sum": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))"
          }
        }
      }
    }
  }
}

必须向标准差函数提供 avg 参数,因为可以在窗口上计算不同样式的平均值(简单、线性加权等)。下面详细介绍的各种移动平均值可用于计算标准差函数的平均值。

unweightedAvg 函数编辑

unweightedAvg 函数计算窗口中所有值的总和,然后除以窗口的大小。它实际上是窗口的简单算术平均值。简单移动平均值不执行任何时间相关的加权,这意味着 simple 移动平均值往往会“滞后”于实际数据。

忽略 nullNaN 值;平均值仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 NaN 作为结果。这意味着平均值计算中使用的计数是非 null、非 NaN 值的计数。

表 69. unweightedAvg(double[] values) 参数

参数名称 描述

values

要查找总和的数值窗口

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_movavg: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.unweightedAvg(values)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_movavg": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.unweightedAvg(values)"
          }
        }
      }
    }
  }
}

linearWeightedAvg 函数编辑

linearWeightedAvg 函数为序列中的点分配线性加权,以便“较旧”的数据点(例如,窗口开头的那些数据点)对总平均值的贡献线性减少。线性加权有助于减少数据均值的“滞后”,因为较旧的点的权重较小。

如果窗口为空,或者所有值都是 null/NaN,则返回 NaN 作为结果。

表 70. linearWeightedAvg(double[] values) 参数

参数名称 描述

values

要查找总和的数值窗口

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_movavg: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.linearWeightedAvg(values)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_movavg": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.linearWeightedAvg(values)"
          }
        }
      }
    }
  }
}

ewma 函数编辑

ewma 函数(也称为“单指数”)类似于 linearMovAvg 函数,不同之处在于较旧的数据点的权重呈指数级递减,而不是线性递减。权重衰减的速度可以通过 alpha 设置来控制。较小的值会使权重衰减缓慢,从而提供更大的平滑度并考虑窗口的更大一部分。较大的值会使权重快速衰减,从而减少较旧的值对移动平均值的影响。这往往会使移动平均值更紧密地跟踪数据,但平滑度较低。

忽略 nullNaN 值;平均值仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 NaN 作为结果。这意味着平均值计算中使用的计数是非 null、非 NaN 值的计数。

表 71. ewma(double[] values, double alpha) 参数

参数名称 描述

values

要查找总和的数值窗口

alpha

指数衰减

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_movavg: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.ewma(values, 0.3)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_movavg": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.ewma(values, 0.3)"
          }
        }
      }
    }
  }
}

holt 函数编辑

holt 函数(也称为“双指数”)包含第二个指数项,用于跟踪数据的趋势。当数据具有潜在的线性趋势时,单指数表现不佳。双指数模型在内部计算两个值:“水平”和“趋势”。

水平计算类似于 ewma,是数据的指数加权视图。不同之处在于,使用先前平滑的值而不是原始值,这使得它能够保持接近原始序列。趋势计算查看当前值和最后一个值之间的差异(例如,平滑数据的斜率或趋势)。趋势值也是指数加权的。

值是通过将水平和趋势分量相乘而产生的。

忽略 nullNaN 值;平均值仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 NaN 作为结果。这意味着平均值计算中使用的计数是非 null、非 NaN 值的计数。

表 72. holt(double[] values, double alpha) 参数

参数名称 描述

values

要查找总和的数值窗口

alpha

水平衰减值

beta

趋势衰减值

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_movavg: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'MovingFunctions.holt(values, 0.3, 0.1)'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_movavg": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "MovingFunctions.holt(values, 0.3, 0.1)"
          }
        }
      }
    }
  }
}

在实践中,holtMovAvg 中的 alpha 值与 ewmaMovAvg 中的行为非常相似:较小的值会产生更多的平滑处理和更多的滞后,而较大的值会产生更紧密的跟踪和更少的滞后。beta 的值通常难以看出。较小的值强调长期趋势(例如整个序列中的恒定线性趋势),而较大的值强调短期趋势。

holtWinters 函数编辑

holtWinters 函数(又称“三重指数”)包含第三个指数项,用于跟踪数据的季节性方面。因此,此聚合基于三个组成部分进行平滑处理:“水平”、“趋势”和“季节性”。

水平和趋势计算与 holt 相同。季节性计算查看当前点与一个周期之前的点之间的差异。

Holt-Winters 比其他移动平均线需要更多的手动干预。您需要指定数据的“周期性”:例如,如果您的数据每 7 天有一个周期性趋势,则应设置 period = 7。同样,如果存在月度趋势,则应将其设置为 30。目前没有周期性检测,尽管计划在未来的增强功能中添加。

忽略 nullNaN 值;平均值仅针对实际值计算。如果窗口为空,或者所有值都是 null/NaN,则返回 NaN 作为结果。这意味着平均值计算中使用的计数是非 null、非 NaN 值的计数。

表 73. holtWinters(double[] values, double alpha) 参数

参数名称 描述

values

要查找总和的数值窗口

alpha

水平衰减值

beta

趋势衰减值

gamma

季节性衰减值

period

数据的周期性

multiplicative

如果希望使用乘法 holt-winters,则为 True,如果使用加法 holt-winters,则为 false

response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: '1M'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'price'
            }
          },
          the_movavg: {
            moving_fn: {
              buckets_path: 'the_sum',
              window: 10,
              script: 'if (values.length > 5*2) {MovingFunctions.holtWinters(values, 0.3, 0.1, 0.1, 5, false)}'
            }
          }
        }
      }
    }
  }
)
puts response
POST /_search
{
  "size": 0,
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1M"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "price" }
        },
        "the_movavg": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 10,
            "script": "if (values.length > 5*2) {MovingFunctions.holtWinters(values, 0.3, 0.1, 0.1, 5, false)}"
          }
        }
      }
    }
  }
}

乘法 Holt-Winters 的工作原理是将每个数据点除以季节性值。如果您的任何数据为零,或者数据中存在缺口(因为这会导致除以零),则会出现问题。为了解决这个问题,mult Holt-Winters 会为所有值填充一个非常小的量 (1*10-10),以便所有值都不为零。这会影响结果,但影响很小。如果您的数据不为零,或者您希望在遇到零时看到 NaN,则可以使用 pad: false 禁用此行为

“冷启动”编辑

遗憾的是,由于 Holt-Winters 的性质,它需要两个周期的数据来“引导”算法。这意味着您的 window 必须始终至少是您的周期的两倍。如果不是,则会抛出异常。这也意味着 Holt-Winters 不会为前 2 * period 个桶发出值;当前算法不会进行回溯。

您会在上面的示例中注意到,我们有一个 if () 语句来检查 values 的大小。这是为了确保在调用 holt-winters 函数之前,我们有两个周期的数据(5 * 2,其中 5 是 holtWintersMovAvg 函数中指定的周期)。