Map/Reduce 集成
编辑Map/Reduce 集成
编辑对于低级别或对性能敏感的环境,elasticsearch-hadoop 提供了专用的 InputFormat
和 OutputFormat
实现,可以读取和写入 Elasticsearch 的数据。在 Map/Reduce 中,Mapper
和 Reducer
读取和写入 Writable
对象,这是一个针对序列化进行了优化的 Hadoop 特定接口。因此,elasticsearch-hadoop InputFormat
和 OutputFormat
将返回并期望 MapWritable
对象;每个被读取或写入的文档都使用一个映射。只要其对象也是 Writable
,映射本身就可以具有任何类型的内部结构——它可以在其 Writable
表示中保存嵌套映射、数字或字符串。elasticsearch-hadoop 在内部会自动将 Writable
的 Map
转换为 JSON 文档,反之亦然,因此您无需处理低级别解析或与 JSON 之间的转换。此外,如果发送到 Elasticsearch 的数据已经是 JSON 格式,则可以直接流式传输,而无需转换为 Writable
对象。阅读本章的其余部分以了解更多信息。
安装
编辑为了使用 elasticsearch-hadoop,需要使 jar 包 可用于作业类路径。该 jar 包大小约为 250kB
,并且没有任何依赖项,可以将其捆绑在作业存档中,也可以通过 CLI 通用选项(如果您的 jar 包实现了 Tool 接口)手动或通过 CLI 进行捆绑,通过 Hadoop 的 DistributedCache 分发,或者通过手动配置集群来提供。
以上所有选项仅影响在分布式节点上运行的代码。如果启动 Hadoop 作业的代码引用 elasticsearch-hadoop,请确保在 HADOOP_CLASSPATH
中包含 JAR 包:HADOOP_CLASSPATH="<冒号分隔的 jar 包路径,包括 elasticsearch-hadoop>"
CLI 示例。
$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar
配置
编辑在 Map/Reduce 作业中使用 elasticsearch-hadoop 时,可以使用 Hadoop 的 Configuration
对象通过将各种选项设置为上述对象上的属性来配置 elasticsearch-hadoop。通常,您需要设置 Elasticsearch 主机和端口(假设它没有在默认的 localhost:9200
上运行)、目标索引/类型以及潜在的查询,例如
Configuration conf = new Configuration(); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); ...
elasticsearch-hadoop 将连接到的 Elasticsearch 集群中的一个节点。默认情况下,elasticsearch-hadoop 将检测集群中的其余节点。 |
|
elasticsearch-hadoop 将用于读取和写入数据的 |
在构建 Hadoop 作业时只需使用配置对象即可。
将数据写入 Elasticsearch
编辑使用 elasticsearch-hadoop,Map/Reduce 作业可以将数据写入 Elasticsearch,使其可以通过 索引 进行搜索。elasticsearch-hadoop 支持(所谓的)旧版 和 新版 Hadoop API。
EsOutputFormat
期望一个表示文档值的 Map<Writable, Writable>
,该值在内部转换为 JSON 文档并在 Elasticsearch 中建立索引。Hadoop OutputFormat
要求实现期望一个键和一个值,但是,由于 Elasticsearch 只需要文档(即值),因此 EsOutputFormat
会忽略键。
旧版 (org.apache.hadoop.mapred
) API
编辑要将数据写入 ES,请在您的作业中使用 org.elasticsearch.hadoop.mr.EsOutputFormat
以及相关的配置 属性
JobConf conf = new JobConf(); conf.setSpeculativeExecution(false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); conf.setOutputFormat(EsOutputFormat.class); conf.setMapOutputValueClass(MapWritable.class); conf.setMapperClass(MyMapper.class); ... JobClient.runJob(conf);
Mapper
实现可以使用 EsOutputFormat
,如下所示
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // create the MapWritable object MapWritable doc = new MapWritable(); ... // write the result to the output collector // one can pass whatever value to the key; EsOutputFormat ignores it output.collect(NullWritable.get(), map); }}
对于需要指定文档的 ID(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置相应的 映射(即 es.mapping.id
)来实现。因此,假设文档包含一个名为 radioId
的字段,该字段是唯一的并且适合用作标识符,则可以按如下方式更新作业配置
JobConf conf = new JobConf(); conf.set("es.mapping.id", "radioId");
在运行时,elasticsearch-hadoop 将从每个文档中提取值,并在批量调用期间相应地使用它。
将现有 JSON 写入 Elasticsearch
编辑对于作业输入数据已经是 JSON 的情况,elasticsearch-hadoop 允许无需应用任何转换即可直接建立索引;数据按原样获取并直接发送到 Elasticsearch。在这种情况下,需要通过设置 es.input.json
参数来指示 json 输入。因此,在这种情况下,elasticsearch-hadoop 期望 Text
或 BytesWritable
(首选,因为它不需要 String
转换)对象作为输出;如果未使用这些类型,库将简单地回退到目标对象的 toString
表示形式。
表 1. 用于 JSON 表示的 Writable
Writable |
注释 |
---|---|
|
当 JSON 数据表示为 |
|
如果 JSON 数据表示为 |
其他任何内容 |
确保 |
确保数据已正确编码为 UTF-8
。作业输出被视为发送到 Elasticsearch 的文档的最终形式。
JobConf conf = new JobConf(); conf.set("es.input.json", "yes"); conf.setMapOutputValueClass(Text.class); ... JobClient.runJob(conf);
Mapper
实现如下所示
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // assuming the document is a String called 'source' String source = ... Text jsonDoc = new Text(source); // send the doc directly output.collect(NullWritable.get(), jsonDoc); }}
写入动态/多资源
编辑对于需要根据数据内容在不同存储桶下建立索引的数据的情况,可以使用 es.resource.write
字段,该字段接受在运行时从文档内容解析的模式。按照前面提到的 媒体示例,可以将其配置如下
JobConf conf = new JobConf(); conf.set("es.resource.write","my-collection-{media-type}/doc");
如果使用 Writable
对象,对于每个 MapWritable
,elasticsearch-hadoop 将提取 media-type
键下的值,并将其用作 Elasticsearch 索引后缀。如果使用原始 JSON,则 elasticsearch-hadoop 将解析文档,提取字段 media-type
并相应地使用其值。
新版 (org.apache.hadoop.mapreduce
) API
编辑使用新版 API 非常相似——事实上,使用了完全相同的类 (org.elasticsearch.hadoop.mr.EsOutputFormat
)
Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); Job job = new Job(conf); job.setOutputFormatClass(EsOutputFormat.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
Mapper
实例也一样
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // create the MapWritable object MapWritable doc = new MapWritable(); ... context.write(NullWritable.get(), doc); }}
指定 ID 或其他文档 元数据 也同样简单
Configuration conf = new Configuration(); conf.set("es.mapping.id", "radioId");
将现有 JSON 写入 Elasticsearch
编辑和以前一样,在直接处理 JSON 时,在新 API 下,配置如下所示
Configuration conf = new Configuration(); conf.set("es.input.json", "yes"); Job job = new Job(conf); job.setMapOutputValueClass(BytesWritable.class); ... job.waitForCompletion(true);
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // assuming the document is stored as bytes byte[] source = ... BytesWritable jsonDoc = new BytesWritable(source); // send the doc directly context.write(NullWritable.get(), jsonDoc); }}
写入动态/多资源
编辑正如预期的那样,在这种情况下,旧版
和新版
API 之间的差异最小(可以认为是不存在的)
Configuration conf = new Configuration(); conf.set("es.resource.write","my-collection-{media-type}/doc"); ...
从 Elasticsearch 读取数据
编辑类似地,要从 Elasticsearch 读取数据,需要使用 org.elasticsearch.hadoop.mr.EsInputFormat
类。虽然它可以读取整个索引,但使用查询更方便——elasticsearch-hadoop 将自动实时执行查询并将结果反馈回 Hadoop。由于查询针对实际数据执行,因此这充当数据集的实时视图。
与它的对应类 (EsOutputFormat
) 一样,EsInputFormat
为 Elasticsearch 返回的每个 JSON 文档返回一个 Map<Writable, Writable>
。由于 InputFormat
需要返回键和值,因此 EsInputFormat
将返回文档 ID(在 Elasticsearch 内)作为键(通常被忽略)以及文档/映射作为值。
如果需要保留Elasticsearch返回的文档结构,请考虑使用org.elasticsearch.hadoop.mr.LinkedMapWritable
。此类扩展了Hadoop的MapWritable
(因此可以轻松替换它)并保留插入顺序;也就是说,在迭代映射时,条目将按插入顺序返回(与MapWritable
不维护插入顺序相反)。但是,由于Hadoop的工作方式,需要将LinkedMapWritable
指定为作业映射输出值(而不是MapWritable
)。
旧版 (org.apache.hadoop.mapred
) API
编辑根据我们上面关于电台艺术家的示例,要获取所有以“me”开头的艺术家,可以使用以下代码片段
JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); conf.setInputFormat(EsInputFormat.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(MapWritable.class); ... JobClient.runJob(conf);
目标索引/类型 |
|
查询 |
|
专用 |
|
|
|
|
使用EsInputFormat
的Mapper
可能如下所示
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
随意使用Java 5泛型来避免上述强制转换。为清晰起见,本章中的示例不包含泛型。
新版 (org.apache.hadoop.mapreduce
) API
编辑正如预期的那样,mapreduce
API版本非常相似
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists/"); conf.set("es.query", "?q=me*"); Job job = new Job(conf); job.setInputFormatClass(EsInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
以及Mapper
实现
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
以JSON格式从Elasticsearch读取数据
编辑如果需要Elasticsearch的结果为JSON格式(通常是为了将其发送到其他系统),则可以指示elasticsearch-hadoop按原样返回数据。通过将es.output.json
设置为true
,连接器将解析来自Elasticsearch的响应,识别文档,并在不转换它们的情况下将其内容作为Text
对象返回给用户。
Configuration conf = new Configuration(); conf.set("es.resource", "source/category"); conf.set("es.output.json", "true");
使用不同的索引进行读写
编辑有时,需要从一个Elasticsearch资源读取数据,对其进行处理,然后将其写入同一个作业中的另一个资源。es.resource
设置不够用,因为它暗示了相同的资源既是源又是目标。在这种情况下,应该使用es.resource.read
和es.resource.write
来区分这两个资源(下面的示例使用的是mapreduce API)。
Configuration conf = new Configuration(); conf.set("es.resource.read", "source/category"); conf.set("es.resource.write", "sink/group");
类型转换
编辑如果使用自动索引创建,请查看此处部分以了解更多信息。
elasticsearch-hadoop会自动将Hadoop内置的Writable
类型转换为Elasticsearch字段类型(反之亦然),如下表所示。
表2.Writable
转换表
Writable |
Elasticsearch类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
值得一提的是,Elasticsearch中仅有的丰富数据类型,例如GeoPoint
或GeoShape
,可以通过将其结构转换为上表中可用的基本类型来支持。例如,根据其存储方式,geo_point
可能会作为Text
(基本上是String
)或ArrayWritable
返回。