Map/Reduce 集成

编辑

对于底层或性能敏感的环境,elasticsearch-hadoop 提供了专用的 InputFormatOutputFormat 实现,可以读取和写入 Elasticsearch 的数据。在 Map/Reduce 中,MapperReducer 读取和写入 Writable 对象,这是一个针对序列化优化的 Hadoop 特定接口。因此,elasticsearch-hadoop 的 InputFormatOutputFormat 将返回和期望 MapWritable 对象;对于每个读取或写入的文档,使用一个 Map。只要其对象也是 Writable,map 本身可以具有任何类型的内部结构 - 它可以在其 Writable 表示形式中保存嵌套的 map、数字或字符串。在内部,elasticsearch-hadoop 会自动将 WritableMap 转换为 JSON 文档,反之亦然,因此您不必处理低级的 JSON 解析或转换。此外,如果发送到 Elasticsearch 的数据已经采用 JSON 格式,则可以直接流式传输,而无需转换为 Writable 对象。阅读本章的其余部分以了解更多信息。

安装

编辑

为了使用 elasticsearch-hadoop,需要将 jar 文件添加到作业的类路径中。由于大小约为 ~250kB 且没有任何依赖项,因此可以将 jar 文件捆绑到作业归档文件中,手动添加或通过 CLI 通用选项(如果您的 jar 文件实现了 Tool 接口),通过 Hadoop 的 分布式缓存 进行分发,或者通过手动配置集群使其可用。

以上所有选项都影响在分布式节点上运行的代码。如果启动 Hadoop 作业的代码引用了 elasticsearch-hadoop,请确保将 JAR 文件包含在 HADOOP_CLASSPATH 中:HADOOP_CLASSPATH="<冒号分隔的 jar 文件路径,包括 elasticsearch-hadoop>"

CLI 示例。

$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar

配置

编辑

在 Map/Reduce 作业中使用 elasticsearch-hadoop 时,可以使用 Hadoop 的 Configuration 对象来配置 elasticsearch-hadoop,方法是将各种选项设置为上述对象上的属性。通常,需要设置 Elasticsearch 的主机和端口(假设它不是在默认的 localhost:9200 上运行)、目标索引/类型以及可能的查询,例如

Configuration conf = new Configuration();
conf.set("es.nodes", "es-server:9200");    
conf.set("es.resource", "radio/artists");  
...

elasticsearch-hadoop 将要连接的 Elasticsearch 集群中的一个节点。默认情况下,elasticsearch-hadoop 将检测集群中的其余节点。

elasticsearch-hadoop 将用于读取和写入数据的 resource(索引/类型)。

只需在构建 Hadoop 作业时使用配置对象即可。

将数据写入 Elasticsearch

编辑

通过 elasticsearch-hadoop,Map/Reduce 作业可以将数据写入 Elasticsearch,使其可以通过 索引 进行搜索。elasticsearch-hadoop 同时支持(所谓的) 的 Hadoop API。

EsOutputFormat 期望一个 Map<Writable, Writable>,表示一个文档值,该值在内部转换为 JSON 文档并在 Elasticsearch 中建立索引。Hadoop 的 OutputFormat 要求实现期望一个键和一个值,但是,由于对于 Elasticsearch 仅需要文档(即值),因此 EsOutputFormat 会忽略键。

的 (org.apache.hadoop.mapred) API

编辑

要将数据写入 ES,请在您的作业中使用 org.elasticsearch.hadoop.mr.EsOutputFormat 以及相关的配置属性

JobConf conf = new JobConf();
conf.setSpeculativeExecution(false);           
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");      
conf.setOutputFormat(EsOutputFormat.class);    
conf.setMapOutputValueClass(MapWritable.class);
conf.setMapperClass(MyMapper.class);
...
JobClient.runJob(conf);

禁用推测执行

目标索引/类型

专用的 OutputFormat

指定 mapper 输出类 (MapWritable)

Mapper 实现可以如下使用 EsOutputFormat

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   // write the result to the output collector
   // one can pass whatever value to the key; EsOutputFormat ignores it
   output.collect(NullWritable.get(), map);
 }}

对于需要指定文档的 ID(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置适当的映射,即 es.mapping.id 来实现。因此,假设文档包含一个名为 radioId 的字段,该字段是唯一的并且适合作为标识符,则可以如下更新作业配置

JobConf conf = new JobConf();
conf.set("es.mapping.id", "radioId");

在运行时,elasticsearch-hadoop 将从每个文档中提取值,并在批量调用期间相应地使用它。

将现有 JSON 写入 Elasticsearch

编辑

对于作业输入数据已经采用 JSON 格式的情况,elasticsearch-hadoop 允许直接索引而无需应用任何转换;数据将按原样获取并直接发送到 Elasticsearch。在这种情况下,需要通过设置 es.input.json 参数来指示 JSON 输入。因此,在这种情况下,elasticsearch-hadoop 期望输出 TextBytesWritable(首选,因为它不需要 String 转换)对象;如果未使用这些类型,则该库将简单地回退到目标对象的 toString 表示形式。

表 1. 用于 JSON 表示的 Writable

Writable 注释

BytesWritable

当 JSON 数据表示为 byte[] 或类似形式时使用此方法

Text

如果 JSON 数据表示为 String,则使用此方法

任何其他

确保 toString() 返回所需的 JSON 文档

确保数据已正确编码为 UTF-8。作业输出被认为是发送到 Elasticsearch 的文档的最终形式。

JobConf conf = new JobConf();
conf.set("es.input.json", "yes");        
conf.setMapOutputValueClass(Text.class); 
...
JobClient.runJob(conf);

指示 EsOutputFormat 的输入类型为 JSON。

设置正确的输出类型(本例中为 Text

Mapper 实现变为

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // assuming the document is a String called 'source'
   String source =  ...
   Text jsonDoc = new Text(source);
   // send the doc directly
   output.collect(NullWritable.get(), jsonDoc);
 }}

写入动态/多资源

编辑

对于需要根据数据内容在不同存储桶下索引写入 Elasticsearch 的数据的情况,可以使用 es.resource.write 字段,该字段接受在运行时从文档内容解析的模式。按照上述媒体示例,可以如下配置

JobConf conf = new JobConf();
conf.set("es.resource.write","my-collection-{media-type}/doc");

如果使用 Writable 对象,则对于每个 MapWritable,elasticsearch-hadoop 将提取 media-type 键下的值,并将其用作 Elasticsearch 索引后缀。如果使用原始 JSON,则 elasticsearch-hadoop 将解析文档,提取 media-type 字段,并相应地使用其值。

的 (org.apache.hadoop.mapreduce) API

编辑

使用的 API 非常相似 - 实际上,使用的是完全相同的类 (org.elasticsearch.hadoop.mr.EsOutputFormat)

Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);    
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); 
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");                            
Job job = new Job(conf);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputValueClass(MapWritable.class);                       
...
job.waitForCompletion(true);

禁用 mapper 推测执行

禁用 reducer 推测执行

目标索引/类型

指定 Mapper 值输出类型(本例中为 MapWritable

Mapper 实例也是如此

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   context.write(NullWritable.get(), doc);
 }}

指定 ID 或其他文档元数据同样容易

Configuration conf = new Configuration();
conf.set("es.mapping.id", "radioId");

将现有 JSON 写入 Elasticsearch

编辑

如前所述,当直接处理 JSON 时,在的 API 下,配置如下所示

Configuration conf = new Configuration();
conf.set("es.input.json", "yes");                 
Job job = new Job(conf);
job.setMapOutputValueClass(BytesWritable.class); 
...
job.waitForCompletion(true);

指示 EsOutputFormat 的输入类型为 JSON。

设置输出类型,在此示例中为 BytesWritable

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // assuming the document is stored as bytes
   byte[] source =  ...
   BytesWritable jsonDoc = new BytesWritable(source);
   // send the doc directly
   context.write(NullWritable.get(), jsonDoc);
 }}

写入动态/多资源

编辑

正如预期的那样,在这种情况下,API 之间的差异最小(可以视为不存在)

Configuration conf = new Configuration();
conf.set("es.resource.write","my-collection-{media-type}/doc");
...

从 Elasticsearch 读取数据

编辑

以类似的方式,要从 Elasticsearch 读取数据,需要使用 org.elasticsearch.hadoop.mr.EsInputFormat 类。虽然它可以读取整个索引,但使用查询会更方便 - elasticsearch-hadoop 会自动实时执行查询,并将结果返回给 Hadoop。由于查询是针对真实数据执行的,因此这充当数据集的实时视图。

与其对应项 (EsOutputFormat) 类似,EsInputFormat 为 Elasticsearch 返回的每个 JSON 文档返回 Map<Writable, Writable>。由于 InputFormat 需要同时返回键和值,因此 EsInputFormat 将返回文档 ID(在 Elasticsearch 中)作为键(通常被忽略),并将文档/map 作为值。

如果需要保留从 Elasticsearch 返回的文档结构,请考虑使用 org.elasticsearch.hadoop.mr.LinkedMapWritable。该类扩展了 Hadoop 的 MapWritable(因此可以轻松替换它)并保留插入顺序;也就是说,当迭代 map 时,将按插入顺序返回条目(而不是维护此顺序的 MapWritable)。但是,由于 Hadoop 的工作方式,需要将 LinkedMapWritable 指定为作业 map 输出值(而不是 MapWritable)。

的 (org.apache.hadoop.mapred) API

编辑

按照我们上面关于广播艺术家的示例,要获取所有以 me 开头的艺术家,可以使用以下代码片段

JobConf conf = new JobConf();
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 
conf.setInputFormat(EsInputFormat.class);       
conf.setMapOutputKeyClass(Text.class);          
conf.setMapOutputValueClass(MapWritable.class); 

...
JobClient.runJob(conf);

目标索引/类型

查询

专用的 InputFormat

Text 作为键类(包含文档 ID)

使用 MapWritable 或 elasticsearch-hadoop 的 LinkedMapWritable(用于保留插入顺序)作为值类(包含文档结构)。

一个使用 EsInputFormatMapper 可能如下所示:

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;      
   ...
 }}

LinkedMapWritableMapWritable 类型兼容,因此类型转换对两者都适用。

可以随意使用 Java 5 泛型来避免上述类型转换。为了清晰和可读性,本章中的示例不包含泛型。

的 (org.apache.hadoop.mapreduce) API

编辑

正如预期的那样,mapreduce API 版本非常相似。

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists/");            
conf.set("es.query", "?q=me*");                       
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);        
...

job.waitForCompletion(true);

目标索引/类型

查询

使用 MapWritable 或 elasticsearch-hadoop 的 LinkedMapWritable(用于保留插入顺序)作为值类(包含文档结构)。

以及 Mapper 的实现。

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;             
   ...
 }}

LinkedMapWritableMapWritable 类型兼容,因此类型转换对两者都适用。

以 JSON 格式从 Elasticsearch 读取

编辑

如果 Elasticsearch 的结果需要为 JSON 格式(通常是为了通过网络发送到其他系统),则可以指示 elasticsearch-hadoop 按原样返回数据。 通过将 es.output.json 设置为 true,连接器将解析 Elasticsearch 的响应,识别文档,并且不进行转换,将其内容作为 Text 对象返回给用户。

Configuration conf = new Configuration();
conf.set("es.resource", "source/category");
conf.set("es.output.json", "true");

为读取和写入使用不同的索引

编辑

有时,需要在同一个作业中从一个 Elasticsearch 资源读取数据,处理数据,然后将其写回到另一个资源。 es.resource 设置是不够的,因为它表示相同的资源既作为源又作为目标。 在这种情况下,应使用 es.resource.reades.resource.write 来区分这两个资源(下面的示例使用 mapreduce API)。

Configuration conf = new Configuration();
conf.set("es.resource.read", "source/category");
conf.set("es.resource.write", "sink/group");

类型转换

编辑

如果使用自动索引创建,请查看 部分以获取更多信息。

elasticsearch-hadoop 会自动将 Hadoop 内置的 Writable 类型转换为 Elasticsearch 字段类型(以及反向转换),如下表所示:

表 2. Writable 转换表

Writable Elasticsearch 类型

null

null

NullWritable

null

BooleanWritable

boolean

Text

string

ByteWritable

byte

IntWritable

int

VInt

int

LongWritable

long

VLongWritable

long

BytesWritable

binary

DoubleWritable

double

FloatWritable

float

MD5Writable

string

ArrayWritable

array

AbstractMapWritable

map

ShortWritable

short

值得一提的是,只有 Elasticsearch 中可用的丰富数据类型,例如 GeoPointGeoShape,可以通过将其结构转换为上表中可用的基本类型来支持。 例如,根据其存储,geo_point 可能作为 Text(基本上是 String)或 ArrayWritable 返回。