Apache Spark 支持

编辑

Apache Spark 是一个快速且通用的集群计算系统。它提供 Java、Scala 和 Python 中的高级 API,以及支持通用执行图的优化引擎。

-- Spark 网站

Spark 通过通常将数据缓存在内存中,在大型数据集上提供快速迭代/类似函数的功能。与本文档中提到的其他库相反,Apache Spark 是一个计算框架,它本身不依赖于 Map/Reduce,但它确实与 Hadoop 集成,主要是与 HDFS 集成。elasticsearch-hadoop 允许 Elasticsearch 以两种方式在 Spark 中使用:通过自 2.1 版以来提供的专用支持,或通过自 2.0 版以来的 Map/Reduce 桥接。自 5.0 版开始,elasticsearch-hadoop 支持 Spark 2.0。

安装

编辑

与其他库一样,elasticsearch-hadoop 需要在 Spark 的类路径中可用。由于 Spark 具有多种部署模式,因此这可以转换为目标类路径,无论它是在一个节点上(如本地模式 - 将在整个文档中使用),还是根据所需的基础设施在每个节点上。

原生 RDD 支持

编辑

在 2.1 中添加。

elasticsearch-hadoop 在 Elasticsearch 和 Apache Spark 之间提供了原生集成,以 RDD(弹性分布式数据集)(或更准确地说是Pair RDD)的形式,该数据集可以读取 Elasticsearch 中的数据。该 RDD 提供两种风格:一种用于 Scala(它将数据作为带有 Scala 集合的 Tuple2 返回),另一种用于 Java(它将数据作为包含 java.util 集合的 Tuple2 返回)。

在可能的情况下,请考虑使用原生集成,因为它提供了最佳性能和最大灵活性。

配置

编辑

要为 Apache Spark 配置 elasticsearch-hadoop,可以在 配置 章节中描述的 SparkConf 对象中设置各种属性。

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName(appName).setMaster(master)
conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
conf.set("es.index.auto.create", "true");

命令行对于希望通过命令行(直接或通过从文件加载)设置属性的用户,请注意 Spark接受以“spark.”为前缀的属性,并将忽略其余属性(并且根据版本可能会发出警告)。为了解决此限制,通过附加 spark. 前缀来定义 elasticsearch-hadoop 属性(因此它们变为 spark.es.),elasticsearch-hadoop 将自动解析它们。

$ ./bin/spark-submit --conf spark.es.resource=index/type ... 

注意 es.resource 属性已变为 spark.es.resource

将数据写入 Elasticsearch

编辑

使用 elasticsearch-hadoop,任何 RDD 都可以保存到 Elasticsearch,只要其内容可以转换为文档即可。在实践中,这意味着 RDD 类型需要是 Map(无论是 Scala 还是 Java),一个 JavaBean 或一个 Scala case class。如果不是这种情况,可以轻松地在 Spark 中转换数据或插入他们自己的自定义 ValueWriter

Scala
编辑

当使用 Scala 时,只需导入 org.elasticsearch.spark 包,该包通过 增强我的库 模式,用 saveToEs 方法丰富任何 RDD API。

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD( 
  Seq(numbers, airports)
).saveToEs("spark/docs") 

Spark Scala 导入

elasticsearch-hadoop Scala 导入

通过其 Scala API 启动 Spark

makeRDD 基于指定的集合创建一个临时 RDD;可以传入任何其他 RDD(在 Java 或 Scala 中)。

将内容(即两个文档(数字和机场))索引到 Elasticsearch 中的 spark/docs 下。

Scala 用户可能会尝试使用 Seq 符号来声明对象(即 JSON 文档),而不是使用 Map。虽然相似,但第一个符号会导致略微不同的类型,这些类型无法与 JSON 文档匹配:Seq 是一个有序序列(换句话说,是一个列表),而 创建一个 Tuple,它或多或少是一个有序的、固定数量的元素。因此,列表列表不能用作文档,因为它无法映射到 JSON 对象;但是它可以在其中自由使用。因此,在上面的示例中,使用 Map(k→v) 而不是 Seq(k→v)

作为上述隐式导入的替代方案,可以通过 org.elasticsearch.spark.rdd 包中的 EsSpark 在 Scala 中使用 elasticsearch-hadoop Spark 支持,它充当实用程序类,允许显式方法调用。此外,不要使用 Map(它们很方便,但由于其结构差异,每个实例都需要一个映射),而是使用case class

import org.apache.spark.SparkContext
import org.elasticsearch.spark.rdd.EsSpark                        

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))             
EsSpark.saveToEs(rdd, "spark/docs")                               

EsSpark 导入

定义一个名为 Trip 的 case class

Trip 实例周围创建一个 RDD

通过 EsSpark 显式索引 RDD

对于需要指定文档 ID(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置相应的 映射(即 es.mapping.id)来实现。按照前面的示例,为了指示 Elasticsearch 使用字段 id 作为文档 ID,请更新 RDD 配置(也可以在 SparkConf 上设置属性,但由于其全局影响,不建议这样做)。

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑

Java 用户有一个专用的类,它提供了与 EsSpark 类似的功能,即 org.elasticsearch.spark.rdd.api.java 中的 JavaEsSpark(一个类似于 Spark 的 Java API 的包)。

import org.apache.spark.api.java.JavaSparkContext;                              
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                        
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");                                    

Spark Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark

为了简化示例,请使用 Guava(Spark 的依赖项)Immutable* 方法来创建简单的 MapList

在两个集合上创建一个简单的 RDD;可以传入任何其他 RDD(在 Java 或 Scala 中)。

将内容(即两个文档(数字和机场))索引到 Elasticsearch 中的 spark/docs 下。

可以通过使用 Java 5 的静态导入进一步简化代码。此外,Map(其映射由于其松散结构而具有动态性)可以用 JavaBean 替换。

public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(
                            ImmutableList.of(upcoming, lastWeek));        
saveToEs(javaRDD, "spark/docs");                                          

静态导入 JavaEsSpark

定义一个包含 TripBean 实例的 RDDTripBean 是一个 JavaBean)。

调用 saveToEs 方法,而无需再次键入 JavaEsSpark

设置文档 ID(或其他元数据字段,如 ttltimestamp)类似于其 Scala 对应物,尽管根据您是否使用 JDK 类或其他实用程序(如 Guava)可能会稍微冗长一些。

JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

将现有的 JSON 写入 Elasticsearch

编辑

对于 RDD 中的数据已存在于 JSON 中的情况,elasticsearch-hadoop 允许直接索引,无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望一个包含 String 或字节数组(byte[]/Array[Byte])的 RDD,假设每个条目都表示一个 JSON 文档。如果 RDD 没有正确的签名,则无法应用 saveJsonToEs 方法(在 Scala 中,它们将不可用)。

Scala
编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

new SparkContext(conf).makeRDD(Seq(json1, json2))
                      .saveJsonToEs("spark/json-trips") 

RDD 中条目的示例 - JSON写入时按原样,没有任何转换,它不应包含换行符,如 \n 或 \r\n。

通过专用的 saveJsonToEs 方法索引 JSON 数据。

Java
编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); 
JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");             

RDD 中条目的示例 - JSON写入时按原样,没有任何转换,它不应包含换行符,如 \n 或 \r\n。

注意 RDD<String> 签名。

通过专用的 saveJsonToEs 方法索引 JSON 数据。

写入动态/多资源

编辑

对于需要将数据索引到不同存储桶(基于数据内容)的 Elasticsearch 中的情况,可以使用 es.resource.write 字段,该字段接受一个在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下。

Scala
编辑
val game = Map(
  "media_type"->"game", 
      "title" -> "FF VI",
       "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")  

用于拆分数据的文档。可以声明任何字段(但请确保它在所有文档中都可用)。

根据其资源模式保存每个对象,在本例中基于 media_type

对于每个即将写入的文档/对象,elasticsearch-hadoop 将提取 media_type 字段并使用其值来确定目标资源。

Java
编辑

正如预期的那样,Java 中的事情非常相似。

Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
saveToEs(javaRDD, "my-collection-{media_type}/doc");  

根据其资源模式保存每个对象,在本例中为 media_type

处理文档元数据

编辑

Elasticsearch 允许每个文档拥有自己的元数据。如上所述,通过各种映射选项,可以自定义这些参数,以便从其所属的文档中提取其值。此外,甚至可以包含/排除发送回 Elasticsearch 的数据部分。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用键值对 RDD在文档本身外部提供元数据。换句话说,对于包含键值元组的RDD,元数据可以从键中提取,而值用作文档源。

元数据通过 org.elasticsearch.spark.rdd 包中的 Metadata Java 枚举进行描述,该枚举标识其类型 - idttlversion 等。因此,RDD 键可以是包含每个文档的 Metadata 及其关联值的 Map。如果 RDD 键不是 Map 类型,则 elasticsearch-hadoop 将认为该对象表示文档 ID 并相应地使用它。这听起来比实际情况更复杂,所以让我们看一些例子。

Scala
编辑

键值对 RDD,或者简单地说,签名为 RDD[(K,V)]RDD 可以利用 saveToEsWithMeta 方法,这些方法可以通过 org.elasticsearch.spark 包的隐式导入或 EsSpark 对象获得。要为每个文档手动指定 ID,只需在您的 RDD 中传入 Object(不是 Map 类型)。

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
airportsRDD.saveToEsWithMeta("airports/2015")    

airportsRDD 是一个键值对 RDD;它由 tupleSeq 创建。

Seq 中每个元组的键表示其关联值/文档的ID;换句话说,文档 otp 的 ID 为 1muc2sfo3

由于 airportsRDD 是一个键值对 RDD,因此它具有可用的 saveToEsWithMeta 方法。这告诉 elasticsearch-hadoop 特别注意 RDD 键并将它们用作元数据,在本例中用作文档 ID。如果使用 saveToEs 代替,则 elasticsearch-hadoop 将认为 RDD 元组(即键和值)作为文档的一部分。

当需要指定除 ID 之外的更多信息时,应使用键类型为 org.elasticsearch.spark.rdd.Metadatascala.collection.Map

import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
airportsRDD.saveToEsWithMeta("airports/2015") 

导入 Metadata 枚举。

用于 otp 文档的元数据。在本例中,ID 的值为 1,TTL 的值为 3h

用于 muc 文档的元数据。在本例中,ID 的值为 2,VERSION 的值为 23

用于 sfo 文档的元数据。在本例中,ID 的值为 3。

元数据和文档被组装成一个键值对 RDD

使用 saveToEsWithMeta 方法相应地保存 RDD

Java
编辑

类似地,在 Java 端,JavaEsSpark 提供了应用于 JavaPairRDDRDD[(K,V)] 在 Java 中的等效项)的 saveToEsWithMeta 方法。因此,要根据文档 ID 保存文档,可以使用

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( 
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       

通过使用围绕文档 ID 和文档本身的 Scala Tuple2 类来创建 JavaPairRDD

围绕 ID (1) 和文档本身 (otp) 封装的第一个文档的元组。

围绕 ID (2) 和 jfk 封装的第二个文档的元组。

使用键作为 ID 和值作为文档相应地保存 JavaPairRDD

当需要指定除 ID 之外的更多信息时,可以选择使用填充了键类型为 org.elasticsearch.spark.rdd.Metadatajava.util.Map

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       

Metadata enum 描述可以声明的文档元数据。

用于 enum 的静态导入,以便以简短格式(IDTTL 等)引用其值。

otp 文档的元数据。

sfo 文档的元数据。

otp(作为值)与其元数据(作为键)之间的元组。

sfo 及其元数据关联的元组。

在包含文档及其各自元数据的 JavaPairRDD 上调用 saveToEsWithMeta

从 Elasticsearch 读取数据

编辑

对于读取,应定义从 Elasticsearch 将数据流式传输到 Spark 的 Elasticsearch RDD

Scala
编辑

与写入类似,org.elasticsearch.spark 包使用 esRDD 方法丰富了 SparkContext API。

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val RDD = sc.esRDD("radio/artists")     

Spark Scala 导入

elasticsearch-hadoop Scala 导入

通过其 Scala API 启动 Spark

为索引 radio/artists 创建了一个专用的 Elasticsearch RDD

该方法可以重载以指定其他查询甚至配置 Map(覆盖 SparkConf)。

...
import org.elasticsearch.spark._

...
val conf = ...
val sc = new SparkContext(conf)

sc.esRDD("radio/artists", "?q=me*") 

创建一个 RDD,从索引 radio/artists 中流式传输与 me* 匹配的所有文档。

默认情况下,Elasticsearch 中的文档以 Tuple2 的形式返回,第一个元素是文档 ID,第二个元素是通过 Scala 集合表示的实际文档,即一个 `Map[String, Any]`,其中键表示字段名称,值表示其各自的值。

Java
编辑

Java 用户有一个专用的 JavaPairRDD,其工作方式与其 Scala 对应项相同,但是返回的 Tuple2 值(或第二个元素)将文档作为本机 java.util 集合返回。

import org.apache.spark.api.java.JavaSparkContext;               
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;             
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);               

JavaPairRDD<String, Map<String, Object>> esRDD =
                        JavaEsSpark.esRDD(jsc, "radio/artists"); 

Spark Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark

为索引 radio/artists 创建了一个专用的 Elasticsearch JavaPairRDD

类似地,可以使用重载的 esRDD 方法来指定查询或传递 Map 对象以进行高级配置。让我们看看它是什么样子,但这次使用Java 静态导入。此外,让我们丢弃文档 ID 并仅检索 RDD 值。

import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*;   

...
JavaRDD<Map<String, Object>> rdd =
        esRDD(jsc, "radio/artists", "?q=me*")  
            .values(); 

静态导入 JavaEsSpark 类。

创建一个 RDD,从索引 radio/artists 中流式传输以 me 开头的所有文档。请注意,由于静态导入,该方法不必完全限定。

仅返回 PairRDD - 因此结果的类型为 JavaRDD 而不是 JavaPairRDD

通过使用 JavaEsSpark API,可以获得 Spark 专用的 JavaPairRDD,它在 Java 环境中比基本 RDD 更适合(因为它具有 Scala 签名)。此外,专用的 RDD 将 Elasticsearch 文档作为正确的 Java 集合返回,因此不必处理 Scala 集合(这通常是使用 RDD 的情况)。当使用 Java 8 时,这一点尤其强大,我们强烈建议使用它,因为它的lambda 表达式使集合处理变得极其简洁。

为此,让我们假设想要过滤 RDD 中的文档,并仅返回包含包含 mega 的值的那些文档(请忽略可以直接通过 Elasticsearch 进行过滤的事实)。

在 Java 8 之前的版本中,代码如下所示

JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(
    new Function<Map<String, Object>, Boolean>() {
      @Override
      public Boolean call(Map<String, Object> map) throws Exception {
          returns map.contains("mega");
      }
    });

使用 Java 8,过滤变成了一行代码

JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc ->
                                                doc.contains("mega"));
以 JSON 格式读取数据
编辑

如果需要以 JSON 格式获取 Elasticsearch 的结果(通常是发送到其他系统),则可以使用专用的 esJsonRDD 方法。在这种情况下,连接器将按原样返回从 Elasticsearch 收到的文档内容,而无需任何处理,在 Scala 中以 RDD[(String, String)] 或在 Java 中以 JavaPairRDD[String, String] 的形式返回,其中键表示文档 ID,值表示其以 JSON 格式表示的实际内容。

类型转换

编辑

在处理多值/数组字段时,请参阅部分,特别是这些配置选项。重要提示:如果使用自动索引创建,请查看部分以获取更多信息。

elasticsearch-hadoop 自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示。

表 4. Scala 类型转换表

Scala 类型 Elasticsearch 类型

None

null

Unit

null

Nil

array

Some[T]

T 根据表格

Map

对象

Traversable

array

case class

object(参见 Map

Product

array

此外,以下隐式转换适用于 Java 类型。

表 5. Java 类型转换表

Java 类型 Elasticsearch 类型

null

null

String

string

Boolean

boolean

Byte

byte

Short

short

Integer

int

Long

long

Double

double

Float

float

Number

floatdouble(取决于大小)

java.util.Calendar

datestring 格式)

java.util.Date

datestring 格式)

java.util.Timestamp

datestring 格式)

byte[]

string(BASE64)

Object[]

array

Iterable

array

Map

对象

Java Bean

object(参见 Map

转换尽最大努力完成;内置的 Java 和 Scala 类型保证能够正确转换,但是对于 Java 或 Scala 中的用户类型没有保证。如上表所述,当在 Scala 中遇到 case 类或在 Java 中遇到 JavaBean 时,转换器将尝试 unwrap 其内容并将其保存为 object。请注意,这仅适用于顶级用户对象 - 如果用户对象嵌套了其他用户对象,则转换很可能失败,因为转换器不执行嵌套的 unwrapping。这样做是有意的,因为转换器必须序列化反序列化数据,而用户类型由于数据丢失而引入了歧义;这可以通过某种类型的映射来解决,但这会使项目过于接近 ORM 的领域,并且可以说引入的复杂性过高而收益甚微;由于 Spark 中的处理功能和 elasticsearch-hadoop 中的可插拔性,可以轻松地将对象转换为其他类型,如果需要,只需付出最少的努力即可获得最大的控制。

地理类型值得一提的是,Elasticsearch 中独有的丰富数据类型,例如 GeoPointGeoShape,通过将其结构转换为上表中提供的基本类型来支持。例如,基于其存储方式,geo_point 可能会作为 StringTraversable 返回。

Spark Streaming 支持

编辑

在 5.0 中添加。

Spark Streaming 是核心 Spark API 的扩展,它支持对实时数据流进行可扩展、高吞吐量、容错的流处理。

-- Spark 网站

Spark Streaming 是核心 Spark 功能的扩展,允许对流数据进行近乎实时的处理。Spark Streaming 围绕 DStream离散化流的概念工作。 DStreams 通过将新到达的记录收集到一个小的 RDD 中并执行它来运行。此操作每隔几秒钟重复一次,并在一个称为微批处理的过程中使用新的 RDDDStream api 包含许多与 RDD api 相同的处理操作,以及一些其他特定于流的方法。从 5.0 版开始,elasticsearch-hadoop 提供了与 Spark Streaming 的原生集成。

使用 elasticsearch-hadoop Spark Streaming 支持时,可以将 Elasticsearch 作为目标输出位置,以便从 Spark Streaming 作业中将数据索引到其中,这与持久化 RDD 的结果的方式相同。但是,与 RDD 不同,由于其连续性,无法使用 DStream 从 Elasticsearch 中读取数据。

Spark Streaming 支持提供了特殊的优化,以便在使用非常小的处理窗口运行作业时,可以节省 Spark 执行器上的网络资源。因此,应该优先使用此集成,而不是在 DStream 上的 foreachRDD 调用返回的 RDD 上调用 saveToEs

DStream 写入 Elasticsearch

编辑

RDD 一样,只要其内容可以转换为文档,任何 DStream 都可以保存到 Elasticsearch 中。在实践中,这意味着 DStream 类型需要是 Map(Scala 或 Java 中的任何一个)、JavaBean 或 Scala case 类。如果不是这种情况,可以轻松地在 Spark 中转换数据或插入他们自己的自定义 ValueWriter

Scala
编辑

使用 Scala 时,只需导入 org.elasticsearch.spark.streaming 包,该包通过 pimp my library 模式,使用 saveToEs 方法丰富了 DStream API

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._               
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._

import org.elasticsearch.spark.streaming._           

...

val conf = ...
val sc = new SparkContext(conf)                      
val ssc = new StreamingContext(sc, Seconds(1))       

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)                

ssc.queueStream(microbatches).saveToEs("spark/docs") 

ssc.start()
ssc.awaitTermination() 

Spark 和 Spark Streaming Scala 导入

elasticsearch-hadoop Spark Streaming 导入

通过其 Scala API 启动 Spark

通过传递 SparkContext 启动 SparkStreaming 上下文。微批次将每秒处理一次。

makeRDD 基于指定的集合创建一个临时 RDD;可以传入任何其他 RDD(在 Java 或 Scala 中)。创建一个 `RDD` 队列以表示要执行的微批次。

RDD 创建一个 DStream 并将内容(即两个 _文档_(数字和机场))索引到 {es} 中的 `spark/docs` 下

启动 Spark Streaming 作业并等待其最终完成。

作为上述隐式导入的替代方案,可以通过 org.elasticsearch.spark.streaming 包中的 EsSparkStreaming 在 Scala 中使用 elasticsearch-hadoop Spark Streaming 支持,它充当实用程序类,允许显式方法调用。此外,不要使用 Map(它们很方便,但由于其结构差异,每个实例都需要一个映射),而应使用案例类

import org.apache.spark.SparkContext
import org.elasticsearch.spark.streaming.EsSparkStreaming         

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val microbatches = mutable.Queue(rdd)                             
val dstream = ssc.queueStream(microbatches)

EsSparkStreaming.saveToEs(dstream, "spark/docs")                  

ssc.start()                                                       

EsSparkStreaming 导入

定义一个名为 Trip 的 case class

围绕 Trip 实例的 RDD 创建一个 DStream

通过 EsSparkStreaming 显式配置要索引的 DStream

启动流式处理过程

启动 SparkStreamingContext 后,无法添加或配置新的 DStream。上下文停止后,无法重新启动。每个 JVM 每次只能有一个活动的 SparkStreamingContext。还要注意,以编程方式停止 SparkStreamingContext 时,它会停止底层的 SparkContext,除非指示它不要这样做。

对于需要指定文档的 id(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置相应的 映射(即 es.mapping.id)来实现。按照前面的示例,要指示 Elasticsearch 使用字段 id 作为文档 id,请更新 DStream 配置(也可以在 SparkConf 上设置属性,但由于其全局影响,不建议这样做)

EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Java
编辑

Java 用户有一个专用类,它提供了与 EsSparkStreaming 类似的功能,即 org.elasticsearch.spark.streaming.api.java 包中的 JavaEsSparkStreaming(一个类似于 Spark 的 Java API 的包)

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;                                              
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;         
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              
JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1));

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
microbatches.add(javaRDD);                                                      
JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs");                       

jssc.start()                                                                    

Spark 和 Spark Streaming Java 导入

elasticsearch-hadoop Java 导入

通过其 Java API 启动 Spark 和 Spark Streaming。微批次将每秒处理一次。

为了简化示例,请使用 Guava(Spark 的依赖项)Immutable* 方法来创建简单的 MapList

在微批次上创建一个简单的 DStream;可以传入任何其他 RDD(在 Java 或 Scala 中)

将内容(即两个文档(数字和机场))索引到 Elasticsearch 中的 spark/docs 下。

执行流式处理作业。

可以通过使用 Java 5 的静态导入进一步简化代码。此外,Map(其映射由于其松散结构而具有动态性)可以用 JavaBean 替换。

public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming;  
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek));
Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>();
microbatches.add(javaRDD);
JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches);       

saveToEs(javaDStream, "spark/docs");                                          

jssc.start()                                                              

静态导入 JavaEsSparkStreaming

定义一个包含 TripBean 实例的 DStreamTripBean 是一个 JavaBean

调用 saveToEs 方法,无需再次键入 JavaEsSparkStreaming

运行该流式处理作业

设置文档 ID(或其他元数据字段,如 ttltimestamp)类似于其 Scala 对应物,尽管根据您是否使用 JDK 类或其他实用程序(如 Guava)可能会稍微冗长一些。

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

将现有 JSON 写入 Elasticsearch

编辑

对于 DStream 流式传输的数据已作为 JSON 序列化的案例,elasticsearch-hadoop 允许直接索引无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。因此,在这种情况下,elasticsearch-hadoop 期望 DStream 包含 String 或字节数组 (byte[]/Array[Byte]),假设每个条目代表一个 JSON 文档。如果 DStream 没有正确的签名,则无法应用 saveJsonToEs 方法(在 Scala 中,它们将不可用)。

Scala
编辑
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

val rdd = sc.makeRDD(Seq(json1, json2))
val microbatch = mutable.Queue(rdd)
ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips")      

ssc.start()                                                       

DStream 中条目的示例 - JSON写入按原样,没有任何转换

通过专用的 saveJsonToEs 方法配置流以索引 JSON 数据

启动流式处理作业

Java
编辑
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>();      
microbatches.add(stringRDD);
JavaDStream<String> stringDStream = jssc.queueStream(microbatches);  

JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips");    

jssc.start()                                                         

DStream 中条目的示例 - JSON写入按原样,没有任何转换

创建 RDD,将其放入队列中,并从排队的 RDD 中创建一个 DStream,将每个 RDD 视为微批次。

注意 JavaDStream<String> 签名

通过专用的 saveJsonToEs 方法配置流以索引 JSON 数据

启动流作业

写入动态/多资源

编辑

对于需要将数据索引到不同存储桶(基于数据内容)的 Elasticsearch 中的情况,可以使用 es.resource.write 字段,该字段接受一个在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下。

Scala
编辑
val game = Map(
  "media_type" -> "game", 
       "title" -> "FF VI",
        "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

val batch = sc.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc")  
ssc.start()

用于拆分数据的文档。可以声明任何字段(但请确保它在所有文档中都可用)。

根据其资源模式保存每个对象,在本例中基于 media_type

对于每个即将写入的文档/对象,elasticsearch-hadoop 将提取 media_type 字段并使用其值来确定目标资源。

Java
编辑

正如预期的那样,Java 中的事情非常相似。

Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
Queue<JavaRDD<Map<String, ?>>> microbatches = ...
JavaDStream<Map<String, ?>> javaDStream =
                jssc.queueStream(microbatches);

saveToEs(javaDStream, "my-collection-{media_type}/doc");  
jssc.start();

根据其资源模式保存每个对象,在本例中为 media_type

处理文档元数据

编辑

Elasticsearch 允许每个文档都有自己的 元数据。如上所述,通过各种 映射 选项,可以自定义这些参数,以便从其所属的文档中提取其值。此外,甚至可以包含/排除发送回 Elasticsearch 的数据部分。在 Spark 中,elasticsearch-hadoop 扩展了此功能,允许通过使用 RDD 在文档本身外部提供元数据。

在 Spark Streaming 中,这没有什么不同。对于包含键值元组的 DStreams,可以从键中提取元数据,并将值用作文档源。

元数据通过 org.elasticsearch.spark.rdd 包中的 Metadata Java 枚举 进行描述,该枚举标识其类型 - idttlversion 等。因此,DStream 的键可以是一个 Map,其中包含每个文档的 Metadata 及其关联的值。如果 DStream 的键不是 Map 类型,则 elasticsearch-hadoop 会将该对象视为表示文档 ID 并相应地使用它。这听起来可能比实际情况复杂,让我们看一些示例。

Scala
编辑

配对 DStreams,或者简单地说,具有签名 DStream[(K,V)]DStreams 可以利用 org.elasticsearch.spark.streaming 包或 EsSparkStreaming 对象通过隐式导入提供的 saveToEsWithMeta 方法。要手动为每个文档指定 ID,只需在 DStream 中传入 Object(不是 Map 类型)。

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()

airportsRDD 是一个键值对 RDD;它由 tupleSeq 创建。

Seq 中每个元组的键表示其关联值/文档的ID;换句话说,文档 otp 的 ID 为 1muc2sfo3

我们构建一个 DStream,它继承了 RDD 的类型签名。

由于生成的 DStream 是一个配对 DStream,因此它可以使用 saveToEsWithMeta 方法。这告诉 elasticsearch-hadoop 特别注意 DStream 键并将它们用作元数据,在本例中用作文档 ID。如果使用 saveToEs 代替,则 elasticsearch-hadoop 会将 DStream 元组(即键和值)都视为文档的一部分。

当需要指定除 ID 之外的更多信息时,应使用键类型为 org.elasticsearch.spark.rdd.Metadatascala.collection.Map

import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()

导入 Metadata 枚举。

用于 otp 文档的元数据。在本例中,ID 的值为 1,TTL 的值为 3h

用于 muc 文档的元数据。在本例中,ID 的值为 2,VERSION 的值为 23

用于 sfo 文档的元数据。在本例中,ID 的值为 3。

元数据和文档被组装成一个键值对 RDD

DStreamRDD 继承签名,成为一个配对 DStream

DStream 使用 saveToEsWithMeta 方法配置为相应地索引数据。

Java
编辑

类似地,在 Java 端,JavaEsSparkStreaming 提供了应用于 JavaPairDStreamDStream[(K,V)] 在 Java 中的等价物)的 saveToEsWithMeta 方法。

由于 Java API 的限制,这往往需要做更多工作。例如,您不能直接从 JavaPairRDD 队列创建 JavaPairDStream。相反,您必须创建一个常规的 Tuple2 对象的 JavaDStream,并将 JavaDStream 转换为 JavaPairDStream。这听起来很复杂,但这是对 API 限制的一个简单的解决方法。

首先,我们将创建一个配对函数,它接收一个 Tuple2 对象,并将其直接返回给框架。

public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable {
    @Override
    public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception {
        return tuple2;
    }
}

然后,我们将配对函数应用于 Tuple2JavaDStream 以创建 JavaPairDStream 并保存它。

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...

// create an RDD of between the id and the docs
JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize(         
      ImmutableList.of(
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();

创建一个常规的 Scala Tuple2JavaRDD,它包含文档 ID 和文档本身。

围绕 ID (1) 和文档本身 (otp) 封装的第一个文档的元组。

围绕 ID (2) 和 jfk 封装的第二个文档的元组。

从元组 RDD 组装一个常规的 JavaDStream

通过将我们的 Tuple2 标识函数传递给 mapToPair 方法,将 JavaDStream 转换为 JavaPairDStream。这将允许类型转换为 JavaPairDStream。此函数可以替换为作业中的任何内容,这些内容可以从单个条目中提取要索引的 ID 和文档。

JavaPairRDD 使用键作为 ID 和值作为文档配置为相应地索引数据。

当需要指定的内容不仅仅是 ID 时,可以选择使用一个 java.util.Map,该 Map 使用 org.elasticsearch.spark.rdd.Metadata 类型的键进行填充。我们将使用相同的类型转换技巧将 JavaDStream 重新打包为 JavaPairDStream

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();

Metadata enum 描述可以声明的文档元数据。

用于 enum 的静态导入,以便以简短格式(IDTTL 等)引用其值。

otp 文档的元数据。

sfo 文档的元数据。

otp(作为值)与其元数据(作为键)之间的元组。

sfo 及其元数据关联的元组。

JavaRDD 创建一个 JavaDStream

通过在其上映射 Tuple2 标识函数,将 JavaDStream 重新打包到 JavaPairDStream 中。

在包含文档及其相应元数据的 JavaPairDStream 上调用 saveToEsWithMeta

Spark Streaming 类型转换

编辑

elasticsearch-hadoop Spark Streaming 支持利用与常规 Spark 类型映射相同的类型映射。为了保持一致性,此处重复了映射。

表 6. Scala 类型转换表

Scala 类型 Elasticsearch 类型

None

null

Unit

null

Nil

array

Some[T]

T 根据表格

Map

对象

Traversable

array

case class

object(参见 Map

Product

array

此外,以下隐式转换适用于 Java 类型。

表 7. Java 类型转换表

Java 类型 Elasticsearch 类型

null

null

String

string

Boolean

boolean

Byte

byte

Short

short

Integer

int

Long

long

Double

double

Float

float

Number

floatdouble(取决于大小)

java.util.Calendar

datestring 格式)

java.util.Date

datestring 格式)

java.util.Timestamp

datestring 格式)

byte[]

string(BASE64)

Object[]

array

Iterable

array

Map

对象

Java Bean

object(参见 Map

**地理类型**值得再次提及的是,仅在 Elasticsearch 中可用的丰富数据类型,例如 GeoPointGeoShape 通过将其结构转换为上表中提供的基本类型来支持。例如,根据其存储,geo_point 可能作为 StringTraversable 返回。

Spark SQL 支持

编辑

在 2.1 中添加。

Spark SQL 是一个用于结构化数据处理的 Spark 模块。它提供了一个称为 DataFrame 的编程抽象,并且还可以充当分布式 SQL 查询引擎。

-- Spark 网站

在核心 Spark 支持的基础上,elasticsearch-hadoop 还提供了与 Spark SQL 的集成。换句话说,Elasticsearch 成为 Spark SQL 的*原生*数据源,以便可以从 Spark SQL *透明地*索引和查询数据。

Spark SQL 使用*结构化*数据 - 换句话说,预期所有条目都具有*相同*的结构(相同数量的字段,相同类型和名称)。不支持使用非结构化数据(具有不同结构的文档),这会导致问题。对于此类情况,请使用 PairRDD

支持的 Spark SQL 版本

编辑

Spark SQL 虽然已成为一个成熟的组件,但在版本之间仍在经历重大变化。Spark SQL 在 1.3 版中成为一个稳定的组件,但是它与以前的版本 向后兼容。此外,Spark 2.0 引入了重大更改,这些更改通过 Dataset API 打破了向后兼容性。elasticsearch-hadoop 支持 Spark SQL 1.3-1.6 和 Spark SQL 2.0 这两个版本,通过两个不同的 jar:elasticsearch-spark-1.x-<version>.jarelasticsearch-hadoop-<version>.jar 支持 Spark SQL 1.3-1.6(或更高版本),而 elasticsearch-spark-2.0-<version>.jar 支持 Spark SQL 2.0。换句话说,除非您使用的是 Spark 2.0,否则请使用 elasticsearch-spark-1.x-<version>.jar

Spark SQL 支持位于 org.elasticsearch.spark.sql 包下。

**API 差异**从 elasticsearch-hadoop 用户的角度来看,Spark SQL 1.3-1.6 和 Spark 2.0 之间的差异相当统一。这篇文档详细描述了差异,下面简要提到了这些差异。

DataFrameDataset
1.3+ 版中 Spark SQL 的核心单元是 DataFrame。此 API 保留于 Spark 2.0 中,但在其下基于 Dataset
统一 API 与专用 Java/Scala API
在 Spark SQL 2.0 中,通过引入 SparkSession 并对 `Dataset`、`DataFrame` 和 `RDD` 使用相同的底层代码,API 进一步统一

从概念上讲,DataFrame 是一个 Dataset[Row],以下文档将重点介绍 Spark SQL 1.3-1.6。

DataFrame(Spark SQL 1.3+)写入 Elasticsearch

编辑

使用 elasticsearch-hadoop,可以将 DataFrame(或任何 Dataset)索引到 Elasticsearch。

Scala
编辑

在 Scala 中,只需导入 org.elasticsearch.spark.sql 包,该包使用 saveToEs 方法丰富给定的 DataFrame 类;虽然这些方法与 org.elasticsearch.spark 包具有相同的签名,但它们是为 DataFrame 实现设计的。

// reusing the example from Spark SQL documentation

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._

import org.elasticsearch.spark.sql._      

...

// sc = existing SparkContext
val sqlContext = new SQLContext(sc)

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = sc.textFile("people.txt")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))
        .toDF()

people.saveToEs("spark/people")           

Spark SQL 包导入

elasticsearch-hadoop Spark 包导入

将文本文件作为*普通* RDD 读取并将其映射到 DataFrame(使用 Person 案例类)。

通过 saveToEs 方法将生成的 DataFrame 索引到 Elasticsearch。

默认情况下,elasticsearch-hadoop 会忽略空值,而不是写入任何字段。由于 DataFrame 旨在被视为结构化表格数据,因此您可以通过将 es.spark.dataframe.write.null 设置切换到 true 来启用将空值作为空值字段写入仅限 DataFrame 对象。

Java
编辑

类似地,对于 Java 使用,专用包 org.elasticsearch.spark.sql.api.java 通过 JavaEsSpark SQL 提供类似的功能。

import org.apache.spark.sql.api.java.*;                      
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;  
...

DataFrame people = ...
JavaEsSparkSQL.saveToEs(people, "spark/people");                     

Spark SQL Java 导入

elasticsearch-hadoop Spark SQL Java 导入

在 Elasticsearch 中的 spark/people 下索引 DataFrame

同样,使用 Java 5 *静态*导入,这可以进一步简化为

import static org.elasticsearch.spark.sql.api.java.JavaEsSpark SQL; 
...
saveToEs("spark/people");                                          

静态导入 JavaEsSpark SQL

调用 saveToEs 方法,而无需再次键入 JavaEsSpark

为了最大程度地控制在 Elasticsearch 中 DataFrame 的映射,强烈建议事先创建映射。有关更多信息,请参阅章节。

将现有 JSON 写入 Elasticsearch

编辑

使用 Spark SQL 时,如果输入数据为 JSON 格式,只需将其转换为 DataFrame(在 Spark SQL 1.3 中)或 Dataset(对于 Spark SQL 2.0)(如 Spark 文档中所述)通过 SQLContext/JavaSQLContextjsonFile 方法。

使用纯 SQL 从 Elasticsearch 读取数据

编辑

在创建临时表之前,索引及其映射必须存在。

Spark SQL 1.2 引入了一个用于从外部数据源读取数据的新 API,该 API 由 elasticsearch-hadoop 支持,简化了与 Elasticsearch 交互所需的 SQL 配置。此外,它在后台了解 Spark 执行的操作,因此可以优化数据和查询(例如过滤或剪枝),从而提高性能。

Spark SQL 中的数据源

编辑

使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContextload 方法访问 Elasticsearch。换句话说,以声明式的方式创建由 Elasticsearch 支持的 DataFrame/Dataset

val sql = new SQLContext...
// Spark 1.3 style
val df = sql.load( 
  "spark/index",   
  "org.elasticsearch.spark.sql") 

SQLContext实验性 load 方法,用于任意数据源。

要加载的路径或资源 - 在此情况下为 Elasticsearch 中的索引/类型。

数据源提供程序 - org.elasticsearch.spark.sql

在 Spark 1.4 中,可以使用以下类似的 API 调用。

// Spark 1.4 style
val df = sql.read      
  .format("org.elasticsearch.spark.sql") 
  .load("spark/index") 

SQLContext实验性 read 方法,用于任意数据源。

数据源提供程序 - org.elasticsearch.spark.sql

要加载的路径或资源 - 在此情况下为 Elasticsearch 中的索引/类型。

在 Spark 1.5 中,可以进一步简化为:

// Spark 1.5 style
val df = sql.read.format("es")
  .load("spark/index")

使用 es 作为别名,而不是 DataSource 提供程序的完整包名称。

无论使用哪种 API,一旦创建,就可以自由地访问 DataFrame 来操作数据。

sources 声明还允许传入特定的选项,即

名称 默认值 描述

path

必需

Elasticsearch 索引/类型

pushdown

true

是否将 Spark SQL 转换为 Elasticsearch 查询 DSL(下推)。

strict

false

是否使用精确(未分析)匹配或非精确(已分析)匹配。

可在 Spark 1.6 或更高版本中使用。

double.filtering

true

是否告诉 Spark 对下推的过滤器应用其自身的过滤。

这两个选项将在下一节中解释。要指定选项(包括通用 elasticsearch-hadoop 选项),只需将 Map 传递给上述方法即可。

例如:

val sql = new SQLContext...
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/index",
                    "pushdown" -> "true",     
                    "es.nodes" -> "someNode", 
                     "es.port" -> "9200")

// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) 

// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true",     
                  "es.nodes" -> "someNode", 
                   "es.port" -> "9200")

// Spark 1.4 style
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
                        .options(options) 
                        .load("spark/index")

pushdown 选项 - Spark 数据源特有。

es.nodes 配置选项。

在定义/加载源时传递选项。

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', nodes 'someNode')" ) " 

Spark 的临时表名称。

USING 子句标识数据源提供程序,在此情况下为 org.elasticsearch.spark.sql

elasticsearch-hadoop 配置选项,其中必选项为 resourcees. 前缀是固定的,因为 SQL 解析器的原因。

请注意,由于 SQL 解析器的原因,不允许使用 .(以及其他用于分隔的常用字符);连接器尝试通过自动附加 es. 前缀来解决此问题,但这仅适用于指定只有一个 . 的配置选项(如上面的 es.nodes)。因此,如果需要具有多个 . 的属性,则应使用上面的 SQLContext.loadSQLContext.read 方法并将属性作为 Map 传递。

下推操作

编辑

使用 elasticsearch-hadoop 作为 Spark source 的一个重要的隐藏功能是,连接器理解在 DataFrame/SQL 中执行的操作,并且默认情况下会将它们转换为相应的 QueryDSL。换句话说,连接器将操作下推到数据源,在那里有效地过滤掉数据,以便将所需的数据流回 Spark。这显着提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为仅返回所需的数据(而不是批量返回数据,然后再由 Spark 处理和丢弃)。请注意,即使指定了查询,下推操作也适用 - 连接器会根据指定的 SQL 对其进行增强

顺便说一句,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的所有`Filter`,同时保留与 Spark 1.3.0 的向后二进制兼容性,将 SQL 操作完全下推到 Elasticsearch,无需任何用户干预。

已作为下推过滤器优化的运算符

SQL 语法 ES 1.x/2.x 语法 ES 5.x 语法

= null , is_null

missing

must_not.exists

= (严格)

term

term

= (非严格)

match

match

> , < , >= , ⇐

range

range

is_not_null

exists

exists

in (严格)

terms

terms

in (非严格)

or.filters

bool.should

and

and.filters

bool.filter

or

or.filters

bool.should [bool.filter]

not

not.filter

bool.must_not

StringStartsWith

wildcard(arg*)

wildcard(arg*)

StringEndsWith

wildcard(*arg)

wildcard(*arg)

StringContains

wildcard(*arg*)

wildcard(*arg*)

EqualNullSafe (严格)

term

term

EqualNullSafe (非严格)

match

match

例如,考虑以下 Spark SQL:

// as a DataFrame
val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips")

df.printSchema()
// root
//|-- departure: string (nullable = true)
//|-- arrival: string (nullable = true)
//|-- days: long (nullable = true)

val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))

或在纯 SQL 中:

CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips")
SELECT departure FROM trips WHERE arrival = "OTP" and days > 3

连接器将查询转换为:

{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}

      },
      "filter" : {
        "and" : [{
            "query" : {
              "match" : {
                "arrival" : "OTP"
              }
            }
          }, {
            "days" : {
              "gt" : 3
            }
          }
        ]
      }
    }
  }
}

此外,下推过滤器可以作用于analyzed 项(默认值),也可以配置为严格并提供exact 匹配(仅作用于not-analyzed 字段)。除非手动指定映射,否则强烈建议保留默认值。Elasticsearch 参考文档中详细讨论了此主题和其他主题。

请注意,double.filtering(从 elasticsearch-hadoop 2.2 开始,适用于 Spark 1.6 或更高版本)允许也由 Spark 处理/评估已下推到 Elasticsearch 的过滤器(默认值)或不处理/评估。关闭此功能,尤其是在处理大型数据集时,可以加快速度。但是,应注意语义,因为关闭此功能可能会返回不同的结果(具体取决于数据的索引方式,analyzednot_analyzed)。通常,在打开strict 时,也可以禁用 double.filtering

数据源作为表

编辑

从 Spark SQL 1.2 开始,还可以通过将其声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源。

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', " + 
            "scroll_size '20')" ) 

Spark 的临时表名称。

USING 子句标识数据源提供程序,在此情况下为 org.elasticsearch.spark.sql

elasticsearch-hadoop 配置选项,其中必选项为 resource。可以使用 es 前缀或为了方便起见跳过它。

由于使用 . 会导致语法异常,因此应将其替换为 _ 样式。因此,在此示例中,es.scroll.size 变为 scroll_size(因为可以删除前导 es)。请注意,这仅在 Spark 1.3 中有效,因为 Spark 1.4 具有更严格的解析器。有关更多信息,请参阅上一章。

定义后,将自动获取模式。因此,可以立即发出查询。

val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")

由于 elasticsearch-hadoop 了解正在执行的查询,因此它可以优化对 Elasticsearch 的请求。例如,给定以下查询:

val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")

它知道仅需要 nameid 字段(第一个字段返回给用户,第二个字段用于 Spark 的内部过滤),因此将请求此数据,从而使查询非常高效。

从 Elasticsearch 读取 DataFrame(Spark SQL 1.3)

编辑

您可能已经猜到了,可以定义一个由 Elasticsearch 文档支持的 DataFrame。或者更好的是,让它们由查询结果支持,有效地创建对数据的动态、实时的视图

Scala
编辑

通过 org.elasticsearch.spark.sql 包,esDF 方法在 SQLContext API 上可用。

import org.apache.spark.sql.SQLContext        

import org.elasticsearch.spark.sql._          
...

val sql = new SQLContext(sc)

val people = sql.esDF("spark/people")         

// check the associated schema
println(people.schema.treeString)             
// root
//  |-- name: string (nullable = true)
//  |-- surname: string (nullable = true)
//  |-- age: long (nullable = true)           

Spark SQL Scala 导入

elasticsearch-hadoop SQL Scala 导入

创建由 Elasticsearch 中的 spark/people 索引支持的 DataFrame

从 Elasticsearch 中发现的与 DataFrame 关联的模式。

请注意,在使用默认 Elasticsearch 映射时,age 字段如何转换为 Long,如映射和类型章节中所述。

与 Spark 核心支持一样,可以指定其他参数,例如查询。这是一个非常强大的概念,因为可以在源(Elasticsearch)处过滤数据,并且仅在结果上使用 Spark。

// get only the Smiths
val smiths = sqlContext.esDF("spark/people","?q=Smith") 

其结果构成 DataFrame 的 Elasticsearch 查询。

控制 DataFrame 模式在某些情况下,尤其是在 Elasticsearch 中的索引包含大量字段时,希望创建一个仅包含其中一部分字段的 DataFrame。虽然可以通过官方 Spark API 或通过专用查询修改 DataFrame(通过处理其支持的 RDD),但 elasticsearch-hadoop 允许用户在创建 DataFrame 时指定要从 Elasticsearch 中包含和排除哪些字段。

通过 es.read.field.includees.read.field.exclude 属性,可以指示要从索引映射中包含或排除哪些字段。语法类似于 Elasticsearch include/exclude。可以通过使用逗号来指定多个值。默认情况下,未指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。请注意,这些属性可以包含前导和尾随通配符。包含字段层次结构的一部分而没有尾随通配符并不意味着整个层次结构都被包含在内。但是,在大多数情况下,仅包含层次结构的一部分没有意义,因此应包含尾随通配符。

例如:

# include
es.read.field.include = *name, address.*
# exclude
es.read.field.exclude = *.created

由于 SparkSQL 与 DataFrame 模式的工作方式,elasticsearch-hadoop 需要在执行实际查询之前了解从 Elasticsearch 返回的字段。虽然可以通过底层 Elasticsearch 查询手动限制字段,但 elasticsearch-hadoop 并不知道这一点,并且结果可能不同,或者更糟糕的是,会发生错误。请改用上述属性,Elasticsearch 将与用户查询一起正确使用这些属性。

Java

编辑

对于 Java 用户,可以通过 JavaEsSpark SQL 使用专用的 API。它与 EsSpark SQL 非常相似,但允许通过 Java 集合而不是 Scala 集合传递配置选项;除此之外,使用这两个 API 的方式完全相同。

import org.apache.spark.sql.api.java.JavaSQLContext;          
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;   
...
SQLContext sql = new SQLContext(sc);

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");  

Spark SQL 导入

elasticsearch-hadoop 导入

创建一个由 Elasticsearch 索引支持的 Java DataFrame

更好的是,DataFrame 可以由查询结果支持。

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people", "?q=Smith"); 

支持 elasticsearch-hadoop DataFrame 的 Elasticsearch 查询

Spark SQL 类型转换

编辑

在处理多值/数组字段时,请参阅部分,特别是这些配置选项。重要提示:如果使用自动索引创建,请查看部分以获取更多信息。

elasticsearch-hadoop 自动将 Spark 内置类型转换为 Elasticsearch 类型(反之亦然),如下表所示。

虽然 Spark SQL 的 DataTypes 在 Scala 和 Java 中都有等价类型,因此 RDD 转换可以适用,但语义略有不同——特别是对于 java.sql 类型,因为 Spark SQL 处理它们的方式有所不同。

表 8. Spark SQL 1.3+ 转换表

Spark SQL DataType Elasticsearch 类型

null

null

ByteType

byte

ShortType

short

IntegerType

int

LongType

long

FloatType

float

DoubleType

double

StringType

string

BinaryType

string(BASE64)

BooleanType

boolean

DateType

datestring 格式)

TimestampType

long(Unix 时间戳)

ArrayType

array

MapType

对象

StructType

对象

**地理类型转换表**除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 会自动检测地理类型的模式,即 Elasticsearch 的 geo_pointgeo_shape。由于每种类型都允许多种格式(geo_point 接受以 4 种不同方式指定的经纬度,而 geo_shape 允许多种类型(目前为 9 种)),并且映射不提供此类信息,因此 elasticsearch-hadoop 会在启动时采样确定的地理字段并检索包含所有相关字段的任意文档;它将解析它并因此确定必要的模式(例如,它可以判断 geo_point 是指定为 StringType 还是 ArrayType)。

由于 Spark SQL 是强类型的,因此每个地理字段需要在所有文档中具有相同的格式。否则,返回的数据将不符合检测到的模式,从而导致错误。

Spark 结构化流支持

编辑

在 6.0 版本中添加。

结构化流 提供快速、可扩展、容错、端到端精确一次的流处理,而无需用户考虑流处理。

-- Spark 文档

作为 Spark 2.0 中的实验性功能发布,Spark 结构化流提供了一个统一的流和批处理接口,该接口内置于 Spark SQL 集成中。从 elasticsearch-hadoop 6.0 开始,我们提供了将流数据索引到 Elasticsearch 的原生功能。

与 Spark SQL 一样,结构化流使用结构化数据。所有条目都应具有相同的结构(相同数量的字段,类型和名称相同)。不支持使用非结构化数据(具有不同结构的文档),这会导致问题。对于此类情况,请使用 DStreams。

支持的 Spark 结构化流版本

编辑

Spark 结构化流从 Spark v2.2.0 开始被认为是普遍可用的。因此,elasticsearch-hadoop 对结构化流的支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。与之前的 Spark SQL 类似,在接口被认为稳定之前,结构化流可能会在版本之间发生重大变化。

Spark 结构化流支持在 org.elasticsearch.spark.sqlorg.elasticsearch.spark.sql.streaming 包下可用。它与 Spark SQL 共享一个统一的接口,形式为 Dataset[_] api。客户端可以以几乎与常规批处理 Datasets 完全相同的方式与流 Datasets 交互,仅有少数例外

将流 Datasets(Spark SQL 2.0+)写入 Elasticsearch

编辑

使用 elasticsearch-hadoop,可以将流支持的 Datasets 索引到 Elasticsearch。

Scala
编辑

在 Scala 中,要将基于流的 Datasets 和 DataFrames 保存到 Elasticsearch,只需配置流以使用 "es" 格式写入,如下所示

import org.apache.spark.sql.SparkSession    

...

val spark = SparkSession.builder()
   .appName("EsStreamingExample")           
   .getOrCreate()

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = spark.readStream                   
        .textFile("/path/to/people/files/*")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))

people.writeStream
      .option("checkpointLocation", "/save/location")      
      .format("es")
      .start("spark/people")                               

Spark SQL 导入

创建 SparkSession

不要调用 read,而是调用 readStream 以获取 DataStreamReader 的实例

连续读取文本文件目录并将它们转换为 Person 对象

提供一个位置来保存流查询的偏移量和提交日志

使用 "es" 格式启动流,以将 Dataset 的内容持续索引到 Elasticsearch

Spark 在基于批处理和基于流的 Datasets 之间没有基于类型的区别。虽然您可以导入 org.elasticsearch.spark.sql 包以向您的 DatasetDataFrame 添加 saveToEs 方法,但如果这些方法在基于流的 Datasets 或 DataFrames 上调用,则会抛出非法参数异常。

Java
编辑

类似地,"es" 格式也可用于 Java。

import org.apache.spark.sql.SparkSession    

...

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")     
  .getOrCreate();

// java bean style class
public static class PersonBean {
  private String name;
  private String surname;                        
  private int age;

  ...
}

Dataset<PersonBean> people = spark.readStream()         
        .textFile("/path/to/people/files/*")
        .map(new MapFunction<String, PersonBean>() {
            @Override
            public PersonBean call(String value) throws Exception {
                return someFunctionThatParsesStringToJavaBeans(value.split(","));         
            }
        }, Encoders.<PersonBean>bean(PersonBean.class));

people.writeStream()
    .option("checkpointLocation", "/save/location")       
    .format("es")
    .start("spark/people");                               

Spark SQL Java 导入。可以使用与 Scala 相同的会话类

创建 SparkSession。也可以使用旧的 SQLContext api

我们创建一个 Java bean 类作为我们的数据格式

使用 readStream() 方法获取 DataStreamReader 以开始构建我们的流

将我们的字符串数据转换为我们的 PersonBean

设置一个保存流状态的位置

使用 "es" 格式,我们将 Dataset 持续索引到 Elasticsearch 中的 spark/people 下。

将现有的 JSON 写入 Elasticsearch

编辑

在使用 Spark SQL 时,如果输入数据为 JSON 格式,只需将其转换为 Dataset(对于 Spark SQL 2.0)(如 Spark 文档 中所述)通过 DataStreamReaderjson 格式。

Spark 结构化流中的接收器提交日志

编辑

Spark 结构化流宣传了一种端到端容错精确一次处理模型,该模型通过使用偏移量检查点和维护每个流查询的提交日志来实现。在执行流查询时,大多数源和接收器都需要您指定一个“checkpointLocation”以持久化作业的状态。如果发生中断,则使用相同的检查点位置启动新的流查询将恢复作业的状态并从中断处继续执行。我们为 elasticsearch-hadoop 的 Elasticsearch 接收器实现维护了一个提交日志,该日志位于配置的检查点位置下的特殊目录中

$> ls /path/to/checkpoint/location
metadata  offsets/  sinks/
$> ls /path/to/checkpoint/location/sinks
elasticsearch/
$> ls /path/to/checkpoint/location/sinks/elasticsearch
12.compact  13  14  15 16  17  18

提交日志目录中的每个文件都对应于已提交的批处理 ID。日志实现定期压缩日志以避免混乱。您可以通过多种方式设置日志目录的位置

  1. 使用 es.spark.sql.streaming.sink.log.path 设置显式日志位置(请参见下文)。
  2. 如果未设置,则将使用 checkpointLocation 指定的路径。
  3. 如果未设置,则将通过组合 SparkSession 中的 spark.sql.streaming.checkpointLocation 的值和 Dataset 给定的查询名称来构造路径。
  4. 如果不存在查询名称,则在上述情况下将使用随机 UUID 代替查询名称
  5. 如果未提供上述任何设置,则 start 调用将抛出异常

以下是影响 Elasticsearch 提交日志行为的配置列表

es.spark.sql.streaming.sink.log.enabled(默认值为 true
启用或禁用流作业的提交日志。默认情况下,日志已启用,并且将跳过具有相同批处理 ID 的输出批处理以避免重复写入。当此设置为 false 时,提交日志将被禁用,并且所有输出都将发送到 Elasticsearch,而不管它们是否已在先前的执行中发送过。
es.spark.sql.streaming.sink.log.path
设置存储此流查询的日志数据的位置。如果未设置此值,则 Elasticsearch 接收器将在 checkpointLocation 中给定的路径下存储其提交日志。任何与 HDFS 客户端兼容的 URI 都可以接受。
es.spark.sql.streaming.sink.log.cleanupDelay(默认值为 10m
提交日志通过 Spark 的 HDFS 客户端进行管理。某些与 HDFS 兼容的文件系统(如 Amazon 的 S3)以异步方式传播文件更改。为了解决此问题,在一组日志文件被压缩后,客户端将等待此时间段,然后再清理旧文件。
es.spark.sql.streaming.sink.log.deletion(默认值为 true
确定是否应删除不再需要的旧日志。在每个批处理提交后,客户端将检查是否有任何已压缩且可以安全删除的提交日志。如果设置为 false,则日志将跳过此清理步骤,为每个批处理留下一个提交文件。
es.spark.sql.streaming.sink.log.compactInterval(默认值为 10
设置在压缩日志文件之前要处理的批处理数量。默认情况下,每 10 个批处理,提交日志将被压缩成一个包含所有先前已提交批处理 ID 的单个文件。

Spark 结构化流类型转换

编辑

结构化流使用与 Spark SQL 集成完全相同的类型转换规则。

在处理多值/数组字段时,请参阅部分,特别是这些配置选项。

如果使用自动索引创建,请查看部分以获取更多信息。

elasticsearch-hadoop 会自动将 Spark 内置类型转换为 Elasticsearch 类型,如下表所示

虽然 Spark SQL 的 DataTypes 在 Scala 和 Java 中都有等价类型,因此 RDD 转换可以适用,但语义略有不同——特别是对于 java.sql 类型,因为 Spark SQL 处理它们的方式有所不同。

表 9. Spark SQL 1.3+ 转换表

Spark SQL DataType Elasticsearch 类型

null

null

ByteType

byte

ShortType

short

IntegerType

int

LongType

long

FloatType

float

DoubleType

double

StringType

string

BinaryType

string(BASE64)

BooleanType

boolean

DateType

datestring 格式)

TimestampType

long(Unix 时间戳)

ArrayType

array

MapType

对象

StructType

对象

使用 Map/Reduce 层

编辑

使用 Spark 与 Elasticsearch 的另一种方法是通过 Map/Reduce 层,即利用 elasticsearch-hadoop 中专用的 Input/OuputFormat。但是,除非您坚持使用 elasticsearch-hadoop 2.0,否则我们强烈建议使用原生集成,因为它提供了更好的性能和灵活性。

配置

编辑

通过 elasticsearch-hadoop,Spark 可以通过其专用的 InputFormat 与 Elasticsearch 集成,在写入的情况下,通过 OutputFormat 集成。这些在Map/Reduce章节中有详细描述,因此请参阅该章节以获取深入的解释。

简而言之,需要使用目标 Elasticsearch 集群和索引设置一个基本的 Hadoop Configuration 对象,可能还需要一个查询,然后就可以开始了。

从Spark的角度来看,唯一需要做的就是设置序列化 - Spark默认依赖于Java序列化,这虽然方便,但效率相当低下。这就是Hadoop本身引入自己的序列化机制及其类型(即Writable)的原因。因此,InputFormatOutputFormat需要返回Writable,而Spark默认情况下并不理解这些类型。好消息是,可以轻松启用不同的序列化方式(例如Kryo),它可以自动处理转换,并且效率也相当高。

SparkConf sc = new SparkConf(); //.setMaster("local");
sc.set("spark.serializer", KryoSerializer.class.getName()); 

// needed only when using the Java API
JavaSparkContext jsc = new JavaSparkContext(sc);

使用Spark启用Kryo序列化支持

或者如果您更喜欢Scala

val sc = new SparkConf(...)
sc.set("spark.serializer", classOf[KryoSerializer].getName) 

使用Spark启用Kryo序列化支持

请注意,Kryo序列化用作处理Writable类型的变通方法;可以选择直接转换类型(从WritableSerializable类型) - 这也很好,但对于入门来说,上面的一行代码似乎是最有效的。

从Elasticsearch读取数据

编辑

要读取数据,只需传入org.elasticsearch.hadoop.mr.EsInputFormat类 - 因为它支持的Map/Reduce API,您可以自由地在SparkContexthadoopRDD(我们推荐出于简洁性考虑)或newAPIHadoopRDD上使用任何一种方法。无论您选择哪种方法,请坚持使用它,以避免将来出现混淆和问题。

(org.apache.hadoop.mapred) API
编辑
JobConf conf = new JobConf();                             
conf.set("es.resource", "radio/artists");                 
conf.set("es.query", "?q=me*");                           

JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class,
                          Text.class, MapWritable.class); 
long docCount = esRDD.count();

创建Hadoop对象(使用旧API)

配置数据源(索引)

设置查询(可选)

通过EsInputFormat在Elasticsearch之上创建一个Spark RDD - 键表示文档ID,值表示文档本身

Scala版本如下

val conf = new JobConf()                                   
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.hadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

创建Hadoop对象(使用旧API)

配置数据源(索引)

设置查询(可选)

通过EsInputFormat在Elasticsearch之上创建一个Spark RDD

(org.apache.hadoop.mapreduce) API
编辑

正如预期的那样,mapreduce API版本非常相似 - 将hadoopRDD替换为newAPIHadoopRDD,并将JobConf替换为Configuration。就是这样。

Configuration conf = new Configuration();       
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 

JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class,
                Text.class, MapWritable.class); 
long docCount = esRDD.count();

创建Hadoop对象(使用新API)

配置数据源(索引)

设置查询(可选)

通过EsInputFormat在Elasticsearch之上创建一个Spark RDD - 键表示文档ID,值表示文档本身

Scala版本如下

val conf = new Configuration()                             
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.newAPIHadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

创建Hadoop对象(使用新API)

配置数据源(索引)

设置查询(可选)

通过EsInputFormat在Elasticsearch之上创建一个Spark RDD

从PySpark使用连接器

编辑

由于其Map/Reduce层,elasticsearch-hadoop也可以从PySpark中使用,用于读写Elasticsearch中的数据。为此,以下是Spark文档中的一个代码片段(确保切换到Python代码片段)

$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

此外,SQL加载器也可以使用

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()