在Hadoop中,MapReduce作业通常需要对大量数据进行处理,因此需要读取和写入数据。为了使开发者能够方便地处理不同类型格式的数据(如文本、CSV、JSON等),Hadoop提供了一系列的InputFormat类,其中每个类用于处理特定格式的数据。
InputFormat定义了每个InputSplit的边界以及它们由哪个Mapper处理,同时还定义了如何将输入数据转换为键-值对(key-value pairs)。在此过程中,Mapper必须根据特定格式解析数据。接下来我们以TextInputFormat为例,简单地介绍一下InputFormat类的源码实现。
TextInputFormat是一种将文本数据作为输入的InputFormat类。其主要的目的是读取文本文件,并将每一行数据转化为一个键值对。它使用了LineRecordReader来实现每行数据的读取。具体的实现方式如下:
首先,定义了一个createRecordReader方法。该方法用于创建LineRecordReader对象,该对象将用于分析输入文件并返回每个记录:
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(FORMAT_DELIMITER_KEY, DEFAULT_DELIMITER);
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
该方法的返回值为一个RecordReader对象,其中模板参数的LongWritable表示键值对的键类型(即行的偏移量),Text表示值类型(即文件中的行)。
在createRecordReader方法中,首先通过context.getConfiguration()方法获取了作业配置信息。通过get()方法获取了FORMAT_DELIMITER_KEY的值(即“textinputformat.record.delimiter”配置项的值),如果该项没有配置则返回默认值“
”。接下来将delimiter转成UTF-8的字节数组,并存储在变量recordDelimiterBytes中。最后创建了一个LineRecordReader对象,并将recordDelimiterBytes作为参数传递给构造函数。
接下来,TextInputFormat还需要实现InputFormat接口的两个方法:getSplits和getRecordReader。它们的实现方式如下:
/**
* Logically splits the set of input files for the job, splits N lines
* of the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status, job.getConfiguration()));
}
return splits;
}
该方法中,先通过listStatus(job)方法获取了所有输入文件。接着调用了getSplitsForFile(status, job.getConfiguration())方法,该方法使用RecordReader读取数据并对数据进行处理,最后返回了一个List类型的splits,表示输入文件被划分成的若干个输入分片。
/**
* Generate the list of files and make them into FileSplits.
*/
public List getSplitsForFile(FileStatus status, Configuration conf)
throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(conf));
long maxSize = getMaxSplitSize(conf);
// generate splits
List splits = new ArrayList();
Path fileName = status.getPath();
if (status.isDirectory()) {
for (FileStatus child : fs.listStatus(fileName)) {
splits.addAll(getSplitsForFile(child, conf));
}
} else {
long length = status.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(status, 0, length);
if ((length != 0) && isSplitable(fs, fileName)) {
long blockSize = status.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length - bytes