Map/Reduce 集成

编辑

对于低级别或对性能敏感的环境,elasticsearch-hadoop 提供了专用的 InputFormatOutputFormat 实现,可以读取和写入 Elasticsearch 的数据。在 Map/Reduce 中,MapperReducer 读取和写入 Writable 对象,这是一个针对序列化进行了优化的 Hadoop 特定接口。因此,elasticsearch-hadoop InputFormatOutputFormat 将返回并期望 MapWritable 对象;每个被读取或写入的文档都使用一个映射。只要其对象也是 Writable,映射本身就可以具有任何类型的内部结构——它可以在其 Writable 表示中保存嵌套映射、数字或字符串。elasticsearch-hadoop 在内部会自动将 WritableMap 转换为 JSON 文档,反之亦然,因此您无需处理低级别解析或与 JSON 之间的转换。此外,如果发送到 Elasticsearch 的数据已经是 JSON 格式,则可以直接流式传输,而无需转换为 Writable 对象。阅读本章的其余部分以了解更多信息。

安装

编辑

为了使用 elasticsearch-hadoop,需要使 jar 包 可用于作业类路径。该 jar 包大小约为 250kB,并且没有任何依赖项,可以将其捆绑在作业存档中,也可以通过 CLI 通用选项(如果您的 jar 包实现了 Tool 接口)手动或通过 CLI 进行捆绑,通过 Hadoop 的 DistributedCache 分发,或者通过手动配置集群来提供。

以上所有选项仅影响在分布式节点上运行的代码。如果启动 Hadoop 作业的代码引用 elasticsearch-hadoop,请确保在 HADOOP_CLASSPATH 中包含 JAR 包:HADOOP_CLASSPATH="<冒号分隔的 jar 包路径,包括 elasticsearch-hadoop>"

CLI 示例。

$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar

配置

编辑

在 Map/Reduce 作业中使用 elasticsearch-hadoop 时,可以使用 Hadoop 的 Configuration 对象通过将各种选项设置为上述对象上的属性来配置 elasticsearch-hadoop。通常,您需要设置 Elasticsearch 主机和端口(假设它没有在默认的 localhost:9200 上运行)、目标索引/类型以及潜在的查询,例如

Configuration conf = new Configuration();
conf.set("es.nodes", "es-server:9200");    
conf.set("es.resource", "radio/artists");  
...

elasticsearch-hadoop 将连接到的 Elasticsearch 集群中的一个节点。默认情况下,elasticsearch-hadoop 将检测集群中的其余节点。

elasticsearch-hadoop 将用于读取和写入数据的 resource(索引/类型)。

在构建 Hadoop 作业时只需使用配置对象即可。

将数据写入 Elasticsearch

编辑

使用 elasticsearch-hadoop,Map/Reduce 作业可以将数据写入 Elasticsearch,使其可以通过 索引 进行搜索。elasticsearch-hadoop 支持(所谓的)旧版新版 Hadoop API。

EsOutputFormat 期望一个表示文档值的 Map<Writable, Writable>,该值在内部转换为 JSON 文档并在 Elasticsearch 中建立索引。Hadoop OutputFormat 要求实现期望一个键和一个值,但是,由于 Elasticsearch 只需要文档(即值),因此 EsOutputFormat 会忽略键。

旧版 (org.apache.hadoop.mapred) API

编辑

要将数据写入 ES,请在您的作业中使用 org.elasticsearch.hadoop.mr.EsOutputFormat 以及相关的配置 属性

JobConf conf = new JobConf();
conf.setSpeculativeExecution(false);           
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");      
conf.setOutputFormat(EsOutputFormat.class);    
conf.setMapOutputValueClass(MapWritable.class);
conf.setMapperClass(MyMapper.class);
...
JobClient.runJob(conf);

禁用推测执行

目标索引/类型

专用的 OutputFormat

指定 mapper 输出类 (MapWritable)

Mapper 实现可以使用 EsOutputFormat,如下所示

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   // write the result to the output collector
   // one can pass whatever value to the key; EsOutputFormat ignores it
   output.collect(NullWritable.get(), map);
 }}

对于需要指定文档的 ID(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置相应的 映射(即 es.mapping.id)来实现。因此,假设文档包含一个名为 radioId 的字段,该字段是唯一的并且适合用作标识符,则可以按如下方式更新作业配置

JobConf conf = new JobConf();
conf.set("es.mapping.id", "radioId");

在运行时,elasticsearch-hadoop 将从每个文档中提取值,并在批量调用期间相应地使用它。

将现有 JSON 写入 Elasticsearch

编辑

对于作业输入数据已经是 JSON 的情况,elasticsearch-hadoop 允许无需应用任何转换即可直接建立索引;数据按原样获取并直接发送到 Elasticsearch。在这种情况下,需要通过设置 es.input.json 参数来指示 json 输入。因此,在这种情况下,elasticsearch-hadoop 期望 TextBytesWritable(首选,因为它不需要 String 转换)对象作为输出;如果未使用这些类型,库将简单地回退到目标对象的 toString 表示形式。

表 1. 用于 JSON 表示的 Writable

Writable 注释

BytesWritable

当 JSON 数据表示为 byte[] 或类似类型时,请使用此方法

Text

如果 JSON 数据表示为 String,请使用此方法

其他任何内容

确保 toString() 返回所需的 JSON 文档

确保数据已正确编码为 UTF-8。作业输出被视为发送到 Elasticsearch 的文档的最终形式。

JobConf conf = new JobConf();
conf.set("es.input.json", "yes");        
conf.setMapOutputValueClass(Text.class); 
...
JobClient.runJob(conf);

指示 EsOutputFormat 的输入类型为 JSON。

设置正确的输出类型(在本例中为 Text

Mapper 实现如下所示

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // assuming the document is a String called 'source'
   String source =  ...
   Text jsonDoc = new Text(source);
   // send the doc directly
   output.collect(NullWritable.get(), jsonDoc);
 }}

写入动态/多资源

编辑

对于需要根据数据内容在不同存储桶下建立索引的数据的情况,可以使用 es.resource.write 字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下

JobConf conf = new JobConf();
conf.set("es.resource.write","my-collection-{media-type}/doc");

如果使用 Writable 对象,对于每个 MapWritable,elasticsearch-hadoop 将提取 media-type 键下的值,并将其用作 Elasticsearch 索引后缀。如果使用原始 JSON,则 elasticsearch-hadoop 将解析文档,提取字段 media-type 并相应地使用其值。

新版 (org.apache.hadoop.mapreduce) API

编辑

使用新版 API 非常相似——事实上,使用了完全相同的类 (org.elasticsearch.hadoop.mr.EsOutputFormat)

Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);    
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); 
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");                            
Job job = new Job(conf);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputValueClass(MapWritable.class);                       
...
job.waitForCompletion(true);

禁用 mapper 推测执行

禁用 reducer 推测执行

目标索引/类型

指定 Mapper 值输出类型(在本例中为 MapWritable

Mapper 实例也一样

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   context.write(NullWritable.get(), doc);
 }}

指定 ID 或其他文档 元数据 也同样简单

Configuration conf = new Configuration();
conf.set("es.mapping.id", "radioId");

将现有 JSON 写入 Elasticsearch

编辑

和以前一样,在直接处理 JSON 时,在新 API 下,配置如下所示

Configuration conf = new Configuration();
conf.set("es.input.json", "yes");                 
Job job = new Job(conf);
job.setMapOutputValueClass(BytesWritable.class); 
...
job.waitForCompletion(true);

指示 EsOutputFormat 的输入类型为 JSON。

设置输出类型,在本例中为 BytesWritable

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // assuming the document is stored as bytes
   byte[] source =  ...
   BytesWritable jsonDoc = new BytesWritable(source);
   // send the doc directly
   context.write(NullWritable.get(), jsonDoc);
 }}

写入动态/多资源

编辑

正如预期的那样,在这种情况下,旧版新版 API 之间的差异最小(可以认为是不存在的)

Configuration conf = new Configuration();
conf.set("es.resource.write","my-collection-{media-type}/doc");
...

从 Elasticsearch 读取数据

编辑

类似地,要从 Elasticsearch 读取数据,需要使用 org.elasticsearch.hadoop.mr.EsInputFormat 类。虽然它可以读取整个索引,但使用查询更方便——elasticsearch-hadoop 将自动实时执行查询并将结果反馈回 Hadoop。由于查询针对实际数据执行,因此这充当数据集的实时视图。

与它的对应类 (EsOutputFormat) 一样,EsInputFormat 为 Elasticsearch 返回的每个 JSON 文档返回一个 Map<Writable, Writable>。由于 InputFormat 需要返回键和值,因此 EsInputFormat 将返回文档 ID(在 Elasticsearch 内)作为键(通常被忽略)以及文档/映射作为值。

如果需要保留Elasticsearch返回的文档结构,请考虑使用org.elasticsearch.hadoop.mr.LinkedMapWritable。此类扩展了Hadoop的MapWritable(因此可以轻松替换它)并保留插入顺序;也就是说,在迭代映射时,条目将按插入顺序返回(与MapWritable不维护插入顺序相反)。但是,由于Hadoop的工作方式,需要将LinkedMapWritable指定为作业映射输出值(而不是MapWritable)。

旧版 (org.apache.hadoop.mapred) API

编辑

根据我们上面关于电台艺术家的示例,要获取所有以“me”开头的艺术家,可以使用以下代码片段

JobConf conf = new JobConf();
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 
conf.setInputFormat(EsInputFormat.class);       
conf.setMapOutputKeyClass(Text.class);          
conf.setMapOutputValueClass(MapWritable.class); 

...
JobClient.runJob(conf);

目标索引/类型

查询

专用InputFormat

Text作为键类(包含文档ID)

MapWritable或elasticsearch-hadoop的LinkedMapWritable(保留插入顺序)作为值类(包含文档结构)

使用EsInputFormatMapper可能如下所示

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;      
   ...
 }}

LinkedMapWritableMapWritable类型兼容,因此强制转换对两者都有效

随意使用Java 5泛型来避免上述强制转换。为清晰起见,本章中的示例不包含泛型。

新版 (org.apache.hadoop.mapreduce) API

编辑

正如预期的那样,mapreduce API版本非常相似

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists/");            
conf.set("es.query", "?q=me*");                       
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);        
...

job.waitForCompletion(true);

目标索引/类型

查询

MapWritable或elasticsearch-hadoop的LinkedMapWritable(保留插入顺序)作为值类(包含文档结构)

以及Mapper实现

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;             
   ...
 }}

LinkedMapWritableMapWritable类型兼容,因此强制转换对两者都有效

以JSON格式从Elasticsearch读取数据

编辑

如果需要Elasticsearch的结果为JSON格式(通常是为了将其发送到其他系统),则可以指示elasticsearch-hadoop按原样返回数据。通过将es.output.json设置为true,连接器将解析来自Elasticsearch的响应,识别文档,并在不转换它们的情况下将其内容作为Text对象返回给用户。

Configuration conf = new Configuration();
conf.set("es.resource", "source/category");
conf.set("es.output.json", "true");

使用不同的索引进行读写

编辑

有时,需要从一个Elasticsearch资源读取数据,对其进行处理,然后将其写入同一个作业中的另一个资源。es.resource设置不够用,因为它暗示了相同的资源既是源又是目标。在这种情况下,应该使用es.resource.reades.resource.write来区分这两个资源(下面的示例使用的是mapreduce API)。

Configuration conf = new Configuration();
conf.set("es.resource.read", "source/category");
conf.set("es.resource.write", "sink/group");

类型转换

编辑

如果使用自动索引创建,请查看此处部分以了解更多信息。

elasticsearch-hadoop会自动将Hadoop内置的Writable类型转换为Elasticsearch字段类型(反之亦然),如下表所示。

表2.Writable转换表

Writable Elasticsearch类型

null

null

NullWritable

null

BooleanWritable

boolean

Text

string

ByteWritable

byte

IntWritable

int

VInt

int

LongWritable

long

VLongWritable

long

BytesWritable

binary

DoubleWritable

double

FloatWritable

float

MD5Writable

string

ArrayWritable

array

AbstractMapWritable

map

ShortWritable

short

值得一提的是,Elasticsearch中仅有的丰富数据类型,例如GeoPointGeoShape,可以通过将其结构转换为上表中可用的基本类型来支持。例如,根据其存储方式,geo_point可能会作为Text(基本上是String)或ArrayWritable返回。