一个专注于大数据技术架构与应用分享的技术博客

MapReduce数据输入中InputFormat类源码解析

在Hadoop中,MapReduce作业通常需要对大量数据进行处理,因此需要读取和写入数据。为了使开发者能够方便地处理不同类型格式的数据(如文本、CSV、JSON等),Hadoop提供了一系列的InputFormat类,其中每个类用于处理特定格式的数据。

InputFormat定义了每个InputSplit的边界以及它们由哪个Mapper处理,同时还定义了如何将输入数据转换为键-值对(key-value pairs)。在此过程中,Mapper必须根据特定格式解析数据。接下来我们以TextInputFormat为例,简单地介绍一下InputFormat类的源码实现。

TextInputFormat是一种将文本数据作为输入的InputFormat类。其主要的目的是读取文本文件,并将每一行数据转化为一个键值对。它使用了LineRecordReader来实现每行数据的读取。具体的实现方式如下:

首先,定义了一个createRecordReader方法。该方法用于创建LineRecordReader对象,该对象将用于分析输入文件并返回每个记录:

  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(FORMAT_DELIMITER_KEY, DEFAULT_DELIMITER);
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

该方法的返回值为一个RecordReader对象,其中模板参数的LongWritable表示键值对的键类型(即行的偏移量),Text表示值类型(即文件中的行)。

在createRecordReader方法中,首先通过context.getConfiguration()方法获取了作业配置信息。通过get()方法获取了FORMAT_DELIMITER_KEY的值(即“textinputformat.record.delimiter”配置项的值),如果该项没有配置则返回默认值“
”。接下来将delimiter转成UTF-8的字节数组,并存储在变量recordDelimiterBytes中。最后创建了一个LineRecordReader对象,并将recordDelimiterBytes作为参数传递给构造函数。

接下来,TextInputFormat还需要实现InputFormat接口的两个方法:getSplits和getRecordReader。它们的实现方式如下:

  /**
   * Logically splits the set of input files for the job, splits N lines
   * of the input as one split.
   *
   * @see FileInputFormat#getSplits(JobContext)
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    for (FileStatus status : listStatus(job)) {
      splits.addAll(getSplitsForFile(status, job.getConfiguration()));
    }
    return splits;
  }

该方法中,先通过listStatus(job)方法获取了所有输入文件。接着调用了getSplitsForFile(status, job.getConfiguration())方法,该方法使用RecordReader读取数据并对数据进行处理,最后返回了一个List类型的splits,表示输入文件被划分成的若干个输入分片。


  /**
   * Generate the list of files and make them into FileSplits.
   */
  public List getSplitsForFile(FileStatus status, Configuration conf) 
      throws IOException {
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(conf));
    long maxSize = getMaxSplitSize(conf);

    // generate splits
    List splits = new ArrayList();
    Path fileName = status.getPath();
    if (status.isDirectory()) {
      for (FileStatus child : fs.listStatus(fileName)) {
        splits.addAll(getSplitsForFile(child, conf));
      }
    } else {
      long length = status.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(status, 0, length);
      if ((length != 0) && isSplitable(fs, fileName)) {
        long blockSize = status.getBlockSize();
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length - bytes
赞(0)
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《MapReduce数据输入中InputFormat类源码解析》
文章链接:https://macsishu.com/graphs-of-datinput-inputformat-class-source-code-parsing
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。