如何在Spark中为输入文件定义多个自定义分隔符?

3

在通过Spark读取文件时,默认的输入文件分隔符是换行符(\n)。可以使用"textinputformat.record.delimiter"属性定义自定义分隔符。

但是,是否可以为同一个文件指定多个分隔符?

假设文件具有以下内容:

COMMENT,A,B,C
COMMENT,D,E,
F
LIKE,I,H,G
COMMENT,J,K,
L
COMMENT,M,N,O

我希望使用COMMENT和LIKE作为分隔符而不是换行符读取此文件。

尽管如此,如果在Spark中不允许使用多个分隔符,我已经想出了一种替代方法。

val ss = SparkSession.builder().appName("SentimentAnalysis").master("local[*]").getOrCreate()
val sc = ss.sparkContext
sc.hadoopConfiguration.set("textinputformat.record.delimiter", "COMMENT")
val rdd = sc.textFile("<filepath>")
val finalRdd = rdd.flatmap(f=>f.split("LIKE"))

但是,我认为拥有多个自定义分隔符会更好。在Spark中是否可能实现?还是我必须使用上述替代方案?

我认为只有通过创建自定义的TextInputFormat才能实现这一点。 - puhlen
创建自定义的TextInputFormat将有助于一次指定单个类型的分隔符。它能用于定义多个分隔符吗?您能否提供您所想的示例? - Vijay
你可以定义任何你想要使用的输入格式,但是你需要自己编写代码。我之前做过一次,很难找到一个好的教程,我在做的时候使用了这个答案 https://dev59.com/8V4c5IYBdhLWcg3wx8tU#27564697 作为指导。 - puhlen
谢谢@puhlen。您提供的帖子确实帮助我编写了具有两个分隔符的自定义文本输入格式。我会在这里发布答案。再次感谢您 :) - Vijay
1个回答

1
通过创建一个自定义的TextInputFormat类,可以解决上述问题,该类可根据两种分隔符字符串进行拆分。评论中@puhlen提供的帖子非常有帮助。以下是我使用的代码片段:
class CustomInputFormat extends TextInputFormat {
  override def createRecordReader(inputSplit: InputSplit,  taskAttemptContext: TaskAttemptContext): RecordReader[LongWritable, Text] = {
    return new ParagraphRecordReader();
  }
}
class ParagraphRecordReader extends RecordReader[LongWritable, Text] {
  var end: Long = 0L;
  var stillInChunk = true;

  var key = new LongWritable();
  var value = new Text();

  var fsin: FSDataInputStream = null;
  val buffer = new DataOutputBuffer();
  val tempBuffer1 = MutableList[Int]();
  val tempBuffer2 = MutableList[Int]();

  val endTag1 = "COMMENT".getBytes();
  val endTag2 = "LIKE".getBytes();

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def initialize(inputSplit: org.apache.hadoop.mapreduce.InputSplit, taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext) {
    val split = inputSplit.asInstanceOf[FileSplit];
    val conf = taskAttemptContext.getConfiguration();
    val path = split.getPath();
    val fs = path.getFileSystem(conf);

    fsin = fs.open(path);
    val start = split.getStart();
    end = split.getStart() + split.getLength();
    fsin.seek(start);

    if (start != 0) {
      readUntilMatch(endTag1, endTag2, false);
    }
  }

  @throws(classOf[IOException])
  override def nextKeyValue(): Boolean = {
    if (!stillInChunk) return false;

    val status = readUntilMatch(endTag1, endTag2, true);

    value = new Text();
    value.set(buffer.getData(), 0, buffer.getLength());
    key = new LongWritable(fsin.getPos());
    buffer.reset();

    if (!status) {
      stillInChunk = false;
    }

    return true;
  }

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def getCurrentKey(): LongWritable = {
    return key;

  }


  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def getCurrentValue(): Text = {
    return value;
  }

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def getProgress(): Float = {
    return 0;
  }

  @throws(classOf[IOException])
  override def close() {
    fsin.close();
  }

  @throws(classOf[IOException])
  def readUntilMatch(match1: Array[Byte], match2: Array[Byte], withinBlock: Boolean): Boolean = {
    var i = 0;
    var j = 0;
    while (true) {
      val b = fsin.read();
      if (b == -1) return false;

      if (b == match1(i)) {
        tempBuffer1.+=(b)
        i = i + 1;
        if (i >= match1.length) {
          tempBuffer1.clear()
          return fsin.getPos() < end;
        }
      } else if (b == match2(j)) {
        tempBuffer2.+=(b)
        j = j + 1;
        if (j >= match2.length) {
          tempBuffer2.clear()
          return fsin.getPos() < end;
        }
      } else {
        if (tempBuffer1.size != 0)
          tempBuffer1.foreach { x => if (withinBlock) buffer.write(x) }
        else if (tempBuffer2.size != 0)
          tempBuffer2.foreach { x => if (withinBlock) buffer.write(x) }
        tempBuffer1.clear()
        tempBuffer2.clear()
        if (withinBlock) buffer.write(b);
        i = 0;
        j = 0;
      }
    }
    return false;
  }

使用以下类从文件系统读取文件时,您的文件将按照所需的两个分隔符进行读取。 :)
val rdd = sc.newAPIHadoopFile("<filepath>", classOf[ParagraphInputFormat], classOf[LongWritable], classOf[Text], sc.hadoopConfiguration)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接