Scala Spark中的RDD过滤器

8

我有一个数据集,想要提取那些 (review/time) 在 x 和 y 之间的 (review/text),例如(1183334400 < time < 1185926400),

以下是我的部分数据:

product/productId: B000278ADA
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A17KXW1PCUAIIN
review/profileName: Mark Anthony "Mark"
review/helpfulness: 4/4
review/score: 5.0
review/time: 1174435200
review/summary: Jobst UltraSheer Knee High Stockings
review/text: Does a very good job of relieving fatigue.

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A9Q3932GX4FX8
review/profileName: Trina Wehle
review/helpfulness: 1/1
review/score: 3.0
review/time: 1352505600
review/summary: Delivery was very long wait.....
review/text: It took almost 3 weeks to recieve the two pairs of stockings .

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: AUIZ1GNBTG5OB
review/profileName: dgodoy
review/helpfulness: 1/1
review/score: 2.0
review/time: 1287014400
review/summary: sizes recomended in the size chart are not real
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!.

我的Spark-Scala代码:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}

object test1 {
  def main(args: Array[String]): Unit = {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)
    val conf: Configuration = new Configuration
    conf.set("textinputformat.record.delimiter", "product/title:")
    val input1=sc.newAPIHadoopFile("data/Electronics.txt",     classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
    val lines = input1.map { text => text._2}
    val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate))))
    filt.saveAsTextFile("data/filter1")
  }
}

但是我的代码运行不好,

我该如何筛选这些行?


1
我在您的输入文件中没有看到分隔符字符串“product / productId:”。 - Nikita
1
你期望什么输出结果?你遇到了什么问题? - maasg
1个回答

11

这其实很简单。试试这个方法:

object test1 
{
  def main(args: Array[String]): Unit = 
  {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)

    def extractDateAndCompare(line: String): Boolean=
    {
        val from = line.indexOf("/time: ") + 7
        val to = line.indexOf("review/text: ") -1
        val date = line.substring(from, to).toLong
        date > startDate && date < endDate
    }

    sc.textFile("data/Electronics.txt")
        .filter(extractDateAndCompare)
        .saveAsTextFile("data/filter1")
  }
}

我通常发现那些中间辅助方法可以使事情更清晰。当然,这假设边界日期在某处被定义并且输入文件包含格式问题。我故意这样做是为了保持简单, 但是如果你遇到错误,添加try、返回Option子句和使用flatMap()可以帮助你避免错误。

另外,您的原始文本有点繁琐,您可能需要探索Json、TSV文件或其他一些更简单的格式。


注意,我是从零开始编写的,可能会有一些索引等细节方面的小问题。但我希望你能理解我的意思。 - Daniel Langdon
亲爱的丹尼尔,我这里有一个1千兆字节的评论数据集,以下是我的数据集示例:产品/产品ID:B000278ADA 产品/标题:Jobst Ultr 产品/价格:46.34 评论/用户ID:A1ZJAH4 评论/用户名:jud doolitt 评论/帮助度:0/0 评论/评分:5.0 评论/时间:1359936000 评论/摘要:一站式购物 评论/内容:能够找到你想要的东西真是太棒了。我想提取在某个时间段内的评论内容,例如我想提取2002年的评论内容。为此,我编写了上述代码,将每条完整的评论数据视为RDD的记录。 - Esmaeil zahedi
哦,我看到你更新了示例文本。这意味着每个“记录”会生成多行? - Daniel Langdon
你的代码在那些行中确实是有意义的。但是,一旦你有了一个RDD[String],这个解决方案应该可以工作,那么你到底遇到了什么问题? - Daniel Langdon
如果它解决了你的问题,你应该接受它,这有助于寻找未答复问题的人。欢迎来到本网站! - Daniel Langdon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接