Map/Reduce 集成
编辑Map/Reduce 集成编辑
对于低级或性能敏感的环境,elasticsearch-hadoop 提供了专用的 InputFormat
和 OutputFormat
实现,可以读取和写入 Elasticsearch 的数据。在 Map/Reduce 中,Mapper
和 Reducer
正在读取和写入 Writable
对象,这是一种针对序列化优化的 Hadoop 特定接口。因此,elasticsearch-hadoop InputFormat
和 OutputFormat
将返回并期望 MapWritable
对象;每个要读取或写入的文档都使用一个映射。映射本身可以具有任何类型的内部结构,只要其对象也是 Writable
- 它可以在其 Writable
表示中保存嵌套映射、数字或字符串。在内部,elasticsearch-hadoop 会自动将 Writable
的 Map
转换为 JSON 文档,反之亦然,因此您无需处理低级解析或与 JSON 之间的转换。此外,如果发送到 Elasticsearch 的数据已经是 JSON 格式,则可以直接流式传输,而无需转换为 Writable
对象。阅读本章的其余部分以了解更多信息。
安装编辑
为了使用 elasticsearch-hadoop,需要将 jar 提供给作业类路径。在约 250kB
且没有任何依赖项的情况下,jar 可以捆绑在作业存档中,手动或通过 CLI 通用选项(如果您的 jar 实现了 Tool 接口),通过 Hadoop 的 分布式缓存 分发,或者通过手动配置集群来提供。
以上所有选项仅影响在分布式节点上运行的代码。如果启动 Hadoop 作业的代码引用了 elasticsearch-hadoop,请确保在 HADOOP_CLASSPATH
中包含 JAR:HADOOP_CLASSPATH="<colon-separated-paths-to-your-jars-including-elasticsearch-hadoop>"
CLI 示例。
$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar
配置编辑
在 Map/Reduce 作业中使用 elasticsearch-hadoop 时,可以使用 Hadoop 的 Configuration
对象通过将各种选项设置为上述对象上的属性来配置 elasticsearch-hadoop。通常,您将设置 Elasticsearch 主机和端口(假设它没有在默认的 localhost:9200
上运行)、目标索引/类型,以及查询,例如
Configuration conf = new Configuration(); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); ...
Elasticsearch 集群中的一个节点,elasticsearch-hadoop 将连接到该节点。默认情况下,elasticsearch-hadoop 将检测集群中的其他节点。 |
|
elasticsearch-hadoop 将用于读取和写入数据的 |
在构建 Hadoop 作业时,只需使用配置对象,您就完成了所有设置。
将数据写入 Elasticsearch编辑
使用 elasticsearch-hadoop,Map/Reduce 作业可以将数据写入 Elasticsearch,使其可以通过 索引 进行搜索。elasticsearch-hadoop 支持(所谓的)旧 和 新 Hadoop API。
EsOutputFormat
期望一个 Map<Writable, Writable>
,表示一个文档值,该值在内部转换为 JSON 文档并在 Elasticsearch 中索引。Hadoop OutputFormat
要求实现期望一个键和一个值,但是,由于 Elasticsearch 仅需要文档(即值),因此 EsOutputFormat
会忽略键。
旧 (org.apache.hadoop.mapred
) API编辑
要将数据写入 ES,请在您的作业中使用 org.elasticsearch.hadoop.mr.EsOutputFormat
以及相关的配置 属性
JobConf conf = new JobConf(); conf.setSpeculativeExecution(false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); conf.setOutputFormat(EsOutputFormat.class); conf.setMapOutputValueClass(MapWritable.class); conf.setMapperClass(MyMapper.class); ... JobClient.runJob(conf);
Mapper
实现可以使用 EsOutputFormat
,如下所示
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // create the MapWritable object MapWritable doc = new MapWritable(); ... // write the result to the output collector // one can pass whatever value to the key; EsOutputFormat ignores it output.collect(NullWritable.get(), map); }}
对于需要指定文档 ID(或其他元数据字段,如 ttl
或 timestamp
)的情况,可以通过设置相应的 映射(即 es.mapping.id
)来实现。因此,假设文档包含一个名为 radioId
的字段,该字段是唯一的,适合用作标识符,您可以按如下方式更新作业配置
JobConf conf = new JobConf(); conf.set("es.mapping.id", "radioId");
在运行时,elasticsearch-hadoop 将从每个文档中提取值,并在批量调用期间相应地使用它。
将现有 JSON 写入 Elasticsearch编辑
对于作业输入数据已经是 JSON 的情况,elasticsearch-hadoop 允许直接索引而无需应用任何转换;数据按原样获取并直接发送到 Elasticsearch。在这种情况下,需要通过设置 es.input.json
参数来指示 json 输入。因此,在这种情况下,elasticsearch-hadoop 期望 Text
或 BytesWritable
(首选,因为它不需要 String
转换)对象作为输出;如果未使用这些类型,库将简单地回退到目标对象的 toString
表示形式。
表 1. 用于 JSON 表示的 Writable
Writable |
注释 |
---|---|
|
当 JSON 数据表示为 |
|
如果 JSON 数据表示为 |
其他任何内容 |
确保 |
确保数据已正确编码,使用 UTF-8
。作业输出被视为发送到 Elasticsearch 的文档的最终形式。
JobConf conf = new JobConf(); conf.set("es.input.json", "yes"); conf.setMapOutputValueClass(Text.class); ... JobClient.runJob(conf);
Mapper
实现变为
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // assuming the document is a String called 'source' String source = ... Text jsonDoc = new Text(source); // send the doc directly output.collect(NullWritable.get(), jsonDoc); }}
写入动态/多资源编辑
对于需要将数据索引到不同桶(基于数据内容)的情况,可以使用 es.resource.write
字段,该字段接受在运行时从文档内容解析的模式。按照上述 媒体示例,可以将其配置如下
JobConf conf = new JobConf(); conf.set("es.resource.write","my-collection-{media-type}/doc");
如果使用 Writable
对象,对于每个 MapWritable
,elasticsearch-hadoop 将提取 media-type
键下的值,并将其用作 Elasticsearch 索引后缀。如果使用原始 JSON,则 elasticsearch-hadoop 将解析文档,提取字段 media-type
并相应地使用其值。
新 (org.apache.hadoop.mapreduce
) API编辑
使用新 API 非常相似 - 事实上,使用了完全相同的类 (org.elasticsearch.hadoop.mr.EsOutputFormat
)
Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); Job job = new Job(conf); job.setOutputFormatClass(EsOutputFormat.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
对于 Mapper
实例也是如此
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // create the MapWritable object MapWritable doc = new MapWritable(); ... context.write(NullWritable.get(), doc); }}
指定 ID 或其他文档 元数据 也很容易
Configuration conf = new Configuration(); conf.set("es.mapping.id", "radioId");
将现有 JSON 写入 Elasticsearch编辑
如前所述,在处理 JSON 时,在新 API 下,配置如下所示
Configuration conf = new Configuration(); conf.set("es.input.json", "yes"); Job job = new Job(conf); job.setMapOutputValueClass(BytesWritable.class); ... job.waitForCompletion(true);
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // assuming the document is stored as bytes byte[] source = ... BytesWritable jsonDoc = new BytesWritable(source); // send the doc directly context.write(NullWritable.get(), jsonDoc); }}
写入动态/多资源编辑
正如预期的那样,旧
和 新
API 之间的差异在这种情况下也是最小的(可以理解为不存在)
Configuration conf = new Configuration(); conf.set("es.resource.write","my-collection-{media-type}/doc"); ...
从 Elasticsearch 读取数据编辑
类似地,要从 Elasticsearch 读取数据,需要使用 org.elasticsearch.hadoop.mr.EsInputFormat
类。虽然它可以读取整个索引,但使用查询更方便 - elasticsearch-hadoop 会自动实时执行查询并返回结果到 Hadoop。由于查询针对真实数据执行,因此它充当数据集的实时视图。
就像它的对应类 (EsOutputFormat
) 一样,EsInputFormat
为 Elasticsearch 返回的每个 JSON 文档返回一个 Map<Writable, Writable>
。由于 InputFormat
要求返回键和值,EsInputFormat
将返回文档 ID(在 Elasticsearch 中)作为键(通常被忽略)以及文档/映射作为值。
如果需要保留从 Elasticsearch 返回的文档结构,请考虑使用 org.elasticsearch.hadoop.mr.LinkedMapWritable
。该类扩展了 Hadoop 的 MapWritable
(因此可以轻松替换它)并保留插入顺序;也就是说,在迭代映射时,条目将按插入顺序返回(与不维护插入顺序的 MapWritable
相反)。但是,由于 Hadoop 的工作方式,需要将 LinkedMapWritable
指定为作业映射输出值(而不是 MapWritable
)。
旧 (org.apache.hadoop.mapred
) APIedit
以下我们上面关于电台艺人的示例,要获取所有以“me”开头的艺人,可以使用以下代码片段
JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); conf.setInputFormat(EsInputFormat.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(MapWritable.class); ... JobClient.runJob(conf);
目标索引/类型 |
|
查询 |
|
专用 |
|
|
|
|
使用 EsInputFormat
的 Mapper
可能如下所示
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
随意使用 Java 5 泛型来避免上面的强制转换。为了清晰和可读性,本章中的示例不包括泛型。
新 (org.apache.hadoop.mapreduce
) APIedit
正如预期的那样,mapreduce
API 版本非常相似
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists/"); conf.set("es.query", "?q=me*"); Job job = new Job(conf); job.setInputFormatClass(EsInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
以及 Mapper
实现
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
以 JSON 格式从 Elasticsearch 读取数据edit
在需要以 JSON 格式从 Elasticsearch 获取结果(通常是将其发送到其他系统)的情况下,可以指示 elasticsearch-hadoop 按原样返回数据。通过将 es.output.json
设置为 true
,连接器将解析来自 Elasticsearch 的响应,识别文档,并在不转换它们的情况下将其内容作为 Text
对象返回给用户
Configuration conf = new Configuration(); conf.set("es.resource", "source/category"); conf.set("es.output.json", "true");
在同一个作业中使用不同的索引进行读写edit
有时,需要从一个 Elasticsearch 资源读取数据,对其进行处理,然后将其写入同一个作业中的另一个资源。 es.resource
设置不够,因为它意味着同一个资源既是源又是目标。在这种情况下,应该使用 es.resource.read
和 es.resource.write
来区分这两个资源(下面的示例使用 mapreduce API)
Configuration conf = new Configuration(); conf.set("es.resource.read", "source/category"); conf.set("es.resource.write", "sink/group");
类型转换edit
如果使用自动索引创建,请查看 此 部分以获取更多信息。
elasticsearch-hadoop 会自动将 Hadoop 内置的 Writable
类型转换为 Elasticsearch 字段类型(反之亦然),如下表所示
表 2. Writable
转换表
Writable |
Elasticsearch 类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
值得一提的是,仅在 Elasticsearch 中可用的丰富数据类型,例如 GeoPoint
或 GeoShape
通过将其结构转换为上表中可用的基元来支持。例如,基于其存储,geo_point
可能作为 Text
(基本上是 String
)或 ArrayWritable
返回。