Map/Reduce 集成
编辑Map/Reduce 集成
编辑对于底层或性能敏感的环境,elasticsearch-hadoop 提供了专用的 InputFormat
和 OutputFormat
实现,可以读取和写入 Elasticsearch 的数据。在 Map/Reduce 中,Mapper
和 Reducer
读取和写入 Writable
对象,这是一个针对序列化优化的 Hadoop 特定接口。因此,elasticsearch-hadoop 的 InputFormat
和 OutputFormat
将返回和期望 MapWritable
对象;对于每个读取或写入的文档,使用一个 Map。只要其对象也是 Writable
,map 本身可以具有任何类型的内部结构 - 它可以在其 Writable
表示形式中保存嵌套的 map、数字或字符串。在内部,elasticsearch-hadoop 会自动将 Writable
的 Map
转换为 JSON 文档,反之亦然,因此您不必处理低级的 JSON 解析或转换。此外,如果发送到 Elasticsearch 的数据已经采用 JSON 格式,则可以直接流式传输,而无需转换为 Writable
对象。阅读本章的其余部分以了解更多信息。
安装
编辑为了使用 elasticsearch-hadoop,需要将 jar 文件添加到作业的类路径中。由于大小约为 ~250kB
且没有任何依赖项,因此可以将 jar 文件捆绑到作业归档文件中,手动添加或通过 CLI 通用选项(如果您的 jar 文件实现了 Tool 接口),通过 Hadoop 的 分布式缓存 进行分发,或者通过手动配置集群使其可用。
以上所有选项都仅影响在分布式节点上运行的代码。如果启动 Hadoop 作业的代码引用了 elasticsearch-hadoop,请确保将 JAR 文件包含在 HADOOP_CLASSPATH
中:HADOOP_CLASSPATH="<冒号分隔的 jar 文件路径,包括 elasticsearch-hadoop>"
CLI 示例。
$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar
配置
编辑在 Map/Reduce 作业中使用 elasticsearch-hadoop 时,可以使用 Hadoop 的 Configuration
对象来配置 elasticsearch-hadoop,方法是将各种选项设置为上述对象上的属性。通常,需要设置 Elasticsearch 的主机和端口(假设它不是在默认的 localhost:9200
上运行)、目标索引/类型以及可能的查询,例如
Configuration conf = new Configuration(); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); ...
elasticsearch-hadoop 将要连接的 Elasticsearch 集群中的一个节点。默认情况下,elasticsearch-hadoop 将检测集群中的其余节点。 |
|
elasticsearch-hadoop 将用于读取和写入数据的 |
只需在构建 Hadoop 作业时使用配置对象即可。
将数据写入 Elasticsearch
编辑通过 elasticsearch-hadoop,Map/Reduce 作业可以将数据写入 Elasticsearch,使其可以通过 索引 进行搜索。elasticsearch-hadoop 同时支持(所谓的)旧 和 新 的 Hadoop API。
EsOutputFormat
期望一个 Map<Writable, Writable>
,表示一个文档值,该值在内部转换为 JSON 文档并在 Elasticsearch 中建立索引。Hadoop 的 OutputFormat
要求实现期望一个键和一个值,但是,由于对于 Elasticsearch 仅需要文档(即值),因此 EsOutputFormat
会忽略键。
旧的 (org.apache.hadoop.mapred
) API
编辑要将数据写入 ES,请在您的作业中使用 org.elasticsearch.hadoop.mr.EsOutputFormat
以及相关的配置属性
JobConf conf = new JobConf(); conf.setSpeculativeExecution(false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); conf.setOutputFormat(EsOutputFormat.class); conf.setMapOutputValueClass(MapWritable.class); conf.setMapperClass(MyMapper.class); ... JobClient.runJob(conf);
Mapper
实现可以如下使用 EsOutputFormat
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // create the MapWritable object MapWritable doc = new MapWritable(); ... // write the result to the output collector // one can pass whatever value to the key; EsOutputFormat ignores it output.collect(NullWritable.get(), map); }}
对于需要指定文档的 ID(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置适当的映射,即 es.mapping.id
来实现。因此,假设文档包含一个名为 radioId
的字段,该字段是唯一的并且适合作为标识符,则可以如下更新作业配置
JobConf conf = new JobConf(); conf.set("es.mapping.id", "radioId");
在运行时,elasticsearch-hadoop 将从每个文档中提取值,并在批量调用期间相应地使用它。
将现有 JSON 写入 Elasticsearch
编辑对于作业输入数据已经采用 JSON 格式的情况,elasticsearch-hadoop 允许直接索引而无需应用任何转换;数据将按原样获取并直接发送到 Elasticsearch。在这种情况下,需要通过设置 es.input.json
参数来指示 JSON 输入。因此,在这种情况下,elasticsearch-hadoop 期望输出 Text
或 BytesWritable
(首选,因为它不需要 String
转换)对象;如果未使用这些类型,则该库将简单地回退到目标对象的 toString
表示形式。
表 1. 用于 JSON 表示的 Writable
Writable |
注释 |
---|---|
|
当 JSON 数据表示为 |
|
如果 JSON 数据表示为 |
任何其他 |
确保 |
确保数据已正确编码为 UTF-8
。作业输出被认为是发送到 Elasticsearch 的文档的最终形式。
JobConf conf = new JobConf(); conf.set("es.input.json", "yes"); conf.setMapOutputValueClass(Text.class); ... JobClient.runJob(conf);
Mapper
实现变为
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // assuming the document is a String called 'source' String source = ... Text jsonDoc = new Text(source); // send the doc directly output.collect(NullWritable.get(), jsonDoc); }}
写入动态/多资源
编辑对于需要根据数据内容在不同存储桶下索引写入 Elasticsearch 的数据的情况,可以使用 es.resource.write
字段,该字段接受在运行时从文档内容解析的模式。按照上述媒体示例,可以如下配置
JobConf conf = new JobConf(); conf.set("es.resource.write","my-collection-{media-type}/doc");
如果使用 Writable
对象,则对于每个 MapWritable
,elasticsearch-hadoop 将提取 media-type
键下的值,并将其用作 Elasticsearch 索引后缀。如果使用原始 JSON,则 elasticsearch-hadoop 将解析文档,提取 media-type
字段,并相应地使用其值。
新的 (org.apache.hadoop.mapreduce
) API
编辑使用新的 API 非常相似 - 实际上,使用的是完全相同的类 (org.elasticsearch.hadoop.mr.EsOutputFormat
)
Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); Job job = new Job(conf); job.setOutputFormatClass(EsOutputFormat.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
Mapper
实例也是如此
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // create the MapWritable object MapWritable doc = new MapWritable(); ... context.write(NullWritable.get(), doc); }}
指定 ID 或其他文档元数据同样容易
Configuration conf = new Configuration(); conf.set("es.mapping.id", "radioId");
将现有 JSON 写入 Elasticsearch
编辑如前所述,当直接处理 JSON 时,在新的 API 下,配置如下所示
Configuration conf = new Configuration(); conf.set("es.input.json", "yes"); Job job = new Job(conf); job.setMapOutputValueClass(BytesWritable.class); ... job.waitForCompletion(true);
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // assuming the document is stored as bytes byte[] source = ... BytesWritable jsonDoc = new BytesWritable(source); // send the doc directly context.write(NullWritable.get(), jsonDoc); }}
写入动态/多资源
编辑正如预期的那样,在这种情况下,旧
和新
API 之间的差异最小(可以视为不存在)
Configuration conf = new Configuration(); conf.set("es.resource.write","my-collection-{media-type}/doc"); ...
从 Elasticsearch 读取数据
编辑以类似的方式,要从 Elasticsearch 读取数据,需要使用 org.elasticsearch.hadoop.mr.EsInputFormat
类。虽然它可以读取整个索引,但使用查询会更方便 - elasticsearch-hadoop 会自动实时执行查询,并将结果返回给 Hadoop。由于查询是针对真实数据执行的,因此这充当数据集的实时视图。
与其对应项 (EsOutputFormat
) 类似,EsInputFormat
为 Elasticsearch 返回的每个 JSON 文档返回 Map<Writable, Writable>
。由于 InputFormat
需要同时返回键和值,因此 EsInputFormat
将返回文档 ID(在 Elasticsearch 中)作为键(通常被忽略),并将文档/map 作为值。
如果需要保留从 Elasticsearch 返回的文档结构,请考虑使用 org.elasticsearch.hadoop.mr.LinkedMapWritable
。该类扩展了 Hadoop 的 MapWritable
(因此可以轻松替换它)并保留插入顺序;也就是说,当迭代 map 时,将按插入顺序返回条目(而不是不维护此顺序的 MapWritable
)。但是,由于 Hadoop 的工作方式,需要将 LinkedMapWritable
指定为作业 map 输出值(而不是 MapWritable
)。
旧的 (org.apache.hadoop.mapred
) API
编辑按照我们上面关于广播艺术家的示例,要获取所有以 me 开头的艺术家,可以使用以下代码片段
JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); conf.setInputFormat(EsInputFormat.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(MapWritable.class); ... JobClient.runJob(conf);
目标索引/类型 |
|
查询 |
|
专用的 |
|
|
|
使用 |
一个使用 EsInputFormat
的 Mapper
可能如下所示:
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
可以随意使用 Java 5 泛型来避免上述类型转换。为了清晰和可读性,本章中的示例不包含泛型。
新 的 (org.apache.hadoop.mapreduce
) API
编辑正如预期的那样,mapreduce
API 版本非常相似。
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists/"); conf.set("es.query", "?q=me*"); Job job = new Job(conf); job.setInputFormatClass(EsInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
以及 Mapper
的实现。
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
以 JSON 格式从 Elasticsearch 读取
编辑如果 Elasticsearch 的结果需要为 JSON 格式(通常是为了通过网络发送到其他系统),则可以指示 elasticsearch-hadoop 按原样返回数据。 通过将 es.output.json
设置为 true
,连接器将解析 Elasticsearch 的响应,识别文档,并且不进行转换,将其内容作为 Text
对象返回给用户。
Configuration conf = new Configuration(); conf.set("es.resource", "source/category"); conf.set("es.output.json", "true");
为读取和写入使用不同的索引
编辑有时,需要在同一个作业中从一个 Elasticsearch 资源读取数据,处理数据,然后将其写回到另一个资源。 es.resource
设置是不够的,因为它表示相同的资源既作为源又作为目标。 在这种情况下,应使用 es.resource.read
和 es.resource.write
来区分这两个资源(下面的示例使用 mapreduce API)。
Configuration conf = new Configuration(); conf.set("es.resource.read", "source/category"); conf.set("es.resource.write", "sink/group");
类型转换
编辑如果使用自动索引创建,请查看 此 部分以获取更多信息。
elasticsearch-hadoop 会自动将 Hadoop 内置的 Writable
类型转换为 Elasticsearch 字段类型(以及反向转换),如下表所示:
表 2. Writable
转换表
Writable |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
值得一提的是,只有 Elasticsearch 中可用的丰富数据类型,例如 GeoPoint
或 GeoShape
,可以通过将其结构转换为上表中可用的基本类型来支持。 例如,根据其存储,geo_point
可能作为 Text
(基本上是 String
)或 ArrayWritable
返回。