Apache Spark 支持

编辑

Apache Spark 是一个快速、通用的集群计算系统。它提供 Java、Scala 和 Python 的高级 API,以及支持通用执行图的优化引擎。

-- Spark 网站

Spark 提供了对大型数据集的快速迭代/函数式能力,通常通过在内存中缓存数据来实现。与本文档中提到的其他库不同,Apache Spark 是一个计算框架,它本身并不与 Map/Reduce 绑定,但它确实与 Hadoop 集成,主要是与 HDFS 集成。elasticsearch-hadoop 允许以两种方式在 Spark 中使用 Elasticsearch:通过自 2.1 版本以来提供的专用支持,或者通过自 2.0 版本以来的 Map/Reduce 桥接。自 5.0 版本以来,elasticsearch-hadoop 支持 Spark 2.0。

安装

编辑

与其他库一样,elasticsearch-hadoop 需要在 Spark 的类路径中可用。由于 Spark 具有多种部署模式,这可以转换为目标类路径,无论它是在一个节点上(如本地模式的情况 - 将在整个文档中使用)还是每个节点,具体取决于所需的基础设施。

原生 RDD 支持

编辑

在 2.1 版本中添加。

elasticsearch-hadoop 以 RDD (弹性分布式数据集)(或更精确地说是 Pair RDD)的形式提供 Elasticsearch 和 Apache Spark 之间的原生集成,该 RDD 可以从 Elasticsearch 中读取数据。RDD 提供两种风格:一种用于 Scala(将数据作为带有 Scala 集合的 Tuple2 返回),另一种用于 Java(将数据作为包含 java.util 集合的 Tuple2 返回)。

在可能的情况下,请考虑使用原生集成,因为它提供了最佳性能和最大灵活性。

配置

编辑

要为 Apache Spark 配置 elasticsearch-hadoop,可以设置 配置 章节中描述的各种属性,这些属性位于 SparkConf 对象中。

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName(appName).setMaster(master)
conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
conf.set("es.index.auto.create", "true");

命令行对于那些想要通过命令行设置属性(直接或从文件加载)的人,请注意 Spark 接受以“spark.”为前缀的属性,并会忽略其余属性(并且根据版本,可能会抛出警告)。为了解决这个限制,请通过附加 spark. 前缀来定义 elasticsearch-hadoop 属性(因此它们变为 spark.es.),并且 elasticsearch-hadoop 将自动解析它们。

$ ./bin/spark-submit --conf spark.es.resource=index/type ... 

请注意 es.resource 属性变成了 spark.es.resource

将数据写入 Elasticsearch

编辑

使用 elasticsearch-hadoop,只要可以将 RDD 的内容转换为文档,就可以将任何 RDD 保存到 Elasticsearch。实际上,这意味着 RDD 类型需要是 Map(无论是 Scala 还是 Java 的)、JavaBean 或 Scala case class。如果不是这种情况,可以在 Spark 中轻松转换数据或插入自定义的 ValueWriter

Scala
编辑

使用 Scala 时,只需导入 org.elasticsearch.spark 包,该包通过 pimp my library 模式,使用 saveToEs 方法丰富了任何 RDD API。

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD( 
  Seq(numbers, airports)
).saveToEs("spark/docs") 

Spark Scala 导入

elasticsearch-hadoop Scala 导入

通过其 Scala API 启动 Spark

makeRDD 基于指定的集合创建一个临时的 RDD;可以传入任何其他 RDD(在 Java 或 Scala 中)。

在 Elasticsearch 中,将内容(即两个文档(数字和机场))索引在 spark/docs 下。

Scala 用户可能倾向于使用 Seq 表示法来声明对象(即 JSON 文档),而不是使用 Map。虽然相似,但第一种表示法会导致稍微不同的类型,这些类型无法与 JSON 文档匹配:Seq 是一个有序序列(换句话说,是一个列表),而 创建一个 Tuple,它或多或少是一个有序的、固定数量的元素。因此,列表的列表不能用作文档,因为它无法映射到 JSON 对象;但是,它可以在其中自由使用。这就是为什么在上面的示例中使用 Map(k→v) 而不是 Seq(k→v) 的原因。

作为上述隐式导入的替代方法,可以通过 org.elasticsearch.spark.rdd 包中的 EsSpark 在 Scala 中使用 elasticsearch-hadoop Spark 支持,它充当实用程序类,允许显式方法调用。此外,可以使用一个case class,而不是 Map(它很方便,但由于其结构差异,每个实例都需要一个映射)。

import org.apache.spark.SparkContext
import org.elasticsearch.spark.rdd.EsSpark                        

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))             
EsSpark.saveToEs(rdd, "spark/docs")                               

EsSpark 导入

定义一个名为 Trip 的 case class

围绕 Trip 实例创建一个 RDD

通过 EsSpark 显式索引 RDD

对于需要指定文档的 id(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置适当的映射,即 es.mapping.id 来完成。在前面的示例中,为了指示 Elasticsearch 将字段 id 用作文档 id,请更新 RDD 配置(也可以在 SparkConf 上设置该属性,但由于其全局影响,不建议这样做)。

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑

Java 用户有一个专用类,它提供了与 EsSpark 类似的功能,即 org.elasticsearch.spark.rdd.api.java 中的 JavaEsSpark(一个类似于 Spark 的 Java API 的包)。

import org.apache.spark.api.java.JavaSparkContext;                              
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                        
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");                                    

Spark Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark

为了简化示例,请使用 Guava(Spark 的依赖项)的 Immutable* 方法来创建简单的 MapList

在两个集合上创建一个简单的 RDD;可以传入任何其他 RDD(在 Java 或 Scala 中)。

在 Elasticsearch 中,将内容(即两个文档(数字和机场))索引在 spark/docs 下。

通过使用 Java 5 静态导入可以进一步简化代码。此外,Map(由于其松散的结构,其映射是动态的)可以替换为 JavaBean

public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(
                            ImmutableList.of(upcoming, lastWeek));        
saveToEs(javaRDD, "spark/docs");                                          

静态导入 JavaEsSpark

定义一个包含 TripBean 实例的 RDDTripBean 是一个 JavaBean)。

调用 saveToEs 方法,而无需再次键入 JavaEsSpark

设置文档 id(或其他元数据字段,如 ttltimestamp)类似于其 Scala 对应物,但根据您使用的是 JDK 类还是其他实用程序(如 Guava),可能稍微冗长一些。

JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

将现有 JSON 写入 Elasticsearch

编辑

对于 RDD 中的数据已经是 JSON 的情况,elasticsearch-hadoop 允许直接索引,无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 希望 RDD 包含 String 或字节数组 (byte[]/Array[Byte]),假设每个条目代表一个 JSON 文档。如果 RDD 没有正确的签名,则不能应用 saveJsonToEs 方法(在 Scala 中它们将不可用)。

Scala
编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

new SparkContext(conf).makeRDD(Seq(json1, json2))
                      .saveJsonToEs("spark/json-trips") 

示例,RDD 中的一个条目 - JSON 按原样写入,没有任何转换,它不应包含换行符,如 \n 或 \r\n。

通过专用的 saveJsonToEs 方法索引 JSON 数据。

Java
编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); 
JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");             

示例,RDD 中的一个条目 - JSON 按原样写入,没有任何转换,它不应包含换行符,如 \n 或 \r\n。

请注意 RDD<String> 签名。

通过专用的 saveJsonToEs 方法索引 JSON 数据。

写入动态/多资源

编辑

对于需要将写入 Elasticsearch 的数据索引到不同的存储桶(基于数据内容)中的情况,可以使用 es.resource.write 字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的媒体示例,可以按如下方式配置它:

Scala
编辑
val game = Map(
  "media_type"->"game", 
      "title" -> "FF VI",
       "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")  

用于拆分数据的文档。可以声明任何字段(但请确保它在所有文档中都可用)。

根据其资源模式保存每个对象,在本例中是基于 media_type

对于即将写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type 字段并使用其值来确定目标资源。

Java
编辑

正如预期的那样,Java 中的情况非常相似。

Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
saveToEs(javaRDD, "my-collection-{media_type}/doc");  

根据其资源模式保存每个对象,在本例中为 media_type

处理文档元数据

编辑

Elasticsearch 允许每个文档都有自己的元数据。如上所述,通过各种映射选项,可以自定义这些参数,以便从其所属的文档中提取它们的值。此外,甚至可以包含/排除哪些数据部分被发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 pair RDDs 在文档外部提供元数据。换句话说,对于包含键值元组的 RDD,可以从键中提取元数据,并将值用作文档源。

通过 org.elasticsearch.spark.rdd 包中的 Metadata Java enum 描述元数据,它标识其类型 - idttlversion 等。因此,RDD 键可以是包含每个文档的 Metadata 及其关联值的 Map。如果 RDD 键不是 Map 类型,则 elasticsearch-hadoop 将认为该对象表示文档 id 并相应地使用它。这听起来比实际情况要复杂,让我们来看一些示例。

Scala
编辑

键值对 RDD,或者简单来说签名是 RDD[(K,V)]RDD,可以利用 saveToEsWithMeta 方法,这些方法可以通过隐式导入 org.elasticsearch.spark 包或 EsSpark 对象来使用。要为每个文档手动指定 id,只需在您的 RDD 中传入 Object(不是 Map 类型)即可

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
airportsRDD.saveToEsWithMeta("airports/2015")    

airportsRDD 是一个键值RDD;它是由 tupleSeq 创建的

Seq 中每个元组的键表示其关联的值/文档的id;换句话说,文档 otp 的 id 是 1muc 的 id 是 2sfo 的 id 是 3

由于 airportsRDD 是一个键值对 RDD,因此它具有可用的 saveToEsWithMeta 方法。这告诉 elasticsearch-hadoop 要特别注意 RDD 的键,并将它们用作元数据,在这种情况下作为文档 id。如果改为使用 saveToEs,则 elasticsearch-hadoop 会将 RDD 元组(即键和值)都视为文档的一部分。

当需要指定不仅仅是 id 的信息时,应该使用一个键类型为 org.elasticsearch.spark.rdd.Metadatascala.collection.Map

import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
airportsRDD.saveToEsWithMeta("airports/2015") 

导入 Metadata 枚举

用于 otp 文档的元数据。在这种情况下,ID 的值为 1,TTL 的值为 3h

用于 muc 文档的元数据。在这种情况下,ID 的值为 2,VERSION 的值为 23

用于 sfo 文档的元数据。在这种情况下,ID 的值为 3

元数据和文档被组装成一个键值对 RDD

使用 saveToEsWithMeta 方法相应地保存 RDD

Java
编辑

类似地,在 Java 端,JavaEsSpark 提供了应用于 JavaPairRDD(Java 中 RDD[(K,V)] 的等效项)的 saveToEsWithMeta 方法。因此,要根据文档的 id 保存文档,可以使用

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( 
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       

通过使用 Scala Tuple2 类包装文档 id 和文档本身来创建一个 JavaPairRDD

第一个文档的元组,包装了 id(1)和文档(otp)本身

第二个文档的元组,包装了 id(2)和 jfk

使用键作为 id,值作为文档,相应地保存 JavaPairRDD

当需要指定不仅仅是 id 的信息时,可以选择使用一个使用类型为 org.elasticsearch.spark.rdd.Metadata 的键填充的 java.util.Map

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       

Metadata enum 描述了可以声明的文档元数据

用于 enum 的静态导入,以便以短格式(IDTTL 等)引用其值

otp 文档的元数据

sfo 文档的元数据

otp(作为值)与其元数据(作为键)之间的元组

关联 sfo 及其元数据的元组

在包含文档及其各自元数据的 JavaPairRDD 上调用 saveToEsWithMeta

从 Elasticsearch 读取数据

编辑

对于读取,应该定义从 Elasticsearch 向 Spark流式传输数据的 Elasticsearch RDD

Scala
编辑

与写入类似,org.elasticsearch.spark 包使用 esRDD 方法丰富了 SparkContext API

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val RDD = sc.esRDD("radio/artists")     

Spark Scala 导入

elasticsearch-hadoop Scala 导入

通过其 Scala API 启动 Spark

为索引 radio/artists 创建一个专用的 Elasticsearch RDD

可以重载该方法以指定额外的查询,甚至指定一个配置 Map(覆盖 SparkConf

...
import org.elasticsearch.spark._

...
val conf = ...
val sc = new SparkContext(conf)

sc.esRDD("radio/artists", "?q=me*") 

创建一个 RDD,该 RDD 流式传输索引 radio/artists 中所有匹配 me* 的文档

默认情况下,Elasticsearch 中的文档以 Tuple2 的形式返回,其中第一个元素是文档 id,第二个元素是通过 Scala 集合 表示的实际文档,即一个 `Map[String, Any]`,其中键表示字段名称,值表示其各自的值。

Java
编辑

Java 用户有一个专用的 JavaPairRDD,它的工作方式与 Scala 对应项相同,但是返回的 Tuple2 值(或第二个元素)以原生 java.util 集合的形式返回文档。

import org.apache.spark.api.java.JavaSparkContext;               
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;             
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);               

JavaPairRDD<String, Map<String, Object>> esRDD =
                        JavaEsSpark.esRDD(jsc, "radio/artists"); 

Spark Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark

为索引 radio/artists 创建一个专用的 Elasticsearch JavaPairRDD

类似地,可以使用重载的 esRDD 方法来指定查询或传递 Map 对象以进行高级配置。让我们看看它是如何工作的,但这次使用 Java 静态导入。此外,让我们丢弃文档 id,仅检索 RDD

import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*;   

...
JavaRDD<Map<String, Object>> rdd =
        esRDD(jsc, "radio/artists", "?q=me*")  
            .values(); 

静态导入 JavaEsSpark

创建一个 RDD,该 RDD 流式传输索引 radio/artists 中所有以 me 开头的文档。请注意,由于静态导入,该方法不必完全限定

仅返回 PairRDD - 这就是为什么结果类型是 JavaRDD不是 JavaPairRDD 的原因

通过使用 JavaEsSpark API,可以获得 Spark 专用的 JavaPairRDD,在 Java 环境中,它比基本 RDD 更适合(因为它的 Scala 签名)。此外,专用的 RDD 将 Elasticsearch 文档作为正确的 Java 集合返回,因此无需处理 Scala 集合(这通常是 RDD 的情况)。当使用 Java 8 时,这一点尤其强大,我们强烈建议使用 Java 8,因为它的 lambda 表达式 使集合处理极其简洁。

例如,假设有人想从 RDD 中过滤文档,并且仅返回那些包含值 mega 的文档(请忽略可以直接通过 Elasticsearch 执行过滤这一事实)。

在 Java 8 之前的版本中,代码看起来像这样

JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(
    new Function<Map<String, Object>, Boolean>() {
      @Override
      public Boolean call(Map<String, Object> map) throws Exception {
          returns map.contains("mega");
      }
    });

在 Java 8 中,过滤变成了一行代码

JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc ->
                                                doc.contains("mega"));
以 JSON 格式读取数据
编辑

如果需要以 JSON 格式从 Elasticsearch 获取结果(通常是为了将结果发送到其他系统),可以使用专用的 esJsonRDD 方法。在这种情况下,连接器将按从 Elasticsearch 接收到的文档内容返回文档内容,而无需进行任何处理,以 Scala 中的 RDD[(String, String)] 或 Java 中的 JavaPairRDD[String, String] 的形式返回,其中键表示文档 id,值表示其 JSON 格式的实际内容。

类型转换

编辑

在处理多值/数组字段时,请参阅部分,特别是这些配置选项。重要提示:如果使用自动索引创建,请查看部分以获取更多信息。

elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示

表 4. Scala 类型转换表

Scala 类型 Elasticsearch 类型

None

null

Unit

null

Nil

空的 array

Some[T]

根据表中的 T

Map

object

Traversable

array

case class

object(参见 Map

Product

array

此外,以下隐含转换适用于 Java 类型

表 5. Java 类型转换表

Java 类型 Elasticsearch 类型

null

null

String

string

Boolean

boolean

Byte

byte

Short

short

Integer

int

Long

long

Double

double

Float

float

Number

floatdouble(取决于大小)

java.util.Calendar

datestring 格式)

java.util.Date

datestring 格式)

java.util.Timestamp

datestring 格式)

byte[]

string (BASE64)

Object[]

array

Iterable

array

Map

object

Java Bean

object(参见 Map

转换过程会尽力而为;内置的 Java 和 Scala 类型保证可以正确转换,但是对于 Java 或 Scala 中的用户类型则不作任何保证。如上表所述,当在 Scala 中遇到 case 类或在 Java 中遇到 JavaBean 时,转换器会尝试解包其内容并将其保存为 object。请注意,这仅适用于顶级的用户对象 - 如果用户对象嵌套了其他用户对象,则转换很可能会失败,因为转换器不会执行嵌套的 解包。这样做是有意为之,因为转换器必须序列化反序列化数据,并且用户类型由于数据丢失而引入了歧义;可以通过某种类型的映射来解决这个问题,但这会使项目过于接近 ORM 的领域,并且可以说引入了过多的复杂性而几乎没有收益;多亏了 Spark 中的处理功能和 elasticsearch-hadoop 中的可插拔性,可以轻松地将对象转换为其他类型,如果需要,只需付出最少的努力和最大的控制。

Geo 类型 值得一提的是,Elasticsearch 中独有的富数据类型,例如 GeoPointGeoShape,通过将其结构转换为上表中可用的基本类型来支持。例如,根据其存储方式,geo_point 可能会作为 StringTraversable 返回。

Spark Streaming 支持

编辑

5.0 版本新增。

Spark Streaming 是核心 Spark API 的扩展,它支持对实时数据流进行可扩展、高吞吐量、容错的流处理。

-- Spark 网站

Spark Streaming 是核心 Spark 功能之上的扩展,允许近实时处理流数据。Spark Streaming 围绕 DStream离散流的概念工作。DStream 的工作方式是将新到达的记录收集到一个小的 RDD 中并执行它。这个过程每隔几秒重复一次,使用一个新的 RDD,这个过程称为微批处理DStream API 包括许多与 RDD API 相同的处理操作,以及其他一些流式特定的方法。从 5.0 版本开始,elasticsearch-hadoop 提供了与 Spark Streaming 的原生集成。

当使用 elasticsearch-hadoop Spark Streaming 支持时,可以将 Elasticsearch 作为输出位置,以便将数据从 Spark Streaming 作业索引到 Elasticsearch 中,就像持久化 RDD 的结果一样。但是,与 RDD 不同,由于 DStream 的连续性,您无法使用 DStream 从 Elasticsearch 中读取数据。

Spark Streaming 支持提供了特殊的优化,以便在运行处理窗口非常小的作业时,在 Spark 执行器上节省网络资源。因此,应该优先使用这种集成,而不是在 DStream 上调用 foreachRDD 返回的 RDD 上调用 saveToEs

DStream 写入 Elasticsearch

编辑

RDD 一样,只要其内容可以转换为文档,任何 DStream 都可以保存到 Elasticsearch。实际上,这意味着 DStream 类型需要是 Map(Scala 或 Java 的),JavaBean 或 Scala case 类。如果不是这种情况,可以轻松地在 Spark 中转换数据或插入自己的自定义 ValueWriter

Scala
编辑

当使用 Scala 时,只需导入 org.elasticsearch.spark.streaming 包,它通过 pimp my library 模式,使用 saveToEs 方法丰富了 DStream API

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._               
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._

import org.elasticsearch.spark.streaming._           

...

val conf = ...
val sc = new SparkContext(conf)                      
val ssc = new StreamingContext(sc, Seconds(1))       

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)                

ssc.queueStream(microbatches).saveToEs("spark/docs") 

ssc.start()
ssc.awaitTermination() 

Spark 和 Spark Streaming Scala 导入

elasticsearch-hadoop Spark Streaming 导入

通过其 Scala API 启动 Spark

通过传递 SparkContext 启动 SparkStreaming 上下文。微批处理将每秒处理一次。

makeRDD 基于指定的集合创建一个临时的 RDD;任何其他 RDD(在 Java 或 Scala 中)都可以传入。创建一个 `RDD` 队列,表示要执行的微批处理。

RDD`s 创建一个 DStream,并将内容(即 {es} 中 `spark/docs` 下的两个_文档_(数字和机场))编入索引。

启动 Spark Streaming 作业并等待其最终完成。

作为上述隐式导入的替代方案,可以通过 org.elasticsearch.spark.streaming 包中的 EsSparkStreaming 在 Scala 中使用 elasticsearch-hadoop Spark Streaming 支持,它充当实用程序类,允许显式方法调用。此外,可以使用case 类,而不是 Map(它们很方便,但由于它们的结构不同,每个实例都需要一个映射)

import org.apache.spark.SparkContext
import org.elasticsearch.spark.streaming.EsSparkStreaming         

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val microbatches = mutable.Queue(rdd)                             
val dstream = ssc.queueStream(microbatches)

EsSparkStreaming.saveToEs(dstream, "spark/docs")                  

ssc.start()                                                       

EsSparkStreaming 导入

定义一个名为 Trip 的 case class

围绕 Trip 实例的 RDD 创建一个 DStream

配置 DStream 以通过 EsSparkStreaming 显式索引

启动流处理

一旦启动了 SparkStreamingContext,就无法添加或配置新的 DStream。一旦上下文停止,就无法重新启动。每个 JVM 一次只能有一个活动的 SparkStreamingContext。另请注意,当以编程方式停止 SparkStreamingContext 时,它会停止底层的 SparkContext,除非指示不停止。

对于需要指定文档 id(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置适当的 映射,即 es.mapping.id 来实现。按照前面的示例,为了指示 Elasticsearch 使用字段 id 作为文档 id,请更新 DStream 配置(也可以在 SparkConf 上设置该属性,但由于其全局效应,不建议这样做)

EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑

Java 用户有一个专用类,它提供类似于 EsSparkStreaming 的功能,即 org.elasticsearch.spark.streaming.api.java 包中的 JavaEsSparkStreaming(类似于 Spark 的 Java API 的包)

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;                                              
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;         
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              
JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1));

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
microbatches.add(javaRDD);                                                      
JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs");                       

jssc.start()                                                                    

Spark 和 Spark Streaming Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark 和 Spark Streaming。微批处理将每秒处理一次。

为了简化示例,请使用 Guava(Spark 的依赖项)的 Immutable* 方法来创建简单的 MapList

在微批处理上创建一个简单的 DStream;任何其他 RDD(在 Java 或 Scala 中)都可以传入

在 Elasticsearch 中,将内容(即两个文档(数字和机场))索引在 spark/docs 下。

执行流式作业。

通过使用 Java 5 静态导入可以进一步简化代码。此外,Map(由于其松散的结构,其映射是动态的)可以替换为 JavaBean

public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming;  
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek));
Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>();
microbatches.add(javaRDD);
JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches);       

saveToEs(javaDStream, "spark/docs");                                          

jssc.start()                                                              

静态导入 JavaEsSparkStreaming

定义一个包含 TripBean 实例的 DStreamTripBean 是一个 JavaBean

调用 saveToEs 方法,而无需再次键入 JavaEsSparkStreaming

运行该流式作业

设置文档 id(或其他元数据字段,如 ttltimestamp)类似于其 Scala 对应物,但根据您使用的是 JDK 类还是其他实用程序(如 Guava),可能稍微冗长一些。

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

将现有 JSON 写入 Elasticsearch

编辑

对于 DStream 流式传输的数据已经序列化为 JSON 的情况,elasticsearch-hadoop 允许直接索引,而不应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望 DStream 包含 String 或字节数组 (byte[]/Array[Byte]),假设每个条目代表一个 JSON 文档。如果 DStream 没有正确的签名,则无法应用 saveJsonToEs 方法(在 Scala 中它们将不可用)。

Scala
编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

val rdd = sc.makeRDD(Seq(json1, json2))
val microbatch = mutable.Queue(rdd)
ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips")      

ssc.start()                                                       

DStream 中的条目示例 - JSON 按原样写入,无需任何转换

配置流,通过专用的 saveJsonToEs 方法索引 JSON 数据

启动流式作业

Java
编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>();      
microbatches.add(stringRDD);
JavaDStream<String> stringDStream = jssc.queueStream(microbatches);  

JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips");    

jssc.start()                                                         

DStream 中的条目示例 - JSON 按原样写入,无需任何转换

创建一个 RDD,将其放入队列,并从排队的 RDD 创建一个 DStream,将每个视为一个微批处理。

请注意 JavaDStream<String> 签名

配置流,通过专用的 saveJsonToEs 方法索引 JSON 数据

启动流式作业

写入动态/多资源

编辑

对于需要将写入 Elasticsearch 的数据索引到不同的存储桶(基于数据内容)中的情况,可以使用 es.resource.write 字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的媒体示例,可以按如下方式配置它:

Scala
编辑
val game = Map(
  "media_type" -> "game", 
       "title" -> "FF VI",
        "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

val batch = sc.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc")  
ssc.start()

用于拆分数据的文档。可以声明任何字段(但请确保它在所有文档中都可用)。

根据其资源模式保存每个对象,在本例中是基于 media_type

对于即将写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type 字段并使用其值来确定目标资源。

Java
编辑

正如预期的那样,Java 中的情况非常相似。

Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
Queue<JavaRDD<Map<String, ?>>> microbatches = ...
JavaDStream<Map<String, ?>> javaDStream =
                jssc.queueStream(microbatches);

saveToEs(javaDStream, "my-collection-{media_type}/doc");  
jssc.start();

根据其资源模式保存每个对象,在本例中为 media_type

处理文档元数据

编辑

Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射选项,可以自定义这些参数,以便从其所属的文档中提取它们的值。此外,甚至可以包含/排除将哪些数据发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 pair RDDs 在文档外部提供元数据。

在 Spark Streaming 中也是如此。对于包含键值元组的 DStreams,可以从键中提取元数据,并将值用作文档源。

元数据通过 org.elasticsearch.spark.rdd 包中的 Metadata Java 枚举 来描述,该枚举标识其类型 - idttlversion 等。因此,DStream 的键可以是包含每个文档的 Metadata 及其关联值的 Map。如果 DStream 键的类型不是 Map,则 elasticsearch-hadoop 会将对象视为表示文档 id 并相应地使用它。这听起来比实际情况复杂,所以让我们看一些示例。

Scala
编辑

Pair DStreamss,或者简单地说,签名 DStream[(K,V)]DStreams 可以利用 saveToEsWithMeta 方法,这些方法可以通过隐式导入 org.elasticsearch.spark.streaming 包或 EsSparkStreaming 对象获得。要手动指定每个文档的 id,只需在 DStream 中传入 Object(类型不是 Map

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()

airportsRDD 是一个键值RDD;它是由 tupleSeq 创建的

Seq 中每个元组的键表示其关联的值/文档的id;换句话说,文档 otp 的 id 是 1muc 的 id 是 2sfo 的 id 是 3

我们构造一个 DStream,它继承了 RDD 的类型签名

由于生成的 DStream 是一个键值对 DStream,它可以使用 saveToEsWithMeta 方法。这会告诉 elasticsearch-hadoop 特别关注 DStream 的键,并将它们用作元数据,在这种情况下用作文档 ID。如果使用 saveToEs,则 elasticsearch-hadoop 会将 DStream 元组(即键和值)都视为文档的一部分。

当需要指定不仅仅是 id 的信息时,应该使用一个键类型为 org.elasticsearch.spark.rdd.Metadatascala.collection.Map

import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()

导入 Metadata 枚举

用于 otp 文档的元数据。在这种情况下,ID 的值为 1,TTL 的值为 3h

用于 muc 文档的元数据。在这种情况下,ID 的值为 2,VERSION 的值为 23

用于 sfo 文档的元数据。在这种情况下,ID 的值为 3

元数据和文档被组装成一个键值对 RDD

DStream 继承了 RDD 的签名,成为一个键值对 DStream

DStream 被配置为使用 saveToEsWithMeta 方法来相应地索引数据。

Java
编辑

类似地,在 Java 端,JavaEsSparkStreaming 提供了应用于 JavaPairDStreamsaveToEsWithMeta 方法(在 Java 中等同于 DStream[(K,V)])。

由于 Java API 的限制,这通常需要更多的工作。例如,您不能直接从 JavaPairRDD 的队列创建 JavaPairDStream。相反,您必须创建一个常规的 Tuple2 对象的 JavaDStream,并将 JavaDStream 转换为 JavaPairDStream。这听起来很复杂,但它只是一个绕过 API 限制的简单方法。

首先,我们将创建一个 pair 函数,它接受一个 Tuple2 对象作为输入,并将其直接返回给框架。

public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable {
    @Override
    public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception {
        return tuple2;
    }
}

然后,我们将 pair 函数应用于 Tuple2JavaDStream,以创建一个 JavaPairDStream 并保存它。

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...

// create an RDD of between the id and the docs
JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize(         
      ImmutableList.of(
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();

创建一个常规的 JavaRDD,其中包含用文档 ID 和文档本身包装的 Scala Tuple2

第一个文档的元组,包装了 id(1)和文档(otp)本身

第二个文档的元组,包装了 id(2)和 jfk

从元组 RDD 中组装一个常规的 JavaDStream

通过将我们的 Tuple2 标识函数传递给 mapToPair 方法,将 JavaDStream 转换为 JavaPairDStream。这将允许将类型转换为 JavaPairDStream。此函数可以替换为作业中任何从单个条目中提取 ID 和要索引的文档的内容。

JavaPairRDD 配置为使用键作为 ID,值作为文档来相应地索引数据。

当需要指定的不只是 ID 时,可以选择使用一个 java.util.Map,其中填充了类型为 org.elasticsearch.spark.rdd.Metadata 的键。我们将使用相同的类型技巧将 JavaDStream 重新打包为 JavaPairDStream

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();

Metadata enum 描述了可以声明的文档元数据

用于 enum 的静态导入,以便以短格式(IDTTL 等)引用其值

otp 文档的元数据

sfo 文档的元数据

otp(作为值)与其元数据(作为键)之间的元组

关联 sfo 及其元数据的元组

JavaRDD 中创建一个 JavaDStream

通过在其上映射 Tuple2 标识函数,将 JavaDStream 重新打包为 JavaPairDStream

在包含文档及其各自元数据的 JavaPairDStream 上调用 saveToEsWithMeta

Spark Streaming 类型转换

编辑

elasticsearch-hadoop Spark Streaming 支持利用与常规 Spark 类型映射相同的类型映射。为了保持一致性,此处重复这些映射。

表 6. Scala 类型转换表

Scala 类型 Elasticsearch 类型

None

null

Unit

null

Nil

空的 array

Some[T]

根据表中的 T

Map

object

Traversable

array

case class

object(参见 Map

Product

array

此外,以下隐含转换适用于 Java 类型

表 7. Java 类型转换表

Java 类型 Elasticsearch 类型

null

null

String

string

Boolean

boolean

Byte

byte

Short

short

Integer

int

Long

long

Double

double

Float

float

Number

floatdouble(取决于大小)

java.util.Calendar

datestring 格式)

java.util.Date

datestring 格式)

java.util.Timestamp

datestring 格式)

byte[]

string (BASE64)

Object[]

array

Iterable

array

Map

object

Java Bean

object(参见 Map

地理类型值得再次提及的是,仅在 Elasticsearch 中可用的富数据类型(例如 GeoPointGeoShape)通过将其结构转换为上表中可用的原语来支持。例如,根据其存储,geo_point 可能作为 StringTraversable 返回。

Spark SQL 支持

编辑

在 2.1 版本中添加。

Spark SQL 是一个用于结构化数据处理的 Spark 模块。它提供了一个名为 DataFrames 的编程抽象,也可以充当分布式 SQL 查询引擎。

-- Spark 网站

除了核心 Spark 支持之外,elasticsearch-hadoop 还提供与 Spark SQL 的集成。换句话说,Elasticsearch 成为 Spark SQL 的原生数据源,以便可以从 Spark SQL 透明地索引和查询数据。

Spark SQL 处理结构化数据,换句话说,所有条目都应具有相同的结构(相同数量的字段,具有相同的类型和名称)。支持使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 PairRDD

支持的 Spark SQL 版本

编辑

Spark SQL 虽然正在成为一个成熟的组件,但在各个版本之间仍在经历重大变化。Spark SQL 在 1.3 版本中成为一个稳定的组件,但它与之前的版本向后兼容。此外,Spark 2.0 引入了重大更改,通过 Dataset API 打破了向后兼容性。elasticsearch-hadoop 通过两个不同的 jar 支持 Spark SQL 1.3-1.6 和 Spark SQL 2.0:elasticsearch-spark-1.x-<version>.jarelasticsearch-hadoop-<version>.jar 支持 Spark SQL 1.3-1.6(或更高版本),而 elasticsearch-spark-2.0-<version>.jar 支持 Spark SQL 2.0。换句话说,除非您使用 Spark 2.0,否则请使用 elasticsearch-spark-1.x-<version>.jar

Spark SQL 支持位于 org.elasticsearch.spark.sql 包下。

API 差异从 elasticsearch-hadoop 用户的角度来看,Spark SQL 1.3-1.6 和 Spark 2.0 之间的差异已经相当统一。此文档详细描述了以下简要提及的差异

DataFrame vs Dataset
Spark SQL 1.3+ 中的核心单元是 DataFrame。此 API 保留在 Spark 2.0 中,但在其底层基于 Dataset
统一的 API 与专用的 Java/Scala API
在 Spark SQL 2.0 中,通过引入 SparkSession 并为 `Dataset`、`DataFrame` 和 `RDD` 使用相同的支持代码,API 进一步统一

从概念上讲,DataFrameDataset[Row],因此以下文档将重点介绍 Spark SQL 1.3-1.6。

DataFrame (Spark SQL 1.3+) 写入 Elasticsearch

编辑

使用 elasticsearch-hadoop,可以将 DataFrame(或任何 Dataset)索引到 Elasticsearch。

Scala
编辑

在 Scala 中,只需导入 org.elasticsearch.spark.sql 包,该包会使用 saveToEs 方法丰富给定的 DataFrame 类;虽然它们与 org.elasticsearch.spark 包具有相同的签名,但它们是为 DataFrame 实现设计的。

// reusing the example from Spark SQL documentation

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._

import org.elasticsearch.spark.sql._      

...

// sc = existing SparkContext
val sqlContext = new SQLContext(sc)

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = sc.textFile("people.txt")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))
        .toDF()

people.saveToEs("spark/people")           

Spark SQL 包导入

elasticsearch-hadoop Spark 包导入

读取文本文件作为普通RDD 并将其映射到 DataFrame(使用 Person case 类)。

通过 saveToEs 方法将生成的 DataFrame 索引到 Elasticsearch。

默认情况下,elasticsearch-hadoop 会忽略空值,而不会写入任何字段。由于 DataFrame 旨在被视为结构化的表格数据,因此您可以通过将 es.spark.dataframe.write.null 设置为 true,仅为 DataFrame 对象启用将空值写入为空值字段。

Java
编辑

类似地,对于 Java 用法,专用包 org.elasticsearch.spark.sql.api.java 通过 JavaEsSpark SQL 提供类似的功能。

import org.apache.spark.sql.api.java.*;                      
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;  
...

DataFrame people = ...
JavaEsSparkSQL.saveToEs(people, "spark/people");                     

Spark SQL Java 导入

elasticsearch-hadoop Spark SQL Java 导入

在 Elasticsearch 中 spark/people 下索引 DataFrame

同样,对于 Java 5 静态导入,可以将其进一步简化为

import static org.elasticsearch.spark.sql.api.java.JavaEsSpark SQL; 
...
saveToEs("spark/people");                                          

静态导入 JavaEsSpark SQL

调用 saveToEs 方法,而无需再次键入 JavaEsSpark

为了最大程度地控制 DataFrame 在 Elasticsearch 中的映射,强烈建议您预先创建映射。有关更多信息,请参见章。

将现有 JSON 写入 Elasticsearch

编辑

使用 Spark SQL 时,如果输入数据为 JSON 格式,只需将其转换为 DataFrame (在 Spark SQL 1.3 中) 或 Dataset (在 Spark SQL 2.0 中) (如 Spark 文档中所述),通过 SQLContext/JavaSQLContext jsonFile 方法。

使用纯 SQL 从 Elasticsearch 读取

编辑

索引及其映射必须在创建临时表之前存在。

Spark SQL 1.2 引入了一个新的API,用于从外部数据源读取,elasticsearch-hadoop 支持该 API,从而简化了与 Elasticsearch 交互所需的 SQL 配置。此外,在幕后,它会理解 Spark 执行的操作,从而优化数据和进行的查询(例如过滤或修剪),从而提高性能。

Spark SQL 中的数据源

编辑

使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext load 方法访问 Elasticsearch。换句话说,以声明式方式创建由 Elasticsearch 支持的 DataFrame/Dataset

val sql = new SQLContext...
// Spark 1.3 style
val df = sql.load( 
  "spark/index",   
  "org.elasticsearch.spark.sql") 

SQLContext 实验性 load 方法,用于任意数据源

要加载的路径或资源 - 在这种情况下为 Elasticsearch 中的索引/类型

数据源提供程序 - org.elasticsearch.spark.sql

在 Spark 1.4 中,将使用以下类似的 API 调用

// Spark 1.4 style
val df = sql.read      
  .format("org.elasticsearch.spark.sql") 
  .load("spark/index") 

SQLContext 实验性 read 方法,用于任意数据源

数据源提供程序 - org.elasticsearch.spark.sql

要加载的路径或资源 - 在这种情况下为 Elasticsearch 中的索引/类型

在 Spark 1.5 中,可以将其进一步简化为

// Spark 1.5 style
val df = sql.read.format("es")
  .load("spark/index")

使用 es 作为 DataSource 提供程序的别名,而不是完整包名称

无论使用哪个 API 创建,都可以自由访问 DataFrame 以操作数据。

sources 声明还允许传入特定选项,即

名称 默认值 描述

path

必需

Elasticsearch 索引/类型

pushdown

true

是否将 Spark SQL 转换为 Elasticsearch Query DSL(下推

strict

false

是否使用精确(未分析)匹配或不使用(已分析)匹配

可在 Spark 1.6 或更高版本中使用

double.filtering

true

是否告诉 Spark 对下推的过滤器应用其自身的过滤

下一节将解释这两个选项。要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map 传递给上述方法

例如

val sql = new SQLContext...
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/index",
                    "pushdown" -> "true",     
                    "es.nodes" -> "someNode", 
                     "es.port" -> "9200")

// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) 

// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true",     
                  "es.nodes" -> "someNode", 
                   "es.port" -> "9200")

// Spark 1.4 style
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
                        .options(options) 
                        .load("spark/index")

pushdown 选项 - 特定于 Spark 数据源

es.nodes 配置选项

在定义/加载数据源时传递选项

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', nodes 'someNode')" ) " 

Spark 的临时表名称

USING 子句,用于标识数据源提供程序,在这种情况下为 org.elasticsearch.spark.sql

elasticsearch-hadoop 配置选项,其中强制性的一个是 resource。由于 SQL 解析器的限制,es. 前缀是固定的。

请注意,由于 SQL 解析器的限制,. (以及其他用于分隔的常用字符) 不被允许;连接器会尝试通过自动附加 es. 前缀来解决这个问题,但这只适用于使用一个 . 指定配置选项的情况(例如上面的 es.nodes)。因此,如果需要具有多个 . 的属性,则应使用上面的 SQLContext.loadSQLContext.read 方法,并将属性作为 Map 传递。

下推操作

编辑

使用 elasticsearch-hadoop 作为 Spark source 的一个重要的隐藏特性是,该连接器理解在 DataFrame/SQL 中执行的操作,并且默认情况下会将其转换为相应的 QueryDSL。换句话说,连接器直接在数据源处下推操作,在那里高效地过滤掉数据,从而将所需的数据流回 Spark。这显著提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回所需的数据(而不是批量返回数据,然后由 Spark 处理和丢弃)。请注意,即使指定了查询,下推操作也适用 - 连接器会根据指定的 SQL 增强它。

另外,elasticsearch-hadoop 支持 Spark (1.3.0 及更高版本) 中所有可用的 `Filter`,同时保持与 Spark 1.3.0 的向后二进制兼容性,将 SQL 操作完全下推到 Elasticsearch,而无需任何用户干预。

已优化为下推过滤器的运算符

SQL 语法 ES 1.x/2.x 语法 ES 5.x 语法

= null , is_null

missing

must_not.exists

= (严格)

term

term

= (非严格)

match

match

> , < , >= , ⇐

range

range

is_not_null

exists

exists

in (严格)

terms

terms

in (非严格)

or.filters

bool.should

and

and.filters

bool.filter

or

or.filters

bool.should [bool.filter]

not

not.filter

bool.must_not

StringStartsWith

wildcard(arg*)

wildcard(arg*)

StringEndsWith

wildcard(*arg)

wildcard(*arg)

StringContains

wildcard(*arg*)

wildcard(*arg*)

EqualNullSafe (严格)

term

term

EqualNullSafe (非严格)

match

match

例如,考虑以下 Spark SQL

// as a DataFrame
val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips")

df.printSchema()
// root
//|-- departure: string (nullable = true)
//|-- arrival: string (nullable = true)
//|-- days: long (nullable = true)

val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))

或者用纯 SQL

CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips")
SELECT departure FROM trips WHERE arrival = "OTP" and days > 3

连接器将查询转换为

{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}

      },
      "filter" : {
        "and" : [{
            "query" : {
              "match" : {
                "arrival" : "OTP"
              }
            }
          }, {
            "days" : {
              "gt" : 3
            }
          }
        ]
      }
    }
  }
}

此外,下推过滤器可以处理 analyzed 词条(默认),或者可以配置为严格并提供 exact 匹配(仅适用于 not-analyzed 字段)。除非手动指定映射,否则强烈建议保留默认设置。有关此主题和其他主题,请参阅 Elasticsearch 参考文档

请注意,自 elasticsearch-hadoop 2.2 起,适用于 Spark 1.6 或更高版本的 double.filtering,允许将已下推到 Elasticsearch 的过滤器也由 Spark 处理/评估(默认)或不处理。关闭此功能,尤其是在处理大数据量时,会加快速度。但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式,analyzednot_analyzed)。一般来说,当开启严格模式时,也可以禁用 double.filtering

数据源作为表

编辑

自 Spark SQL 1.2 起,也可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', " + 
            "scroll_size '20')" ) 

Spark 的临时表名称

USING 子句,用于标识数据源提供程序,在这种情况下为 org.elasticsearch.spark.sql

elasticsearch-hadoop 配置选项,其中强制性的一个是 resource。为了方便起见,可以使用 es 前缀,也可以跳过它。

由于使用 . 可能会导致语法异常,因此应该将其替换为 _ 样式。因此,在本例中,es.scroll.size 变为 scroll_size(因为可以删除前导 es)。请注意,这仅适用于 Spark 1.3,因为 Spark 1.4 有更严格的解析器。有关更多信息,请参阅上面的章节。

定义后,将自动拾取模式。因此,可以立即发出查询

val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")

由于 elasticsearch-hadoop 知道正在进行的查询,因此它可以优化对 Elasticsearch 完成的请求。例如,给定以下查询

val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")

它知道只需要 nameid 字段(第一个返回给用户,第二个用于 Spark 的内部过滤),因此将请求这些数据,从而使查询非常高效。

从 Elasticsearch 读取 DataFrames (Spark SQL 1.3)

编辑

正如您可能已经猜到的,可以定义由 Elasticsearch 文档支持的 DataFrame。甚至更好的是,让它们由查询结果支持,从而有效地创建对数据的动态实时视图

Scala
编辑

通过 org.elasticsearch.spark.sql 包,esDF 方法在 SQLContext API 上可用

import org.apache.spark.sql.SQLContext        

import org.elasticsearch.spark.sql._          
...

val sql = new SQLContext(sc)

val people = sql.esDF("spark/people")         

// check the associated schema
println(people.schema.treeString)             
// root
//  |-- name: string (nullable = true)
//  |-- surname: string (nullable = true)
//  |-- age: long (nullable = true)           

Spark SQL Scala 导入

elasticsearch-hadoop SQL Scala 导入

创建一个由 Elasticsearch 中 spark/people 索引支持的 DataFrame

从 Elasticsearch 发现的 DataFrame 相关模式

请注意,在使用默认 Elasticsearch 映射时,age 字段是如何转换为 Long 的,如映射和类型章节中所述。

就像 Spark core 支持一样,可以指定其他参数,例如查询。这是一个非常强大的概念,因为可以在源头(Elasticsearch)过滤数据,并且仅在结果上使用 Spark

// get only the Smiths
val smiths = sqlContext.esDF("spark/people","?q=Smith") 

Elasticsearch 查询,其结果构成 DataFrame

控制 DataFrame 模式在某些情况下,尤其是当 Elasticsearch 中的索引包含大量字段时,需要创建一个仅包含其中一部分字段的 DataFrame。虽然可以通过官方 Spark API 或通过专用查询来修改 DataFrame (通过对其支持的 RDD 进行操作),但 elasticsearch-hadoop 允许用户指定在创建 DataFrame 时从 Elasticsearch 中包含和排除哪些字段。

通过 es.read.field.includees.read.field.exclude 属性,可以指示从索引映射中包含或排除哪些字段。语法类似于 Elasticsearch 的 include/exclude。可以通过使用逗号来指定多个值。默认情况下,不指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。请注意,这些属性可以包含前导和尾随通配符。包含字段层次结构的一部分而不包含尾随通配符并不意味着包含整个层次结构。但是,在大多数情况下,仅包含层次结构的一部分是没有意义的,因此应该包含尾随通配符。

例如

# include
es.read.field.include = *name, address.*
# exclude
es.read.field.exclude = *.created

由于 SparkSQL 使用 DataFrame 模式的方式,elasticsearch-hadoop 需要在执行实际查询之前知道从 Elasticsearch 返回哪些字段。虽然可以通过底层的 Elasticsearch 查询手动限制字段,但 elasticsearch-hadoop 并不知道这一点,结果可能会有所不同,或者更糟糕的是,会发生错误。请改用上面的属性,Elasticsearch 将在用户查询的同时正确使用这些属性。

Java
编辑

对于 Java 用户,可以通过 JavaEsSpark SQL 提供专用 API。它与 EsSpark SQL 非常相似,但是它允许通过 Java 集合而不是 Scala 集合传入配置选项;除此之外,使用两者是完全相同的

import org.apache.spark.sql.api.java.JavaSQLContext;          
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;   
...
SQLContext sql = new SQLContext(sc);

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");  

Spark SQL 导入

elasticsearch-hadoop 导入

创建一个由 Elasticsearch 索引支持的 Java DataFrame

更好的是,DataFrame 可以由查询结果支持

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people", "?q=Smith"); 

支持 elasticsearch-hadoop DataFrame 的 Elasticsearch 查询

Spark SQL 类型转换

编辑

在处理多值/数组字段时,请参阅部分,特别是这些配置选项。重要提示:如果使用自动索引创建,请查看部分以获取更多信息。

elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示

虽然 Spark SQL DataTypes 在 Scala 和 Java 中都有等效项,因此可以应用 RDD 转换,但语义略有不同 - 特别是由于 Spark SQL 处理它们的方式,java.sql 类型存在差异。

表 8. Spark SQL 1.3+ 转换表

Spark SQL DataType Elasticsearch 类型

null

null

ByteType

byte

ShortType

short

IntegerType

int

LongType

long

FloatType

float

DoubleType

double

StringType

string

BinaryType

string (BASE64)

BooleanType

boolean

DateType

datestring 格式)

TimestampType

long (unix 时间)

ArrayType

array

MapType

object

StructType

object

地理类型转换表除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 会对地理类型执行自动模式检测,即 Elasticsearch geo_pointgeo_shape。由于每种类型都允许多种格式(geo_point 接受以 4 种不同方式指定的纬度和经度,而 geo_shape 允许各种类型(当前为 9 种))并且映射不提供此类信息,因此 elasticsearch-hadoop 将在启动时采样确定的地理字段,并检索包含所有相关字段的任意文档;它将解析它,从而确定必要的模式(例如,它可以判断 geo_point 是指定为 StringType 还是 ArrayType)。

由于 Spark SQL 是强类型的,因此每个地理字段都需要在所有文档中具有相同的格式。如果不是这样,则返回的数据将不符合检测到的模式,从而导致错误。

Spark 结构化流支持

编辑

在 6.0 中添加。

结构化流提供了快速、可扩展、容错、端到端且仅一次的流处理,而无需用户考虑流式传输。

-- Spark 文档

Spark 2.0 中以实验性功能发布,Spark 结构化流提供了集成到 Spark SQL 集成中的统一流式传输和批处理接口。从 elasticsearch-hadoop 6.0 开始,我们提供了将流式数据索引到 Elasticsearch 的本机功能。

与 Spark SQL 一样,结构化流使用结构化数据。所有条目都应具有相同的结构(相同数量的字段,具有相同的类型和名称)。支持使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 DStream

支持的 Spark 结构化流版本

编辑

自 Spark v2.2.0 起,Spark 结构化流被认为是通用可用的。因此,elasticsearch-hadoop 对结构化流的支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。与之前的 Spark SQL 类似,结构化流在接口被认为是稳定的之前,可能会在版本之间发生重大更改。

Spark 结构化流式处理支持位于 org.elasticsearch.spark.sqlorg.elasticsearch.spark.sql.streaming 包下。它以 Dataset[_] API 的形式与 Spark SQL 共享统一的接口。客户端可以像使用常规批处理 Dataset 一样,几乎以完全相同的方式与流式 Dataset 进行交互,只有 少数例外

将流式 Dataset (Spark SQL 2.0+) 写入 Elasticsearch

编辑

通过 elasticsearch-hadoop,流支持的 Dataset 可以被索引到 Elasticsearch。

Scala
编辑

在 Scala 中,要将基于流的 DatasetDataFrame 保存到 Elasticsearch,只需配置流使用 "es" 格式写入,如下所示

import org.apache.spark.sql.SparkSession    

...

val spark = SparkSession.builder()
   .appName("EsStreamingExample")           
   .getOrCreate()

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = spark.readStream                   
        .textFile("/path/to/people/files/*")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))

people.writeStream
      .option("checkpointLocation", "/save/location")      
      .format("es")
      .start("spark/people")                               

Spark SQL 导入

创建 SparkSession

不要调用 read,而是调用 readStream 来获取 DataStreamReader 的实例

连续读取文本文件目录,并将其转换为 Person 对象

提供一个位置来保存流式查询的偏移量和提交日志

使用 "es" 格式启动流,以将 Dataset 的内容连续索引到 Elasticsearch

Spark 不会对基于批处理和流的 Dataset 进行类型区分。虽然您可能能够导入 org.elasticsearch.spark.sql 包以将 saveToEs 方法添加到您的 DatasetDataFrame,但是如果在基于流的 DatasetDataFrame 上调用这些方法,它将抛出一个非法参数异常。

Java
编辑

类似地,"es" 格式也适用于 Java 用法

import org.apache.spark.sql.SparkSession    

...

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")     
  .getOrCreate();

// java bean style class
public static class PersonBean {
  private String name;
  private String surname;                        
  private int age;

  ...
}

Dataset<PersonBean> people = spark.readStream()         
        .textFile("/path/to/people/files/*")
        .map(new MapFunction<String, PersonBean>() {
            @Override
            public PersonBean call(String value) throws Exception {
                return someFunctionThatParsesStringToJavaBeans(value.split(","));         
            }
        }, Encoders.<PersonBean>bean(PersonBean.class));

people.writeStream()
    .option("checkpointLocation", "/save/location")       
    .format("es")
    .start("spark/people");                               

Spark SQL Java 导入。可以使用与 Scala 相同的会话类

创建 SparkSession。也可以使用旧的 SQLContext api

我们创建一个 java bean 类作为我们的数据格式

使用 readStream() 方法获取 DataStreamReader 以开始构建我们的流

将我们的字符串数据转换为 PersonBean

设置一个位置以保存我们流的状态

使用 "es" 格式,我们持续地将 Dataset 索引到 Elasticsearch 中的 spark/people

将现有 JSON 写入 Elasticsearch

编辑

当使用 Spark SQL 时,如果输入数据是 JSON 格式,只需将其转换为 Dataset(对于 Spark SQL 2.0)(如 Spark 文档中所述),通过 DataStreamReaderjson 格式。

Spark 结构化流中的 Sink 提交日志

编辑

Spark 结构化流提供了一个端到端的容错、恰好一次的处理模型,该模型通过使用偏移量检查点和为每个流式查询维护提交日志来实现。在执行流式查询时,大多数源和接收器都需要您指定一个“checkpointLocation”以持久化作业的状态。如果发生中断,使用相同的检查点位置启动新的流式查询将恢复作业的状态并从中断处继续。我们在配置的检查点位置下的一个特殊目录中为 elasticsearch-hadoop 的 Elasticsearch 接收器实现维护一个提交日志

$> ls /path/to/checkpoint/location
metadata  offsets/  sinks/
$> ls /path/to/checkpoint/location/sinks
elasticsearch/
$> ls /path/to/checkpoint/location/sinks/elasticsearch
12.compact  13  14  15 16  17  18

提交日志目录中的每个文件都对应于已提交的批处理 ID。日志实现会定期压缩日志以避免混乱。您可以通过多种方式设置日志目录的位置

  1. 使用 es.spark.sql.streaming.sink.log.path 设置显式日志位置(见下文)。
  2. 如果未设置该值,则将使用 checkpointLocation 指定的路径。
  3. 如果未设置该值,则将通过将 SparkSession 中的 spark.sql.streaming.checkpointLocation 的值与 Dataset 给定的查询名称组合来构造路径。
  4. 如果不存在查询名称,则在上述情况下将使用随机 UUID 代替查询名称
  5. 如果未提供上述任何设置,则 start 调用将抛出异常

以下是影响 Elasticsearch 提交日志行为的配置列表

es.spark.sql.streaming.sink.log.enabled (默认值 true
启用或禁用流式作业的提交日志。默认情况下,日志处于启用状态,并且将跳过具有相同批处理 ID 的输出批次,以避免重复写入。当此值设置为 false 时,提交日志将被禁用,并且所有输出都将发送到 Elasticsearch,无论它们是否在先前的执行中发送过。
es.spark.sql.streaming.sink.log.path
设置存储此流式查询的日志数据的位置。如果未设置此值,则 Elasticsearch 接收器将将其提交日志存储在 checkpointLocation 中给出的路径下。任何兼容 HDFS 客户端的 URI 都是可接受的。
es.spark.sql.streaming.sink.log.cleanupDelay (默认值 10m
提交日志是通过 Spark 的 HDFS 客户端管理的。某些兼容 HDFS 的文件系统(例如 Amazon 的 S3)以异步方式传播文件更改。为了解决这个问题,在压缩一组日志文件后,客户端将等待这段时间,然后清理旧文件。
es.spark.sql.streaming.sink.log.deletion (默认值 true
确定日志是否应删除不再需要的旧日志。在提交每个批处理后,客户端将检查是否有任何已压缩且可以安全删除的提交日志。如果设置为 false,则日志将跳过此清理步骤,为每个批处理保留一个提交文件。
es.spark.sql.streaming.sink.log.compactInterval (默认值 10
设置在压缩日志文件之前要处理的批次数。默认情况下,每 10 个批处理,提交日志将压缩为一个包含所有先前提交的批处理 ID 的文件。

Spark 结构化流类型转换

编辑

结构化流使用与 Spark SQL 集成完全相同的类型转换规则。

处理多值/数组字段时,请参阅节,特别是这些配置选项。

如果使用自动索引创建,请查看节以获取更多信息。

elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型,如下表所示

虽然 Spark SQL DataTypes 在 Scala 和 Java 中都有等效项,因此可以应用 RDD 转换,但语义略有不同 - 特别是由于 Spark SQL 处理它们的方式,java.sql 类型存在差异。

表 9. Spark SQL 1.3+ 转换表

Spark SQL DataType Elasticsearch 类型

null

null

ByteType

byte

ShortType

short

IntegerType

int

LongType

long

FloatType

float

DoubleType

double

StringType

string

BinaryType

string (BASE64)

BooleanType

boolean

DateType

datestring 格式)

TimestampType

long (unix 时间)

ArrayType

array

MapType

object

StructType

object

使用 Map/Reduce 层

编辑

另一种将 Spark 与 Elasticsearch 一起使用的方法是通过 Map/Reduce 层,即利用 elasticsearch-hadoop 中专用的 Input/OuputFormat。但是,除非有人被困在 elasticsearch-hadoop 2.0 上,否则我们强烈建议使用本机集成,因为它提供了明显更好的性能和灵活性。

配置

编辑

通过 elasticsearch-hadoop,Spark 可以通过其专用的 InputFormat 与 Elasticsearch 集成,在写入的情况下,通过 OutputFormat 集成。这些在 Map/Reduce 章节中进行了详细描述,因此请参阅该章节以获取深入解释。

简而言之,需要设置一个基本的 Hadoop Configuration 对象,其中包含目标 Elasticsearch 集群和索引,可能还有查询,然后就可以开始了。

从 Spark 的角度来看,唯一需要的是设置序列化 - Spark 默认依赖于 Java 序列化,这很方便,但效率相当低。这就是 Hadoop 本身引入了自己的序列化机制及其自身类型(即 Writable)的原因。因此,InputFormatOutputFormat 需要返回 Writable,而 Spark 默认情况下不理解 Writable。好消息是,可以轻松启用不同的序列化 (Kryo),它可以自动处理转换,并且效率也相当高。

SparkConf sc = new SparkConf(); //.setMaster("local");
sc.set("spark.serializer", KryoSerializer.class.getName()); 

// needed only when using the Java API
JavaSparkContext jsc = new JavaSparkContext(sc);

使用 Spark 启用 Kryo 序列化支持

或者,如果您喜欢 Scala

val sc = new SparkConf(...)
sc.set("spark.serializer", classOf[KryoSerializer].getName) 

使用 Spark 启用 Kryo 序列化支持

请注意,Kryo 序列化用作处理 Writable 类型的变通方法;可以选择直接转换类型(从 WritableSerializable 类型) - 这很好,但对于入门来说,上面的单行代码似乎是最有效的。

从 Elasticsearch 读取数据

编辑

要读取数据,只需传入 org.elasticsearch.hadoop.mr.EsInputFormat 类 - 由于它同时支持 oldnew Map/Reduce API,您可以自由地在 SparkContexthadoopRDD(我们建议为了简洁起见)或 newAPIHadoopRDD 上使用任一方法。无论您选择哪个,请坚持使用它,以避免在以后出现混乱和问题。

(org.apache.hadoop.mapred) API
编辑
JobConf conf = new JobConf();                             
conf.set("es.resource", "radio/artists");                 
conf.set("es.query", "?q=me*");                           

JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class,
                          Text.class, MapWritable.class); 
long docCount = esRDD.count();

创建 Hadoop 对象(使用旧的 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD - 键表示文档 ID,值表示文档本身

下面的 Scala 版本

val conf = new JobConf()                                   
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.hadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

创建 Hadoop 对象(使用旧的 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD

(org.apache.hadoop.mapreduce) API
编辑

正如预期的那样,mapreduce API 版本非常相似 - 将 hadoopRDD 替换为 newAPIHadoopRDD,将 JobConf 替换为 Configuration。就是这样。

Configuration conf = new Configuration();       
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 

JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class,
                Text.class, MapWritable.class); 
long docCount = esRDD.count();

创建 Hadoop 对象(使用新的 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD - 键表示文档 ID,值表示文档本身

下面的 Scala 版本

val conf = new Configuration()                             
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.newAPIHadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

创建 Hadoop 对象(使用新的 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD

从 PySpark 使用连接器

编辑

得益于其 Map/Reduce 层,elasticsearch-hadoop 也可以从 PySpark 中使用,以便从 Elasticsearch 中读取和写入数据。为此,下面是来自 Spark 文档 的一个片段(请确保切换到 Python 代码片段)

$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

此外,SQL 加载器也可以使用

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()