博客
关于我
【MapReduce】基础案例 ---- 自定义OutputFormat <根据内容输出到指定文件目录中>
阅读量:318 次
发布时间:2019-03-04

本文共 4040 字,大约阅读时间需要 13 分钟。

MR输出格式实现

在MapReduce(MR)框架中,OutputFormat是输出的基类,所有实现该接口的类都可以作为MR任务的输出格式。不同输出格式有不同的适用场景和特点。

常见的OutputFormat实现类

以下是几种常见的OutputFormat实现及其特点:

1. 文本输出(TestOutputFormat)

TestOutputFormat是默认的输出格式,它将每条记录写为文本行。其特点如下:

  • 支持任意键值类型,默认会调用toString()方法将键值转换为字符串
  • 输出格式紧凑,便于压缩和处理

2. SequenceFileOutputFormat

SequenceFileOutputFormat是一种紧凑的输出格式,通常用于作为后续MR任务的输入。其特点如下:

  • 输出格式紧凑,适合压缩
  • 适合作为后续MR任务的输入

3. 自定义OutputFormat

根据具体需求,可以实现自定义的OutputFormat。以下是自定义OutputFormat的实现步骤:

  • 继承自FileOutputFormat,并设置通用类型参数
  • 重写RecordWriter方法,创建自定义输出流
  • 根据需求定义输出规则,例如筛选输出
  • 确保正确关闭IO流

自定义OutputFormat示例

以下是一个自定义FilterOutputFormat的实现示例:

public class FilterOutputFormat extends FileOutputFormat {      @Override      public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {          return new FilterRecordWriter(job);      }  }

FilterRecordWriter的实现如下:

public class FilterRecordWriter extends RecordWriter {      private FSDataOutputStream fosAtguigu;      private FSDataOutputStream fosOther;      public FilterRecordWriter(TaskAttemptContext job) throws IOException {          try {              FileSystem fs = FileSystem.get(job.getConfiguration());              Path pathAtguigu = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\atguigu.log");              fosAtguigu = fs.create(pathAtguigu);              Path pathOther = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\other.log");              fosOther = fs.create(pathOther);          } catch (Exception e) {              e.printStackTrace();          }      }      @Override      public void write(Text key, NullWritable value) throws IOException, InterruptedException {          String keyStr = key.toString();          if (keyStr.contains("atguigu")) {              fosAtguigu.write(keyStr.getBytes());          } else {              fosOther.write(keyStr.getBytes());          }      }      @Override      public void close(TaskAttemptContext context) throws IOException, InterruptedException {          IOUtils.closeStream(fosAtguigu);          IOUtils.closeStream(fosOther);      }  }

Mapper阶段

在Mapper阶段,主要任务是读取数据并输出。以下是FilterMapper的实现:

public class FilterMapper extends Mapper {      @Override      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {          context.write(value, NullWritable.get());      }  }

Reducer阶段

在Reducer阶段,主要任务是将读取的数据进行聚合并输出。以下是FilterReducer的实现:

public class FilterReducer extends Reducer, Text, NullWritable> {      @Override      protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {          String line = key.toString() + "\r\n";          Text k = new Text();          k.set(line);          for (NullWritable value : values) {              context.write(k, NullWritable.get());          }      }  }

Driver阶段

Driver阶段负责配置任务并提交Job。以下是FilterDriver的实现:

public class FilterDriver {      public static void main(String[] args) throws IOException, InterruptedException {          Configuration conf = new Configuration();          Job job = Job.getInstance(conf);          job.setMapperClass(FilterMapper.class);          job.setReducerClass(FilterReducer.class);          job.setJarByClass(FilterDriver.class);          job.setMapOutputKeyClass(Text.class);          job.setMapOutputValueClass(NullWritable.class);          job.setOutputKeyClass(Text.class);          job.setOutputValueClass(NullWritable.class);          job.setOutputFormatClass(FilterOutputFormat.class);          FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\log.txt"));          FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Filteroutput"));          boolean result = job.waitForCompletion(true);          System.exit(result ? 0 : 1);      }  }

注意事项

在配置Job时,虽然我们自定义了OutputFormat,但由于FileOutputFormat会默认输出_SUCCESS文件,因此需要手动指定输出目录。

转载地址:http://vueq.baihongyu.com/

你可能感兴趣的文章
OpenCV中的监督学习
查看>>
opencv中读写视频
查看>>
opencv之cv2.findContours和drawContours(python)
查看>>
opencv之namedWindow,imshow出现两个窗口
查看>>
opencv之模糊处理
查看>>
Opencv介绍及opencv3.0在 vs2010上的配置
查看>>
OpenCV使用霍夫变换检测图像中的形状
查看>>
opencv保存图片路径包含中文乱码解决方案
查看>>
OpenCV保证输入图像为三通道
查看>>
OpenCV入门教程(非常详细)从零基础入门到精通,看完这一篇就够了
查看>>
opencv图像分割2-GMM
查看>>
opencv图像分割3-分水岭方法
查看>>
opencv图像切割1-KMeans方法
查看>>
OpenCV图像处理篇之阈值操作函数
查看>>
opencv图像特征融合-seamlessClone
查看>>
OpenCV图像的深浅拷贝
查看>>
OpenCV学习(13) 细化算法(1)(转)
查看>>
OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波
查看>>
OpenCV学堂 | CV开发者必须懂的9种距离度量方法,内含欧氏距离、切比雪夫距离等(建议收藏)
查看>>
OpenCV学堂 | OpenCV案例 | 基于轮廓分析对象提取
查看>>