Apache Spark 支持
编辑Apache Spark 支持编辑
Apache Spark 是一个快速、通用的集群计算系统。它提供了 Java、Scala 和 Python 的高级 API,以及一个支持通用执行图的优化引擎。 |
||
-- Spark 网站 |
Spark 通过通常将数据*缓存*在内存中,为大型数据集提供快速迭代/类函数功能。与本文档中提到的其他库不同,Apache Spark 是一个不依赖于 Map/Reduce 本身的计算框架,但它确实与 Hadoop 集成,主要是与 HDFS 集成。 elasticsearch-hadoop 允许在 Spark 中以两种方式使用 Elasticsearch:通过自 2.1 版起提供的专用支持或通过自 2.0 版起提供的 Map/Reduce 桥接。自 5.0 版起,elasticsearch-hadoop 中支持 Spark 2.0
安装编辑
与其他库一样,elasticsearch-hadoop 需要在 Spark 的类路径中可用。由于 Spark 具有多种部署模式,因此这可以转换为目标类路径,无论是在单个节点上(如本地模式 - 将在整个文档中使用)还是在每个节点上,具体取决于所需的基础架构。
原生 RDD 支持编辑
在 2.1 中添加。
elasticsearch-hadoop 以 RDD
(弹性分布式数据集)(或更准确地说是*对* RDD
)的形式提供 Elasticsearch 和 Apache Spark 之间的*原生*集成,可以从 Elasticsearch 读取数据。 RDD
以两种*风格*提供:一种用于 Scala(将数据作为 Tuple2
返回,其中包含 Scala 集合),另一种用于 Java(将数据作为 Tuple2
返回,其中包含 java.util
集合)。
只要有可能,请考虑使用*原生*集成,因为它提供了最佳性能和最大灵活性。
配置编辑
要为 Apache Spark 配置 elasticsearch-hadoop,可以在 SparkConf
对象中设置 配置 一章中描述的各种属性
import org.apache.spark.SparkConf val conf = new SparkConf().setAppName(appName).setMaster(master) conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); conf.set("es.index.auto.create", "true");
**命令行**对于那些希望通过命令行设置属性的用户(直接设置或从文件加载),请注意 Spark *仅*接受以 "spark." 前缀开头的属性,并将*忽略*其余属性(并且可能会根据版本抛出警告)。要解决此限制,请通过附加 spark.
前缀来定义 elasticsearch-hadoop 属性(因此它们变为 spark.es.
),elasticsearch-hadoop 将自动解析它们
将数据写入 Elasticsearch编辑
使用 elasticsearch-hadoop,任何 RDD
都可以保存到 Elasticsearch,只要其内容可以转换为文档即可。实际上,这意味着 RDD
类型需要是 Map
(无论是 Scala 还是 Java)、JavaBean
还是 Scala case 类。如果不是这种情况,则可以轻松地在 Spark 中*转换*数据或插入自己的自定义 ValueWriter
。
Scala编辑
使用 Scala 时,只需导入 org.elasticsearch.spark
包,该包通过 pimp my library 模式,使用 saveToEs
方法丰富了*任何* RDD
API
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") sc.makeRDD( Seq(numbers, airports) ).saveToEs("spark/docs")
Spark Scala 导入 |
|
elasticsearch-hadoop Scala 导入 |
|
通过其 Scala API 启动 Spark |
|
|
|
在 Elasticsearch 的 |
Scala 用户可能会尝试使用 Seq
和 →
符号来声明*根*对象(即 JSON 文档),而不是使用 Map
。虽然类似,但第一个符号会导致略有不同的类型,这些类型无法与 JSON 文档匹配:Seq
是一个有序序列(换句话说,是一个列表),而 →
创建一个 Tuple
,它或多或少是一个有序的、固定数量的元素。因此,列表列表不能用作文档,因为它不能映射到 JSON 对象;但是它可以在一个对象中自由使用。因此,在上面的示例中,使用了 Map(k→v)
而不是 Seq(k→v)
作为上述*隐式*导入的替代方法,可以通过 org.elasticsearch.spark.rdd
包中的 EsSpark
在 Scala 中使用 elasticsearch-hadoop Spark 支持,它充当一个实用程序类,允许显式方法调用。此外,不要使用 Map
(它很方便,但由于结构不同,每个实例需要一个映射),而是使用*case 类*
import org.apache.spark.SparkContext import org.elasticsearch.spark.rdd.EsSpark // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd, "spark/docs")
对于需要指定文档的 ID(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置相应的 映射 来实现,即 es.mapping.id
。按照前面的示例,要指示 Elasticsearch 使用字段 id
作为文档 ID,请更新 RDD
配置(也可以在 SparkConf
上设置该属性,但由于其全局影响,因此不建议这样做)
EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java编辑
Java 用户有一个专用类,它提供类似于 EsSpark
的功能,即 org.elasticsearch.spark.rdd.api.java
中的 JavaEsSpark
(一个类似于 Spark 的 Java API 的包)
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs");
Spark Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark |
|
为了简化示例,使用 Guava(Spark 的依赖项) |
|
在两个集合上创建一个简单的 |
|
在 Elasticsearch 的 |
可以使用 Java 5 *static* 导入进一步简化代码。此外,Map
(由于其*松散*结构,其映射是动态的)可以用 JavaBean
替换
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize( ImmutableList.of(upcoming, lastWeek)); saveToEs(javaRDD, "spark/docs");
设置文档 ID(或其他元数据字段,如 ttl
或 timestamp
)与其 Scala 对应项类似,但根据您使用的是 JDK 类还是其他一些实用程序(如 Guava),可能会更加冗长
JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
将现有 JSON 写入 Elasticsearch编辑
对于 RDD
中的数据已经是 JSON 的情况,elasticsearch-hadoop 允许直接索引*而不*应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望包含 String
或字节数组(byte[]
/Array[Byte]
)的 RDD
,假设每个条目都表示一个 JSON 文档。如果 RDD
没有正确的签名,则无法应用 saveJsonToEs
方法(在 Scala 中,它们将不可用)。
Scala编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" new SparkContext(conf).makeRDD(Seq(json1, json2)) .saveJsonToEs("spark/json-trips")
Java编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");
|
|
注意 |
|
通过专用的 |
写入动态/多资源编辑
如果需要根据数据内容将写入 Elasticsearch 的数据索引到不同的存储桶下,可以使用 es.resource.write
字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下
Scala编辑
val game = Map( "media_type"->"game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")
对于要写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type
字段并使用其值来确定目标资源。
Java编辑
正如预期的那样,Java 中的情况惊人地相似
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); saveToEs(javaRDD, "my-collection-{media_type}/doc");
处理文档元数据编辑
Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射 选项,可以自定义这些参数,以便从其所属文档中提取它们的值。此外,您甚至可以包含/排除将哪些数据发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 *对* RDD
在文档本身 *外部* 提供元数据。换句话说,对于包含键值对的 RDD
,可以从键中提取元数据,并将该值用作文档源。
元数据通过 org.elasticsearch.spark.rdd
包中的 Metadata
Java 枚举 进行描述,该枚举标识其类型 - id
、ttl
、version
等… 因此,RDD
键可以是包含每个文档的 Metadata
及其关联值的 Map
。如果 RDD
键的类型不是 Map
,则 elasticsearch-hadoop 会将该对象视为表示文档 ID 并相应地使用它。这听起来比实际情况复杂,所以让我们看一些例子。
Scala编辑
对 RDD
,或者简单地说,签名为 RDD[(K,V)]
的 RDD
可以利用 saveToEsWithMeta
方法,这些方法可以通过 *隐式* 导入 org.elasticsearch.spark
包或 EsSpark
对象获得。要手动指定每个文档的 ID,只需在 RDD
中传入 Object
(不是 Map
类型)即可
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
|
|
|
|
由于 |
当需要指定多个 ID 时,应使用键的类型为 org.elasticsearch.spark.rdd.Metadata
的 scala.collection.Map
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
导入 |
|
用于 |
|
用于 |
|
用于 |
|
元数据和文档被组装成一个 *对* |
|
使用 |
Java编辑
类似地,在 Java 端,JavaEsSpark
提供了 saveToEsWithMeta
方法,这些方法应用于 JavaPairRDD
(Java 中 RDD[(K,V)]
的等效项)。因此,要根据 ID 保存文档,可以使用
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
通过使用 Scala |
|
第一个文档的元组,围绕 ID ( |
|
第二个文档的元组,围绕 ID ( |
|
使用键作为 ID,值作为文档,相应地保存 |
当需要指定多个 ID 时,可以选择使用填充了 org.elasticsearch.spark.rdd.Metadata
类型的键的 java.util.Map
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
描述可以声明的文档元数据的 |
|
枚举的静态导入,以简短格式( |
|
|
|
|
|
|
|
关联 |
|
对包含文档及其各自元数据的 |
从 Elasticsearch 读取数据编辑
对于读取,应该定义将数据从 Elasticsearch *流式传输* 到 Spark 的 Elasticsearch RDD
。
Scala编辑
与写入类似,org.elasticsearch.spark
包使用 esRDD
方法丰富了 SparkContext
API
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val RDD = sc.esRDD("radio/artists")
可以重载该方法以指定其他查询,甚至可以指定配置 Map
(覆盖 SparkConf
)
... import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) sc.esRDD("radio/artists", "?q=me*")
默认情况下,来自 Elasticsearch 的文档作为 Tuple2
返回,其中第一个元素包含文档 ID,第二个元素包含通过 Scala 集合 表示的实际文档,即一个 `Map[String, Any]`,其中键表示字段名称,值表示它们各自的值。
Java编辑
Java 用户有一个专用的 JavaPairRDD
,它的工作方式与其 Scala 对应物相同,但是返回的 Tuple2
值(或第二个元素)将文档作为本机 java.util
集合返回。
import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");
Spark Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark |
|
为索引 |
以类似的方式,可以使用重载的 esRDD
方法来指定查询或传递 Map
对象以进行高级配置。让我们看看它是如何工作的,但这次使用 Java 静态导入。此外,让我们丢弃文档 ID 并仅检索 RDD
值
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*; ... JavaRDD<Map<String, Object>> rdd = esRDD(jsc, "radio/artists", "?q=me*") .values();
静态导入 |
|
创建一个 |
|
仅返回 |
通过使用 JavaEsSpark
API,可以获得 Spark 专用的 JavaPairRDD
,它比基本 RDD
更适合 Java 环境(因为它具有 Scala 签名)。此外,专用的 RDD
将 Elasticsearch 文档作为正确的 Java 集合返回,因此您不必处理 Scala 集合(这通常是 RDD
的情况)。这在使用 Java 8 时尤其强大,我们强烈建议您使用它,因为它的 lambda 表达式 使集合处理 *非常* 简洁。
例如,假设您要过滤 RDD
中的文档,并仅返回包含 mega
的值的文档(请忽略您可以并且应该直接通过 Elasticsearch 进行过滤的事实)。
在 Java 8 之前的版本中,代码如下所示
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter( new Function<Map<String, Object>, Boolean>() { @Override public Boolean call(Map<String, Object> map) throws Exception { returns map.contains("mega"); } });
使用 Java 8,过滤变成了一行代码
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc -> doc.contains("mega"));
以 JSON 格式读取数据编辑
如果需要 Elasticsearch 的结果采用 JSON 格式(通常要将其发送到其他系统),则可以使用专用的 esJsonRDD
方法。在这种情况下,连接器将按从 Elasticsearch 接收到的原样返回文档内容,而无需任何处理,在 Scala 中为 RDD[(String, String)]
,在 Java 中为 JavaPairRDD[String, String]
,其中键表示文档 ID,值表示其 JSON 格式的实际内容。
类型转换编辑
elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(以及反向转换),如下表所示
表 4. Scala 类型转换表
Scala 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
空 |
|
根据表格的 |
|
|
|
|
case 类 |
|
|
|
此外,以下_隐式_转换适用于 Java 类型
表 5. Java 类型转换表
Java 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
转换尽力而为;保证内置的 Java 和 Scala 类型能够正确转换,但是对于 Java 或 Scala 中的用户类型,则不作任何保证。如上表所示,当在 Scala 中遇到 case
类或在 Java 中遇到 JavaBean
时,转换器会尝试_解包_其内容并将其保存为 对象
。请注意,这只适用于顶级用户对象 - 如果用户对象中嵌套了其他用户对象,则转换可能会失败,因为转换器不会执行嵌套_解包_。这样做是有意为之的,因为转换器必须对数据进行_序列化_和_反序列化_,而用户类型由于数据丢失而引入了歧义;这可以通过某种类型的映射来解决,但这会使项目过于接近 ORM 的领域,并且可以说引入了过多的复杂性,而收益却微乎其微;借助 Spark 中的处理功能和 elasticsearch-hadoop 中的可插拔性,可以轻松地将对象转换为其他类型,如果需要,只需付出最小的努力和最大的控制力。
**地理类型**值得一提的是,Elasticsearch 中仅提供的富数据类型,例如 GeoPoint
或 GeoShape
,可以通过将其结构转换为上表中可用的基元来支持。例如,根据其存储方式,geo_point
可能会作为 String
或 Traversable
返回。
Spark Streaming 支持编辑
添加于 5.0 版本。
Spark Streaming 是核心 Spark API 的扩展,支持对实时数据流进行可扩展、高吞吐量、容错的流处理。 |
||
-- Spark 网站 |
Spark Streaming 是核心 Spark 功能之上的扩展,允许对流数据进行近乎实时的处理。Spark Streaming 围绕 DStream
或_离散流_的概念工作。DStream
通过将新到达的记录收集到一个小的 RDD
中并执行它来操作。这会在每隔几秒钟重复一次,并在一个称为_微批处理_的过程中使用新的 RDD
。DStream
api 包含许多与 RDD
api 相同的处理操作,以及一些其他特定于流的方法。从 5.0 版本开始,elasticsearch-hadoop 提供了与 Spark Streaming 的原生集成。
使用 elasticsearch-hadoop Spark Streaming 支持时,可以将 Elasticsearch 作为输出位置,以便从 Spark Streaming 作业中将数据索引到其中,就像可以持久化 RDD
的结果一样。但是,与 RDD
不同,由于 DStream
的连续性,您无法使用它从 Elasticsearch 中读取数据。
Spark Streaming 支持提供特殊的优化,以便在运行处理窗口非常小的作业时,在 Spark 执行器上节省网络资源。因此,应该优先使用这种集成,而不是在从 DStream
的 foreachRDD
调用返回的 RDD
上调用 saveToEs
。
将 DStream
写入 Elasticsearch编辑
与 RDD
一样,只要可以将任何 DStream
的内容转换为文档,就可以将其保存到 Elasticsearch 中。在实践中,这意味着 DStream
类型需要是 Map
(Scala 或 Java 类型)、JavaBean
或 Scala case 类。如果不是这种情况,则可以轻松地在 Spark 中_转换_数据或插入自己的自定义 ValueWriter
。
Scala编辑
使用 Scala 时,只需导入 org.elasticsearch.spark.streaming
包,该包通过 _pimp my library_ 模式,使用 saveToEs
方法丰富了 DStream
API
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.elasticsearch.spark.streaming._ ... val conf = ... val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") val rdd = sc.makeRDD(Seq(numbers, airports)) val microbatches = mutable.Queue(rdd) ssc.queueStream(microbatches).saveToEs("spark/docs") ssc.start() ssc.awaitTermination()
Spark 和 Spark Streaming Scala 导入 |
|
elasticsearch-hadoop Spark Streaming 导入 |
|
通过其 Scala API 启动 Spark |
|
通过将 SparkContext 传递给 SparkStreaming 上下文来启动它。微批处理将每秒处理一次。 |
|
|
|
从 |
|
启动 Spark Streaming 作业并等待它最终完成。 |
作为上述_隐式_导入的替代方法,可以使用 org.elasticsearch.spark.streaming
包中的 EsSparkStreaming
在 Scala 中使用 elasticsearch-hadoop Spark Streaming 支持,它充当一个实用程序类,允许显式方法调用。此外,不要使用 Map
(它很方便,但由于结构不同,每个实例需要一个映射),而应使用_case 类_
import org.apache.spark.SparkContext import org.elasticsearch.spark.streaming.EsSparkStreaming // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) val microbatches = mutable.Queue(rdd) val dstream = ssc.queueStream(microbatches) EsSparkStreaming.saveToEs(dstream, "spark/docs") ssc.start()
|
|
定义一个名为 |
|
围绕 |
|
将 |
|
启动流处理 |
启动 SparkStreamingContext 后,将无法添加或配置新的 DStream
。上下文停止后,将无法重新启动。每个 JVM 一次只能有一个活动的 SparkStreamingContext。另请注意,以编程方式停止 SparkStreamingContext 时,它会停止底层的 SparkContext,除非指示不要这样做。
对于需要指定文档的 ID(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置相应的 映射(即 es.mapping.id
)来实现。在上一个示例之后,要指示 Elasticsearch 使用字段 id
作为文档 ID,请更新 DStream
配置(也可以在 SparkConf
上设置该属性,但由于其全局影响,不建议这样做)
EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Java编辑
Java 用户有一个专用类,它提供类似于 EsSparkStreaming
的功能,即 org.elasticsearch.spark.streaming.api.java
包中的 JavaEsSparkStreaming
(类似于 Spark 的 Java API)
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1)); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>(); microbatches.add(javaRDD); JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs"); jssc.start()
Spark 和 Spark Streaming Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark 和 Spark Streaming。微批处理将每秒处理一次。 |
|
为了简化示例,使用 Guava(Spark 的依赖项) |
|
在微批处理上创建一个简单的 |
|
在 Elasticsearch 的 |
|
执行流作业。 |
可以使用 Java 5 *static* 导入进一步简化代码。此外,Map
(由于其*松散*结构,其映射是动态的)可以用 JavaBean
替换
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek)); Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>(); microbatches.add(javaRDD); JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "spark/docs"); jssc.start()
静态导入 |
|
定义一个包含 |
|
调用 |
|
运行该 Streaming 作业 |
设置文档 ID(或其他元数据字段,如 ttl
或 timestamp
)与其 Scala 对应项类似,但根据您使用的是 JDK 类还是其他一些实用程序(如 Guava),可能会更加冗长
JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
将现有 JSON 写入 Elasticsearch编辑
对于 DStream
流式传输的数据已经序列化为 JSON 的情况,elasticsearch-hadoop 允许直接索引_而无需_应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 需要一个包含 String
或字节数组(byte[]
/Array[Byte]
)的 DStream
,假设每个条目都表示一个 JSON 文档。如果 DStream
没有正确的签名,则无法应用 saveJsonToEs
方法(在 Scala 中,它们将不可用)。
Scala编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val rdd = sc.makeRDD(Seq(json1, json2)) val microbatch = mutable.Queue(rdd) ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips") ssc.start()
Java编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaStreamingContext jssc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>(); microbatches.add(stringRDD); JavaDStream<String> stringDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); jssc.start()
|
|
创建一个 |
|
注意 |
|
配置流以通过专用 |
|
启动流式作业 |
写入动态/多资源编辑
如果需要根据数据内容将写入 Elasticsearch 的数据索引到不同的存储桶下,可以使用 es.resource.write
字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下
Scala编辑
val game = Map( "media_type" -> "game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") val batch = sc.makeRDD(Seq(game, book, cd)) val microbatches = mutable.Queue(batch) ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc") ssc.start()
对于要写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type
字段并使用其值来确定目标资源。
Java编辑
正如预期的那样,Java 中的情况惊人地相似
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); Queue<JavaRDD<Map<String, ?>>> microbatches = ... JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "my-collection-{media_type}/doc"); jssc.start();
处理文档元数据编辑
Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射 选项,可以自定义这些参数,以便从其所属文档中提取它们的值。此外,您甚至可以包含/排除将哪些数据发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 键值对 RDD
在文档外部提供元数据。
这在 Spark Streaming 中没有什么不同。对于包含键值对的 DStream
,可以从键中提取元数据,并将该值用作文档源。
元数据通过 org.elasticsearch.spark.rdd
包中的 Metadata
Java 枚举 进行描述,该枚举标识其类型 - id
、ttl
、version
等… 因此,DStream
的键可以是包含每个文档的 Metadata
及其关联值的 Map
。如果 DStream
键的类型不是 Map
,则 elasticsearch-hadoop 会将该对象视为表示文档 ID,并相应地使用它。这听起来比实际情况复杂,因此让我们看一些示例。
Scala编辑
键值对 DStream
,或者简单地说,签名为 DStream[(K,V)]
的 DStream
可以利用 saveToEsWithMeta
方法,这些方法可以通过隐式导入 org.elasticsearch.spark.streaming
包或 EsSparkStreaming
对象来使用。要为每个文档手动指定 ID,只需在 DStream
中传入 Object
(不是 Map
类型)即可
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
|
|
|
|
我们构造一个 |
|
由于生成的 |
当需要指定多个 ID 时,应使用键的类型为 org.elasticsearch.spark.rdd.Metadata
的 scala.collection.Map
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
导入 |
|
用于 |
|
用于 |
|
用于 |
|
元数据和文档被组装成一个 *对* |
|
|
|
|
Java编辑
类似地,在 Java 端,JavaEsSparkStreaming
提供了应用于 JavaPairDStream
(Java 中 DStream[(K,V)]
的等效项)的 saveToEsWithMeta
方法。
由于 Java API 的限制,这往往需要做更多的工作。例如,您不能直接从 JavaPairRDD
队列创建 JavaPairDStream
。相反,您必须创建一个 Tuple2
对象的常规 JavaDStream
,并将 JavaDStream
转换为 JavaPairDStream
。这听起来很复杂,但它是 API 限制的一个简单解决方法。
首先,我们将创建一个对函数,它接收一个 Tuple2
对象,并将其原样返回给框架
public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable { @Override public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception { return tuple2; } }
然后,我们将对函数应用于 Tuple2
的 JavaDStream
以创建一个 JavaPairDStream
并保存它
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... JavaStreamingContext jssc = ... // create an RDD of between the id and the docs JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize( ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
创建一个常规的 |
|
第一个文档的元组,围绕 ID ( |
|
第二个文档的元组,围绕 ID ( |
|
从元组 |
|
通过将我们的 |
|
|
当需要指定多个 ID 时,可以选择使用填充了 org.elasticsearch.spark.rdd.Metadata
类型的键的 java.util.Map
。我们将使用相同的类型技巧将 JavaDStream
重新打包为 JavaPairDStream
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
描述可以声明的文档元数据的 |
|
枚举的静态导入,以简短格式( |
|
|
|
|
|
|
|
关联 |
|
从 |
|
通过将 |
|
在包含文档及其各自元数据的 |
Spark Streaming 类型转换编辑
elasticsearch-hadoop Spark Streaming 支持利用与常规 Spark 类型映射相同的类型映射。为了保持一致性,此处重复列出了这些映射
表 6. Scala 类型转换表
Scala 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
空 |
|
根据表格的 |
|
|
|
|
case 类 |
|
|
|
此外,以下_隐式_转换适用于 Java 类型
表 7. Java 类型转换表
Java 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
地理类型值得一提的是,仅在 Elasticsearch 中可用的丰富数据类型(例如 GeoPoint
或 GeoShape
)可以通过将其结构转换为上表中可用的基元来支持。例如,根据其存储,geo_point
可能会作为 String
或 Traversable
返回。
Spark SQL 支持编辑
在 2.1 中添加。
Spark SQL 是一个用于结构化数据处理的 Spark 模块。它提供了一种称为 DataFrame 的编程抽象,还可以充当分布式 SQL 查询引擎。 |
||
-- Spark 网站 |
除了核心 Spark 支持之外,elasticsearch-hadoop 还提供了与 Spark SQL 的集成。换句话说,Elasticsearch 成为 Spark SQL 的本机源,因此可以从 Spark SQL 透明地索引和查询数据。
Spark SQL 使用结构化数据 - 换句话说,所有条目都应具有相同的结构(相同数量的字段,具有相同的类型和名称)。不支持使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 PairRDD
。
支持的 Spark SQL 版本编辑
Spark SQL 在成为一个成熟组件的同时,在不同版本之间仍在经历重大变化。Spark SQL 在 1.3 版本中成为一个稳定的组件,但是它与以前的版本不 向后兼容。此外,Spark 2.0 引入了一些重大变化,这些变化通过 Dataset
API 破坏了向后兼容性。elasticsearch-hadoop 通过两个不同的 jar 支持 Spark SQL 1.3-1.6 和 Spark SQL 2.0:elasticsearch-spark-1.x-<version>.jar
和 elasticsearch-hadoop-<version>.jar
支持 Spark SQL 1.3-1.6(或更高版本),而 elasticsearch-spark-2.0-<version>.jar
支持 Spark SQL 2.0。换句话说,除非您使用的是 Spark 2.0,否则请使用 elasticsearch-spark-1.x-<version>.jar
Spark SQL 支持在 org.elasticsearch.spark.sql
包下提供。
API 差异从 elasticsearch-hadoop 用户的角度来看,Spark SQL 1.3-1.6 和 Spark 2.0 之间的差异相当小。这份 文档 详细描述了这些差异,下面将简要介绍
-
DataFrame
与Dataset
- 1.3+ 中 Spark SQL 的核心单元是
DataFrame
。此 API 在 Spark 2.0 中仍然存在,但是它基于Dataset
- 统一 API 与专用 Java/Scala API
- 在 Spark SQL 2.0 中,通过引入
SparkSession
以及对 `Dataset`、`DataFrame` 和 `RDD` 使用相同的底层代码,API 得到了进一步统一。
从概念上讲,DataFrame
是 Dataset[Row]
,因此下面的文档将重点介绍 Spark SQL 1.3-1.6。
将 DataFrame
(Spark SQL 1.3+)写入 Elasticsearch编辑
使用 elasticsearch-hadoop,可以将 DataFrame
(或任何 Dataset
)索引到 Elasticsearch。
Scala编辑
在 Scala 中,只需导入 org.elasticsearch.spark.sql
包,它将使用 saveToEs
方法丰富给定的 DataFrame
类;虽然这些方法与 org.elasticsearch.spark
包中的方法具有相同的签名,但它们是为 DataFrame
实现而设计的。
// reusing the example from Spark SQL documentation import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.elasticsearch.spark.sql._ ... // sc = existing SparkContext val sqlContext = new SQLContext(sc) // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.saveToEs("spark/people")
Spark SQL 包导入 |
|
elasticsearch-hadoop Spark 包导入 |
|
将文本文件读取为*普通* |
|
通过 |
默认情况下,elasticsearch-hadoop 将忽略空值,以便不写入任何字段。由于 DataFrame
旨在被视为结构化表格数据,因此您可以通过将 es.spark.dataframe.write.null
设置切换为 true
来启用将空值写入 DataFrame
对象的空值字段。
Java编辑
类似地,对于 Java 使用,专用包 org.elasticsearch.spark.sql.api.java
通过 JavaEsSpark SQL
提供类似的功能
import org.apache.spark.sql.api.java.*; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... DataFrame people = ... JavaEsSparkSQL.saveToEs(people, "spark/people");
Spark SQL Java 导入 |
|
elasticsearch-hadoop Spark SQL Java 导入 |
|
在 Elasticsearch 中的 |
同样,使用 Java 5 *静态*导入,这可以进一步简化为
为了最大限度地控制 Elasticsearch 中 DataFrame
的映射,强烈建议您事先创建映射。有关更多信息,请参阅本章。
将现有 JSON 写入 Elasticsearch编辑
使用 Spark SQL 时,如果输入数据为 JSON 格式,只需通过 SQLContext
/JavaSQLContext
jsonFile
方法将其转换为 DataFrame
(在 Spark SQL 1.3 中)或 Dataset
(对于 Spark SQL 2.0)(如 Spark 文档中所述)。
使用纯 SQL 从 Elasticsearch 读取数据编辑
索引及其映射必须在创建临时表之前存在
Spark SQL 1.2 引入了一个新的API,用于从外部数据源读取数据,elasticsearch-hadoop 支持该 API,从而简化了与 Elasticsearch 交互所需的 SQL 配置。此外,它在幕后了解 Spark 执行的操作,因此可以优化数据和查询(例如过滤或剪枝),从而提高性能。
Spark SQL 中的数据源编辑
使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext
load
方法访问 Elasticsearch。换句话说,以*声明性*方式创建由 Elasticsearch 支持的 DataFrame
/Dataset
val sql = new SQLContext... // Spark 1.3 style val df = sql.load( "spark/index", "org.elasticsearch.spark.sql")
|
|
要加载的路径或资源 - 在这种情况下,Elasticsearch 中的索引/类型 |
|
数据源提供程序 - |
在 Spark 1.4 中,将使用以下类似的 API 调用
|
|
数据源提供程序 - |
|
要加载的路径或资源 - 在这种情况下,Elasticsearch 中的索引/类型 |
在 Spark 1.5 中,这可以进一步简化为
无论使用哪种 API,一旦创建,就可以自由访问 DataFrame
来操作数据。
*sources*声明还允许传入特定选项,即
名称 | 默认值 | 描述 |
---|---|---|
|
必需 |
Elasticsearch 索引/类型 |
|
|
是否将 Spark SQL 转换为 Elasticsearch 查询 DSL(*下推*) |
|
|
是否使用*精确*(非分析)匹配(分析) |
可在 Spark 1.6 或更高版本中使用 |
||
|
|
是否告诉 Spark 对下推的过滤器应用自己的过滤 |
下一节将解释这两个选项。要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map
传递给上述方法即可
例如
val sql = new SQLContext... // options for Spark 1.3 need to include the target path/resource val options13 = Map("path" -> "spark/index", "pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.3 style val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) // options for Spark 1.4 - the path/resource is specified separately val options = Map("pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.4 style val spark14DF = sql.read.format("org.elasticsearch.spark.sql") .options(options) .load("spark/index")
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', nodes 'someNode')" ) "
Spark 的临时表名 |
|
|
|
elasticsearch-hadoop 配置选项,必填选项为 |
请注意,由于 SQL 解析器的原因,不允许使用 .
(以及用于分隔的其他常见字符);连接器尝试通过自动追加 es.
前缀来解决此问题,但这仅适用于指定只有一个 .
的配置选项(如上面的 es.nodes
)。因此,如果需要具有多个 .
的属性,则应使用上面的 SQLContext.load
或 SQLContext.read
方法并将属性作为 Map
传递。
下推操作编辑
使用 elasticsearch-hadoop 作为 Spark source
的一个重要的*隐藏*特性是,连接器可以理解 DataFrame
/SQL 中执行的操作,并且默认情况下会将它们*转换*为相应的QueryDSL。换句话说,连接器直接在源头*下推*操作,在源头有效地过滤掉数据,以便*仅*将所需的数据流回 Spark。这显著提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回需要的数据(而不是返回大量数据,然后由 Spark 处理和丢弃)。请注意,即使指定了查询,下推操作也会应用 - 连接器将根据指定的 SQL *增强*查询。
另外,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的*所有* `Filter`,同时保留与 Spark 1.3.0 的向后二进制兼容性,将 SQL 操作完全下推到 Elasticsearch,而无需任何用户干预。
已优化为下推过滤器的运算符
SQL 语法 | ES 1.x/2.x 语法 | ES 5.x 语法 |
---|---|---|
= null , is_null |
missing |
must_not.exists |
= (严格) |
term |
term |
= (非严格) |
match |
match |
> , < , >= , ⇐ |
range |
range |
is_not_null |
exists |
exists |
in (严格) |
terms |
terms |
in (非严格) |
or.filters |
bool.should |
and |
and.filters |
bool.filter |
or |
or.filters |
bool.should [bool.filter] |
not |
not.filter |
bool.must_not |
StringStartsWith |
wildcard(arg*) |
wildcard(arg*) |
StringEndsWith |
wildcard(*arg) |
wildcard(*arg) |
StringContains |
wildcard(*arg*) |
wildcard(*arg*) |
EqualNullSafe (严格) |
term |
term |
EqualNullSafe (非严格) |
match |
match |
例如,请考虑以下 Spark SQL
// as a DataFrame val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips") df.printSchema() // root //|-- departure: string (nullable = true) //|-- arrival: string (nullable = true) //|-- days: long (nullable = true) val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))
或在纯 SQL 中
CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips") SELECT departure FROM trips WHERE arrival = "OTP" and days > 3
连接器将查询转换为
{ "query" : { "filtered" : { "query" : { "match_all" : {} }, "filter" : { "and" : [{ "query" : { "match" : { "arrival" : "OTP" } } }, { "days" : { "gt" : 3 } } ] } } } }
此外,下推过滤器可以作用于*分析*的词条(默认值),也可以配置为*严格*并提供*精确*匹配(仅作用于*未分析*的字段)。除非手动指定映射,否则强烈建议保留默认值。Elasticsearch 参考文档中详细讨论了此主题和其他主题。
请注意,自 elasticsearch-hadoop 2.2 for Spark 1.6 或更高版本起,double.filtering
允许 Spark(默认)处理/评估已下推到 Elasticsearch 的过滤器。关闭此功能(尤其是在处理大型数据集时)可以加快速度。但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式,*分析*与*未分析*)。通常,在启用*严格*时,也可以禁用 double.filtering
。
数据源作为表编辑
从 Spark SQL 1.2 开始,还可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问它
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', " + "scroll_size '20')" )
Spark 的临时表名 |
|
|
|
elasticsearch-hadoop 配置选项,必填选项为 |
|
由于使用 |
定义后,将自动获取架构。因此,您可以立即发出查询
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")
由于 elasticsearch-hadoop 知道正在进行的查询,因此它可以*优化*对 Elasticsearch 发出的请求。例如,给定以下查询
val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")
它知道只需要 name
和 id
字段(第一个返回给用户,第二个用于 Spark 的内部过滤),因此只会请求这些数据,从而使查询效率很高。
从 Elasticsearch 读取 DataFrame
(Spark SQL 1.3)编辑
正如您可能已经猜到的那样,可以定义一个由 Elasticsearch 文档支持的 DataFrame
。甚至更好的是,让它们由查询结果支持,从而有效地创建数据的动态实时*视图*。
Scala编辑
通过 org.elasticsearch.spark.sql
包,esDF
方法在 SQLContext
API 上可用
import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ ... val sql = new SQLContext(sc) val people = sql.esDF("spark/people") // check the associated schema println(people.schema.treeString) // root // |-- name: string (nullable = true) // |-- surname: string (nullable = true) // |-- age: long (nullable = true)
Spark SQL Scala 导入 |
|
elasticsearch-hadoop SQL Scala 导入 |
|
创建一个由 Elasticsearch 中的 |
|
从 Elasticsearch 发现的与 |
|
请注意,如 映射和类型 一章中所述,在使用默认 Elasticsearch 映射时, |
就像 Spark *core* 支持一样,可以指定其他参数,例如查询。这是一个非常*强大*的概念,因为可以在源(Elasticsearch)过滤数据,并且仅对结果使用 Spark
**控制 DataFrame
架构**在某些情况下,特别是当 Elasticsearch 中的索引包含大量字段时,希望创建一个仅包含其中*一部分*的 DataFrame
。虽然可以通过官方 Spark API 或通过专用查询修改 DataFrame
(通过处理其支持的 RDD
),但 elasticsearch-hadoop 允许用户指定在创建 DataFrame
时要包含和排除 Elasticsearch 中的哪些字段。
通过 es.read.field.include
和 es.read.field.exclude
属性,可以指示要包含或排除索引映射中的哪些字段。语法类似于 Elasticsearch 包含/排除。可以使用逗号指定多个值。默认情况下,不指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。请注意,这些属性可以包含前导和尾随通配符。包含字段层次结构的一部分而不包含尾随通配符并不意味着包含整个层次结构。但是,在大多数情况下,仅包含层次结构的一部分是没有意义的,因此应包含尾随通配符。
例如
# include es.read.field.include = *name, address.* # exclude es.read.field.exclude = *.created
由于 SparkSQL 使用 DataFrame
架构的方式,elasticsearch-hadoop 需要在执行实际查询*之前*了解 Elasticsearch 返回了哪些字段。虽然可以通过底层 Elasticsearch 查询手动限制字段,但 elasticsearch-hadoop 不知道这一点,结果可能会不同,或者更糟的是,会发生错误。请改用上面的属性,Elasticsearch 将与用户查询一起正确使用这些属性。
Java编辑
对于 Java 用户,JavaEsSpark SQL
提供了专用 API。它与 EsSpark SQL
惊人地相似,但是它允许通过 Java 集合而不是 Scala 集合传递配置选项;除此之外,使用两者完全相同
import org.apache.spark.sql.api.java.JavaSQLContext; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... SQLContext sql = new SQLContext(sc); DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");
更好的是,DataFrame
可以由查询结果支持
Spark SQL 类型转换编辑
elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(以及反向转换),如下表所示
虽然 Spark SQL DataType
在 Scala 和 Java 中都有等效项,因此 RDD 转换可以应用,但语义略有不同 - 特别是对于 java.sql
类型,因为 Spark SQL 处理它们的方式不同
表 8. Spark SQL 1.3+ 转换表
Spark SQL DataType |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**地理类型转换表**除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 对地理类型执行自动架构检测,即 Elasticsearch geo_point
和 geo_shape
。由于每种类型都允许多种格式(geo_point
接受以 4 种不同方式指定的纬度和经度,而 geo_shape
允许各种类型(目前为 9 种)),并且映射不提供此类信息,因此 elasticsearch-hadoop 将在启动时*采样*确定的地理字段,并检索包含所有相关字段的任意文档;它将解析它,从而确定必要的架构(例如,它可以判断 geo_point
是指定为 StringType
还是 ArrayType
)。
由于 Spark SQL 是强类型的,因此每个地理字段在*所有*文档中都需要具有相同的格式。否则,返回的数据将不适合检测到的架构,从而导致错误。
Spark 结构化流支持编辑
在 6.0 中添加。
结构化流 提供快速、可扩展、容错、端到端的精确一次流处理,而用户无需推理流。 |
||
-- Spark 文档 |
作为 Spark 2.0 中的实验性功能发布,Spark 结构化流提供了一个内置于 Spark SQL 集成中的统一流和批处理接口。从 elasticsearch-hadoop 6.0 开始,我们提供将流数据索引到 Elasticsearch 的本机功能。
与 Spark SQL 一样,结构化流使用*结构化*数据。所有条目都应具有*相同*的结构(相同数量的字段,具有相同的类型和名称)。*不支持*使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 DStream
。
支持的 Spark 结构化流版本编辑
从 Spark v2.2.0 开始,Spark 结构化流被认为是*普遍可用*的。因此,elasticsearch-hadoop 对结构化流的支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。与其之前的 Spark SQL 类似,结构化流在接口被认为*稳定*之前,可能会在版本之间发生重大变化。
Spark 结构化流支持在 org.elasticsearch.spark.sql
和 org.elasticsearch.spark.sql.streaming
包下可用。它以 Dataset[_]
api 的形式与 Spark SQL 共享一个统一的接口。客户端可以以几乎与常规批处理 Dataset
完全相同的方式与流式 Dataset
交互,只有少数例外。
将流式 Datasets
(Spark SQL 2.0+)写入 Elasticsearch编辑
使用 elasticsearch-hadoop,可以将流支持的 Dataset
索引到 Elasticsearch。
Scala编辑
在 Scala 中,要将基于流的 Dataset
和 DataFrame
保存到 Elasticsearch,只需将流配置为使用 "es"
格式写出,如下所示
import org.apache.spark.sql.SparkSession ... val spark = SparkSession.builder() .appName("EsStreamingExample") .getOrCreate() // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = spark.readStream .textFile("/path/to/people/files/*") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) people.writeStream .option("checkpointLocation", "/save/location") .format("es") .start("spark/people")
Spark SQL 导入 |
|
创建 |
|
调用 |
|
连续读取文本文件目录并将它们转换为 |
|
提供一个位置来保存流式查询的偏移量和提交日志 |
|
使用 |
Spark 在基于批处理和基于流的 Dataset
之间没有基于类型的区别。虽然您可能能够导入 org.elasticsearch.spark.sql
包以将 saveToEs
方法添加到您的 Dataset
或 DataFrame
中,但如果在基于流的 Dataset
或 DataFrame
上调用这些方法,则会引发非法参数异常。
Java编辑
以类似的方式,"es"
格式也可用于 Java
import org.apache.spark.sql.SparkSession ... SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); // java bean style class public static class PersonBean { private String name; private String surname; private int age; ... } Dataset<PersonBean> people = spark.readStream() .textFile("/path/to/people/files/*") .map(new MapFunction<String, PersonBean>() { @Override public PersonBean call(String value) throws Exception { return someFunctionThatParsesStringToJavaBeans(value.split(",")); } }, Encoders.<PersonBean>bean(PersonBean.class)); people.writeStream() .option("checkpointLocation", "/save/location") .format("es") .start("spark/people");
Spark SQL Java 导入。可以使用与 Scala 相同的会话类 |
|
创建 SparkSession。也可以使用旧的 |
|
我们创建一个 java bean 类来用作我们的数据格式 |
|
使用 |
|
将我们的字符串数据转换为 PersonBean |
|
设置一个位置来保存我们流的状态 |
|
使用 |
将现有 JSON 写入 Elasticsearch编辑
使用 Spark SQL 时,如果输入数据为 JSON 格式,只需通过 DataStreamReader
的 json
格式将其转换为 Dataset
(对于 Spark SQL 2.0)(如 Spark 文档中所述)。
Spark 结构化流中的接收器提交日志编辑
Spark 结构化流式传输宣传了一种端到端的容错精确一次处理模型,该模型通过使用偏移量检查点和为每个流式查询维护提交日志来实现。执行流式查询时,大多数源和接收器都要求您指定“checkpointLocation”以持久化作业的状态。如果发生中断,使用相同的检查点位置启动新的流式查询将恢复作业状态并从中断处继续。我们在配置的检查点位置下的特殊目录中维护 elasticsearch-hadoop 的 Elasticsearch 接收器实现的提交日志
$> ls /path/to/checkpoint/location metadata offsets/ sinks/ $> ls /path/to/checkpoint/location/sinks elasticsearch/ $> ls /path/to/checkpoint/location/sinks/elasticsearch 12.compact 13 14 15 16 17 18
提交日志目录中的每个文件对应于已提交的批处理 ID。日志实现会定期压缩日志以避免混乱。您可以通过多种方式设置日志目录的位置
- 使用
es.spark.sql.streaming.sink.log.path
设置显式日志位置(见下文)。 - 如果未设置,则将使用
checkpointLocation
指定的路径。 - 如果未设置,则将通过将 SparkSession 中的
spark.sql.streaming.checkpointLocation
的值与Dataset
的给定查询名称组合来构造路径。 - 如果不存在查询名称,则在上述情况下将使用随机 UUID 代替查询名称
- 如果未提供上述任何设置,则
start
调用将抛出异常
以下是影响 Elasticsearch 提交日志行为的配置列表
-
es.spark.sql.streaming.sink.log.enabled
(默认值true
) - 启用或禁用流式作业的提交日志。默认情况下,日志处于启用状态,并且将跳过具有相同批处理 ID 的输出批处理以避免重复写入。当此设置为
false
时,提交日志将被禁用,并且所有输出都将发送到 Elasticsearch,而不管它们是否已在之前的执行中发送。 -
es.spark.sql.streaming.sink.log.path
- 设置存储此流式查询的日志数据的位置。如果未设置此值,则 Elasticsearch 接收器会将其提交日志存储在
checkpointLocation
中给定的路径下。任何兼容 HDFS 客户端的 URI 都是可以接受的。 -
es.spark.sql.streaming.sink.log.cleanupDelay
(默认值10m
) - 提交日志通过 Spark 的 HDFS 客户端进行管理。某些兼容 HDFS 的文件系统(如 Amazon 的 S3)以异步方式传播文件更改。为了解决这个问题,在一组日志文件被压缩后,客户端将在清理旧文件之前等待这段时间。
-
es.spark.sql.streaming.sink.log.deletion
(默认值true
) - 确定日志是否应删除不再需要的旧日志。在提交每个批处理后,客户端将检查是否有任何已压缩且可以安全删除的提交日志。如果设置为
false
,则日志将跳过此清理步骤,为每个批处理保留一个提交文件。 -
es.spark.sql.streaming.sink.log.compactInterval
(默认值10
) - 设置在压缩日志文件之前要处理的批处理数。默认情况下,每 10 个批处理,提交日志将被压缩成一个包含所有先前提交的批处理 ID 的文件。
Spark 结构化流式传输类型转换编辑
结构化流式传输使用与 Spark SQL 集成完全相同的类型转换规则。
如果使用自动索引创建,请查看本节以获取更多信息。
elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型,如下表所示
虽然 Spark SQL DataType
在 Scala 和 Java 中都有等效项,因此 RDD 转换可以应用,但语义略有不同 - 特别是对于 java.sql
类型,因为 Spark SQL 处理它们的方式不同
表 9. Spark SQL 1.3+ 转换表
Spark SQL DataType |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
使用 Map/Reduce 层编辑
将 Spark 与 Elasticsearch 结合使用的另一种方法是通过 Map/Reduce 层,即利用 elasticsearch-hadoop 中专用的 Input/OuputFormat
。但是,除非您坚持使用 elasticsearch-hadoop 2.0,否则我们*强烈*建议您使用原生集成,因为它提供了更好的性能和灵活性。
配置编辑
通过 elasticsearch-hadoop,Spark 可以通过其专用的 InputFormat
与 Elasticsearch 集成,并且在写入时,可以通过 OutputFormat
与 Elasticsearch 集成。这些内容在Map/Reduce一章中有详细介绍,因此请参阅该章节以获取深入的解释。
简而言之,您需要使用目标 Elasticsearch 集群和索引(可能还有一个查询)设置一个基本的 Hadoop Configuration
对象,然后就可以开始了。
从 Spark 的角度来看,唯一需要做的是设置序列化 - Spark 默认依赖于 Java 序列化,这很方便,但效率相当低。这就是 Hadoop 本身引入自己的序列化机制和自己的类型(即 Writable
s)的原因。因此,InputFormat
和 OutputFormat
s 需要返回 Spark 无法理解的 Writable
s。好消息是,您可以轻松启用不同的序列化(Kryo),它可以自动处理转换,并且效率也很高。
SparkConf sc = new SparkConf(); //.setMaster("local"); sc.set("spark.serializer", KryoSerializer.class.getName()); // needed only when using the Java API JavaSparkContext jsc = new JavaSparkContext(sc);
或者,如果您更喜欢 Scala
请注意,Kryo 序列化用作处理 Writable
类型的变通方法;您可以选择直接转换类型(从 Writable
到 Serializable
类型)- 这很好,但是对于入门来说,上面的一行代码似乎是最有效的。
从 Elasticsearch 读取数据编辑
要读取数据,只需传入 org.elasticsearch.hadoop.mr.EsInputFormat
类 - 因为它同时支持 旧的
和 新的
Map/Reduce API,因此您可以自由使用 SparkContext
's、hadoopRDD
(为了简洁起见,我们推荐使用此方法)或 newAPIHadoopRDD
上的任何一种方法。无论您选择哪种方法,都要坚持下去,以避免将来出现混淆和问题。
*旧的* (org.apache.hadoop.mapred
) API编辑
JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
创建 Hadoop 对象(使用旧 API) |
|
配置源(索引) |
|
设置查询(可选) |
|
通过 |
Scala 版本如下
val conf = new JobConf() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();
*新的* (org.apache.hadoop.mapreduce
) API编辑
不出所料,mapreduce
API 版本非常相似 - 将 hadoopRDD
替换为 newAPIHadoopRDD
,将 JobConf
替换为 Configuration
。就是这样。
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
创建 Hadoop 对象(使用新 API) |
|
配置源(索引) |
|
设置查询(可选) |
|
通过 |
Scala 版本如下
从 PySpark 使用连接器编辑
由于其Map/Reduce层,elasticsearch-hadoop 也可以从 PySpark 中使用,以读取和写入 Elasticsearch 数据。为此,以下是Spark 文档中的一个片段(请确保切换到 Python 片段)
$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
此外,也可以使用 SQL 加载器
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") df.printSchema()