如何在Apache Spark中使用Scala或Python运行多线程作业?

4
我正在面对与Spark中并发相关的问题,这阻止了我在生产中使用它,但我知道有一种方法可以解决。我正在尝试在订单历史记录上为10亿个产品中的700万个用户运行Spark ALS。首先,我正在获取一个不同用户列表,然后在这些用户上运行循环以获取推荐结果,这是非常缓慢的过程,需要数天才能为所有用户获取推荐结果。我尝试使用笛卡尔积获取所有用户和产品的推荐结果,但是再次将其提供给Elasticsearch时,我必须为每个用户过滤和排序记录,然后才能将其提供给其他API进行消费。
因此,请建议一个在这种情况下非常可扩展且可用于实时推荐的解决方案。
以下是我的Scala代码片段,它将帮助您了解我当前如何解决这个问题:
  //    buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
  def recommend_for_user(user: Int): Unit = {
      println("Recommendations for User ID: " + user);
      // Product IDs which are not bought by user 
      val candidates = buys_values
        .filter(x => x("customer_id").toString.toInt != user)
        .map(x => x("product_id").toString.toInt)
        .distinct().map((user, _))
      // find 30 products with top rating
      val recommendations = bestModel.get
        .predict(candidates)
        .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))

      var i = 1
      var ESMap = Map[String, String]()
      recommendations.foreach { r =>
        ESMap += r.product.toString -> bitem_ids.value(r.product)
      }
      //  push to elasticsearch with user as id
      client.execute {
        index into "recommendation" / "items" id user fields ESMap
      }.await
      // remove candidate RDD from memory
      candidates.unpersist()
  }
  // iterate on each user to get recommendations for the user [slow process]
  user_ids.foreach(recommend_for_user)

你的并发问题在哪个层面上? - eliasah
Spark和Scala实现并行化的一种方式是使用并行集合。从您的代码中,我无法确定您是否正在使用可以并行化的数据结构,但也许有一种方法可以做到这一点。例如,buy_values是什么样的数据结构?由于您正在过滤和映射其组件,因此它似乎必须是某种集合,并且可能可以使用parbuy_values = sc.parallelize(buy_values)进行并行化。请参见https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds。 - user4322779
buy_values是Spark RDD,但user_ids是一个列表。遍历user_ids需要时间。因此,这段代码给我每秒3个用户的推荐,速度很慢。如果我可以并行处理多个用户,无论是在同一Spark上下文中使用多线程还是通过其他方式,我们就可以使其扩展。 - Suraj
2个回答

2
很明显,你的程序瓶颈在于对“candidates”进行搜索。考虑到Spark的架构,这严重限制了你的并行能力,并增加了每个用户启动Spark作业的大量开销。
假设典型场景是有700万个用户和10亿个产品,大部分时间你会预测除用户已购买的几个产品之外的整个产品范围。至少在我看来,一个重要问题是为什么要费心筛选。即使你推荐已经购买过的产品,真的有害吗?
除非你有非常严格的要求,否则我建议忽略这个问题,直接使用MatrixFactorizationModel.recommendProductsForUsers,它基本上可以为你完成所有工作,除了数据导出。之后,你可以执行批量导出,就可以了。
现在假设你有一个明确的无重复策略。在假定一个典型用户只购买了相对较少的产品的情况下,你可以开始获取每个用户的产品集合:
val userProdSet = buy_values
    .map{case (user, product, _) => (user, product)} 
    .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)

接下来,您可以简单地将userProdSet映射以获得预测:
// Number of predictions for each user
val nPred = 30;

userProdSet.map{case (user, prodSet) => {
    val recommended = model
         // Find recommendations for user
        .recommendProducts(_, nPred + prodSet.size))
        // Filter to remove already purchased 
        .filter(rating => !prodSet.contains(rating.product))
        // Sort and limit
        .sortBy(_.rating)
        .reverse
        .take(nPred)
    (user, recommended)
}}

您可以进一步改进,通过使用可变集合进行聚合,并通过广播模型来实现,但这只是一个概念性的想法。
如果在user_ids中的用户数量小于整个集合(buy_values)中的用户数量,则可以简单地过滤userProdSet以仅保留用户的子集。

0

1.4版本新增了recommendAll功能,可以生成所有推荐结果并通过kv存储进行服务。


你能指出文档/代码在哪里吗?有一个私有方法recommendForAllrecommendProductsForUsers使用,我已经提到过了,但我还没有看到recommendAll - zero323
每当重新运行ALS(例如每小时)时,请使用recommendProductsForUsers,并将推荐上传到服务后端。我假设您正在使用类似solr / elasticsearch或hbase / cassandra的文档数据存储来存储用于服务的推荐...看起来您正在使用elasticsearch来提供结果...前k个结果已经排序,所以您不应该进行任何排序...您只需要保留用户点击了哪个项目的标志,然后推荐就会过时...您应该过滤掉过时的推荐,直到下一个版本的ALS运行。 - Debasish Das

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接