Apache Spark 支持
编辑Apache Spark 支持
编辑
Apache Spark 是一个快速且通用的集群计算系统。它提供 Java、Scala 和 Python 中的高级 API,以及支持通用执行图的优化引擎。 |
||
-- Spark 网站 |
Spark 通过通常将数据缓存在内存中,在大型数据集上提供快速迭代/类似函数的功能。与本文档中提到的其他库相反,Apache Spark 是一个计算框架,它本身不依赖于 Map/Reduce,但它确实与 Hadoop 集成,主要是与 HDFS 集成。elasticsearch-hadoop 允许 Elasticsearch 以两种方式在 Spark 中使用:通过自 2.1 版以来提供的专用支持,或通过自 2.0 版以来的 Map/Reduce 桥接。自 5.0 版开始,elasticsearch-hadoop 支持 Spark 2.0。
安装
编辑与其他库一样,elasticsearch-hadoop 需要在 Spark 的类路径中可用。由于 Spark 具有多种部署模式,因此这可以转换为目标类路径,无论它是在一个节点上(如本地模式 - 将在整个文档中使用),还是根据所需的基础设施在每个节点上。
原生 RDD 支持
编辑在 2.1 中添加。
elasticsearch-hadoop 在 Elasticsearch 和 Apache Spark 之间提供了原生集成,以 RDD
(弹性分布式数据集)(或更准确地说是Pair RDD
)的形式,该数据集可以读取 Elasticsearch 中的数据。该 RDD
提供两种风格:一种用于 Scala(它将数据作为带有 Scala 集合的 Tuple2
返回),另一种用于 Java(它将数据作为包含 java.util
集合的 Tuple2
返回)。
在可能的情况下,请考虑使用原生集成,因为它提供了最佳性能和最大灵活性。
配置
编辑要为 Apache Spark 配置 elasticsearch-hadoop,可以在 配置 章节中描述的 SparkConf
对象中设置各种属性。
import org.apache.spark.SparkConf val conf = new SparkConf().setAppName(appName).setMaster(master) conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); conf.set("es.index.auto.create", "true");
命令行对于希望通过命令行(直接或通过从文件加载)设置属性的用户,请注意 Spark仅接受以“spark.”为前缀的属性,并将忽略其余属性(并且根据版本可能会发出警告)。为了解决此限制,通过附加 spark.
前缀来定义 elasticsearch-hadoop 属性(因此它们变为 spark.es.
),elasticsearch-hadoop 将自动解析它们。
将数据写入 Elasticsearch
编辑使用 elasticsearch-hadoop,任何 RDD
都可以保存到 Elasticsearch,只要其内容可以转换为文档即可。在实践中,这意味着 RDD
类型需要是 Map
(无论是 Scala 还是 Java),一个 JavaBean
或一个 Scala case class。如果不是这种情况,可以轻松地在 Spark 中转换数据或插入他们自己的自定义 ValueWriter
。
Scala
编辑当使用 Scala 时,只需导入 org.elasticsearch.spark
包,该包通过 增强我的库 模式,用 saveToEs
方法丰富任何 RDD
API。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") sc.makeRDD( Seq(numbers, airports) ).saveToEs("spark/docs")
Spark Scala 导入 |
|
elasticsearch-hadoop Scala 导入 |
|
通过其 Scala API 启动 Spark |
|
|
|
将内容(即两个文档(数字和机场))索引到 Elasticsearch 中的 |
Scala 用户可能会尝试使用 Seq
和 →
符号来声明根对象(即 JSON 文档),而不是使用 Map
。虽然相似,但第一个符号会导致略微不同的类型,这些类型无法与 JSON 文档匹配:Seq
是一个有序序列(换句话说,是一个列表),而 →
创建一个 Tuple
,它或多或少是一个有序的、固定数量的元素。因此,列表列表不能用作文档,因为它无法映射到 JSON 对象;但是它可以在其中自由使用。因此,在上面的示例中,使用 Map(k→v)
而不是 Seq(k→v)
。
作为上述隐式导入的替代方案,可以通过 org.elasticsearch.spark.rdd
包中的 EsSpark
在 Scala 中使用 elasticsearch-hadoop Spark 支持,它充当实用程序类,允许显式方法调用。此外,不要使用 Map
(它们很方便,但由于其结构差异,每个实例都需要一个映射),而是使用case class。
import org.apache.spark.SparkContext import org.elasticsearch.spark.rdd.EsSpark // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd, "spark/docs")
对于需要指定文档 ID(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置相应的 映射(即 es.mapping.id
)来实现。按照前面的示例,为了指示 Elasticsearch 使用字段 id
作为文档 ID,请更新 RDD
配置(也可以在 SparkConf
上设置属性,但由于其全局影响,不建议这样做)。
EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑Java 用户有一个专用的类,它提供了与 EsSpark
类似的功能,即 org.elasticsearch.spark.rdd.api.java
中的 JavaEsSpark
(一个类似于 Spark 的 Java API 的包)。
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs");
Spark Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark |
|
为了简化示例,请使用 Guava(Spark 的依赖项) |
|
在两个集合上创建一个简单的 |
|
将内容(即两个文档(数字和机场))索引到 Elasticsearch 中的 |
可以通过使用 Java 5 的静态导入进一步简化代码。此外,Map
(其映射由于其松散结构而具有动态性)可以用 JavaBean
替换。
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize( ImmutableList.of(upcoming, lastWeek)); saveToEs(javaRDD, "spark/docs");
静态导入 |
|
定义一个包含 |
|
调用 |
设置文档 ID(或其他元数据字段,如 ttl
或 timestamp
)类似于其 Scala 对应物,尽管根据您是否使用 JDK 类或其他实用程序(如 Guava)可能会稍微冗长一些。
JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
将现有的 JSON 写入 Elasticsearch
编辑对于 RDD
中的数据已存在于 JSON 中的情况,elasticsearch-hadoop 允许直接索引,无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望一个包含 String
或字节数组(byte[]
/Array[Byte]
)的 RDD
,假设每个条目都表示一个 JSON 文档。如果 RDD
没有正确的签名,则无法应用 saveJsonToEs
方法(在 Scala 中,它们将不可用)。
Scala
编辑val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" new SparkContext(conf).makeRDD(Seq(json1, json2)) .saveJsonToEs("spark/json-trips")
Java
编辑String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");
|
|
注意 |
|
通过专用的 |
写入动态/多资源
编辑对于需要将数据索引到不同存储桶(基于数据内容)的 Elasticsearch 中的情况,可以使用 es.resource.write
字段,该字段接受一个在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下。
Scala
编辑val game = Map( "media_type"->"game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")
对于每个即将写入的文档/对象,elasticsearch-hadoop 将提取 media_type
字段并使用其值来确定目标资源。
Java
编辑正如预期的那样,Java 中的事情非常相似。
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); saveToEs(javaRDD, "my-collection-{media_type}/doc");
处理文档元数据
编辑Elasticsearch 允许每个文档拥有自己的元数据。如上所述,通过各种映射选项,可以自定义这些参数,以便从其所属的文档中提取其值。此外,甚至可以包含/排除发送回 Elasticsearch 的数据部分。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用键值对 RDD
在文档本身外部提供元数据。换句话说,对于包含键值元组的RDD
,元数据可以从键中提取,而值用作文档源。
元数据通过 org.elasticsearch.spark.rdd
包中的 Metadata
Java 枚举进行描述,该枚举标识其类型 - id
、ttl
、version
等。因此,RDD
键可以是包含每个文档的 Metadata
及其关联值的 Map
。如果 RDD
键不是 Map
类型,则 elasticsearch-hadoop 将认为该对象表示文档 ID 并相应地使用它。这听起来比实际情况更复杂,所以让我们看一些例子。
Scala
编辑键值对 RDD
,或者简单地说,签名为 RDD[(K,V)]
的 RDD
可以利用 saveToEsWithMeta
方法,这些方法可以通过 org.elasticsearch.spark
包的隐式导入或 EsSpark
对象获得。要为每个文档手动指定 ID,只需在您的 RDD
中传入 Object
(不是 Map
类型)。
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
|
|
|
|
由于 |
当需要指定除 ID 之外的更多信息时,应使用键类型为 org.elasticsearch.spark.rdd.Metadata
的 scala.collection.Map
。
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
导入 |
|
用于 |
|
用于 |
|
用于 |
|
元数据和文档被组装成一个键值对 |
|
使用 |
Java
编辑类似地,在 Java 端,JavaEsSpark
提供了应用于 JavaPairRDD
(RDD[(K,V)]
在 Java 中的等效项)的 saveToEsWithMeta
方法。因此,要根据文档 ID 保存文档,可以使用
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
通过使用围绕文档 ID 和文档本身的 Scala |
|
围绕 ID ( |
|
围绕 ID ( |
|
使用键作为 ID 和值作为文档相应地保存 |
当需要指定除 ID 之外的更多信息时,可以选择使用填充了键类型为 org.elasticsearch.spark.rdd.Metadata
的 java.util.Map
。
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
|
|
用于 |
|
|
|
|
|
|
|
将 |
|
在包含文档及其各自元数据的 |
从 Elasticsearch 读取数据
编辑对于读取,应定义从 Elasticsearch 将数据流式传输到 Spark 的 Elasticsearch RDD
。
Scala
编辑与写入类似,org.elasticsearch.spark
包使用 esRDD
方法丰富了 SparkContext
API。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val RDD = sc.esRDD("radio/artists")
Spark Scala 导入 |
|
elasticsearch-hadoop Scala 导入 |
|
通过其 Scala API 启动 Spark |
|
为索引 |
该方法可以重载以指定其他查询甚至配置 Map
(覆盖 SparkConf
)。
... import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) sc.esRDD("radio/artists", "?q=me*")
默认情况下,Elasticsearch 中的文档以 Tuple2
的形式返回,第一个元素是文档 ID,第二个元素是通过 Scala 集合表示的实际文档,即一个 `Map[String, Any]`,其中键表示字段名称,值表示其各自的值。
Java
编辑Java 用户有一个专用的 JavaPairRDD
,其工作方式与其 Scala 对应项相同,但是返回的 Tuple2
值(或第二个元素)将文档作为本机 java.util
集合返回。
import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");
Spark Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark |
|
为索引 |
类似地,可以使用重载的 esRDD
方法来指定查询或传递 Map
对象以进行高级配置。让我们看看它是什么样子,但这次使用Java 静态导入。此外,让我们丢弃文档 ID 并仅检索 RDD
值。
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*; ... JavaRDD<Map<String, Object>> rdd = esRDD(jsc, "radio/artists", "?q=me*") .values();
静态导入 |
|
创建一个 |
|
仅返回 |
通过使用 JavaEsSpark
API,可以获得 Spark 专用的 JavaPairRDD
,它在 Java 环境中比基本 RDD
更适合(因为它具有 Scala 签名)。此外,专用的 RDD
将 Elasticsearch 文档作为正确的 Java 集合返回,因此不必处理 Scala 集合(这通常是使用 RDD
的情况)。当使用 Java 8 时,这一点尤其强大,我们强烈建议使用它,因为它的lambda 表达式使集合处理变得极其简洁。
为此,让我们假设想要过滤 RDD
中的文档,并仅返回包含包含 mega
的值的那些文档(请忽略可以直接通过 Elasticsearch 进行过滤的事实)。
在 Java 8 之前的版本中,代码如下所示
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter( new Function<Map<String, Object>, Boolean>() { @Override public Boolean call(Map<String, Object> map) throws Exception { returns map.contains("mega"); } });
使用 Java 8,过滤变成了一行代码
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc -> doc.contains("mega"));
以 JSON 格式读取数据
编辑如果需要以 JSON 格式获取 Elasticsearch 的结果(通常是发送到其他系统),则可以使用专用的 esJsonRDD
方法。在这种情况下,连接器将按原样返回从 Elasticsearch 收到的文档内容,而无需任何处理,在 Scala 中以 RDD[(String, String)]
或在 Java 中以 JavaPairRDD[String, String]
的形式返回,其中键表示文档 ID,值表示其以 JSON 格式表示的实际内容。
类型转换
编辑elasticsearch-hadoop 自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示。
表 4. Scala 类型转换表
Scala 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
空 |
|
|
|
|
|
|
case class |
|
|
|
此外,以下隐式转换适用于 Java 类型。
表 5. Java 类型转换表
Java 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
转换尽最大努力完成;内置的 Java 和 Scala 类型保证能够正确转换,但是对于 Java 或 Scala 中的用户类型没有保证。如上表所述,当在 Scala 中遇到 case
类或在 Java 中遇到 JavaBean
时,转换器将尝试 unwrap
其内容并将其保存为 object
。请注意,这仅适用于顶级用户对象 - 如果用户对象嵌套了其他用户对象,则转换很可能失败,因为转换器不执行嵌套的 unwrapping
。这样做是有意的,因为转换器必须序列化和反序列化数据,而用户类型由于数据丢失而引入了歧义;这可以通过某种类型的映射来解决,但这会使项目过于接近 ORM 的领域,并且可以说引入的复杂性过高而收益甚微;由于 Spark 中的处理功能和 elasticsearch-hadoop 中的可插拔性,可以轻松地将对象转换为其他类型,如果需要,只需付出最少的努力即可获得最大的控制。
地理类型值得一提的是,Elasticsearch 中独有的丰富数据类型,例如 GeoPoint
或 GeoShape
,通过将其结构转换为上表中提供的基本类型来支持。例如,基于其存储方式,geo_point
可能会作为 String
或 Traversable
返回。
Spark Streaming 支持
编辑在 5.0 中添加。
Spark Streaming 是核心 Spark API 的扩展,它支持对实时数据流进行可扩展、高吞吐量、容错的流处理。 |
||
-- Spark 网站 |
Spark Streaming 是核心 Spark 功能的扩展,允许对流数据进行近乎实时的处理。Spark Streaming 围绕 DStream
或离散化流的概念工作。 DStreams
通过将新到达的记录收集到一个小的 RDD
中并执行它来运行。此操作每隔几秒钟重复一次,并在一个称为微批处理的过程中使用新的 RDD
。 DStream
api 包含许多与 RDD
api 相同的处理操作,以及一些其他特定于流的方法。从 5.0 版开始,elasticsearch-hadoop 提供了与 Spark Streaming 的原生集成。
使用 elasticsearch-hadoop Spark Streaming 支持时,可以将 Elasticsearch 作为目标输出位置,以便从 Spark Streaming 作业中将数据索引到其中,这与持久化 RDD
的结果的方式相同。但是,与 RDD
不同,由于其连续性,无法使用 DStream
从 Elasticsearch 中读取数据。
Spark Streaming 支持提供了特殊的优化,以便在使用非常小的处理窗口运行作业时,可以节省 Spark 执行器上的网络资源。因此,应该优先使用此集成,而不是在 DStream
上的 foreachRDD
调用返回的 RDD
上调用 saveToEs
。
将 DStream
写入 Elasticsearch
编辑与 RDD
一样,只要其内容可以转换为文档,任何 DStream
都可以保存到 Elasticsearch 中。在实践中,这意味着 DStream
类型需要是 Map
(Scala 或 Java 中的任何一个)、JavaBean
或 Scala case 类。如果不是这种情况,可以轻松地在 Spark 中转换数据或插入他们自己的自定义 ValueWriter
。
Scala
编辑使用 Scala 时,只需导入 org.elasticsearch.spark.streaming
包,该包通过 pimp my library 模式,使用 saveToEs
方法丰富了 DStream
API
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.elasticsearch.spark.streaming._ ... val conf = ... val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") val rdd = sc.makeRDD(Seq(numbers, airports)) val microbatches = mutable.Queue(rdd) ssc.queueStream(microbatches).saveToEs("spark/docs") ssc.start() ssc.awaitTermination()
Spark 和 Spark Streaming Scala 导入 |
|
elasticsearch-hadoop Spark Streaming 导入 |
|
通过其 Scala API 启动 Spark |
|
通过传递 SparkContext 启动 SparkStreaming 上下文。微批次将每秒处理一次。 |
|
|
|
从 |
|
启动 Spark Streaming 作业并等待其最终完成。 |
作为上述隐式导入的替代方案,可以通过 org.elasticsearch.spark.streaming
包中的 EsSparkStreaming
在 Scala 中使用 elasticsearch-hadoop Spark Streaming 支持,它充当实用程序类,允许显式方法调用。此外,不要使用 Map
(它们很方便,但由于其结构差异,每个实例都需要一个映射),而应使用案例类
import org.apache.spark.SparkContext import org.elasticsearch.spark.streaming.EsSparkStreaming // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) val microbatches = mutable.Queue(rdd) val dstream = ssc.queueStream(microbatches) EsSparkStreaming.saveToEs(dstream, "spark/docs") ssc.start()
|
|
定义一个名为 |
|
围绕 |
|
通过 |
|
启动流式处理过程 |
启动 SparkStreamingContext 后,无法添加或配置新的 DStream
。上下文停止后,无法重新启动。每个 JVM 每次只能有一个活动的 SparkStreamingContext。还要注意,以编程方式停止 SparkStreamingContext 时,它会停止底层的 SparkContext,除非指示它不要这样做。
对于需要指定文档的 id(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置相应的 映射(即 es.mapping.id
)来实现。按照前面的示例,要指示 Elasticsearch 使用字段 id
作为文档 id,请更新 DStream
配置(也可以在 SparkConf
上设置属性,但由于其全局影响,不建议这样做)
EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑Java 用户有一个专用类,它提供了与 EsSparkStreaming
类似的功能,即 org.elasticsearch.spark.streaming.api.java
包中的 JavaEsSparkStreaming
(一个类似于 Spark 的 Java API 的包)
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1)); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>(); microbatches.add(javaRDD); JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs"); jssc.start()
Spark 和 Spark Streaming Java 导入 |
|
elasticsearch-hadoop Java 导入 |
|
通过其 Java API 启动 Spark 和 Spark Streaming。微批次将每秒处理一次。 |
|
为了简化示例,请使用 Guava(Spark 的依赖项) |
|
在微批次上创建一个简单的 |
|
将内容(即两个文档(数字和机场))索引到 Elasticsearch 中的 |
|
执行流式处理作业。 |
可以通过使用 Java 5 的静态导入进一步简化代码。此外,Map
(其映射由于其松散结构而具有动态性)可以用 JavaBean
替换。
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek)); Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>(); microbatches.add(javaRDD); JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "spark/docs"); jssc.start()
静态导入 |
|
定义一个包含 |
|
调用 |
|
运行该流式处理作业 |
设置文档 ID(或其他元数据字段,如 ttl
或 timestamp
)类似于其 Scala 对应物,尽管根据您是否使用 JDK 类或其他实用程序(如 Guava)可能会稍微冗长一些。
JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
将现有 JSON 写入 Elasticsearch
编辑对于 DStream
流式传输的数据已作为 JSON 序列化的案例,elasticsearch-hadoop 允许直接索引无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望 DStream
包含 String
或字节数组 (byte[]
/Array[Byte]
),假设每个条目代表一个 JSON 文档。如果 DStream
没有正确的签名,则无法应用 saveJsonToEs
方法(在 Scala 中,它们将不可用)。
Scala
编辑val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val rdd = sc.makeRDD(Seq(json1, json2)) val microbatch = mutable.Queue(rdd) ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips") ssc.start()
Java
编辑String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaStreamingContext jssc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>(); microbatches.add(stringRDD); JavaDStream<String> stringDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); jssc.start()
|
|
创建 |
|
注意 |
|
通过专用的 |
|
启动流作业 |
写入动态/多资源
编辑对于需要将数据索引到不同存储桶(基于数据内容)的 Elasticsearch 中的情况,可以使用 es.resource.write
字段,该字段接受一个在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下。
Scala
编辑val game = Map( "media_type" -> "game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") val batch = sc.makeRDD(Seq(game, book, cd)) val microbatches = mutable.Queue(batch) ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc") ssc.start()
对于每个即将写入的文档/对象,elasticsearch-hadoop 将提取 media_type
字段并使用其值来确定目标资源。
Java
编辑正如预期的那样,Java 中的事情非常相似。
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); Queue<JavaRDD<Map<String, ?>>> microbatches = ... JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "my-collection-{media_type}/doc"); jssc.start();
处理文档元数据
编辑Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射 选项,可以自定义这些参数,以便从其所属的文档中提取其值。此外,甚至可以包含/排除发送回 Elasticsearch 的数据部分。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 对 RDD
在文档本身外部提供元数据。
在 Spark Streaming 中,这没有什么不同。对于包含键值元组的 DStreams
,可以从键中提取元数据,并将值用作文档源。
元数据通过 org.elasticsearch.spark.rdd
包中的 Metadata
Java 枚举 进行描述,该枚举标识其类型 - id
、ttl
、version
等。因此,DStream
的键可以是一个 Map
,其中包含每个文档的 Metadata
及其关联的值。如果 DStream
的键不是 Map
类型,则 elasticsearch-hadoop 会将该对象视为表示文档 ID 并相应地使用它。这听起来可能比实际情况复杂,让我们看一些示例。
Scala
编辑配对 DStreams
,或者简单地说,具有签名 DStream[(K,V)]
的 DStreams
可以利用 org.elasticsearch.spark.streaming
包或 EsSparkStreaming
对象通过隐式导入提供的 saveToEsWithMeta
方法。要手动为每个文档指定 ID,只需在 DStream
中传入 Object
(不是 Map
类型)。
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
|
|
|
|
我们构建一个 |
|
由于生成的 |
当需要指定除 ID 之外的更多信息时,应使用键类型为 org.elasticsearch.spark.rdd.Metadata
的 scala.collection.Map
。
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
导入 |
|
用于 |
|
用于 |
|
用于 |
|
元数据和文档被组装成一个键值对 |
|
|
|
|
Java
编辑类似地,在 Java 端,JavaEsSparkStreaming
提供了应用于 JavaPairDStream
(DStream[(K,V)]
在 Java 中的等价物)的 saveToEsWithMeta
方法。
由于 Java API 的限制,这往往需要做更多工作。例如,您不能直接从 JavaPairRDD
队列创建 JavaPairDStream
。相反,您必须创建一个常规的 Tuple2
对象的 JavaDStream
,并将 JavaDStream
转换为 JavaPairDStream
。这听起来很复杂,但这是对 API 限制的一个简单的解决方法。
首先,我们将创建一个配对函数,它接收一个 Tuple2
对象,并将其直接返回给框架。
public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable { @Override public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception { return tuple2; } }
然后,我们将配对函数应用于 Tuple2
的 JavaDStream
以创建 JavaPairDStream
并保存它。
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... JavaStreamingContext jssc = ... // create an RDD of between the id and the docs JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize( ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
创建一个常规的 Scala |
|
围绕 ID ( |
|
围绕 ID ( |
|
从元组 |
|
通过将我们的 |
|
|
当需要指定的内容不仅仅是 ID 时,可以选择使用一个 java.util.Map
,该 Map 使用 org.elasticsearch.spark.rdd.Metadata
类型的键进行填充。我们将使用相同的类型转换技巧将 JavaDStream
重新打包为 JavaPairDStream
。
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
|
|
用于 |
|
|
|
|
|
|
|
将 |
|
从 |
|
通过在其上映射 |
|
在包含文档及其相应元数据的 |
Spark Streaming 类型转换
编辑elasticsearch-hadoop Spark Streaming 支持利用与常规 Spark 类型映射相同的类型映射。为了保持一致性,此处重复了映射。
表 6. Scala 类型转换表
Scala 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
空 |
|
|
|
|
|
|
case class |
|
|
|
此外,以下隐式转换适用于 Java 类型。
表 7. Java 类型转换表
Java 类型 | Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
**地理类型**值得再次提及的是,仅在 Elasticsearch 中可用的丰富数据类型,例如 GeoPoint
或 GeoShape
通过将其结构转换为上表中提供的基本类型来支持。例如,根据其存储,geo_point
可能作为 String
或 Traversable
返回。
Spark SQL 支持
编辑在 2.1 中添加。
Spark SQL 是一个用于结构化数据处理的 Spark 模块。它提供了一个称为 DataFrame 的编程抽象,并且还可以充当分布式 SQL 查询引擎。 |
||
-- Spark 网站 |
在核心 Spark 支持的基础上,elasticsearch-hadoop 还提供了与 Spark SQL 的集成。换句话说,Elasticsearch 成为 Spark SQL 的*原生*数据源,以便可以从 Spark SQL *透明地*索引和查询数据。
Spark SQL 使用*结构化*数据 - 换句话说,预期所有条目都具有*相同*的结构(相同数量的字段,相同类型和名称)。不支持使用非结构化数据(具有不同结构的文档),这会导致问题。对于此类情况,请使用 PairRDD
。
支持的 Spark SQL 版本
编辑Spark SQL 虽然已成为一个成熟的组件,但在版本之间仍在经历重大变化。Spark SQL 在 1.3 版中成为一个稳定的组件,但是它与以前的版本不 向后兼容。此外,Spark 2.0 引入了重大更改,这些更改通过 Dataset
API 打破了向后兼容性。elasticsearch-hadoop 支持 Spark SQL 1.3-1.6 和 Spark SQL 2.0 这两个版本,通过两个不同的 jar:elasticsearch-spark-1.x-<version>.jar
和 elasticsearch-hadoop-<version>.jar
支持 Spark SQL 1.3-1.6(或更高版本),而 elasticsearch-spark-2.0-<version>.jar
支持 Spark SQL 2.0。换句话说,除非您使用的是 Spark 2.0,否则请使用 elasticsearch-spark-1.x-<version>.jar
。
Spark SQL 支持位于 org.elasticsearch.spark.sql
包下。
**API 差异**从 elasticsearch-hadoop 用户的角度来看,Spark SQL 1.3-1.6 和 Spark 2.0 之间的差异相当统一。这篇文档详细描述了差异,下面简要提到了这些差异。
-
DataFrame
与Dataset
- 1.3+ 版中 Spark SQL 的核心单元是
DataFrame
。此 API 保留于 Spark 2.0 中,但在其下基于Dataset
。 - 统一 API 与专用 Java/Scala API
- 在 Spark SQL 2.0 中,通过引入
SparkSession
并对 `Dataset`、`DataFrame` 和 `RDD` 使用相同的底层代码,API 进一步统一。
从概念上讲,DataFrame
是一个 Dataset[Row]
,以下文档将重点介绍 Spark SQL 1.3-1.6。
将 DataFrame
(Spark SQL 1.3+)写入 Elasticsearch
编辑使用 elasticsearch-hadoop,可以将 DataFrame
(或任何 Dataset
)索引到 Elasticsearch。
Scala
编辑在 Scala 中,只需导入 org.elasticsearch.spark.sql
包,该包使用 saveToEs
方法丰富给定的 DataFrame
类;虽然这些方法与 org.elasticsearch.spark
包具有相同的签名,但它们是为 DataFrame
实现设计的。
// reusing the example from Spark SQL documentation import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.elasticsearch.spark.sql._ ... // sc = existing SparkContext val sqlContext = new SQLContext(sc) // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.saveToEs("spark/people")
Spark SQL 包导入 |
|
elasticsearch-hadoop Spark 包导入 |
|
将文本文件作为*普通* |
|
通过 |
默认情况下,elasticsearch-hadoop 会忽略空值,而不是写入任何字段。由于 DataFrame
旨在被视为结构化表格数据,因此您可以通过将 es.spark.dataframe.write.null
设置切换到 true
来启用将空值作为空值字段写入仅限 DataFrame
对象。
Java
编辑类似地,对于 Java 使用,专用包 org.elasticsearch.spark.sql.api.java
通过 JavaEsSpark SQL
提供类似的功能。
import org.apache.spark.sql.api.java.*; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... DataFrame people = ... JavaEsSparkSQL.saveToEs(people, "spark/people");
Spark SQL Java 导入 |
|
elasticsearch-hadoop Spark SQL Java 导入 |
|
在 Elasticsearch 中的 |
同样,使用 Java 5 *静态*导入,这可以进一步简化为
为了最大程度地控制在 Elasticsearch 中 DataFrame
的映射,强烈建议事先创建映射。有关更多信息,请参阅此章节。
将现有 JSON 写入 Elasticsearch
编辑使用 Spark SQL 时,如果输入数据为 JSON 格式,只需将其转换为 DataFrame
(在 Spark SQL 1.3 中)或 Dataset
(对于 Spark SQL 2.0)(如 Spark 文档中所述)通过 SQLContext
/JavaSQLContext
的 jsonFile
方法。
使用纯 SQL 从 Elasticsearch 读取数据
编辑在创建临时表之前,索引及其映射必须存在。
Spark SQL 1.2 引入了一个用于从外部数据源读取数据的新 API,该 API 由 elasticsearch-hadoop 支持,简化了与 Elasticsearch 交互所需的 SQL 配置。此外,它在后台了解 Spark 执行的操作,因此可以优化数据和查询(例如过滤或剪枝),从而提高性能。
Spark SQL 中的数据源
编辑使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext
的 load
方法访问 Elasticsearch。换句话说,以声明式的方式创建由 Elasticsearch 支持的 DataFrame
/Dataset
。
val sql = new SQLContext... // Spark 1.3 style val df = sql.load( "spark/index", "org.elasticsearch.spark.sql")
|
|
要加载的路径或资源 - 在此情况下为 Elasticsearch 中的索引/类型。 |
|
数据源提供程序 - |
在 Spark 1.4 中,可以使用以下类似的 API 调用。
|
|
数据源提供程序 - |
|
要加载的路径或资源 - 在此情况下为 Elasticsearch 中的索引/类型。 |
在 Spark 1.5 中,可以进一步简化为:
无论使用哪种 API,一旦创建,就可以自由地访问 DataFrame
来操作数据。
sources 声明还允许传入特定的选项,即
名称 | 默认值 | 描述 |
---|---|---|
|
必需 |
Elasticsearch 索引/类型 |
|
|
是否将 Spark SQL 转换为 Elasticsearch 查询 DSL(下推)。 |
|
|
是否使用精确(未分析)匹配或非精确(已分析)匹配。 |
可在 Spark 1.6 或更高版本中使用。 |
||
|
|
是否告诉 Spark 对下推的过滤器应用其自身的过滤。 |
这两个选项将在下一节中解释。要指定选项(包括通用 elasticsearch-hadoop 选项),只需将 Map
传递给上述方法即可。
例如:
val sql = new SQLContext... // options for Spark 1.3 need to include the target path/resource val options13 = Map("path" -> "spark/index", "pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.3 style val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) // options for Spark 1.4 - the path/resource is specified separately val options = Map("pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.4 style val spark14DF = sql.read.format("org.elasticsearch.spark.sql") .options(options) .load("spark/index")
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', nodes 'someNode')" ) "
Spark 的临时表名称。 |
|
|
|
elasticsearch-hadoop 配置选项,其中必选项为 |
请注意,由于 SQL 解析器的原因,不允许使用 .
(以及其他用于分隔的常用字符);连接器尝试通过自动附加 es.
前缀来解决此问题,但这仅适用于指定只有一个 .
的配置选项(如上面的 es.nodes
)。因此,如果需要具有多个 .
的属性,则应使用上面的 SQLContext.load
或 SQLContext.read
方法并将属性作为 Map
传递。
下推操作
编辑使用 elasticsearch-hadoop 作为 Spark source
的一个重要的隐藏功能是,连接器理解在 DataFrame
/SQL 中执行的操作,并且默认情况下会将它们转换为相应的 QueryDSL。换句话说,连接器将操作下推到数据源,在那里有效地过滤掉数据,以便仅将所需的数据流回 Spark。这显着提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为仅返回所需的数据(而不是批量返回数据,然后再由 Spark 处理和丢弃)。请注意,即使指定了查询,下推操作也适用 - 连接器会根据指定的 SQL 对其进行增强。
顺便说一句,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的所有`Filter`,同时保留与 Spark 1.3.0 的向后二进制兼容性,将 SQL 操作完全下推到 Elasticsearch,无需任何用户干预。
已作为下推过滤器优化的运算符
SQL 语法 | ES 1.x/2.x 语法 | ES 5.x 语法 |
---|---|---|
= null , is_null |
missing |
must_not.exists |
= (严格) |
term |
term |
= (非严格) |
match |
match |
> , < , >= , ⇐ |
range |
range |
is_not_null |
exists |
exists |
in (严格) |
terms |
terms |
in (非严格) |
or.filters |
bool.should |
and |
and.filters |
bool.filter |
or |
or.filters |
bool.should [bool.filter] |
not |
not.filter |
bool.must_not |
StringStartsWith |
wildcard(arg*) |
wildcard(arg*) |
StringEndsWith |
wildcard(*arg) |
wildcard(*arg) |
StringContains |
wildcard(*arg*) |
wildcard(*arg*) |
EqualNullSafe (严格) |
term |
term |
EqualNullSafe (非严格) |
match |
match |
例如,考虑以下 Spark SQL:
// as a DataFrame val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips") df.printSchema() // root //|-- departure: string (nullable = true) //|-- arrival: string (nullable = true) //|-- days: long (nullable = true) val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))
或在纯 SQL 中:
CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips") SELECT departure FROM trips WHERE arrival = "OTP" and days > 3
连接器将查询转换为:
{ "query" : { "filtered" : { "query" : { "match_all" : {} }, "filter" : { "and" : [{ "query" : { "match" : { "arrival" : "OTP" } } }, { "days" : { "gt" : 3 } } ] } } } }
此外,下推过滤器可以作用于analyzed
项(默认值),也可以配置为严格并提供exact
匹配(仅作用于not-analyzed
字段)。除非手动指定映射,否则强烈建议保留默认值。Elasticsearch 参考文档中详细讨论了此主题和其他主题。
请注意,double.filtering
(从 elasticsearch-hadoop 2.2 开始,适用于 Spark 1.6 或更高版本)允许也由 Spark 处理/评估已下推到 Elasticsearch 的过滤器(默认值)或不处理/评估。关闭此功能,尤其是在处理大型数据集时,可以加快速度。但是,应注意语义,因为关闭此功能可能会返回不同的结果(具体取决于数据的索引方式,analyzed
与 not_analyzed
)。通常,在打开strict 时,也可以禁用 double.filtering
。
数据源作为表
编辑从 Spark SQL 1.2 开始,还可以通过将其声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源。
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', " + "scroll_size '20')" )
Spark 的临时表名称。 |
|
|
|
elasticsearch-hadoop 配置选项,其中必选项为 |
|
由于使用 |
定义后,将自动获取模式。因此,可以立即发出查询。
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")
由于 elasticsearch-hadoop 了解正在执行的查询,因此它可以优化对 Elasticsearch 的请求。例如,给定以下查询:
val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")
它知道仅需要 name
和 id
字段(第一个字段返回给用户,第二个字段用于 Spark 的内部过滤),因此将仅请求此数据,从而使查询非常高效。
从 Elasticsearch 读取 DataFrame
(Spark SQL 1.3)
编辑您可能已经猜到了,可以定义一个由 Elasticsearch 文档支持的 DataFrame
。或者更好的是,让它们由查询结果支持,有效地创建对数据的动态、实时的视图。
Scala
编辑通过 org.elasticsearch.spark.sql
包,esDF
方法在 SQLContext
API 上可用。
import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ ... val sql = new SQLContext(sc) val people = sql.esDF("spark/people") // check the associated schema println(people.schema.treeString) // root // |-- name: string (nullable = true) // |-- surname: string (nullable = true) // |-- age: long (nullable = true)
Spark SQL Scala 导入 |
|
elasticsearch-hadoop SQL Scala 导入 |
|
创建由 Elasticsearch 中的 |
|
从 Elasticsearch 中发现的与 |
|
请注意,在使用默认 Elasticsearch 映射时, |
与 Spark 核心支持一样,可以指定其他参数,例如查询。这是一个非常强大的概念,因为可以在源(Elasticsearch)处过滤数据,并且仅在结果上使用 Spark。
控制 DataFrame
模式在某些情况下,尤其是在 Elasticsearch 中的索引包含大量字段时,希望创建一个仅包含其中一部分字段的 DataFrame
。虽然可以通过官方 Spark API 或通过专用查询修改 DataFrame
(通过处理其支持的 RDD
),但 elasticsearch-hadoop 允许用户在创建 DataFrame
时指定要从 Elasticsearch 中包含和排除哪些字段。
通过 es.read.field.include
和 es.read.field.exclude
属性,可以指示要从索引映射中包含或排除哪些字段。语法类似于 Elasticsearch include/exclude。可以通过使用逗号来指定多个值。默认情况下,未指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。请注意,这些属性可以包含前导和尾随通配符。包含字段层次结构的一部分而没有尾随通配符并不意味着整个层次结构都被包含在内。但是,在大多数情况下,仅包含层次结构的一部分没有意义,因此应包含尾随通配符。
例如:
# include es.read.field.include = *name, address.* # exclude es.read.field.exclude = *.created
由于 SparkSQL 与 DataFrame
模式的工作方式,elasticsearch-hadoop 需要在执行实际查询之前了解从 Elasticsearch 返回的字段。虽然可以通过底层 Elasticsearch 查询手动限制字段,但 elasticsearch-hadoop 并不知道这一点,并且结果可能不同,或者更糟糕的是,会发生错误。请改用上述属性,Elasticsearch 将与用户查询一起正确使用这些属性。
Java
编辑对于 Java 用户,可以通过 JavaEsSpark SQL
使用专用的 API。它与 EsSpark SQL
非常相似,但允许通过 Java 集合而不是 Scala 集合传递配置选项;除此之外,使用这两个 API 的方式完全相同。
import org.apache.spark.sql.api.java.JavaSQLContext; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... SQLContext sql = new SQLContext(sc); DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");
更好的是,DataFrame
可以由查询结果支持。
Spark SQL 类型转换
编辑elasticsearch-hadoop 自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示。
虽然 Spark SQL 的 DataType
s 在 Scala 和 Java 中都有等价类型,因此 RDD 转换可以适用,但语义略有不同——特别是对于 java.sql
类型,因为 Spark SQL 处理它们的方式有所不同。
表 8. Spark SQL 1.3+ 转换表
Spark SQL DataType |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**地理类型转换表**除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 会自动检测地理类型的模式,即 Elasticsearch 的 geo_point
和 geo_shape
。由于每种类型都允许多种格式(geo_point
接受以 4 种不同方式指定的经纬度,而 geo_shape
允许多种类型(目前为 9 种)),并且映射不提供此类信息,因此 elasticsearch-hadoop 会在启动时采样确定的地理字段并检索包含所有相关字段的任意文档;它将解析它并因此确定必要的模式(例如,它可以判断 geo_point
是指定为 StringType
还是 ArrayType
)。
由于 Spark SQL 是强类型的,因此每个地理字段需要在所有文档中具有相同的格式。否则,返回的数据将不符合检测到的模式,从而导致错误。
Spark 结构化流支持
编辑在 6.0 版本中添加。
结构化流 提供快速、可扩展、容错、端到端精确一次的流处理,而无需用户考虑流处理。 |
||
-- Spark 文档 |
作为 Spark 2.0 中的实验性功能发布,Spark 结构化流提供了一个统一的流和批处理接口,该接口内置于 Spark SQL 集成中。从 elasticsearch-hadoop 6.0 开始,我们提供了将流数据索引到 Elasticsearch 的原生功能。
与 Spark SQL 一样,结构化流使用结构化数据。所有条目都应具有相同的结构(相同数量的字段,类型和名称相同)。不支持使用非结构化数据(具有不同结构的文档),这会导致问题。对于此类情况,请使用 DStream
s。
支持的 Spark 结构化流版本
编辑Spark 结构化流从 Spark v2.2.0 开始被认为是普遍可用的。因此,elasticsearch-hadoop 对结构化流的支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。与之前的 Spark SQL 类似,在接口被认为稳定之前,结构化流可能会在版本之间发生重大变化。
Spark 结构化流支持在 org.elasticsearch.spark.sql
和 org.elasticsearch.spark.sql.streaming
包下可用。它与 Spark SQL 共享一个统一的接口,形式为 Dataset[_]
api。客户端可以以几乎与常规批处理 Dataset
s 完全相同的方式与流 Dataset
s 交互,仅有少数例外。
将流 Datasets
(Spark SQL 2.0+)写入 Elasticsearch
编辑使用 elasticsearch-hadoop,可以将流支持的 Dataset
s 索引到 Elasticsearch。
Scala
编辑在 Scala 中,要将基于流的 Dataset
s 和 DataFrame
s 保存到 Elasticsearch,只需配置流以使用 "es"
格式写入,如下所示
import org.apache.spark.sql.SparkSession ... val spark = SparkSession.builder() .appName("EsStreamingExample") .getOrCreate() // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = spark.readStream .textFile("/path/to/people/files/*") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) people.writeStream .option("checkpointLocation", "/save/location") .format("es") .start("spark/people")
Spark SQL 导入 |
|
创建 |
|
不要调用 |
|
连续读取文本文件目录并将它们转换为 |
|
提供一个位置来保存流查询的偏移量和提交日志 |
|
使用 |
Spark 在基于批处理和基于流的 Dataset
s 之间没有基于类型的区别。虽然您可以导入 org.elasticsearch.spark.sql
包以向您的 Dataset
或 DataFrame
添加 saveToEs
方法,但如果这些方法在基于流的 Dataset
s 或 DataFrame
s 上调用,则会抛出非法参数异常。
Java
编辑类似地,"es"
格式也可用于 Java。
import org.apache.spark.sql.SparkSession ... SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); // java bean style class public static class PersonBean { private String name; private String surname; private int age; ... } Dataset<PersonBean> people = spark.readStream() .textFile("/path/to/people/files/*") .map(new MapFunction<String, PersonBean>() { @Override public PersonBean call(String value) throws Exception { return someFunctionThatParsesStringToJavaBeans(value.split(",")); } }, Encoders.<PersonBean>bean(PersonBean.class)); people.writeStream() .option("checkpointLocation", "/save/location") .format("es") .start("spark/people");
Spark SQL Java 导入。可以使用与 Scala 相同的会话类 |
|
创建 SparkSession。也可以使用旧的 |
|
我们创建一个 Java bean 类作为我们的数据格式 |
|
使用 |
|
将我们的字符串数据转换为我们的 PersonBean |
|
设置一个保存流状态的位置 |
|
使用 |
将现有的 JSON 写入 Elasticsearch
编辑在使用 Spark SQL 时,如果输入数据为 JSON 格式,只需将其转换为 Dataset
(对于 Spark SQL 2.0)(如 Spark 文档 中所述)通过 DataStreamReader
的 json
格式。
Spark 结构化流中的接收器提交日志
编辑Spark 结构化流宣传了一种端到端容错精确一次处理模型,该模型通过使用偏移量检查点和维护每个流查询的提交日志来实现。在执行流查询时,大多数源和接收器都需要您指定一个“checkpointLocation”以持久化作业的状态。如果发生中断,则使用相同的检查点位置启动新的流查询将恢复作业的状态并从中断处继续执行。我们为 elasticsearch-hadoop 的 Elasticsearch 接收器实现维护了一个提交日志,该日志位于配置的检查点位置下的特殊目录中
$> ls /path/to/checkpoint/location metadata offsets/ sinks/ $> ls /path/to/checkpoint/location/sinks elasticsearch/ $> ls /path/to/checkpoint/location/sinks/elasticsearch 12.compact 13 14 15 16 17 18
提交日志目录中的每个文件都对应于已提交的批处理 ID。日志实现定期压缩日志以避免混乱。您可以通过多种方式设置日志目录的位置
- 使用
es.spark.sql.streaming.sink.log.path
设置显式日志位置(请参见下文)。 - 如果未设置,则将使用
checkpointLocation
指定的路径。 - 如果未设置,则将通过组合 SparkSession 中的
spark.sql.streaming.checkpointLocation
的值和Dataset
给定的查询名称来构造路径。 - 如果不存在查询名称,则在上述情况下将使用随机 UUID 代替查询名称
- 如果未提供上述任何设置,则
start
调用将抛出异常
以下是影响 Elasticsearch 提交日志行为的配置列表
-
es.spark.sql.streaming.sink.log.enabled
(默认值为true
) - 启用或禁用流作业的提交日志。默认情况下,日志已启用,并且将跳过具有相同批处理 ID 的输出批处理以避免重复写入。当此设置为
false
时,提交日志将被禁用,并且所有输出都将发送到 Elasticsearch,而不管它们是否已在先前的执行中发送过。 -
es.spark.sql.streaming.sink.log.path
- 设置存储此流查询的日志数据的位置。如果未设置此值,则 Elasticsearch 接收器将在
checkpointLocation
中给定的路径下存储其提交日志。任何与 HDFS 客户端兼容的 URI 都可以接受。 -
es.spark.sql.streaming.sink.log.cleanupDelay
(默认值为10m
) - 提交日志通过 Spark 的 HDFS 客户端进行管理。某些与 HDFS 兼容的文件系统(如 Amazon 的 S3)以异步方式传播文件更改。为了解决此问题,在一组日志文件被压缩后,客户端将等待此时间段,然后再清理旧文件。
-
es.spark.sql.streaming.sink.log.deletion
(默认值为true
) - 确定是否应删除不再需要的旧日志。在每个批处理提交后,客户端将检查是否有任何已压缩且可以安全删除的提交日志。如果设置为
false
,则日志将跳过此清理步骤,为每个批处理留下一个提交文件。 -
es.spark.sql.streaming.sink.log.compactInterval
(默认值为10
) - 设置在压缩日志文件之前要处理的批处理数量。默认情况下,每 10 个批处理,提交日志将被压缩成一个包含所有先前已提交批处理 ID 的单个文件。
Spark 结构化流类型转换
编辑结构化流使用与 Spark SQL 集成完全相同的类型转换规则。
如果使用自动索引创建,请查看此部分以获取更多信息。
elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型,如下表所示
虽然 Spark SQL 的 DataType
s 在 Scala 和 Java 中都有等价类型,因此 RDD 转换可以适用,但语义略有不同——特别是对于 java.sql
类型,因为 Spark SQL 处理它们的方式有所不同。
表 9. Spark SQL 1.3+ 转换表
Spark SQL DataType |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
使用 Map/Reduce 层
编辑使用 Spark 与 Elasticsearch 的另一种方法是通过 Map/Reduce 层,即利用 elasticsearch-hadoop 中专用的 Input/OuputFormat
。但是,除非您坚持使用 elasticsearch-hadoop 2.0,否则我们强烈建议使用原生集成,因为它提供了更好的性能和灵活性。
配置
编辑通过 elasticsearch-hadoop,Spark 可以通过其专用的 InputFormat
与 Elasticsearch 集成,在写入的情况下,通过 OutputFormat
集成。这些在Map/Reduce章节中有详细描述,因此请参阅该章节以获取深入的解释。
简而言之,需要使用目标 Elasticsearch 集群和索引设置一个基本的 Hadoop Configuration
对象,可能还需要一个查询,然后就可以开始了。
从Spark的角度来看,唯一需要做的就是设置序列化 - Spark默认依赖于Java序列化,这虽然方便,但效率相当低下。这就是Hadoop本身引入自己的序列化机制及其类型(即Writable
)的原因。因此,InputFormat
和OutputFormat
需要返回Writable
,而Spark默认情况下并不理解这些类型。好消息是,可以轻松启用不同的序列化方式(例如Kryo),它可以自动处理转换,并且效率也相当高。
SparkConf sc = new SparkConf(); //.setMaster("local"); sc.set("spark.serializer", KryoSerializer.class.getName()); // needed only when using the Java API JavaSparkContext jsc = new JavaSparkContext(sc);
或者如果您更喜欢Scala
请注意,Kryo序列化用作处理Writable
类型的变通方法;可以选择直接转换类型(从Writable
到Serializable
类型) - 这也很好,但对于入门来说,上面的一行代码似乎是最有效的。
从Elasticsearch读取数据
编辑要读取数据,只需传入org.elasticsearch.hadoop.mr.EsInputFormat
类 - 因为它支持旧
和新
的Map/Reduce API,您可以自由地在SparkContext
的hadoopRDD
(我们推荐出于简洁性考虑)或newAPIHadoopRDD
上使用任何一种方法。无论您选择哪种方法,请坚持使用它,以避免将来出现混淆和问题。
旧(org.apache.hadoop.mapred
) API
编辑JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
创建Hadoop对象(使用旧API) |
|
配置数据源(索引) |
|
设置查询(可选) |
|
通过 |
Scala版本如下
val conf = new JobConf() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();
新(org.apache.hadoop.mapreduce
) API
编辑正如预期的那样,mapreduce
API版本非常相似 - 将hadoopRDD
替换为newAPIHadoopRDD
,并将JobConf
替换为Configuration
。就是这样。
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
创建Hadoop对象(使用新API) |
|
配置数据源(索引) |
|
设置查询(可选) |
|
通过 |
Scala版本如下
从PySpark使用连接器
编辑由于其Map/Reduce层,elasticsearch-hadoop也可以从PySpark中使用,用于读写Elasticsearch中的数据。为此,以下是Spark文档中的一个代码片段(确保切换到Python代码片段)
$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
此外,SQL加载器也可以使用
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") df.printSchema()