如何在PySpark UDF中解决类似匈牙利算法/线性求和分配的任务问题(包括特殊情况)

10

我有一个任务问题,并希望问问SO社区如何最好地为我的spark dataframe实现此任务(利用spark 3.1+)。我将首先描述问题,然后再进行实现。

这就是问题:我最多有N个任务和N个个体(在此问题的情况下,N=10)。每个个体执行每个任务的成本不同,其中最小成本为$0,最大成本为$10。这是一种具有某些注意事项的匈牙利算法问题。

  1. 有些情况下任务少于10个和/或个体少于10个,有人未被分配任务(或任务未被指派给任何人)是可以接受的。
  2. [更复杂的边缘情况/我遇到困难的情况] - 列表中可能会有一个标记为multiTask=True的任务(不能有多个multiTask,也可能没有)。如果一个工作人员对多任务的成本低于x,他将自动分配到多任务,并且多任务在优化期间被视为已分配。
    • 我将分享一些例子。在这个例子中,要分配给多任务的x值为1。
      • 如果10个人中有1个人在多任务上的成本为0.25,则他将被分配到多任务,然后其他9个人将被分配到其他9个任务
      • 如果10个人中有2个人在多任务上的成本<1,则两个人都会被分配到多任务,然后其他8个人将被分配到剩余的9个任务中的8个。1项任务将不会分配给任何人。
      • 如果所有10个工作人员在多任务上的成本<1,则所有工作人员都将被分配到多任务上。这很罕见,但可能发生。
      • 如果没有工作人员对多任务的成本<1,则在优化期间只会将多任务分配给一个人以最小化成本。

这是spark dataframe的样子。 注意: 我展示了一个简单的例子,其中N=3(3个任务,3个个体)。

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])

df = spark.createDataFrame(rdd)

你会看到有一个日期/位置,因为我需要解决每个日期/位置分组的作业问题。

我原本计划通过为每个工人和任务分配一个“索引”,使用 dense_rank(),然后使用 pandas UDF 填充基于这些索引的 N x N 的 numpy 数组,并调用 linear_sum_assignment 函数来解决问题。但是,由于我提出的有关多任务的第二种情况,我不认为这个计划能够奏效。

worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")

# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1) 
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)


def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
  df_dict = pandas_df.to_dict('records')
  # in case there are less than N rows/columns
  N = max(pandas_df.shape[0], pandas_df.shape[1])
  arr = np.zeros((N,N))
  for row in df_dict: 
    # worker_idx will be the row number, task idx will be the col number
    worker_idx = row.get('worker_idx')
    task_idx = row.get('task_idx')
    arr[worker_idx][task_idx] = row.get('cost')
  rids, cids = linear_sum_assignment(n)
  
  return_list = []
  # now want to return a dataframe that says which task_idx a worker has 
  for r, c in zip(rids, cids):
    for d in df_dict: 
      if d.get('worker_idx') == r:
        d['task_assignment'] = c
        return_list.append(d)
  return pd.DataFrame(return_list)
      
  
  
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)

df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))

正如您所看到的,这种情况根本没有涉及到多任务。我希望以最有效的方式解决这个问题,因此我不想被绑定在pandas udf或scipy上。

1个回答

1
我不了解你正在使用的库,所以无法帮助你编写代码,但我认为你应该分两步进行:
  1. 如果需要将工人分配到多任务中,则将其分配到多任务中。如果有人被分配到此任务,请勿在成本矩阵中包含此任务。
  2. 像往常一样使用匈牙利算法(或其他算法)将工人分配到任务中。
基本的匈牙利算法仅适用于方形成本矩阵,并且通过在成本矩阵中填充零来正确处理了它,但是有些算法的修改可以处理矩形矩阵。您可能想查看是否有这些替代算法可供使用,因为它可能会更快。

1
是的,那就是我计划采取的方法。然而,我遇到的问题是如何高效地实现它,因为如果我事先为每个人分配索引,然后必须删除分配给多任务和多任务(可能)的人,他们的行/列将变为0。 - Lauren Leder
我无法事先分配索引,只能在看到是否有人被分配到多任务后再进行分配,但这感觉需要很多循环。 - Lauren Leder
匈牙利算法的时间复杂度为O(n^3)或O(n^4),具体取决于实现方式。在该算法之外增加一个或两个额外的循环以减小n的大小是一个不错的权衡。 - Yay295

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接