Create Scalding Source like TextLine that combines multiple files into single mappers
我们有许多需要合并的小文件。在 Scalding 中,您可以使用 TextLine 将文件读取为文本行。问题是我们每个文件有 1 个映射器,但我们想组合多个文件,以便它们由 1 个映射器处理。
我知道我们需要将输入格式更改为 CombineFileInputFormat 的实现,这可能涉及使用级联 CombinedHfs。我们无法弄清楚如何做到这一点,但应该只需要几行代码来定义我们自己的名为 CombineTextLine.
的 Scalding 源
非常感谢任何可以提供代码的人。
作为一个附带问题,我们有一些在 s3 中的数据,如果给定的解决方案适用于 s3 文件,那就太好了 – 我想这取决于 CombineFileInputFormat 或 CombinedHfs 是否适用于 s3。
你在你的问题中得到了这个想法,所以这可能是你的解决方案。
创建您自己的输入格式,扩展 CombineFileInputFormat 并使用您自己的自定义 RecordReader。我正在向您展示 Java 代码,但如果您愿意,您可以轻松地将其转换为 scala。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileRecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> { public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> { public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException { @Override @Override @Override @Override @Override @Override @Override } |
然后你需要扩展 TextLine 类,让它使用你自己定义的输入格式(Scala 代码从现在开始)。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 |
import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf} import cascading.tap.Tap import com.twitter.scalding.{FixedPathSource, TextLineScheme} import cascading.scheme.Scheme class CombineFileTextLine extends TextLine{ override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) { |
为您的组合输入创建一个方案。
1
2 3 4 |
最后,创建你的源类:
1
|
如果您想使用单个路径而不是多个路径,则对源类的更改是微不足道的。
希望对你有帮助。
- 太好了,谢谢!!为什么默认情况下这不在 Scalding 中!无论如何,我尝试稍微编辑它以将自定义 InputFormat 名称更改为 CombinedInputFormat 因为它与方案 CombineFileTextLine TBH 具有相同的名称,当我第一次看到这个时,我感到很困惑。此外,我将添加导入,因为这也可能会导致问题(特别是 TextLine)。
这应该可以解决问题,伙计? – https://wiki.apache.org/hadoop/HowManyMapsAndReduces
- 谢谢,但这只会增加映射器的数量,而不是减少它,特别是当有很多小文件时。 “这可以用来增加map任务的数量,但不会设置低于Hadoop通过拆分输入数据确定的数量。”
来源:https://www.codenong.com/23917404/