Apache Spark 支持编辑

Apache Spark 是一个快速、通用的集群计算系统。它提供了 Java、Scala 和 Python 的高级 API,以及一个支持通用执行图的优化引擎。

-- Spark 网站

Spark 通过通常将数据*缓存*在内存中,为大型数据集提供快速迭代/类函数功能。与本文档中提到的其他库不同,Apache Spark 是一个不依赖于 Map/Reduce 本身的计算框架,但它确实与 Hadoop 集成,主要是与 HDFS 集成。 elasticsearch-hadoop 允许在 Spark 中以两种方式使用 Elasticsearch:通过自 2.1 版起提供的专用支持或通过自 2.0 版起提供的 Map/Reduce 桥接。自 5.0 版起,elasticsearch-hadoop 中支持 Spark 2.0

安装编辑

与其他库一样,elasticsearch-hadoop 需要在 Spark 的类路径中可用。由于 Spark 具有多种部署模式,因此这可以转换为目标类路径,无论是在单个节点上(如本地模式 - 将在整个文档中使用)还是在每个节点上,具体取决于所需的基础架构。

原生 RDD 支持编辑

在 2.1 中添加。

elasticsearch-hadoop 以 RDD(弹性分布式数据集)(或更准确地说是*对* RDD)的形式提供 Elasticsearch 和 Apache Spark 之间的*原生*集成,可以从 Elasticsearch 读取数据。 RDD 以两种*风格*提供:一种用于 Scala(将数据作为 Tuple2 返回,其中包含 Scala 集合),另一种用于 Java(将数据作为 Tuple2 返回,其中包含 java.util 集合)。

只要有可能,请考虑使用*原生*集成,因为它提供了最佳性能和最大灵活性。

配置编辑

要为 Apache Spark 配置 elasticsearch-hadoop,可以在 SparkConf 对象中设置 配置 一章中描述的各种属性

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName(appName).setMaster(master)
conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
conf.set("es.index.auto.create", "true");

**命令行**对于那些希望通过命令行设置属性的用户(直接设置或从文件加载),请注意 Spark *仅*接受以 "spark." 前缀开头的属性,并将*忽略*其余属性(并且可能会根据版本抛出警告)。要解决此限制,请通过附加 spark. 前缀来定义 elasticsearch-hadoop 属性(因此它们变为 spark.es.),elasticsearch-hadoop 将自动解析它们

$ ./bin/spark-submit --conf spark.es.resource=index/type ... 

请注意 es.resource 属性,它已变为 spark.es.resource

将数据写入 Elasticsearch编辑

使用 elasticsearch-hadoop,任何 RDD 都可以保存到 Elasticsearch,只要其内容可以转换为文档即可。实际上,这意味着 RDD 类型需要是 Map(无论是 Scala 还是 Java)、JavaBean 还是 Scala case 类。如果不是这种情况,则可以轻松地在 Spark 中*转换*数据或插入自己的自定义 ValueWriter

Scala编辑

使用 Scala 时,只需导入 org.elasticsearch.spark 包,该包通过 pimp my library 模式,使用 saveToEs 方法丰富了*任何* RDD API

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD( 
  Seq(numbers, airports)
).saveToEs("spark/docs") 

Spark Scala 导入

elasticsearch-hadoop Scala 导入

通过其 Scala API 启动 Spark

makeRDD 根据指定的集合创建一个临时 RDD;可以传入任何其他 RDD(Java 或 Scala)

在 Elasticsearch 的 spark/docs 下索引内容(即两个*文档*(数字和机场))

Scala 用户可能会尝试使用 Seq 符号来声明*根*对象(即 JSON 文档),而不是使用 Map。虽然类似,但第一个符号会导致略有不同的类型,这些类型无法与 JSON 文档匹配:Seq 是一个有序序列(换句话说,是一个列表),而 创建一个 Tuple,它或多或少是一个有序的、固定数量的元素。因此,列表列表不能用作文档,因为它不能映射到 JSON 对象;但是它可以在一个对象中自由使用。因此,在上面的示例中,使用了 Map(k→v) 而不是 Seq(k→v)

作为上述*隐式*导入的替代方法,可以通过 org.elasticsearch.spark.rdd 包中的 EsSpark 在 Scala 中使用 elasticsearch-hadoop Spark 支持,它充当一个实用程序类,允许显式方法调用。此外,不要使用 Map(它很方便,但由于结构不同,每个实例需要一个映射),而是使用*case 类*

import org.apache.spark.SparkContext
import org.elasticsearch.spark.rdd.EsSpark                        

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))             
EsSpark.saveToEs(rdd, "spark/docs")                               

EsSpark 导入

定义一个名为 Trip 的 case 类

围绕 Trip 实例创建一个 RDD

通过 EsSpark 显式索引 RDD

对于需要指定文档的 ID(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置相应的 映射 来实现,即 es.mapping.id。按照前面的示例,要指示 Elasticsearch 使用字段 id 作为文档 ID,请更新 RDD 配置(也可以在 SparkConf 上设置该属性,但由于其全局影响,因此不建议这样做)

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java编辑

Java 用户有一个专用类,它提供类似于 EsSpark 的功能,即 org.elasticsearch.spark.rdd.api.java 中的 JavaEsSpark(一个类似于 Spark 的 Java API 的包)

import org.apache.spark.api.java.JavaSparkContext;                              
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                        
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");                                    

Spark Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark

为了简化示例,使用 Guava(Spark 的依赖项)Immutable* 方法来简化 MapList 的创建

在两个集合上创建一个简单的 RDD;可以传入任何其他 RDD(Java 或 Scala)

在 Elasticsearch 的 spark/docs 下索引内容(即两个*文档*(数字和机场))

可以使用 Java 5 *static* 导入进一步简化代码。此外,Map(由于其*松散*结构,其映射是动态的)可以用 JavaBean 替换

public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(
                            ImmutableList.of(upcoming, lastWeek));        
saveToEs(javaRDD, "spark/docs");                                          

静态导入 JavaEsSpark

定义一个包含 TripBean 实例的 RDDTripBean 是一个 JavaBean

调用 saveToEs 方法,而不必再次键入 JavaEsSpark

设置文档 ID(或其他元数据字段,如 ttltimestamp)与其 Scala 对应项类似,但根据您使用的是 JDK 类还是其他一些实用程序(如 Guava),可能会更加冗长

JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

将现有 JSON 写入 Elasticsearch编辑

对于 RDD 中的数据已经是 JSON 的情况,elasticsearch-hadoop 允许直接索引*而不*应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望包含 String 或字节数组(byte[]/Array[Byte])的 RDD,假设每个条目都表示一个 JSON 文档。如果 RDD 没有正确的签名,则无法应用 saveJsonToEs 方法(在 Scala 中,它们将不可用)。

Scala编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

new SparkContext(conf).makeRDD(Seq(json1, json2))
                      .saveJsonToEs("spark/json-trips") 

RDD 中条目的示例 - JSON 按*原样*写入,不进行任何转换,它不应包含换行符,如 \n 或 \r\n

通过专用的 saveJsonToEs 方法索引 JSON 数据

Java编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); 
JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");             

RDD 中条目的示例 - JSON 按*原样*写入,不进行任何转换,它不应包含换行符,如 \n 或 \r\n

注意 RDD<String> 签名

通过专用的 saveJsonToEs 方法索引 JSON 数据

写入动态/多资源编辑

如果需要根据数据内容将写入 Elasticsearch 的数据索引到不同的存储桶下,可以使用 es.resource.write 字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下

Scala编辑
val game = Map(
  "media_type"->"game", 
      "title" -> "FF VI",
       "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")  

用于拆分数据的文档 *键*。可以声明任何字段(但请确保它在所有文档中都可用)

根据其资源模式保存每个对象,在本例中,根据 media_type

对于要写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type 字段并使用其值来确定目标资源。

Java编辑

正如预期的那样,Java 中的情况惊人地相似

Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
saveToEs(javaRDD, "my-collection-{media_type}/doc");  

根据其资源模式保存每个对象,在本例中为 media_type

处理文档元数据编辑

Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射 选项,可以自定义这些参数,以便从其所属文档中提取它们的值。此外,您甚至可以包含/排除将哪些数据发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 *对* RDD 在文档本身 *外部* 提供元数据。换句话说,对于包含键值对的 RDD,可以从键中提取元数据,并将该值用作文档源。

元数据通过 org.elasticsearch.spark.rdd 包中的 Metadata Java 枚举 进行描述,该枚举标识其类型 - idttlversion 等…​ 因此,RDD 键可以是包含每个文档的 Metadata 及其关联值的 Map。如果 RDD 键的类型不是 Map,则 elasticsearch-hadoop 会将该对象视为表示文档 ID 并相应地使用它。这听起来比实际情况复杂,所以让我们看一些例子。

Scala编辑

RDD,或者简单地说,签名为 RDD[(K,V)]RDD 可以利用 saveToEsWithMeta 方法,这些方法可以通过 *隐式* 导入 org.elasticsearch.spark 包或 EsSpark 对象获得。要手动指定每个文档的 ID,只需在 RDD 中传入 Object(不是 Map 类型)即可

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
airportsRDD.saveToEsWithMeta("airports/2015")    

airportsRDD 是一个 *键值* 对 RDD;它是从一个 Seqtuple 创建的

Seq 中每个元组的键表示其关联值/文档的 *ID*;换句话说,文档 otp 的 ID 为 1muc 的 ID 为 2sfo 的 ID 为 3

由于 airportsRDD 是一个对 RDD,因此它可以使用 saveToEsWithMeta 方法。这告诉 elasticsearch-hadoop 要特别注意 RDD 键并将它们用作元数据,在本例中用作文档 ID。如果改为使用 saveToEs,则 elasticsearch-hadoop 会将 RDD 元组(即键和值)视为文档的一部分。

当需要指定多个 ID 时,应使用键的类型为 org.elasticsearch.spark.rdd.Metadatascala.collection.Map

import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
airportsRDD.saveToEsWithMeta("airports/2015") 

导入 Metadata 枚举

用于 otp 文档的元数据。在本例中,ID 的值为 1,TTL 的值为 3h

用于 muc 文档的元数据。在本例中,ID 的值为 2,VERSION 的值为 23

用于 sfo 文档的元数据。在本例中,ID 的值为 3

元数据和文档被组装成一个 *对* RDD

使用 saveToEsWithMeta 方法相应地保存 RDD

Java编辑

类似地,在 Java 端,JavaEsSpark 提供了 saveToEsWithMeta 方法,这些方法应用于 JavaPairRDD(Java 中 RDD[(K,V)] 的等效项)。因此,要根据 ID 保存文档,可以使用

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( 
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       

通过使用 Scala Tuple2 类(围绕文档 ID 和文档本身)创建 JavaPairRDD

第一个文档的元组,围绕 ID (1) 和文档 (otp) 本身

第二个文档的元组,围绕 ID (2) 和 jfk

使用键作为 ID,值作为文档,相应地保存 JavaPairRDD

当需要指定多个 ID 时,可以选择使用填充了 org.elasticsearch.spark.rdd.Metadata 类型的键的 java.util.Map

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       

描述可以声明的文档元数据的 Metadata enum

枚举的静态导入,以简短格式(IDTTL 等…​)引用其值

otp 文档的元数据

sfo 文档的元数据

otp(作为值)与其元数据(作为键)之间的元组

关联 sfo 及其元数据的元组

对包含文档及其各自元数据的 JavaPairRDD 调用 saveToEsWithMeta

从 Elasticsearch 读取数据编辑

对于读取,应该定义将数据从 Elasticsearch *流式传输* 到 Spark 的 Elasticsearch RDD

Scala编辑

与写入类似,org.elasticsearch.spark 包使用 esRDD 方法丰富了 SparkContext API

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val RDD = sc.esRDD("radio/artists")     

Spark Scala 导入

elasticsearch-hadoop Scala 导入

通过其 Scala API 启动 Spark

为索引 radio/artists 创建了一个专用的 RDD

可以重载该方法以指定其他查询,甚至可以指定配置 Map(覆盖 SparkConf

...
import org.elasticsearch.spark._

...
val conf = ...
val sc = new SparkContext(conf)

sc.esRDD("radio/artists", "?q=me*") 

创建一个 RDD,从索引 radio/artists 中流式传输与 me* 匹配的所有文档

默认情况下,来自 Elasticsearch 的文档作为 Tuple2 返回,其中第一个元素包含文档 ID,第二个元素包含通过 Scala 集合 表示的实际文档,即一个 `Map[String, Any]`,其中键表示字段名称,值表示它们各自的值。

Java编辑

Java 用户有一个专用的 JavaPairRDD,它的工作方式与其 Scala 对应物相同,但是返回的 Tuple2 值(或第二个元素)将文档作为本机 java.util 集合返回。

import org.apache.spark.api.java.JavaSparkContext;               
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;             
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);               

JavaPairRDD<String, Map<String, Object>> esRDD =
                        JavaEsSpark.esRDD(jsc, "radio/artists"); 

Spark Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark

为索引 radio/artists 创建了一个专用的 JavaPairRDD

以类似的方式,可以使用重载的 esRDD 方法来指定查询或传递 Map 对象以进行高级配置。让我们看看它是如何工作的,但这次使用 Java 静态导入。此外,让我们丢弃文档 ID 并仅检索 RDD

import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*;   

...
JavaRDD<Map<String, Object>> rdd =
        esRDD(jsc, "radio/artists", "?q=me*")  
            .values(); 

静态导入 JavaEsSpark

创建一个 RDD,从索引 radio/artists 中流式传输以 me 开头的所有文档。请注意,由于静态导入,该方法不必完全限定

仅返回 PairRDD 的 *值* - 因此结果的类型为 JavaRDD 而 *不是* JavaPairRDD

通过使用 JavaEsSpark API,可以获得 Spark 专用的 JavaPairRDD,它比基本 RDD 更适合 Java 环境(因为它具有 Scala 签名)。此外,专用的 RDD 将 Elasticsearch 文档作为正确的 Java 集合返回,因此您不必处理 Scala 集合(这通常是 RDD 的情况)。这在使用 Java 8 时尤其强大,我们强烈建议您使用它,因为它的 lambda 表达式 使集合处理 *非常* 简洁。

例如,假设您要过滤 RDD 中的文档,并仅返回包含 mega 的值的文档(请忽略您可以并且应该直接通过 Elasticsearch 进行过滤的事实)。

在 Java 8 之前的版本中,代码如下所示

JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(
    new Function<Map<String, Object>, Boolean>() {
      @Override
      public Boolean call(Map<String, Object> map) throws Exception {
          returns map.contains("mega");
      }
    });

使用 Java 8,过滤变成了一行代码

JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc ->
                                                doc.contains("mega"));
以 JSON 格式读取数据编辑

如果需要 Elasticsearch 的结果采用 JSON 格式(通常要将其发送到其他系统),则可以使用专用的 esJsonRDD 方法。在这种情况下,连接器将按从 Elasticsearch 接收到的原样返回文档内容,而无需任何处理,在 Scala 中为 RDD[(String, String)],在 Java 中为 JavaPairRDD[String, String],其中键表示文档 ID,值表示其 JSON 格式的实际内容。

类型转换编辑

处理多值/数组字段时,请参阅节,特别是这些配置选项。重要提示:如果使用自动索引创建,请查看节以获取更多信息。

elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(以及反向转换),如下表所示

表 4. Scala 类型转换表

Scala 类型 Elasticsearch 类型

None

null

Unit

null

Nil

数组

Some[T]

根据表格的 T

Map

对象

Traversable

数组

case 类

对象(参见 Map

Product

数组

此外,以下_隐式_转换适用于 Java 类型

表 5. Java 类型转换表

Java 类型 Elasticsearch 类型

null

null

String

字符串

Boolean

布尔值

Byte

字节

Short

短整型

Integer

整型

Long

长整型

Double

双精度浮点数

Float

浮点数

Number

浮点数双精度浮点数(取决于大小)

java.util.Calendar

日期字符串 格式)

java.util.Date

日期字符串 格式)

java.util.Timestamp

日期字符串 格式)

byte[]

字符串 (BASE64)

Object[]

数组

Iterable

数组

Map

对象

Java Bean

对象(参见 Map

转换尽力而为;保证内置的 Java 和 Scala 类型能够正确转换,但是对于 Java 或 Scala 中的用户类型,则不作任何保证。如上表所示,当在 Scala 中遇到 case 类或在 Java 中遇到 JavaBean 时,转换器会尝试_解包_其内容并将其保存为 对象。请注意,这只适用于顶级用户对象 - 如果用户对象中嵌套了其他用户对象,则转换可能会失败,因为转换器不会执行嵌套_解包_。这样做是有意为之的,因为转换器必须对数据进行_序列化_和_反序列化_,而用户类型由于数据丢失而引入了歧义;这可以通过某种类型的映射来解决,但这会使项目过于接近 ORM 的领域,并且可以说引入了过多的复杂性,而收益却微乎其微;借助 Spark 中的处理功能和 elasticsearch-hadoop 中的可插拔性,可以轻松地将对象转换为其他类型,如果需要,只需付出最小的努力和最大的控制力。

**地理类型**值得一提的是,Elasticsearch 中仅提供的富数据类型,例如 GeoPointGeoShape,可以通过将其结构转换为上表中可用的基元来支持。例如,根据其存储方式,geo_point 可能会作为 StringTraversable 返回。

Spark Streaming 支持编辑

添加于 5.0 版本。

Spark Streaming 是核心 Spark API 的扩展,支持对实时数据流进行可扩展、高吞吐量、容错的流处理。

-- Spark 网站

Spark Streaming 是核心 Spark 功能之上的扩展,允许对流数据进行近乎实时的处理。Spark Streaming 围绕 DStream 或_离散流_的概念工作。DStream 通过将新到达的记录收集到一个小的 RDD 中并执行它来操作。这会在每隔几秒钟重复一次,并在一个称为_微批处理_的过程中使用新的 RDDDStream api 包含许多与 RDD api 相同的处理操作,以及一些其他特定于流的方法。从 5.0 版本开始,elasticsearch-hadoop 提供了与 Spark Streaming 的原生集成。

使用 elasticsearch-hadoop Spark Streaming 支持时,可以将 Elasticsearch 作为输出位置,以便从 Spark Streaming 作业中将数据索引到其中,就像可以持久化 RDD 的结果一样。但是,与 RDD 不同,由于 DStream 的连续性,您无法使用它从 Elasticsearch 中读取数据。

Spark Streaming 支持提供特殊的优化,以便在运行处理窗口非常小的作业时,在 Spark 执行器上节省网络资源。因此,应该优先使用这种集成,而不是在从 DStreamforeachRDD 调用返回的 RDD 上调用 saveToEs

DStream 写入 Elasticsearch编辑

RDD 一样,只要可以将任何 DStream 的内容转换为文档,就可以将其保存到 Elasticsearch 中。在实践中,这意味着 DStream 类型需要是 Map(Scala 或 Java 类型)、JavaBean 或 Scala case 类。如果不是这种情况,则可以轻松地在 Spark 中_转换_数据或插入自己的自定义 ValueWriter

Scala编辑

使用 Scala 时,只需导入 org.elasticsearch.spark.streaming 包,该包通过 _pimp my library_ 模式,使用 saveToEs 方法丰富了 DStream API

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._               
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._

import org.elasticsearch.spark.streaming._           

...

val conf = ...
val sc = new SparkContext(conf)                      
val ssc = new StreamingContext(sc, Seconds(1))       

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)                

ssc.queueStream(microbatches).saveToEs("spark/docs") 

ssc.start()
ssc.awaitTermination() 

Spark 和 Spark Streaming Scala 导入

elasticsearch-hadoop Spark Streaming 导入

通过其 Scala API 启动 Spark

通过将 SparkContext 传递给 SparkStreaming 上下文来启动它。微批处理将每秒处理一次。

makeRDD 根据指定的集合创建一个临时 RDD;可以传入任何其他 RDD(Java 或 Scala)。创建一个 `RDD` 队列来表示要执行的微批处理。

RDD 中创建一个 DStream,并将内容(即两个_文档_(数字和机场))索引到 {es} 中的 `spark/docs` 下

启动 Spark Streaming 作业并等待它最终完成。

作为上述_隐式_导入的替代方法,可以使用 org.elasticsearch.spark.streaming 包中的 EsSparkStreaming 在 Scala 中使用 elasticsearch-hadoop Spark Streaming 支持,它充当一个实用程序类,允许显式方法调用。此外,不要使用 Map(它很方便,但由于结构不同,每个实例需要一个映射),而应使用_case 类_

import org.apache.spark.SparkContext
import org.elasticsearch.spark.streaming.EsSparkStreaming         

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val microbatches = mutable.Queue(rdd)                             
val dstream = ssc.queueStream(microbatches)

EsSparkStreaming.saveToEs(dstream, "spark/docs")                  

ssc.start()                                                       

EsSparkStreaming 导入

定义一个名为 Trip 的 case 类

围绕 Trip 实例的 RDD 创建一个 DStream

DStream 配置为通过 EsSparkStreaming 显式索引

启动流处理

启动 SparkStreamingContext 后,将无法添加或配置新的 DStream。上下文停止后,将无法重新启动。每个 JVM 一次只能有一个活动的 SparkStreamingContext。另请注意,以编程方式停止 SparkStreamingContext 时,它会停止底层的 SparkContext,除非指示不要这样做。

对于需要指定文档的 ID(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置相应的 映射(即 es.mapping.id)来实现。在上一个示例之后,要指示 Elasticsearch 使用字段 id 作为文档 ID,请更新 DStream 配置(也可以在 SparkConf 上设置该属性,但由于其全局影响,不建议这样做)

EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Java编辑

Java 用户有一个专用类,它提供类似于 EsSparkStreaming 的功能,即 org.elasticsearch.spark.streaming.api.java 包中的 JavaEsSparkStreaming(类似于 Spark 的 Java API

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;                                              
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;         
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              
JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1));

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
microbatches.add(javaRDD);                                                      
JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs");                       

jssc.start()                                                                    

Spark 和 Spark Streaming Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark 和 Spark Streaming。微批处理将每秒处理一次。

为了简化示例,使用 Guava(Spark 的依赖项)Immutable* 方法来简化 MapList 的创建

在微批处理上创建一个简单的 DStream;可以传入任何其他 RDD(Java 或 Scala)

在 Elasticsearch 的 spark/docs 下索引内容(即两个*文档*(数字和机场))

执行流作业。

可以使用 Java 5 *static* 导入进一步简化代码。此外,Map(由于其*松散*结构,其映射是动态的)可以用 JavaBean 替换

public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming;  
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek));
Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>();
microbatches.add(javaRDD);
JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches);       

saveToEs(javaDStream, "spark/docs");                                          

jssc.start()                                                              

静态导入 JavaEsSparkStreaming

定义一个包含 TripBean 实例的 DStreamTripBean 是一个 JavaBean

调用 saveToEs 方法,无需再次键入 JavaEsSparkStreaming

运行该 Streaming 作业

设置文档 ID(或其他元数据字段,如 ttltimestamp)与其 Scala 对应项类似,但根据您使用的是 JDK 类还是其他一些实用程序(如 Guava),可能会更加冗长

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

将现有 JSON 写入 Elasticsearch编辑

对于 DStream 流式传输的数据已经序列化为 JSON 的情况,elasticsearch-hadoop 允许直接索引_而无需_应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 需要一个包含 String 或字节数组(byte[]/Array[Byte])的 DStream,假设每个条目都表示一个 JSON 文档。如果 DStream 没有正确的签名,则无法应用 saveJsonToEs 方法(在 Scala 中,它们将不可用)。

Scala编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

val rdd = sc.makeRDD(Seq(json1, json2))
val microbatch = mutable.Queue(rdd)
ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips")      

ssc.start()                                                       

DStream 中条目的示例 - JSON _按原样写入_,不进行任何转换

配置流以通过专用 saveJsonToEs 方法对 JSON 数据进行索引

启动流式作业

Java编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>();      
microbatches.add(stringRDD);
JavaDStream<String> stringDStream = jssc.queueStream(microbatches);  

JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips");    

jssc.start()                                                         

DStream 中条目的示例 - JSON _按原样写入_,不进行任何转换

创建一个 RDD,将其放入队列中,并从排队的 RDD 中创建一个 DStream,将每个 RDD 视为一个微批次。

注意 JavaDStream<String> 签名

配置流以通过专用 saveJsonToEs 方法对 JSON 数据进行索引

启动流式作业

写入动态/多资源编辑

如果需要根据数据内容将写入 Elasticsearch 的数据索引到不同的存储桶下,可以使用 es.resource.write 字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下

Scala编辑
val game = Map(
  "media_type" -> "game", 
       "title" -> "FF VI",
        "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

val batch = sc.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc")  
ssc.start()

用于拆分数据的文档 *键*。可以声明任何字段(但请确保它在所有文档中都可用)

根据其资源模式保存每个对象,在本例中,根据 media_type

对于要写入的每个文档/对象,elasticsearch-hadoop 将提取 media_type 字段并使用其值来确定目标资源。

Java编辑

正如预期的那样,Java 中的情况惊人地相似

Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
Queue<JavaRDD<Map<String, ?>>> microbatches = ...
JavaDStream<Map<String, ?>> javaDStream =
                jssc.queueStream(microbatches);

saveToEs(javaDStream, "my-collection-{media_type}/doc");  
jssc.start();

根据其资源模式保存每个对象,在本例中为 media_type

处理文档元数据编辑

Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射 选项,可以自定义这些参数,以便从其所属文档中提取它们的值。此外,您甚至可以包含/排除将哪些数据发送回 Elasticsearch。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 键值对 RDD 在文档外部提供元数据。

这在 Spark Streaming 中没有什么不同。对于包含键值对的 DStream,可以从键中提取元数据,并将该值用作文档源。

元数据通过 org.elasticsearch.spark.rdd 包中的 Metadata Java 枚举 进行描述,该枚举标识其类型 - idttlversion 等…​ 因此,DStream 的键可以是包含每个文档的 Metadata 及其关联值的 Map。如果 DStream 键的类型不是 Map,则 elasticsearch-hadoop 会将该对象视为表示文档 ID,并相应地使用它。这听起来比实际情况复杂,因此让我们看一些示例。

Scala编辑

键值对 DStream,或者简单地说,签名为 DStream[(K,V)]DStream 可以利用 saveToEsWithMeta 方法,这些方法可以通过隐式导入 org.elasticsearch.spark.streaming 包或 EsSparkStreaming 对象来使用。要为每个文档手动指定 ID,只需在 DStream 中传入 Object(不是 Map 类型)即可

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()

airportsRDD 是一个 *键值* 对 RDD;它是从一个 Seqtuple 创建的

Seq 中每个元组的键表示其关联值/文档的 *ID*;换句话说,文档 otp 的 ID 为 1muc 的 ID 为 2sfo 的 ID 为 3

我们构造一个 DStream,它继承 RDD 的类型签名

由于生成的 DStream 是一个键值对 DStream,因此它可以使用 saveToEsWithMeta 方法。这告诉 elasticsearch-hadoop 要特别注意 DStream 键并将它们用作元数据,在本例中用作文档 ID。如果改为使用 saveToEs,则 elasticsearch-hadoop 会将 DStream 元组(即键和值)视为文档的一部分。

当需要指定多个 ID 时,应使用键的类型为 org.elasticsearch.spark.rdd.Metadatascala.collection.Map

import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()

导入 Metadata 枚举

用于 otp 文档的元数据。在本例中,ID 的值为 1,TTL 的值为 3h

用于 muc 文档的元数据。在本例中,ID 的值为 2,VERSION 的值为 23

用于 sfo 文档的元数据。在本例中,ID 的值为 3

元数据和文档被组装成一个 *对* RDD

DStream 继承 RDD 的签名,成为一个键值对 DStream

DStream 配置为使用 saveToEsWithMeta 方法相应地对数据进行索引

Java编辑

类似地,在 Java 端,JavaEsSparkStreaming 提供了应用于 JavaPairDStream(Java 中 DStream[(K,V)] 的等效项)的 saveToEsWithMeta 方法。

由于 Java API 的限制,这往往需要做更多的工作。例如,您不能直接从 JavaPairRDD 队列创建 JavaPairDStream。相反,您必须创建一个 Tuple2 对象的常规 JavaDStream,并将 JavaDStream 转换为 JavaPairDStream。这听起来很复杂,但它是 API 限制的一个简单解决方法。

首先,我们将创建一个对函数,它接收一个 Tuple2 对象,并将其原样返回给框架

public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable {
    @Override
    public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception {
        return tuple2;
    }
}

然后,我们将对函数应用于 Tuple2JavaDStream 以创建一个 JavaPairDStream 并保存它

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...

// create an RDD of between the id and the docs
JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize(         
      ImmutableList.of(
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();

创建一个常规的 JavaRDD,其中包含围绕文档 ID 和文档本身的 Scala Tuple2

第一个文档的元组,围绕 ID (1) 和文档 (otp) 本身

第二个文档的元组,围绕 ID (2) 和 jfk

从元组 RDD 中组装一个常规的 JavaDStream

通过将我们的 Tuple2 标识函数传递给 mapToPair 方法,将 JavaDStream 转换为 JavaPairDStream。这将允许将类型转换为 JavaPairDStream。此函数可以替换为作业中的任何内容,这些内容将从单个条目中提取要索引的 ID 和文档。

JavaPairRDD 配置为使用键作为 ID,值作为文档来相应地对数据进行索引

当需要指定多个 ID 时,可以选择使用填充了 org.elasticsearch.spark.rdd.Metadata 类型的键的 java.util.Map。我们将使用相同的类型技巧将 JavaDStream 重新打包为 JavaPairDStream

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();

描述可以声明的文档元数据的 Metadata enum

枚举的静态导入,以简短格式(IDTTL 等…​)引用其值

otp 文档的元数据

sfo 文档的元数据

otp(作为值)与其元数据(作为键)之间的元组

关联 sfo 及其元数据的元组

JavaRDD 中创建一个 JavaDStream

通过将 Tuple2 标识函数映射到 JavaDStream 上,将其重新打包为 JavaPairDStream

在包含文档及其各自元数据的 JavaPairDStream 上调用 saveToEsWithMeta

Spark Streaming 类型转换编辑

elasticsearch-hadoop Spark Streaming 支持利用与常规 Spark 类型映射相同的类型映射。为了保持一致性,此处重复列出了这些映射

表 6. Scala 类型转换表

Scala 类型 Elasticsearch 类型

None

null

Unit

null

Nil

数组

Some[T]

根据表格的 T

Map

对象

Traversable

数组

case 类

对象(参见 Map

Product

数组

此外,以下_隐式_转换适用于 Java 类型

表 7. Java 类型转换表

Java 类型 Elasticsearch 类型

null

null

String

字符串

Boolean

布尔值

Byte

字节

Short

短整型

Integer

整型

Long

长整型

Double

双精度浮点数

Float

浮点数

Number

浮点数双精度浮点数(取决于大小)

java.util.Calendar

日期字符串 格式)

java.util.Date

日期字符串 格式)

java.util.Timestamp

日期字符串 格式)

byte[]

字符串 (BASE64)

Object[]

数组

Iterable

数组

Map

对象

Java Bean

对象(参见 Map

地理类型值得一提的是,仅在 Elasticsearch 中可用的丰富数据类型(例如 GeoPointGeoShape)可以通过将其结构转换为上表中可用的基元来支持。例如,根据其存储,geo_point 可能会作为 StringTraversable 返回。

Spark SQL 支持编辑

在 2.1 中添加。

Spark SQL 是一个用于结构化数据处理的 Spark 模块。它提供了一种称为 DataFrame 的编程抽象,还可以充当分布式 SQL 查询引擎。

-- Spark 网站

除了核心 Spark 支持之外,elasticsearch-hadoop 还提供了与 Spark SQL 的集成。换句话说,Elasticsearch 成为 Spark SQL 的本机源,因此可以从 Spark SQL 透明地索引和查询数据。

Spark SQL 使用结构化数据 - 换句话说,所有条目都应具有相同的结构(相同数量的字段,具有相同的类型和名称)。不支持使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 PairRDD

支持的 Spark SQL 版本编辑

Spark SQL 在成为一个成熟组件的同时,在不同版本之间仍在经历重大变化。Spark SQL 在 1.3 版本中成为一个稳定的组件,但是它与以前的版本 向后兼容。此外,Spark 2.0 引入了一些重大变化,这些变化通过 Dataset API 破坏了向后兼容性。elasticsearch-hadoop 通过两个不同的 jar 支持 Spark SQL 1.3-1.6 和 Spark SQL 2.0:elasticsearch-spark-1.x-<version>.jarelasticsearch-hadoop-<version>.jar 支持 Spark SQL 1.3-1.6(或更高版本),而 elasticsearch-spark-2.0-<version>.jar 支持 Spark SQL 2.0。换句话说,除非您使用的是 Spark 2.0,否则请使用 elasticsearch-spark-1.x-<version>.jar

Spark SQL 支持在 org.elasticsearch.spark.sql 包下提供。

API 差异从 elasticsearch-hadoop 用户的角度来看,Spark SQL 1.3-1.6 和 Spark 2.0 之间的差异相当小。这份 文档 详细描述了这些差异,下面将简要介绍

DataFrameDataset
1.3+ 中 Spark SQL 的核心单元是 DataFrame。此 API 在 Spark 2.0 中仍然存在,但是它基于 Dataset
统一 API 与专用 Java/Scala API
在 Spark SQL 2.0 中,通过引入 SparkSession 以及对 `Dataset`、`DataFrame` 和 `RDD` 使用相同的底层代码,API 得到了进一步统一

从概念上讲,DataFrameDataset[Row],因此下面的文档将重点介绍 Spark SQL 1.3-1.6。

DataFrame(Spark SQL 1.3+)写入 Elasticsearch编辑

使用 elasticsearch-hadoop,可以将 DataFrame(或任何 Dataset)索引到 Elasticsearch。

Scala编辑

在 Scala 中,只需导入 org.elasticsearch.spark.sql 包,它将使用 saveToEs 方法丰富给定的 DataFrame 类;虽然这些方法与 org.elasticsearch.spark 包中的方法具有相同的签名,但它们是为 DataFrame 实现而设计的。

// reusing the example from Spark SQL documentation

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._

import org.elasticsearch.spark.sql._      

...

// sc = existing SparkContext
val sqlContext = new SQLContext(sc)

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = sc.textFile("people.txt")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))
        .toDF()

people.saveToEs("spark/people")           

Spark SQL 包导入

elasticsearch-hadoop Spark 包导入

将文本文件读取为*普通* RDD 并将其映射到 DataFrame(使用 Person 案例类)

通过 saveToEs 方法将生成的 DataFrame 索引到 Elasticsearch

默认情况下,elasticsearch-hadoop 将忽略空值,以便不写入任何字段。由于 DataFrame 旨在被视为结构化表格数据,因此您可以通过将 es.spark.dataframe.write.null 设置切换为 true 来启用将空值写入 DataFrame 对象的空值字段。

Java编辑

类似地,对于 Java 使用,专用包 org.elasticsearch.spark.sql.api.java 通过 JavaEsSpark SQL 提供类似的功能

import org.apache.spark.sql.api.java.*;                      
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;  
...

DataFrame people = ...
JavaEsSparkSQL.saveToEs(people, "spark/people");                     

Spark SQL Java 导入

elasticsearch-hadoop Spark SQL Java 导入

在 Elasticsearch 中的 spark/people 下索引 DataFrame

同样,使用 Java 5 *静态*导入,这可以进一步简化为

import static org.elasticsearch.spark.sql.api.java.JavaEsSpark SQL; 
...
saveToEs("spark/people");                                          

静态导入 JavaEsSpark SQL

调用 saveToEs 方法,而不必再次键入 JavaEsSpark

为了最大限度地控制 Elasticsearch 中 DataFrame 的映射,强烈建议您事先创建映射。有关更多信息,请参阅本章

将现有 JSON 写入 Elasticsearch编辑

使用 Spark SQL 时,如果输入数据为 JSON 格式,只需通过 SQLContext/JavaSQLContext jsonFile 方法将其转换为 DataFrame(在 Spark SQL 1.3 中)或 Dataset(对于 Spark SQL 2.0)(如 Spark 文档中所述)。

使用纯 SQL 从 Elasticsearch 读取数据编辑

索引及其映射必须在创建临时表之前存在

Spark SQL 1.2 引入了一个新的API,用于从外部数据源读取数据,elasticsearch-hadoop 支持该 API,从而简化了与 Elasticsearch 交互所需的 SQL 配置。此外,它在幕后了解 Spark 执行的操作,因此可以优化数据和查询(例如过滤或剪枝),从而提高性能。

Spark SQL 中的数据源编辑

使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext load 方法访问 Elasticsearch。换句话说,以*声明性*方式创建由 Elasticsearch 支持的 DataFrame/Dataset

val sql = new SQLContext...
// Spark 1.3 style
val df = sql.load( 
  "spark/index",   
  "org.elasticsearch.spark.sql") 

SQLContext *实验性* load 方法,用于任意数据源

要加载的路径或资源 - 在这种情况下,Elasticsearch 中的索引/类型

数据源提供程序 - org.elasticsearch.spark.sql

在 Spark 1.4 中,将使用以下类似的 API 调用

// Spark 1.4 style
val df = sql.read      
  .format("org.elasticsearch.spark.sql") 
  .load("spark/index") 

SQLContext *实验性* read 方法,用于任意数据源

数据源提供程序 - org.elasticsearch.spark.sql

要加载的路径或资源 - 在这种情况下,Elasticsearch 中的索引/类型

在 Spark 1.5 中,这可以进一步简化为

// Spark 1.5 style
val df = sql.read.format("es")
  .load("spark/index")

使用 es 作为别名,而不是 DataSource 提供程序的完整包名称

无论使用哪种 API,一旦创建,就可以自由访问 DataFrame 来操作数据。

*sources*声明还允许传入特定选项,即

名称 默认值 描述

路径

必需

Elasticsearch 索引/类型

下推

true

是否将 Spark SQL 转换为 Elasticsearch 查询 DSL(*下推*)

严格

false

是否使用*精确*(非分析)匹配(分析)

可在 Spark 1.6 或更高版本中使用

double.filtering

true

是否告诉 Spark 对下推的过滤器应用自己的过滤

下一节将解释这两个选项。要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map 传递给上述方法即可

例如

val sql = new SQLContext...
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/index",
                    "pushdown" -> "true",     
                    "es.nodes" -> "someNode", 
                     "es.port" -> "9200")

// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) 

// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true",     
                  "es.nodes" -> "someNode", 
                   "es.port" -> "9200")

// Spark 1.4 style
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
                        .options(options) 
                        .load("spark/index")

pushdown 选项 - 特定于 Spark 数据源

es.nodes 配置选项

在定义/加载源时传递选项

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', nodes 'someNode')" ) " 

Spark 的临时表名

USING 子句,用于标识数据源提供程序,在本例中为 org.elasticsearch.spark.sql

elasticsearch-hadoop 配置选项,必填选项为 resourcees. 前缀是固定的,因为 SQL 解析器

请注意,由于 SQL 解析器的原因,不允许使用 .(以及用于分隔的其他常见字符);连接器尝试通过自动追加 es. 前缀来解决此问题,但这仅适用于指定只有一个 . 的配置选项(如上面的 es.nodes)。因此,如果需要具有多个 . 的属性,则应使用上面的 SQLContext.loadSQLContext.read 方法并将属性作为 Map 传递。

下推操作编辑

使用 elasticsearch-hadoop 作为 Spark source 的一个重要的*隐藏*特性是,连接器可以理解 DataFrame/SQL 中执行的操作,并且默认情况下会将它们*转换*为相应的QueryDSL。换句话说,连接器直接在源头*下推*操作,在源头有效地过滤掉数据,以便*仅*将所需的数据流回 Spark。这显著提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回需要的数据(而不是返回大量数据,然后由 Spark 处理和丢弃)。请注意,即使指定了查询,下推操作也会应用 - 连接器将根据指定的 SQL *增强*查询。

另外,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的*所有* `Filter`,同时保留与 Spark 1.3.0 的向后二进制兼容性,将 SQL 操作完全下推到 Elasticsearch,而无需任何用户干预。

已优化为下推过滤器的运算符

SQL 语法 ES 1.x/2.x 语法 ES 5.x 语法

= null , is_null

missing

must_not.exists

= (严格)

term

term

= (非严格)

match

match

> , < , >= , ⇐

range

range

is_not_null

exists

exists

in (严格)

terms

terms

in (非严格)

or.filters

bool.should

and

and.filters

bool.filter

or

or.filters

bool.should [bool.filter]

not

not.filter

bool.must_not

StringStartsWith

wildcard(arg*)

wildcard(arg*)

StringEndsWith

wildcard(*arg)

wildcard(*arg)

StringContains

wildcard(*arg*)

wildcard(*arg*)

EqualNullSafe (严格)

term

term

EqualNullSafe (非严格)

match

match

例如,请考虑以下 Spark SQL

// as a DataFrame
val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips")

df.printSchema()
// root
//|-- departure: string (nullable = true)
//|-- arrival: string (nullable = true)
//|-- days: long (nullable = true)

val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))

或在纯 SQL 中

CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips")
SELECT departure FROM trips WHERE arrival = "OTP" and days > 3

连接器将查询转换为

{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}

      },
      "filter" : {
        "and" : [{
            "query" : {
              "match" : {
                "arrival" : "OTP"
              }
            }
          }, {
            "days" : {
              "gt" : 3
            }
          }
        ]
      }
    }
  }
}

此外,下推过滤器可以作用于*分析*的词条(默认值),也可以配置为*严格*并提供*精确*匹配(仅作用于*未分析*的字段)。除非手动指定映射,否则强烈建议保留默认值。Elasticsearch 参考文档中详细讨论了此主题和其他主题。

请注意,自 elasticsearch-hadoop 2.2 for Spark 1.6 或更高版本起,double.filtering 允许 Spark(默认)处理/评估已下推到 Elasticsearch 的过滤器。关闭此功能(尤其是在处理大型数据集时)可以加快速度。但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式,*分析*与*未分析*)。通常,在启用*严格*时,也可以禁用 double.filtering

数据源作为表编辑

从 Spark SQL 1.2 开始,还可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问它

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', " + 
            "scroll_size '20')" ) 

Spark 的临时表名

USING 子句,用于标识数据源提供程序,在本例中为 org.elasticsearch.spark.sql

elasticsearch-hadoop 配置选项,必填选项为 resource。为了方便起见,可以使用 es 前缀或跳过它。

由于使用 . 会导致语法异常,因此应将其替换为 _ 样式。因此,在本例中,es.scroll.size 变为 scroll_size(因为可以删除前导 es)。请注意,这仅适用于 Spark 1.3,因为 Spark 1.4 的解析器更严格。有关更多信息,请参阅上一章。

定义后,将自动获取架构。因此,您可以立即发出查询

val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")

由于 elasticsearch-hadoop 知道正在进行的查询,因此它可以*优化*对 Elasticsearch 发出的请求。例如,给定以下查询

val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")

它知道只需要 nameid 字段(第一个返回给用户,第二个用于 Spark 的内部过滤),因此只会请求这些数据,从而使查询效率很高。

从 Elasticsearch 读取 DataFrame(Spark SQL 1.3)编辑

正如您可能已经猜到的那样,可以定义一个由 Elasticsearch 文档支持的 DataFrame。甚至更好的是,让它们由查询结果支持,从而有效地创建数据的动态实时*视图*。

Scala编辑

通过 org.elasticsearch.spark.sql 包,esDF 方法在 SQLContext API 上可用

import org.apache.spark.sql.SQLContext        

import org.elasticsearch.spark.sql._          
...

val sql = new SQLContext(sc)

val people = sql.esDF("spark/people")         

// check the associated schema
println(people.schema.treeString)             
// root
//  |-- name: string (nullable = true)
//  |-- surname: string (nullable = true)
//  |-- age: long (nullable = true)           

Spark SQL Scala 导入

elasticsearch-hadoop SQL Scala 导入

创建一个由 Elasticsearch 中的 spark/people 索引支持的 DataFrame

从 Elasticsearch 发现的与 DataFrame 关联的架构

请注意,如 映射和类型 一章中所述,在使用默认 Elasticsearch 映射时,age 字段是如何转换为 Long 的。

就像 Spark *core* 支持一样,可以指定其他参数,例如查询。这是一个非常*强大*的概念,因为可以在源(Elasticsearch)过滤数据,并且仅对结果使用 Spark

// get only the Smiths
val smiths = sqlContext.esDF("spark/people","?q=Smith") 

Elasticsearch 查询,其结果包含 DataFrame

**控制 DataFrame 架构**在某些情况下,特别是当 Elasticsearch 中的索引包含大量字段时,希望创建一个仅包含其中*一部分*的 DataFrame。虽然可以通过官方 Spark API 或通过专用查询修改 DataFrame(通过处理其支持的 RDD),但 elasticsearch-hadoop 允许用户指定在创建 DataFrame 时要包含和排除 Elasticsearch 中的哪些字段。

通过 es.read.field.includees.read.field.exclude 属性,可以指示要包含或排除索引映射中的哪些字段。语法类似于 Elasticsearch 包含/排除。可以使用逗号指定多个值。默认情况下,不指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。请注意,这些属性可以包含前导和尾随通配符。包含字段层次结构的一部分而不包含尾随通配符并不意味着包含整个层次结构。但是,在大多数情况下,仅包含层次结构的一部分是没有意义的,因此应包含尾随通配符。

例如

# include
es.read.field.include = *name, address.*
# exclude
es.read.field.exclude = *.created

由于 SparkSQL 使用 DataFrame 架构的方式,elasticsearch-hadoop 需要在执行实际查询*之前*了解 Elasticsearch 返回了哪些字段。虽然可以通过底层 Elasticsearch 查询手动限制字段,但 elasticsearch-hadoop 不知道这一点,结果可能会不同,或者更糟的是,会发生错误。请改用上面的属性,Elasticsearch 将与用户查询一起正确使用这些属性。

Java编辑

对于 Java 用户,JavaEsSpark SQL 提供了专用 API。它与 EsSpark SQL 惊人地相似,但是它允许通过 Java 集合而不是 Scala 集合传递配置选项;除此之外,使用两者完全相同

import org.apache.spark.sql.api.java.JavaSQLContext;          
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;   
...
SQLContext sql = new SQLContext(sc);

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");  

Spark SQL 导入

elasticsearch-hadoop 导入

创建一个由 Elasticsearch 索引支持的 Java DataFrame

更好的是,DataFrame 可以由查询结果支持

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people", "?q=Smith"); 

支持 elasticsearch-hadoop DataFrame 的 Elasticsearch 查询

Spark SQL 类型转换编辑

处理多值/数组字段时,请参阅节,特别是这些配置选项。重要提示:如果使用自动索引创建,请查看节以获取更多信息。

elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型(以及反向转换),如下表所示

虽然 Spark SQL DataType 在 Scala 和 Java 中都有等效项,因此 RDD 转换可以应用,但语义略有不同 - 特别是对于 java.sql 类型,因为 Spark SQL 处理它们的方式不同

表 8. Spark SQL 1.3+ 转换表

Spark SQL DataType Elasticsearch 类型

null

null

ByteType

字节

ShortType

短整型

IntegerType

整型

LongType

长整型

FloatType

浮点数

DoubleType

双精度浮点数

StringType

字符串

BinaryType

字符串 (BASE64)

BooleanType

布尔值

DateType

日期字符串 格式)

TimestampType

long(unix 时间)

ArrayType

数组

MapType

对象

StructType

对象

**地理类型转换表**除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 对地理类型执行自动架构检测,即 Elasticsearch geo_pointgeo_shape。由于每种类型都允许多种格式(geo_point 接受以 4 种不同方式指定的纬度和经度,而 geo_shape 允许各种类型(目前为 9 种)),并且映射不提供此类信息,因此 elasticsearch-hadoop 将在启动时*采样*确定的地理字段,并检索包含所有相关字段的任意文档;它将解析它,从而确定必要的架构(例如,它可以判断 geo_point 是指定为 StringType 还是 ArrayType)。

由于 Spark SQL 是强类型的,因此每个地理字段在*所有*文档中都需要具有相同的格式。否则,返回的数据将不适合检测到的架构,从而导致错误。

Spark 结构化流支持编辑

在 6.0 中添加。

结构化流 提供快速、可扩展、容错、端到端的精确一次流处理,而用户无需推理流。

-- Spark 文档

作为 Spark 2.0 中的实验性功能发布,Spark 结构化流提供了一个内置于 Spark SQL 集成中的统一流和批处理接口。从 elasticsearch-hadoop 6.0 开始,我们提供将流数据索引到 Elasticsearch 的本机功能。

与 Spark SQL 一样,结构化流使用*结构化*数据。所有条目都应具有*相同*的结构(相同数量的字段,具有相同的类型和名称)。*不支持*使用非结构化数据(具有不同结构的文档),并且会导致问题。对于这种情况,请使用 DStream

支持的 Spark 结构化流版本编辑

从 Spark v2.2.0 开始,Spark 结构化流被认为是*普遍可用*的。因此,elasticsearch-hadoop 对结构化流的支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。与其之前的 Spark SQL 类似,结构化流在接口被认为*稳定*之前,可能会在版本之间发生重大变化。

Spark 结构化流支持在 org.elasticsearch.spark.sqlorg.elasticsearch.spark.sql.streaming 包下可用。它以 Dataset[_] api 的形式与 Spark SQL 共享一个统一的接口。客户端可以以几乎与常规批处理 Dataset 完全相同的方式与流式 Dataset 交互,只有少数例外

将流式 Datasets(Spark SQL 2.0+)写入 Elasticsearch编辑

使用 elasticsearch-hadoop,可以将流支持的 Dataset 索引到 Elasticsearch。

Scala编辑

在 Scala 中,要将基于流的 DatasetDataFrame 保存到 Elasticsearch,只需将流配置为使用 "es" 格式写出,如下所示

import org.apache.spark.sql.SparkSession    

...

val spark = SparkSession.builder()
   .appName("EsStreamingExample")           
   .getOrCreate()

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = spark.readStream                   
        .textFile("/path/to/people/files/*")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))

people.writeStream
      .option("checkpointLocation", "/save/location")      
      .format("es")
      .start("spark/people")                               

Spark SQL 导入

创建 SparkSession

调用 readStream 而不是 read 以获取 DataStreamReader 的实例

连续读取文本文件目录并将它们转换为 Person 对象

提供一个位置来保存流式查询的偏移量和提交日志

使用 "es" 格式启动流,以将 Dataset 的内容连续索引到 Elasticsearch

Spark 在基于批处理和基于流的 Dataset 之间没有基于类型的区别。虽然您可能能够导入 org.elasticsearch.spark.sql 包以将 saveToEs 方法添加到您的 DatasetDataFrame 中,但如果在基于流的 DatasetDataFrame 上调用这些方法,则会引发非法参数异常。

Java编辑

以类似的方式,"es" 格式也可用于 Java

import org.apache.spark.sql.SparkSession    

...

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")     
  .getOrCreate();

// java bean style class
public static class PersonBean {
  private String name;
  private String surname;                        
  private int age;

  ...
}

Dataset<PersonBean> people = spark.readStream()         
        .textFile("/path/to/people/files/*")
        .map(new MapFunction<String, PersonBean>() {
            @Override
            public PersonBean call(String value) throws Exception {
                return someFunctionThatParsesStringToJavaBeans(value.split(","));         
            }
        }, Encoders.<PersonBean>bean(PersonBean.class));

people.writeStream()
    .option("checkpointLocation", "/save/location")       
    .format("es")
    .start("spark/people");                               

Spark SQL Java 导入。可以使用与 Scala 相同的会话类

创建 SparkSession。也可以使用旧的 SQLContext api

我们创建一个 java bean 类来用作我们的数据格式

使用 readStream() 方法获取 DataStreamReader 以开始构建我们的流

将我们的字符串数据转换为 PersonBean

设置一个位置来保存我们流的状态

使用 "es" 格式,我们不断将 Elasticsearch 中的 Dataset 索引到 spark/people

将现有 JSON 写入 Elasticsearch编辑

使用 Spark SQL 时,如果输入数据为 JSON 格式,只需通过 DataStreamReaderjson 格式将其转换为 Dataset(对于 Spark SQL 2.0)(如 Spark 文档中所述)。

Spark 结构化流中的接收器提交日志编辑

Spark 结构化流式传输宣传了一种端到端的容错精确一次处理模型,该模型通过使用偏移量检查点和为每个流式查询维护提交日志来实现。执行流式查询时,大多数源和接收器都要求您指定“checkpointLocation”以持久化作业的状态。如果发生中断,使用相同的检查点位置启动新的流式查询将恢复作业状态并从中断处继续。我们在配置的检查点位置下的特殊目录中维护 elasticsearch-hadoop 的 Elasticsearch 接收器实现的提交日志

$> ls /path/to/checkpoint/location
metadata  offsets/  sinks/
$> ls /path/to/checkpoint/location/sinks
elasticsearch/
$> ls /path/to/checkpoint/location/sinks/elasticsearch
12.compact  13  14  15 16  17  18

提交日志目录中的每个文件对应于已提交的批处理 ID。日志实现会定期压缩日志以避免混乱。您可以通过多种方式设置日志目录的位置

  1. 使用 es.spark.sql.streaming.sink.log.path 设置显式日志位置(见下文)。
  2. 如果未设置,则将使用 checkpointLocation 指定的路径。
  3. 如果未设置,则将通过将 SparkSession 中的 spark.sql.streaming.checkpointLocation 的值与 Dataset 的给定查询名称组合来构造路径。
  4. 如果不存在查询名称,则在上述情况下将使用随机 UUID 代替查询名称
  5. 如果未提供上述任何设置,则 start 调用将抛出异常

以下是影响 Elasticsearch 提交日志行为的配置列表

es.spark.sql.streaming.sink.log.enabled(默认值 true
启用或禁用流式作业的提交日志。默认情况下,日志处于启用状态,并且将跳过具有相同批处理 ID 的输出批处理以避免重复写入。当此设置为 false 时,提交日志将被禁用,并且所有输出都将发送到 Elasticsearch,而不管它们是否已在之前的执行中发送。
es.spark.sql.streaming.sink.log.path
设置存储此流式查询的日志数据的位置。如果未设置此值,则 Elasticsearch 接收器会将其提交日志存储在 checkpointLocation 中给定的路径下。任何兼容 HDFS 客户端的 URI 都是可以接受的。
es.spark.sql.streaming.sink.log.cleanupDelay(默认值 10m
提交日志通过 Spark 的 HDFS 客户端进行管理。某些兼容 HDFS 的文件系统(如 Amazon 的 S3)以异步方式传播文件更改。为了解决这个问题,在一组日志文件被压缩后,客户端将在清理旧文件之前等待这段时间。
es.spark.sql.streaming.sink.log.deletion(默认值 true
确定日志是否应删除不再需要的旧日志。在提交每个批处理后,客户端将检查是否有任何已压缩且可以安全删除的提交日志。如果设置为 false,则日志将跳过此清理步骤,为每个批处理保留一个提交文件。
es.spark.sql.streaming.sink.log.compactInterval(默认值 10
设置在压缩日志文件之前要处理的批处理数。默认情况下,每 10 个批处理,提交日志将被压缩成一个包含所有先前提交的批处理 ID 的文件。

Spark 结构化流式传输类型转换编辑

结构化流式传输使用与 Spark SQL 集成完全相同的类型转换规则。

在处理多值/数组字段时,请参阅节,特别是这些配置选项。

如果使用自动索引创建,请查看节以获取更多信息。

elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型,如下表所示

虽然 Spark SQL DataType 在 Scala 和 Java 中都有等效项,因此 RDD 转换可以应用,但语义略有不同 - 特别是对于 java.sql 类型,因为 Spark SQL 处理它们的方式不同

表 9. Spark SQL 1.3+ 转换表

Spark SQL DataType Elasticsearch 类型

null

null

ByteType

字节

ShortType

短整型

IntegerType

整型

LongType

长整型

FloatType

浮点数

DoubleType

双精度浮点数

StringType

字符串

BinaryType

字符串 (BASE64)

BooleanType

布尔值

DateType

日期字符串 格式)

TimestampType

long(unix 时间)

ArrayType

数组

MapType

对象

StructType

对象

使用 Map/Reduce 层编辑

将 Spark 与 Elasticsearch 结合使用的另一种方法是通过 Map/Reduce 层,即利用 elasticsearch-hadoop 中专用的 Input/OuputFormat。但是,除非您坚持使用 elasticsearch-hadoop 2.0,否则我们*强烈*建议您使用原生集成,因为它提供了更好的性能和灵活性。

配置编辑

通过 elasticsearch-hadoop,Spark 可以通过其专用的 InputFormat 与 Elasticsearch 集成,并且在写入时,可以通过 OutputFormat 与 Elasticsearch 集成。这些内容在Map/Reduce一章中有详细介绍,因此请参阅该章节以获取深入的解释。

简而言之,您需要使用目标 Elasticsearch 集群和索引(可能还有一个查询)设置一个基本的 Hadoop Configuration 对象,然后就可以开始了。

从 Spark 的角度来看,唯一需要做的是设置序列化 - Spark 默认依赖于 Java 序列化,这很方便,但效率相当低。这就是 Hadoop 本身引入自己的序列化机制和自己的类型(即 Writables)的原因。因此,InputFormatOutputFormats 需要返回 Spark 无法理解的 Writables。好消息是,您可以轻松启用不同的序列化(Kryo),它可以自动处理转换,并且效率也很高。

SparkConf sc = new SparkConf(); //.setMaster("local");
sc.set("spark.serializer", KryoSerializer.class.getName()); 

// needed only when using the Java API
JavaSparkContext jsc = new JavaSparkContext(sc);

使用 Spark 启用 Kryo 序列化支持

或者,如果您更喜欢 Scala

val sc = new SparkConf(...)
sc.set("spark.serializer", classOf[KryoSerializer].getName) 

使用 Spark 启用 Kryo 序列化支持

请注意,Kryo 序列化用作处理 Writable 类型的变通方法;您可以选择直接转换类型(从 WritableSerializable 类型)- 这很好,但是对于入门来说,上面的一行代码似乎是最有效的。

从 Elasticsearch 读取数据编辑

要读取数据,只需传入 org.elasticsearch.hadoop.mr.EsInputFormat 类 - 因为它同时支持 旧的新的 Map/Reduce API,因此您可以自由使用 SparkContext's、hadoopRDD(为了简洁起见,我们推荐使用此方法)或 newAPIHadoopRDD 上的任何一种方法。无论您选择哪种方法,都要坚持下去,以避免将来出现混淆和问题。

*旧的* (org.apache.hadoop.mapred) API编辑
JobConf conf = new JobConf();                             
conf.set("es.resource", "radio/artists");                 
conf.set("es.query", "?q=me*");                           

JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class,
                          Text.class, MapWritable.class); 
long docCount = esRDD.count();

创建 Hadoop 对象(使用旧 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD - 键表示文档 ID,值表示文档本身

Scala 版本如下

val conf = new JobConf()                                   
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.hadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

创建 Hadoop 对象(使用旧 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD

*新的* (org.apache.hadoop.mapreduce) API编辑

不出所料,mapreduce API 版本非常相似 - 将 hadoopRDD 替换为 newAPIHadoopRDD,将 JobConf 替换为 Configuration。就是这样。

Configuration conf = new Configuration();       
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 

JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class,
                Text.class, MapWritable.class); 
long docCount = esRDD.count();

创建 Hadoop 对象(使用新 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD - 键表示文档 ID,值表示文档本身

Scala 版本如下

val conf = new Configuration()                             
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.newAPIHadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

创建 Hadoop 对象(使用新 API)

配置源(索引)

设置查询(可选)

通过 EsInputFormat 在 Elasticsearch 之上创建一个 Spark RDD

从 PySpark 使用连接器编辑

由于其Map/Reduce层,elasticsearch-hadoop 也可以从 PySpark 中使用,以读取和写入 Elasticsearch 数据。为此,以下是Spark 文档中的一个片段(请确保切换到 Python 片段)

$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

此外,也可以使用 SQL 加载器

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()