关于scala:创建像TextLine这样的Scalding Source,将多个文件组合成单个映射器 | 珊瑚贝

Create Scalding Source like TextLine that combines multiple files into single mappers


我们有许多需要合并的小文件。在 Scalding 中,您可以使用 TextLine 将文件读取为文本行。问题是我们每个文件有 1 个映射器,但我们想组合多个文件,以便它们由 1 个映射器处理。

我知道我们需要将输入格式更改为 CombineFileInputFormat 的实现,这可能涉及使用级联 CombinedHfs。我们无法弄清楚如何做到这一点,但应该只需要几行代码来定义我们自己的名为 CombineTextLine.

的 Scalding 源

非常感谢任何可以提供代码的人。

作为一个附带问题,我们有一些在 s3 中的数据,如果给定的解决方案适用于 s3 文件,那就太好了 – 我想这取决于 CombineFileInputFormat 或 CombinedHfs 是否适用于 s3。


你在你的问题中得到了这个想法,所以这可能是你的解决方案。

创建您自己的输入格式,扩展 CombineFileInputFormat 并使用您自己的自定义 RecordReader。我正在向您展示 Java 代码,但如果您愿意,您可以轻松地将其转换为 scala。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {

    public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
        private final RecordReader<LongWritable,Text> delegate;

        public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
            FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
            delegate = new LineRecordReader(conf, fileSplit);
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException {
            return delegate.next(key, value);
        }

        @Override
        public LongWritable createKey() {
            return delegate.createKey();
        }

        @Override
        public Text createValue() {
            return delegate.createValue();
        }

        @Override
        public long getPos() throws IOException {
            return delegate.getPos();
        }

        @Override
        public void close() throws IOException {
            delegate.close();
        }

        @Override
        public float getProgress() throws IOException {
            return delegate.getProgress();
        }
    }

    @Override
    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
    }

}

然后你需要扩展 TextLine 类,让它使用你自己定义的输入格式(Scala 代码从现在开始)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
import cascading.tap.Tap
import com.twitter.scalding.{FixedPathSource, TextLineScheme}
import cascading.scheme.Scheme

class CombineFileTextLine extends TextLine{

  override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
    super.sourceConfInit(flowProcess, tap, conf)
    conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
  }
}

为您的组合输入创建一个方案。

1
2
3
4
trait CombineFileTextLineScheme extends TextLineScheme{

  override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}

最后,创建你的源类:

1
case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme

如果您想使用单个路径而不是多个路径,则对源类的更改是微不足道的。

希望对你有帮助。

  • 太好了,谢谢!!为什么默认情况下这不在 Scalding 中!无论如何,我尝试稍微编辑它以将自定义 InputFormat 名称更改为 CombinedInputFormat 因为它与方案 CombineFileTextLine TBH 具有相同的名称,当我第一次看到这个时,我感到很困惑。此外,我将添加导入,因为这也可能会导致问题(特别是 TextLine)。


这应该可以解决问题,伙计? – https://wiki.apache.org/hadoop/HowManyMapsAndReduces

  • 谢谢,但这只会增加映射器的数量,而不是减少它,特别是当有很多小文件时。 “这可以用来增加map任务的数量,但不会设置低于Hadoop通过拆分输入数据确定的数量。”


来源:https://www.codenong.com/23917404/

微信公众号
手机浏览(小程序)

Warning: get_headers(): SSL operation failed with code 1. OpenSSL Error messages: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57

Warning: get_headers(): Failed to enable crypto in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57

Warning: get_headers(https://static.shanhubei.com/qrcode/qrcode_viewid_9296.jpg): failed to open stream: operation failed in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57
0
分享到:
没有账号? 忘记密码?