Apache Spark 支持
编辑Apache Spark 支持
编辑
Apache Spark 是一个快速、通用的集群计算系统。它提供 Java、Scala 和 Python 的高级 API,以及支持通用执行图的优化引擎。 |
||
-- Spark 网站 |
Spark 提供了对大型数据集的快速迭代/函数式能力,通常通过在内存中缓存数据来实现。与本文档中提到的其他库不同,Apache Spark 是一个计算框架,它本身并不与 Map/Reduce 绑定,但它确实与 Hadoop 集成,主要是与 HDFS 集成。elasticsearch-hadoop 允许以两种方式在 Spark 中使用 Elasticsearch:通过自 2.1 版本以来提供的专用支持,或者通过自 2.0 版本以来的 Map/Reduce 桥接。自 5.0 版本以来,elasticsearch-hadoop 支持 Spark 2.0。
安装
编辑与其他库一样,elasticsearch-hadoop 需要在 Spark 的类路径中可用。由于 Spark 具有多种部署模式,这可以转换为目标类路径,无论它是在一个节点上(如本地模式的情况 - 将在整个文档中使用)还是每个节点,具体取决于所需的基础设施。
原生 RDD 支持
编辑在 2.1 版本中添加。
elasticsearch-hadoop 以 RDD
(弹性分布式数据集)(或更精确地说是 Pair RDD
)的形式提供 Elasticsearch 和 Apache Spark 之间的原生集成,该 RDD 可以从 Elasticsearch 中读取数据。RDD
提供两种风格:一种用于 Scala(将数据作为带有 Scala 集合的 Tuple2
返回),另一种用于 Java(将数据作为包含 java.util
集合的 Tuple2
返回)。
在可能的情况下,请考虑使用原生集成,因为它提供了最佳性能和最大灵活性。
配置
编辑要为 Apache Spark 配置 elasticsearch-hadoop,可以设置 配置 章节中描述的各种属性,这些属性位于 SparkConf
对象中。
import org.apache.spark.SparkConf val conf = new SparkConf().setAppName(appName).setMaster(master) conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); conf.set("es.index.auto.create", "true");
命令行对于那些想要通过命令行设置属性(直接或从文件加载)的人,请注意 Spark 仅接受以“spark.”为前缀的属性,并会忽略其余属性(并且根据版本,可能会抛出警告)。为了解决这个限制,请通过附加 spark.
前缀来定义 elasticsearch-hadoop 属性(因此它们变为 spark.es.
),并且 elasticsearch-hadoop 将自动解析它们。
将数据写入 Elasticsearch
编辑使用 elasticsearch-hadoop,只要可以将 RDD
的内容转换为文档,就可以将任何 RDD
保存到 Elasticsearch。实际上,这意味着 RDD
类型需要是 Map
(无论是 Scala 还是 Java 的)、JavaBean
或 Scala case class。如果不是这种情况,可以在 Spark 中轻松转换数据或插入自定义的 ValueWriter
。
Scala
编辑使用 Scala 时,只需导入 org.elasticsearch.spark
包,该包通过 pimp my library 模式,使用 saveToEs
方法丰富了任何 RDD
API。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") sc.makeRDD( Seq(numbers, airports) ).saveToEs("spark/docs")
Spark Scala 导入 |
|
elasticsearch-hadoop Scala 导入 |
|
通过其 Scala API 启动 Spark |
|
|
|
在 Elasticsearch 中,将内容(即两个文档(数字和机场))索引在 |
Scala 用户可能倾向于使用 Seq
和 →
表示法来声明根对象(即 JSON 文档),而不是使用 Map
。虽然相似,但第一种表示法会导致稍微不同的类型,这些类型无法与 JSON 文档匹配:Seq
是一个有序序列(换句话说,是一个列表),而 →
创建一个 Tuple
,它或多或少是一个有序的、固定数量的元素。因此,列表的列表不能用作文档,因为它无法映射到 JSON 对象;但是,它可以在其中自由使用。这就是为什么在上面的示例中使用 Map(k→v)
而不是 Seq(k→v)
的原因。
作为上述隐式导入的替代方法,可以通过 org.elasticsearch.spark.rdd
包中的 EsSpark
在 Scala 中使用 elasticsearch-hadoop Spark 支持,它充当实用程序类,允许显式方法调用。此外,可以使用一个case class,而不是 Map
(它很方便,但由于其结构差异,每个实例都需要一个映射)。
import org.apache.spark.SparkContext import org.elasticsearch.spark.rdd.EsSpark // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd, "spark/docs")
对于需要指定文档的 id(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置适当的映射,即 es.mapping.id
来完成。在前面的示例中,为了指示 Elasticsearch 将字段 id
用作文档 id,请更新 RDD
配置(也可以在 SparkConf
上设置该属性,但由于其全局影响,不建议这样做)。
EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑Java 用户有一个专用类,它提供了与 EsSpark
类似的功能,即 org.elasticsearch.spark.rdd.api.java
中的 JavaEsSpark
(一个类似于 Spark 的 Java API 的包)。
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs");
Spark Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark |
|
为了简化示例,请使用 Guava(Spark 的依赖项)的 |
|
在两个集合上创建一个简单的 |
|
在 Elasticsearch 中,将内容(即两个文档(数字和机场))索引在 |
通过使用 Java 5 静态导入可以进一步简化代码。此外,Map
(由于其松散的结构,其映射是动态的)可以替换为 JavaBean
。
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize( ImmutableList.of(upcoming, lastWeek)); saveToEs(javaRDD, "spark/docs");
静态导入 |
|
定义一个包含 |
|
调用 |
设置文档 id(或其他元数据字段,如 ttl
或 timestamp
)类似于其 Scala 对应物,但根据您使用的是 JDK 类还是其他实用程序(如 Guava),可能稍微冗长一些。
JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
将现有 JSON 写入 Elasticsearch
编辑对于 RDD
中的数据已经是 JSON 的情况,elasticsearch-hadoop 允许直接索引,无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 希望 RDD
包含 String
或字节数组 (byte[]
/Array[Byte]
),假设每个条目代表一个 JSON 文档。如果 RDD
没有正确的签名,则不能应用 saveJsonToEs
方法(在 Scala 中它们将不可用)。
Scala
编辑val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" new SparkContext(conf).makeRDD(Seq(json1, json2)) .saveJsonToEs("spark/json-trips")
Java
编辑String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");
示例, |
|
请注意 |
|
通过专用的 |
写入动态/多资源
编辑对于需要将写入 Elasticsearch 的数据索引到不同的存储桶(基于数据内容)中的情况,可以使用 es.resource.write
字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的媒体示例,可以按如下方式配置它:
Scala
编辑val game = Map( "media_type"->"game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")
对于即将写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type
字段并使用其值来确定目标资源。
Java
编辑正如预期的那样,Java 中的情况非常相似。
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); saveToEs(javaRDD, "my-collection-{media_type}/doc");
处理文档元数据
编辑Elasticsearch 允许每个文档都有自己的元数据。如上所述,通过各种映射选项,可以自定义这些参数,以便从其所属的文档中提取它们的值。此外,甚至可以包含/排除哪些数据部分被发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 pair RDD
s 在文档外部提供元数据。换句话说,对于包含键值元组的 RDD
,可以从键中提取元数据,并将值用作文档源。
通过 org.elasticsearch.spark.rdd
包中的 Metadata
Java enum 描述元数据,它标识其类型 - id
、ttl
、version
等。因此,RDD
键可以是包含每个文档的 Metadata
及其关联值的 Map
。如果 RDD
键不是 Map
类型,则 elasticsearch-hadoop 将认为该对象表示文档 id 并相应地使用它。这听起来比实际情况要复杂,让我们来看一些示例。
Scala
编辑键值对 RDD
,或者简单来说签名是 RDD[(K,V)]
的 RDD
,可以利用 saveToEsWithMeta
方法,这些方法可以通过隐式导入 org.elasticsearch.spark
包或 EsSpark
对象来使用。要为每个文档手动指定 id,只需在您的 RDD
中传入 Object
(不是 Map
类型)即可
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
|
|
|
|
由于 |
当需要指定不仅仅是 id 的信息时,应该使用一个键类型为 org.elasticsearch.spark.rdd.Metadata
的 scala.collection.Map
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
导入 |
|
用于 |
|
用于 |
|
用于 |
|
元数据和文档被组装成一个键值对 |
|
使用 |
Java
编辑类似地,在 Java 端,JavaEsSpark
提供了应用于 JavaPairRDD
(Java 中 RDD[(K,V)]
的等效项)的 saveToEsWithMeta
方法。因此,要根据文档的 id 保存文档,可以使用
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
通过使用 Scala |
|
第一个文档的元组,包装了 id( |
|
第二个文档的元组,包装了 id( |
|
使用键作为 id,值作为文档,相应地保存 |
当需要指定不仅仅是 id 的信息时,可以选择使用一个使用类型为 org.elasticsearch.spark.rdd.Metadata
的键填充的 java.util.Map
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
|
|
用于 |
|
|
|
|
|
|
|
关联 |
|
在包含文档及其各自元数据的 |
从 Elasticsearch 读取数据
编辑对于读取,应该定义从 Elasticsearch 向 Spark流式传输数据的 Elasticsearch RDD
。
Scala
编辑与写入类似,org.elasticsearch.spark
包使用 esRDD
方法丰富了 SparkContext
API
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val RDD = sc.esRDD("radio/artists")
Spark Scala 导入 |
|
elasticsearch-hadoop Scala 导入 |
|
通过其 Scala API 启动 Spark |
|
为索引 |
可以重载该方法以指定额外的查询,甚至指定一个配置 Map
(覆盖 SparkConf
)
... import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) sc.esRDD("radio/artists", "?q=me*")
默认情况下,Elasticsearch 中的文档以 Tuple2
的形式返回,其中第一个元素是文档 id,第二个元素是通过 Scala 集合 表示的实际文档,即一个 `Map[String, Any]`,其中键表示字段名称,值表示其各自的值。
Java
编辑Java 用户有一个专用的 JavaPairRDD
,它的工作方式与 Scala 对应项相同,但是返回的 Tuple2
值(或第二个元素)以原生 java.util
集合的形式返回文档。
import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");
Spark Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark |
|
为索引 |
类似地,可以使用重载的 esRDD
方法来指定查询或传递 Map
对象以进行高级配置。让我们看看它是如何工作的,但这次使用 Java 静态导入。此外,让我们丢弃文档 id,仅检索 RDD
值
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*; ... JavaRDD<Map<String, Object>> rdd = esRDD(jsc, "radio/artists", "?q=me*") .values();
静态导入 |
|
创建一个 |
|
仅返回 |
通过使用 JavaEsSpark
API,可以获得 Spark 专用的 JavaPairRDD
,在 Java 环境中,它比基本 RDD
更适合(因为它的 Scala 签名)。此外,专用的 RDD
将 Elasticsearch 文档作为正确的 Java 集合返回,因此无需处理 Scala 集合(这通常是 RDD
的情况)。当使用 Java 8 时,这一点尤其强大,我们强烈建议使用 Java 8,因为它的 lambda 表达式 使集合处理极其简洁。
例如,假设有人想从 RDD
中过滤文档,并且仅返回那些包含值 mega
的文档(请忽略可以直接通过 Elasticsearch 执行过滤这一事实)。
在 Java 8 之前的版本中,代码看起来像这样
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter( new Function<Map<String, Object>, Boolean>() { @Override public Boolean call(Map<String, Object> map) throws Exception { returns map.contains("mega"); } });
在 Java 8 中,过滤变成了一行代码
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc -> doc.contains("mega"));
以 JSON 格式读取数据
编辑如果需要以 JSON 格式从 Elasticsearch 获取结果(通常是为了将结果发送到其他系统),可以使用专用的 esJsonRDD
方法。在这种情况下,连接器将按从 Elasticsearch 接收到的文档内容返回文档内容,而无需进行任何处理,以 Scala 中的 RDD[(String, String)]
或 Java 中的 JavaPairRDD[String, String]
的形式返回,其中键表示文档 id,值表示其 JSON 格式的实际内容。
类型转换
编辑elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示
表 4. Scala 类型转换表
Scala 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
空的 |
|
根据表中的 |
|
|
|
|
case class |
|
|
|
此外,以下隐含转换适用于 Java 类型
表 5. Java 类型转换表
Java 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
转换过程会尽力而为;内置的 Java 和 Scala 类型保证可以正确转换,但是对于 Java 或 Scala 中的用户类型则不作任何保证。如上表所述,当在 Scala 中遇到 case
类或在 Java 中遇到 JavaBean
时,转换器会尝试解包
其内容并将其保存为 object
。请注意,这仅适用于顶级的用户对象 - 如果用户对象嵌套了其他用户对象,则转换很可能会失败,因为转换器不会执行嵌套的 解包
。这样做是有意为之,因为转换器必须序列化和反序列化数据,并且用户类型由于数据丢失而引入了歧义;可以通过某种类型的映射来解决这个问题,但这会使项目过于接近 ORM 的领域,并且可以说引入了过多的复杂性而几乎没有收益;多亏了 Spark 中的处理功能和 elasticsearch-hadoop 中的可插拔性,可以轻松地将对象转换为其他类型,如果需要,只需付出最少的努力和最大的控制。
Geo 类型 值得一提的是,Elasticsearch 中独有的富数据类型,例如 GeoPoint
或 GeoShape
,通过将其结构转换为上表中可用的基本类型来支持。例如,根据其存储方式,geo_point
可能会作为 String
或 Traversable
返回。
Spark Streaming 支持
编辑5.0 版本新增。
Spark Streaming 是核心 Spark API 的扩展,它支持对实时数据流进行可扩展、高吞吐量、容错的流处理。 |
||
-- Spark 网站 |
Spark Streaming 是核心 Spark 功能之上的扩展,允许近实时处理流数据。Spark Streaming 围绕 DStream
或离散流的概念工作。DStream
的工作方式是将新到达的记录收集到一个小的 RDD
中并执行它。这个过程每隔几秒重复一次,使用一个新的 RDD
,这个过程称为微批处理。DStream
API 包括许多与 RDD
API 相同的处理操作,以及其他一些流式特定的方法。从 5.0 版本开始,elasticsearch-hadoop 提供了与 Spark Streaming 的原生集成。
当使用 elasticsearch-hadoop Spark Streaming 支持时,可以将 Elasticsearch 作为输出位置,以便将数据从 Spark Streaming 作业索引到 Elasticsearch 中,就像持久化 RDD
的结果一样。但是,与 RDD
不同,由于 DStream
的连续性,您无法使用 DStream
从 Elasticsearch 中读取数据。
Spark Streaming 支持提供了特殊的优化,以便在运行处理窗口非常小的作业时,在 Spark 执行器上节省网络资源。因此,应该优先使用这种集成,而不是在 DStream
上调用 foreachRDD
返回的 RDD
上调用 saveToEs
。
将 DStream
写入 Elasticsearch
编辑与 RDD
一样,只要其内容可以转换为文档,任何 DStream
都可以保存到 Elasticsearch。实际上,这意味着 DStream
类型需要是 Map
(Scala 或 Java 的),JavaBean
或 Scala case 类。如果不是这种情况,可以轻松地在 Spark 中转换数据或插入自己的自定义 ValueWriter
。
Scala
编辑当使用 Scala 时,只需导入 org.elasticsearch.spark.streaming
包,它通过 pimp my library 模式,使用 saveToEs
方法丰富了 DStream
API
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.elasticsearch.spark.streaming._ ... val conf = ... val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") val rdd = sc.makeRDD(Seq(numbers, airports)) val microbatches = mutable.Queue(rdd) ssc.queueStream(microbatches).saveToEs("spark/docs") ssc.start() ssc.awaitTermination()
Spark 和 Spark Streaming Scala 导入 |
|
elasticsearch-hadoop Spark Streaming 导入 |
|
通过其 Scala API 启动 Spark |
|
通过传递 SparkContext 启动 SparkStreaming 上下文。微批处理将每秒处理一次。 |
|
|
|
从 |
|
启动 Spark Streaming 作业并等待其最终完成。 |
作为上述隐式导入的替代方案,可以通过 org.elasticsearch.spark.streaming
包中的 EsSparkStreaming
在 Scala 中使用 elasticsearch-hadoop Spark Streaming 支持,它充当实用程序类,允许显式方法调用。此外,可以使用case 类,而不是 Map
(它们很方便,但由于它们的结构不同,每个实例都需要一个映射)
import org.apache.spark.SparkContext import org.elasticsearch.spark.streaming.EsSparkStreaming // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) val microbatches = mutable.Queue(rdd) val dstream = ssc.queueStream(microbatches) EsSparkStreaming.saveToEs(dstream, "spark/docs") ssc.start()
|
|
定义一个名为 |
|
围绕 |
|
配置 |
|
启动流处理 |
一旦启动了 SparkStreamingContext,就无法添加或配置新的 DStream
。一旦上下文停止,就无法重新启动。每个 JVM 一次只能有一个活动的 SparkStreamingContext。另请注意,当以编程方式停止 SparkStreamingContext 时,它会停止底层的 SparkContext,除非指示不停止。
对于需要指定文档 id(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置适当的 映射,即 es.mapping.id
来实现。按照前面的示例,为了指示 Elasticsearch 使用字段 id
作为文档 id,请更新 DStream
配置(也可以在 SparkConf
上设置该属性,但由于其全局效应,不建议这样做)
EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑Java 用户有一个专用类,它提供类似于 EsSparkStreaming
的功能,即 org.elasticsearch.spark.streaming.api.java
包中的 JavaEsSparkStreaming
(类似于 Spark 的 Java API 的包)
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1)); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>(); microbatches.add(javaRDD); JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs"); jssc.start()
Spark 和 Spark Streaming Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark 和 Spark Streaming。微批处理将每秒处理一次。 |
|
为了简化示例,请使用 Guava(Spark 的依赖项)的 |
|
在微批处理上创建一个简单的 |
|
在 Elasticsearch 中,将内容(即两个文档(数字和机场))索引在 |
|
执行流式作业。 |
通过使用 Java 5 静态导入可以进一步简化代码。此外,Map
(由于其松散的结构,其映射是动态的)可以替换为 JavaBean
。
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek)); Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>(); microbatches.add(javaRDD); JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "spark/docs"); jssc.start()
静态导入 |
|
定义一个包含 |
|
调用 |
|
运行该流式作业 |
设置文档 id(或其他元数据字段,如 ttl
或 timestamp
)类似于其 Scala 对应物,但根据您使用的是 JDK 类还是其他实用程序(如 Guava),可能稍微冗长一些。
JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
将现有 JSON 写入 Elasticsearch
编辑对于 DStream
流式传输的数据已经序列化为 JSON 的情况,elasticsearch-hadoop 允许直接索引,而不应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望 DStream
包含 String
或字节数组 (byte[]
/Array[Byte]
),假设每个条目代表一个 JSON 文档。如果 DStream
没有正确的签名,则无法应用 saveJsonToEs
方法(在 Scala 中它们将不可用)。
Scala
编辑val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val rdd = sc.makeRDD(Seq(json1, json2)) val microbatch = mutable.Queue(rdd) ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips") ssc.start()
Java
编辑String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaStreamingContext jssc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>(); microbatches.add(stringRDD); JavaDStream<String> stringDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); jssc.start()
|
|
创建一个 |
|
请注意 |
|
配置流,通过专用的 |
|
启动流式作业 |
写入动态/多资源
编辑对于需要将写入 Elasticsearch 的数据索引到不同的存储桶(基于数据内容)中的情况,可以使用 es.resource.write
字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的媒体示例,可以按如下方式配置它:
Scala
编辑val game = Map( "media_type" -> "game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") val batch = sc.makeRDD(Seq(game, book, cd)) val microbatches = mutable.Queue(batch) ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc") ssc.start()
对于即将写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type
字段并使用其值来确定目标资源。
Java
编辑正如预期的那样,Java 中的情况非常相似。
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); Queue<JavaRDD<Map<String, ?>>> microbatches = ... JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "my-collection-{media_type}/doc"); jssc.start();
处理文档元数据
编辑Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射选项,可以自定义这些参数,以便从其所属的文档中提取它们的值。此外,甚至可以包含/排除将哪些数据发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 pair RDD
s 在文档外部提供元数据。
在 Spark Streaming 中也是如此。对于包含键值元组的 DStream
s,可以从键中提取元数据,并将值用作文档源。
元数据通过 org.elasticsearch.spark.rdd
包中的 Metadata
Java 枚举 来描述,该枚举标识其类型 - id
、ttl
、version
等。因此,DStream
的键可以是包含每个文档的 Metadata
及其关联值的 Map
。如果 DStream
键的类型不是 Map
,则 elasticsearch-hadoop 会将对象视为表示文档 id 并相应地使用它。这听起来比实际情况复杂,所以让我们看一些示例。
Scala
编辑Pair DStreams
s,或者简单地说,签名 DStream[(K,V)]
的 DStream
s 可以利用 saveToEsWithMeta
方法,这些方法可以通过隐式导入 org.elasticsearch.spark.streaming
包或 EsSparkStreaming
对象获得。要手动指定每个文档的 id,只需在 DStream
中传入 Object
(类型不是 Map
)
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
|
|
|
|
我们构造一个 |
|
由于生成的 |
当需要指定不仅仅是 id 的信息时,应该使用一个键类型为 org.elasticsearch.spark.rdd.Metadata
的 scala.collection.Map
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
导入 |
|
用于 |
|
用于 |
|
用于 |
|
元数据和文档被组装成一个键值对 |
|
|
|
|
Java
编辑类似地,在 Java 端,JavaEsSparkStreaming
提供了应用于 JavaPairDStream
的 saveToEsWithMeta
方法(在 Java 中等同于 DStream[(K,V)]
)。
由于 Java API 的限制,这通常需要更多的工作。例如,您不能直接从 JavaPairRDD
的队列创建 JavaPairDStream
。相反,您必须创建一个常规的 Tuple2
对象的 JavaDStream
,并将 JavaDStream
转换为 JavaPairDStream
。这听起来很复杂,但它只是一个绕过 API 限制的简单方法。
首先,我们将创建一个 pair 函数,它接受一个 Tuple2
对象作为输入,并将其直接返回给框架。
public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable { @Override public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception { return tuple2; } }
然后,我们将 pair 函数应用于 Tuple2
的 JavaDStream
,以创建一个 JavaPairDStream
并保存它。
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... JavaStreamingContext jssc = ... // create an RDD of between the id and the docs JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize( ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
创建一个常规的 |
|
第一个文档的元组,包装了 id( |
|
第二个文档的元组,包装了 id( |
|
从元组 |
|
通过将我们的 |
|
|
当需要指定的不只是 ID 时,可以选择使用一个 java.util.Map
,其中填充了类型为 org.elasticsearch.spark.rdd.Metadata
的键。我们将使用相同的类型技巧将 JavaDStream
重新打包为 JavaPairDStream
。
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
|
|
用于 |
|
|
|
|
|
|
|
关联 |
|
从 |
|
通过在其上映射 |
|
在包含文档及其各自元数据的 |
Spark Streaming 类型转换
编辑elasticsearch-hadoop Spark Streaming 支持利用与常规 Spark 类型映射相同的类型映射。为了保持一致性,此处重复这些映射。
表 6. Scala 类型转换表
Scala 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
空的 |
|
根据表中的 |
|
|
|
|
case class |
|
|
|
此外,以下隐含转换适用于 Java 类型
表 7. Java 类型转换表
Java 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
地理类型值得再次提及的是,仅在 Elasticsearch 中可用的富数据类型(例如 GeoPoint
或 GeoShape
)通过将其结构转换为上表中可用的原语来支持。例如,根据其存储,geo_point
可能作为 String
或 Traversable
返回。
Spark SQL 支持
编辑在 2.1 版本中添加。
Spark SQL 是一个用于结构化数据处理的 Spark 模块。它提供了一个名为 DataFrames 的编程抽象,也可以充当分布式 SQL 查询引擎。 |
||
-- Spark 网站 |
除了核心 Spark 支持之外,elasticsearch-hadoop 还提供与 Spark SQL 的集成。换句话说,Elasticsearch 成为 Spark SQL 的原生数据源,以便可以从 Spark SQL 透明地索引和查询数据。
Spark SQL 处理结构化数据,换句话说,所有条目都应具有相同的结构(相同数量的字段,具有相同的类型和名称)。不支持使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 PairRDD
。
支持的 Spark SQL 版本
编辑Spark SQL 虽然正在成为一个成熟的组件,但在各个版本之间仍在经历重大变化。Spark SQL 在 1.3 版本中成为一个稳定的组件,但它与之前的版本不向后兼容。此外,Spark 2.0 引入了重大更改,通过 Dataset
API 打破了向后兼容性。elasticsearch-hadoop 通过两个不同的 jar 支持 Spark SQL 1.3-1.6 和 Spark SQL 2.0:elasticsearch-spark-1.x-<version>.jar
和 elasticsearch-hadoop-<version>.jar
支持 Spark SQL 1.3-1.6(或更高版本),而 elasticsearch-spark-2.0-<version>.jar
支持 Spark SQL 2.0。换句话说,除非您使用 Spark 2.0,否则请使用 elasticsearch-spark-1.x-<version>.jar
。
Spark SQL 支持位于 org.elasticsearch.spark.sql
包下。
API 差异从 elasticsearch-hadoop 用户的角度来看,Spark SQL 1.3-1.6 和 Spark 2.0 之间的差异已经相当统一。此文档详细描述了以下简要提及的差异
-
DataFrame
vsDataset
- Spark SQL 1.3+ 中的核心单元是
DataFrame
。此 API 保留在 Spark 2.0 中,但在其底层基于Dataset
。 - 统一的 API 与专用的 Java/Scala API
- 在 Spark SQL 2.0 中,通过引入
SparkSession
并为 `Dataset`、`DataFrame` 和 `RDD` 使用相同的支持代码,API 进一步统一。
从概念上讲,DataFrame
是 Dataset[Row]
,因此以下文档将重点介绍 Spark SQL 1.3-1.6。
将 DataFrame
(Spark SQL 1.3+) 写入 Elasticsearch
编辑使用 elasticsearch-hadoop,可以将 DataFrame
(或任何 Dataset
)索引到 Elasticsearch。
Scala
编辑在 Scala 中,只需导入 org.elasticsearch.spark.sql
包,该包会使用 saveToEs
方法丰富给定的 DataFrame
类;虽然它们与 org.elasticsearch.spark
包具有相同的签名,但它们是为 DataFrame
实现设计的。
// reusing the example from Spark SQL documentation import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.elasticsearch.spark.sql._ ... // sc = existing SparkContext val sqlContext = new SQLContext(sc) // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.saveToEs("spark/people")
Spark SQL 包导入 |
|
elasticsearch-hadoop Spark 包导入 |
|
读取文本文件作为普通的 |
|
通过 |
默认情况下,elasticsearch-hadoop 会忽略空值,而不会写入任何字段。由于 DataFrame
旨在被视为结构化的表格数据,因此您可以通过将 es.spark.dataframe.write.null
设置为 true
,仅为 DataFrame
对象启用将空值写入为空值字段。
Java
编辑类似地,对于 Java 用法,专用包 org.elasticsearch.spark.sql.api.java
通过 JavaEsSpark SQL
提供类似的功能。
import org.apache.spark.sql.api.java.*; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... DataFrame people = ... JavaEsSparkSQL.saveToEs(people, "spark/people");
Spark SQL Java 导入 |
|
elasticsearch-hadoop Spark SQL Java 导入 |
|
在 Elasticsearch 中 |
同样,对于 Java 5 静态导入,可以将其进一步简化为
为了最大程度地控制 DataFrame
在 Elasticsearch 中的映射,强烈建议您预先创建映射。有关更多信息,请参见本章。
将现有 JSON 写入 Elasticsearch
编辑使用 Spark SQL 时,如果输入数据为 JSON 格式,只需将其转换为 DataFrame
(在 Spark SQL 1.3 中) 或 Dataset
(在 Spark SQL 2.0 中) (如 Spark 文档中所述),通过 SQLContext
/JavaSQLContext
jsonFile
方法。
使用纯 SQL 从 Elasticsearch 读取
编辑索引及其映射必须在创建临时表之前存在。
Spark SQL 1.2 引入了一个新的API,用于从外部数据源读取,elasticsearch-hadoop 支持该 API,从而简化了与 Elasticsearch 交互所需的 SQL 配置。此外,在幕后,它会理解 Spark 执行的操作,从而优化数据和进行的查询(例如过滤或修剪),从而提高性能。
Spark SQL 中的数据源
编辑使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext
load
方法访问 Elasticsearch。换句话说,以声明式方式创建由 Elasticsearch 支持的 DataFrame
/Dataset
。
val sql = new SQLContext... // Spark 1.3 style val df = sql.load( "spark/index", "org.elasticsearch.spark.sql")
|
|
要加载的路径或资源 - 在这种情况下为 Elasticsearch 中的索引/类型 |
|
数据源提供程序 - |
在 Spark 1.4 中,将使用以下类似的 API 调用
|
|
数据源提供程序 - |
|
要加载的路径或资源 - 在这种情况下为 Elasticsearch 中的索引/类型 |
在 Spark 1.5 中,可以将其进一步简化为
无论使用哪个 API 创建,都可以自由访问 DataFrame
以操作数据。
sources 声明还允许传入特定选项,即
名称 | 默认值 | 描述 |
---|---|---|
|
必需 |
Elasticsearch 索引/类型 |
|
|
是否将 Spark SQL 转换为 Elasticsearch Query DSL(下推) |
|
|
是否使用精确(未分析)匹配或不使用(已分析)匹配 |
可在 Spark 1.6 或更高版本中使用 |
||
|
|
是否告诉 Spark 对下推的过滤器应用其自身的过滤 |
下一节将解释这两个选项。要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map
传递给上述方法
例如
val sql = new SQLContext... // options for Spark 1.3 need to include the target path/resource val options13 = Map("path" -> "spark/index", "pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.3 style val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) // options for Spark 1.4 - the path/resource is specified separately val options = Map("pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.4 style val spark14DF = sql.read.format("org.elasticsearch.spark.sql") .options(options) .load("spark/index")
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', nodes 'someNode')" ) "
Spark 的临时表名称 |
|
|
|
elasticsearch-hadoop 配置选项,其中强制性的一个是 |
请注意,由于 SQL 解析器的限制,.
(以及其他用于分隔的常用字符) 不被允许;连接器会尝试通过自动附加 es.
前缀来解决这个问题,但这只适用于使用一个 .
指定配置选项的情况(例如上面的 es.nodes
)。因此,如果需要具有多个 .
的属性,则应使用上面的 SQLContext.load
或 SQLContext.read
方法,并将属性作为 Map
传递。
下推操作
编辑使用 elasticsearch-hadoop 作为 Spark source
的一个重要的隐藏特性是,该连接器理解在 DataFrame
/SQL 中执行的操作,并且默认情况下会将其转换为相应的 QueryDSL。换句话说,连接器直接在数据源处下推操作,在那里高效地过滤掉数据,从而仅将所需的数据流回 Spark。这显著提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回所需的数据(而不是批量返回数据,然后由 Spark 处理和丢弃)。请注意,即使指定了查询,下推操作也适用 - 连接器会根据指定的 SQL 增强它。
另外,elasticsearch-hadoop 支持 Spark (1.3.0 及更高版本) 中所有可用的 `Filter`,同时保持与 Spark 1.3.0 的向后二进制兼容性,将 SQL 操作完全下推到 Elasticsearch,而无需任何用户干预。
已优化为下推过滤器的运算符
SQL 语法 | ES 1.x/2.x 语法 | ES 5.x 语法 |
---|---|---|
= null , is_null |
missing |
must_not.exists |
= (严格) |
term |
term |
= (非严格) |
match |
match |
> , < , >= , ⇐ |
range |
range |
is_not_null |
exists |
exists |
in (严格) |
terms |
terms |
in (非严格) |
or.filters |
bool.should |
and |
and.filters |
bool.filter |
or |
or.filters |
bool.should [bool.filter] |
not |
not.filter |
bool.must_not |
StringStartsWith |
wildcard(arg*) |
wildcard(arg*) |
StringEndsWith |
wildcard(*arg) |
wildcard(*arg) |
StringContains |
wildcard(*arg*) |
wildcard(*arg*) |
EqualNullSafe (严格) |
term |
term |
EqualNullSafe (非严格) |
match |
match |
例如,考虑以下 Spark SQL
// as a DataFrame val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips") df.printSchema() // root //|-- departure: string (nullable = true) //|-- arrival: string (nullable = true) //|-- days: long (nullable = true) val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))
或者用纯 SQL
CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips") SELECT departure FROM trips WHERE arrival = "OTP" and days > 3
连接器将查询转换为
{ "query" : { "filtered" : { "query" : { "match_all" : {} }, "filter" : { "and" : [{ "query" : { "match" : { "arrival" : "OTP" } } }, { "days" : { "gt" : 3 } } ] } } } }
此外,下推过滤器可以处理 analyzed
词条(默认),或者可以配置为严格并提供 exact
匹配(仅适用于 not-analyzed
字段)。除非手动指定映射,否则强烈建议保留默认设置。有关此主题和其他主题,请参阅 Elasticsearch 参考文档。
请注意,自 elasticsearch-hadoop 2.2 起,适用于 Spark 1.6 或更高版本的 double.filtering
,允许将已下推到 Elasticsearch 的过滤器也由 Spark 处理/评估(默认)或不处理。关闭此功能,尤其是在处理大数据量时,会加快速度。但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式,analyzed
与 not_analyzed
)。一般来说,当开启严格模式时,也可以禁用 double.filtering
。
数据源作为表
编辑自 Spark SQL 1.2 起,也可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', " + "scroll_size '20')" )
Spark 的临时表名称 |
|
|
|
elasticsearch-hadoop 配置选项,其中强制性的一个是 |
|
由于使用 |
定义后,将自动拾取模式。因此,可以立即发出查询
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")
由于 elasticsearch-hadoop 知道正在进行的查询,因此它可以优化对 Elasticsearch 完成的请求。例如,给定以下查询
val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")
它知道只需要 name
和 id
字段(第一个返回给用户,第二个用于 Spark 的内部过滤),因此将仅请求这些数据,从而使查询非常高效。
从 Elasticsearch 读取 DataFrame
s (Spark SQL 1.3)
编辑正如您可能已经猜到的,可以定义由 Elasticsearch 文档支持的 DataFrame
。甚至更好的是,让它们由查询结果支持,从而有效地创建对数据的动态实时视图。
Scala
编辑通过 org.elasticsearch.spark.sql
包,esDF
方法在 SQLContext
API 上可用
import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ ... val sql = new SQLContext(sc) val people = sql.esDF("spark/people") // check the associated schema println(people.schema.treeString) // root // |-- name: string (nullable = true) // |-- surname: string (nullable = true) // |-- age: long (nullable = true)
Spark SQL Scala 导入 |
|
elasticsearch-hadoop SQL Scala 导入 |
|
创建一个由 Elasticsearch 中 |
|
从 Elasticsearch 发现的 |
|
请注意,在使用默认 Elasticsearch 映射时, |
就像 Spark core 支持一样,可以指定其他参数,例如查询。这是一个非常强大的概念,因为可以在源头(Elasticsearch)过滤数据,并且仅在结果上使用 Spark
控制 DataFrame
模式在某些情况下,尤其是当 Elasticsearch 中的索引包含大量字段时,需要创建一个仅包含其中一部分字段的 DataFrame
。虽然可以通过官方 Spark API 或通过专用查询来修改 DataFrame
(通过对其支持的 RDD
进行操作),但 elasticsearch-hadoop 允许用户指定在创建 DataFrame
时从 Elasticsearch 中包含和排除哪些字段。
通过 es.read.field.include
和 es.read.field.exclude
属性,可以指示从索引映射中包含或排除哪些字段。语法类似于 Elasticsearch 的 include/exclude。可以通过使用逗号来指定多个值。默认情况下,不指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。请注意,这些属性可以包含前导和尾随通配符。包含字段层次结构的一部分而不包含尾随通配符并不意味着包含整个层次结构。但是,在大多数情况下,仅包含层次结构的一部分是没有意义的,因此应该包含尾随通配符。
例如
# include es.read.field.include = *name, address.* # exclude es.read.field.exclude = *.created
由于 SparkSQL 使用 DataFrame
模式的方式,elasticsearch-hadoop 需要在执行实际查询之前知道从 Elasticsearch 返回哪些字段。虽然可以通过底层的 Elasticsearch 查询手动限制字段,但 elasticsearch-hadoop 并不知道这一点,结果可能会有所不同,或者更糟糕的是,会发生错误。请改用上面的属性,Elasticsearch 将在用户查询的同时正确使用这些属性。
Java
编辑对于 Java 用户,可以通过 JavaEsSpark SQL
提供专用 API。它与 EsSpark SQL
非常相似,但是它允许通过 Java 集合而不是 Scala 集合传入配置选项;除此之外,使用两者是完全相同的
import org.apache.spark.sql.api.java.JavaSQLContext; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... SQLContext sql = new SQLContext(sc); DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");
更好的是,DataFrame
可以由查询结果支持
Spark SQL 类型转换
编辑elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示
虽然 Spark SQL DataType
s 在 Scala 和 Java 中都有等效项,因此可以应用 RDD 转换,但语义略有不同 - 特别是由于 Spark SQL 处理它们的方式,java.sql
类型存在差异。
表 8. Spark SQL 1.3+ 转换表
Spark SQL DataType |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
地理类型转换表除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 会对地理类型执行自动模式检测,即 Elasticsearch geo_point
和 geo_shape
。由于每种类型都允许多种格式(geo_point
接受以 4 种不同方式指定的纬度和经度,而 geo_shape
允许各种类型(当前为 9 种))并且映射不提供此类信息,因此 elasticsearch-hadoop 将在启动时采样确定的地理字段,并检索包含所有相关字段的任意文档;它将解析它,从而确定必要的模式(例如,它可以判断 geo_point
是指定为 StringType
还是 ArrayType
)。
由于 Spark SQL 是强类型的,因此每个地理字段都需要在所有文档中具有相同的格式。如果不是这样,则返回的数据将不符合检测到的模式,从而导致错误。
Spark 结构化流支持
编辑在 6.0 中添加。
结构化流提供了快速、可扩展、容错、端到端且仅一次的流处理,而无需用户考虑流式传输。 |
||
-- Spark 文档 |
Spark 2.0 中以实验性功能发布,Spark 结构化流提供了集成到 Spark SQL 集成中的统一流式传输和批处理接口。从 elasticsearch-hadoop 6.0 开始,我们提供了将流式数据索引到 Elasticsearch 的本机功能。
与 Spark SQL 一样,结构化流使用结构化数据。所有条目都应具有相同的结构(相同数量的字段,具有相同的类型和名称)。不支持使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 DStream
。
支持的 Spark 结构化流版本
编辑自 Spark v2.2.0 起,Spark 结构化流被认为是通用可用的。因此,elasticsearch-hadoop 对结构化流的支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。与之前的 Spark SQL 类似,结构化流在接口被认为是稳定的之前,可能会在版本之间发生重大更改。
Spark 结构化流式处理支持位于 org.elasticsearch.spark.sql
和 org.elasticsearch.spark.sql.streaming
包下。它以 Dataset[_]
API 的形式与 Spark SQL 共享统一的接口。客户端可以像使用常规批处理 Dataset
一样,几乎以完全相同的方式与流式 Dataset
进行交互,只有 少数例外。
将流式 Dataset
(Spark SQL 2.0+) 写入 Elasticsearch
编辑通过 elasticsearch-hadoop,流支持的 Dataset
可以被索引到 Elasticsearch。
Scala
编辑在 Scala 中,要将基于流的 Dataset
和 DataFrame
保存到 Elasticsearch,只需配置流使用 "es"
格式写入,如下所示
import org.apache.spark.sql.SparkSession ... val spark = SparkSession.builder() .appName("EsStreamingExample") .getOrCreate() // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = spark.readStream .textFile("/path/to/people/files/*") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) people.writeStream .option("checkpointLocation", "/save/location") .format("es") .start("spark/people")
Spark SQL 导入 |
|
创建 |
|
不要调用 |
|
连续读取文本文件目录,并将其转换为 |
|
提供一个位置来保存流式查询的偏移量和提交日志 |
|
使用 |
Spark 不会对基于批处理和流的 Dataset
进行类型区分。虽然您可能能够导入 org.elasticsearch.spark.sql
包以将 saveToEs
方法添加到您的 Dataset
或 DataFrame
,但是如果在基于流的 Dataset
或 DataFrame
上调用这些方法,它将抛出一个非法参数异常。
Java
编辑类似地,"es"
格式也适用于 Java 用法
import org.apache.spark.sql.SparkSession ... SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); // java bean style class public static class PersonBean { private String name; private String surname; private int age; ... } Dataset<PersonBean> people = spark.readStream() .textFile("/path/to/people/files/*") .map(new MapFunction<String, PersonBean>() { @Override public PersonBean call(String value) throws Exception { return someFunctionThatParsesStringToJavaBeans(value.split(",")); } }, Encoders.<PersonBean>bean(PersonBean.class)); people.writeStream() .option("checkpointLocation", "/save/location") .format("es") .start("spark/people");
Spark SQL Java 导入。可以使用与 Scala 相同的会话类 |
|
创建 SparkSession。也可以使用旧的 |
|
我们创建一个 java bean 类作为我们的数据格式 |
|
使用 |
|
将我们的字符串数据转换为 PersonBean |
|
设置一个位置以保存我们流的状态 |
|
使用 |
将现有 JSON 写入 Elasticsearch
编辑当使用 Spark SQL 时,如果输入数据是 JSON 格式,只需将其转换为 Dataset
(对于 Spark SQL 2.0)(如 Spark 文档中所述),通过 DataStreamReader
的 json
格式。
Spark 结构化流中的 Sink 提交日志
编辑Spark 结构化流提供了一个端到端的容错、恰好一次的处理模型,该模型通过使用偏移量检查点和为每个流式查询维护提交日志来实现。在执行流式查询时,大多数源和接收器都需要您指定一个“checkpointLocation”以持久化作业的状态。如果发生中断,使用相同的检查点位置启动新的流式查询将恢复作业的状态并从中断处继续。我们在配置的检查点位置下的一个特殊目录中为 elasticsearch-hadoop 的 Elasticsearch 接收器实现维护一个提交日志
$> ls /path/to/checkpoint/location metadata offsets/ sinks/ $> ls /path/to/checkpoint/location/sinks elasticsearch/ $> ls /path/to/checkpoint/location/sinks/elasticsearch 12.compact 13 14 15 16 17 18
提交日志目录中的每个文件都对应于已提交的批处理 ID。日志实现会定期压缩日志以避免混乱。您可以通过多种方式设置日志目录的位置
- 使用
es.spark.sql.streaming.sink.log.path
设置显式日志位置(见下文)。 - 如果未设置该值,则将使用
checkpointLocation
指定的路径。 - 如果未设置该值,则将通过将 SparkSession 中的
spark.sql.streaming.checkpointLocation
的值与Dataset
给定的查询名称组合来构造路径。 - 如果不存在查询名称,则在上述情况下将使用随机 UUID 代替查询名称
- 如果未提供上述任何设置,则
start
调用将抛出异常
以下是影响 Elasticsearch 提交日志行为的配置列表
-
es.spark.sql.streaming.sink.log.enabled
(默认值true
) - 启用或禁用流式作业的提交日志。默认情况下,日志处于启用状态,并且将跳过具有相同批处理 ID 的输出批次,以避免重复写入。当此值设置为
false
时,提交日志将被禁用,并且所有输出都将发送到 Elasticsearch,无论它们是否在先前的执行中发送过。 -
es.spark.sql.streaming.sink.log.path
- 设置存储此流式查询的日志数据的位置。如果未设置此值,则 Elasticsearch 接收器将将其提交日志存储在
checkpointLocation
中给出的路径下。任何兼容 HDFS 客户端的 URI 都是可接受的。 -
es.spark.sql.streaming.sink.log.cleanupDelay
(默认值10m
) - 提交日志是通过 Spark 的 HDFS 客户端管理的。某些兼容 HDFS 的文件系统(例如 Amazon 的 S3)以异步方式传播文件更改。为了解决这个问题,在压缩一组日志文件后,客户端将等待这段时间,然后清理旧文件。
-
es.spark.sql.streaming.sink.log.deletion
(默认值true
) - 确定日志是否应删除不再需要的旧日志。在提交每个批处理后,客户端将检查是否有任何已压缩且可以安全删除的提交日志。如果设置为
false
,则日志将跳过此清理步骤,为每个批处理保留一个提交文件。 -
es.spark.sql.streaming.sink.log.compactInterval
(默认值10
) - 设置在压缩日志文件之前要处理的批次数。默认情况下,每 10 个批处理,提交日志将压缩为一个包含所有先前提交的批处理 ID 的文件。
Spark 结构化流类型转换
编辑结构化流使用与 Spark SQL 集成完全相同的类型转换规则。
如果使用自动索引创建,请查看本节以获取更多信息。
elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型,如下表所示
虽然 Spark SQL DataType
s 在 Scala 和 Java 中都有等效项,因此可以应用 RDD 转换,但语义略有不同 - 特别是由于 Spark SQL 处理它们的方式,java.sql
类型存在差异。
表 9. Spark SQL 1.3+ 转换表
Spark SQL DataType |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
使用 Map/Reduce 层
编辑另一种将 Spark 与 Elasticsearch 一起使用的方法是通过 Map/Reduce 层,即利用 elasticsearch-hadoop 中专用的 Input/OuputFormat
。但是,除非有人被困在 elasticsearch-hadoop 2.0 上,否则我们强烈建议使用本机集成,因为它提供了明显更好的性能和灵活性。
配置
编辑通过 elasticsearch-hadoop,Spark 可以通过其专用的 InputFormat
与 Elasticsearch 集成,在写入的情况下,通过 OutputFormat
集成。这些在 Map/Reduce 章节中进行了详细描述,因此请参阅该章节以获取深入解释。
简而言之,需要设置一个基本的 Hadoop Configuration
对象,其中包含目标 Elasticsearch 集群和索引,可能还有查询,然后就可以开始了。
从 Spark 的角度来看,唯一需要的是设置序列化 - Spark 默认依赖于 Java 序列化,这很方便,但效率相当低。这就是 Hadoop 本身引入了自己的序列化机制及其自身类型(即 Writable
)的原因。因此,InputFormat
和 OutputFormat
需要返回 Writable
,而 Spark 默认情况下不理解 Writable
。好消息是,可以轻松启用不同的序列化 (Kryo),它可以自动处理转换,并且效率也相当高。
SparkConf sc = new SparkConf(); //.setMaster("local"); sc.set("spark.serializer", KryoSerializer.class.getName()); // needed only when using the Java API JavaSparkContext jsc = new JavaSparkContext(sc);
或者,如果您喜欢 Scala
请注意,Kryo 序列化用作处理 Writable
类型的变通方法;可以选择直接转换类型(从 Writable
到 Serializable
类型) - 这很好,但对于入门来说,上面的单行代码似乎是最有效的。
从 Elasticsearch 读取数据
编辑要读取数据,只需传入 org.elasticsearch.hadoop.mr.EsInputFormat
类 - 由于它同时支持 old
和 new
Map/Reduce API,您可以自由地在 SparkContext
的 hadoopRDD
(我们建议为了简洁起见)或 newAPIHadoopRDD
上使用任一方法。无论您选择哪个,请坚持使用它,以避免在以后出现混乱和问题。
旧 (org.apache.hadoop.mapred
) API
编辑JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
创建 Hadoop 对象(使用旧的 API) |
|
配置源(索引) |
|
设置查询(可选) |
|
通过 |
下面的 Scala 版本
val conf = new JobConf() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();
新 (org.apache.hadoop.mapreduce
) API
编辑正如预期的那样,mapreduce
API 版本非常相似 - 将 hadoopRDD
替换为 newAPIHadoopRDD
,将 JobConf
替换为 Configuration
。就是这样。
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
创建 Hadoop 对象(使用新的 API) |
|
配置源(索引) |
|
设置查询(可选) |
|
通过 |
下面的 Scala 版本
从 PySpark 使用连接器
编辑得益于其 Map/Reduce 层,elasticsearch-hadoop 也可以从 PySpark 中使用,以便从 Elasticsearch 中读取和写入数据。为此,下面是来自 Spark 文档 的一个片段(请确保切换到 Python 代码片段)
$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
此外,SQL 加载器也可以使用
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") df.printSchema()