Map/Reduce 集成编辑

对于低级或性能敏感的环境,elasticsearch-hadoop 提供了专用的 InputFormatOutputFormat 实现,可以读取和写入 Elasticsearch 的数据。在 Map/Reduce 中,MapperReducer 正在读取和写入 Writable 对象,这是一种针对序列化优化的 Hadoop 特定接口。因此,elasticsearch-hadoop InputFormatOutputFormat 将返回并期望 MapWritable 对象;每个要读取或写入的文档都使用一个映射。映射本身可以具有任何类型的内部结构,只要其对象也是 Writable - 它可以在其 Writable 表示中保存嵌套映射、数字或字符串。在内部,elasticsearch-hadoop 会自动将 WritableMap 转换为 JSON 文档,反之亦然,因此您无需处理低级解析或与 JSON 之间的转换。此外,如果发送到 Elasticsearch 的数据已经是 JSON 格式,则可以直接流式传输,而无需转换为 Writable 对象。阅读本章的其余部分以了解更多信息。

安装编辑

为了使用 elasticsearch-hadoop,需要将 jar 提供给作业类路径。在约 250kB 且没有任何依赖项的情况下,jar 可以捆绑在作业存档中,手动或通过 CLI 通用选项(如果您的 jar 实现了 Tool 接口),通过 Hadoop 的 分布式缓存 分发,或者通过手动配置集群来提供。

以上所有选项仅影响在分布式节点上运行的代码。如果启动 Hadoop 作业的代码引用了 elasticsearch-hadoop,请确保在 HADOOP_CLASSPATH 中包含 JAR:HADOOP_CLASSPATH="<colon-separated-paths-to-your-jars-including-elasticsearch-hadoop>"

CLI 示例。

$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar

配置编辑

在 Map/Reduce 作业中使用 elasticsearch-hadoop 时,可以使用 Hadoop 的 Configuration 对象通过将各种选项设置为上述对象上的属性来配置 elasticsearch-hadoop。通常,您将设置 Elasticsearch 主机和端口(假设它没有在默认的 localhost:9200 上运行)、目标索引/类型,以及查询,例如

Configuration conf = new Configuration();
conf.set("es.nodes", "es-server:9200");    
conf.set("es.resource", "radio/artists");  
...

Elasticsearch 集群中的一个节点,elasticsearch-hadoop 将连接到该节点。默认情况下,elasticsearch-hadoop 将检测集群中的其他节点。

elasticsearch-hadoop 将用于读取和写入数据的 resource(索引/类型)。

在构建 Hadoop 作业时,只需使用配置对象,您就完成了所有设置。

将数据写入 Elasticsearch编辑

使用 elasticsearch-hadoop,Map/Reduce 作业可以将数据写入 Elasticsearch,使其可以通过 索引 进行搜索。elasticsearch-hadoop 支持(所谓的) Hadoop API。

EsOutputFormat 期望一个 Map<Writable, Writable>,表示一个文档值,该值在内部转换为 JSON 文档并在 Elasticsearch 中索引。Hadoop OutputFormat 要求实现期望一个键和一个值,但是,由于 Elasticsearch 仅需要文档(即值),因此 EsOutputFormat 会忽略键。

(org.apache.hadoop.mapred) API编辑

要将数据写入 ES,请在您的作业中使用 org.elasticsearch.hadoop.mr.EsOutputFormat 以及相关的配置 属性

JobConf conf = new JobConf();
conf.setSpeculativeExecution(false);           
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");      
conf.setOutputFormat(EsOutputFormat.class);    
conf.setMapOutputValueClass(MapWritable.class);
conf.setMapperClass(MyMapper.class);
...
JobClient.runJob(conf);

禁用推测性执行

目标索引/类型

专用的 OutputFormat

指定映射器输出类 (MapWritable)

Mapper 实现可以使用 EsOutputFormat,如下所示

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   // write the result to the output collector
   // one can pass whatever value to the key; EsOutputFormat ignores it
   output.collect(NullWritable.get(), map);
 }}

对于需要指定文档 ID(或其他元数据字段,如 ttltimestamp)的情况,可以通过设置相应的 映射(即 es.mapping.id)来实现。因此,假设文档包含一个名为 radioId 的字段,该字段是唯一的,适合用作标识符,您可以按如下方式更新作业配置

JobConf conf = new JobConf();
conf.set("es.mapping.id", "radioId");

在运行时,elasticsearch-hadoop 将从每个文档中提取值,并在批量调用期间相应地使用它。

将现有 JSON 写入 Elasticsearch编辑

对于作业输入数据已经是 JSON 的情况,elasticsearch-hadoop 允许直接索引而无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。在这种情况下,需要通过设置 es.input.json 参数来指示 json 输入。因此,在这种情况下,elasticsearch-hadoop 期望 TextBytesWritable(首选,因为它不需要 String 转换)对象作为输出;如果未使用这些类型,库将简单地回退到目标对象的 toString 表示形式。

表 1. 用于 JSON 表示的 Writable

Writable 注释

BytesWritable

当 JSON 数据表示为 byte[] 或类似数据时,请使用此方法

Text

如果 JSON 数据表示为 String,请使用此方法

其他任何内容

确保 toString() 返回所需的 JSON 文档

确保数据已正确编码,使用 UTF-8。作业输出被视为发送到 Elasticsearch 的文档的最终形式。

JobConf conf = new JobConf();
conf.set("es.input.json", "yes");        
conf.setMapOutputValueClass(Text.class); 
...
JobClient.runJob(conf);

指示 EsOutputFormat 的输入类型为 JSON。

设置正确的输出类型(在本例中为 Text

Mapper 实现变为

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // assuming the document is a String called 'source'
   String source =  ...
   Text jsonDoc = new Text(source);
   // send the doc directly
   output.collect(NullWritable.get(), jsonDoc);
 }}

写入动态/多资源编辑

对于需要将数据索引到不同桶(基于数据内容)的情况,可以使用 es.resource.write 字段,该字段接受在运行时从文档内容解析的模式。按照上述 媒体示例,可以将其配置如下

JobConf conf = new JobConf();
conf.set("es.resource.write","my-collection-{media-type}/doc");

如果使用 Writable 对象,对于每个 MapWritable,elasticsearch-hadoop 将提取 media-type 键下的值,并将其用作 Elasticsearch 索引后缀。如果使用原始 JSON,则 elasticsearch-hadoop 将解析文档,提取字段 media-type 并相应地使用其值。

(org.apache.hadoop.mapreduce) API编辑

使用 API 非常相似 - 事实上,使用了完全相同的类 (org.elasticsearch.hadoop.mr.EsOutputFormat)

Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);    
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); 
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");                            
Job job = new Job(conf);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputValueClass(MapWritable.class);                       
...
job.waitForCompletion(true);

禁用映射器推测性执行

禁用化简器推测性执行

目标索引/类型

指定 Mapper 值输出类型(在本例中为 MapWritable

对于 Mapper 实例也是如此

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   context.write(NullWritable.get(), doc);
 }}

指定 ID 或其他文档 元数据 也很容易

Configuration conf = new Configuration();
conf.set("es.mapping.id", "radioId");

将现有 JSON 写入 Elasticsearch编辑

如前所述,在处理 JSON 时,在 API 下,配置如下所示

Configuration conf = new Configuration();
conf.set("es.input.json", "yes");                 
Job job = new Job(conf);
job.setMapOutputValueClass(BytesWritable.class); 
...
job.waitForCompletion(true);

指示 EsOutputFormat 的输入类型为 JSON。

设置输出类型,在本例中为 BytesWritable

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // assuming the document is stored as bytes
   byte[] source =  ...
   BytesWritable jsonDoc = new BytesWritable(source);
   // send the doc directly
   context.write(NullWritable.get(), jsonDoc);
 }}

写入动态/多资源编辑

正如预期的那样, API 之间的差异在这种情况下也是最小的(可以理解为不存在)

Configuration conf = new Configuration();
conf.set("es.resource.write","my-collection-{media-type}/doc");
...

从 Elasticsearch 读取数据编辑

类似地,要从 Elasticsearch 读取数据,需要使用 org.elasticsearch.hadoop.mr.EsInputFormat 类。虽然它可以读取整个索引,但使用查询更方便 - elasticsearch-hadoop 会自动实时执行查询并返回结果到 Hadoop。由于查询针对真实数据执行,因此它充当数据集的实时视图。

就像它的对应类 (EsOutputFormat) 一样,EsInputFormat 为 Elasticsearch 返回的每个 JSON 文档返回一个 Map<Writable, Writable>。由于 InputFormat 要求返回键和值,EsInputFormat 将返回文档 ID(在 Elasticsearch 中)作为键(通常被忽略)以及文档/映射作为值。

如果需要保留从 Elasticsearch 返回的文档结构,请考虑使用 org.elasticsearch.hadoop.mr.LinkedMapWritable。该类扩展了 Hadoop 的 MapWritable(因此可以轻松替换它)并保留插入顺序;也就是说,在迭代映射时,条目将按插入顺序返回(与不维护插入顺序的 MapWritable 相反)。但是,由于 Hadoop 的工作方式,需要将 LinkedMapWritable 指定为作业映射输出值(而不是 MapWritable)。

(org.apache.hadoop.mapred) APIedit

以下我们上面关于电台艺人的示例,要获取所有以“me”开头的艺人,可以使用以下代码片段

JobConf conf = new JobConf();
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 
conf.setInputFormat(EsInputFormat.class);       
conf.setMapOutputKeyClass(Text.class);          
conf.setMapOutputValueClass(MapWritable.class); 

...
JobClient.runJob(conf);

目标索引/类型

查询

专用 InputFormat

Text 作为键类(包含文档 ID)

MapWritable 或 elasticsearch-hadoop 的 LinkedMapWritable(保留插入顺序)作为值类(包含文档结构)

使用 EsInputFormatMapper 可能如下所示

public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;      
   ...
 }}

LinkedMapWritableMapWritable 类型兼容,因此强制转换对两者都有效

随意使用 Java 5 泛型来避免上面的强制转换。为了清晰和可读性,本章中的示例不包括泛型。

(org.apache.hadoop.mapreduce) APIedit

正如预期的那样,mapreduce API 版本非常相似

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists/");            
conf.set("es.query", "?q=me*");                       
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);        
...

job.waitForCompletion(true);

目标索引/类型

查询

MapWritable 或 elasticsearch-hadoop 的 LinkedMapWritable(保留插入顺序)作为值类(包含文档结构)

以及 Mapper 实现

public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;             
   ...
 }}

LinkedMapWritableMapWritable 类型兼容,因此强制转换对两者都有效

以 JSON 格式从 Elasticsearch 读取数据edit

在需要以 JSON 格式从 Elasticsearch 获取结果(通常是将其发送到其他系统)的情况下,可以指示 elasticsearch-hadoop 按原样返回数据。通过将 es.output.json 设置为 true,连接器将解析来自 Elasticsearch 的响应,识别文档,并在不转换它们的情况下将其内容作为 Text 对象返回给用户

Configuration conf = new Configuration();
conf.set("es.resource", "source/category");
conf.set("es.output.json", "true");

在同一个作业中使用不同的索引进行读写edit

有时,需要从一个 Elasticsearch 资源读取数据,对其进行处理,然后将其写入同一个作业中的另一个资源。 es.resource 设置不够,因为它意味着同一个资源既是源又是目标。在这种情况下,应该使用 es.resource.reades.resource.write 来区分这两个资源(下面的示例使用 mapreduce API)

Configuration conf = new Configuration();
conf.set("es.resource.read", "source/category");
conf.set("es.resource.write", "sink/group");

类型转换edit

如果使用自动索引创建,请查看 部分以获取更多信息。

elasticsearch-hadoop 会自动将 Hadoop 内置的 Writable 类型转换为 Elasticsearch 字段类型(反之亦然),如下表所示

表 2. Writable 转换表

Writable Elasticsearch 类型

null

null

NullWritable

null

BooleanWritable

boolean

Text

string

ByteWritable

byte

IntWritable

int

VInt

int

LongWritable

long

VLongWritable

long

BytesWritable

binary

DoubleWritable

double

FloatWritable

float

MD5Writable

string

ArrayWritable

array

AbstractMapWritable

map

ShortWritable

short

值得一提的是,仅在 Elasticsearch 中可用的丰富数据类型,例如 GeoPointGeoShape 通过将其结构转换为上表中可用的基元来支持。例如,基于其存储,geo_point 可能作为 Text(基本上是 String)或 ArrayWritable 返回。