带有子集匹配维度的区间树?

21

这是一个关于相对复杂问题的算法问题。其基础如下:

基于 可用时间段预订时间段 的排班系统。时间段具有某些标准,我们称之为标签。如果可用时间段的标签集是预订时间段的超集,则预订将与可用时间段匹配。

以具体情况为例,考虑以下场景:

11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+
在11:00到12:30之间,可以预订标签AB,12:00到13:30提供CD,大约在12:00到12:30之间有重叠。
11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+
  xxxxxx
  x A  x
  xxxxxx

这里已经为 A 进行了预订,因此在 11:15-ish 至 12:00-ish 之间不能进行其他 AB 的预订。

简而言之,没有可用时段的具体限制:

  • 可用时段可以包含任意数量的标签
  • 任意数量的时段可以重叠在任何时间
  • 时段长度是任意的
  • 预订可以包含任意数量的标签

该系统中唯一需要遵守的规则是:

  • 添加预订时,至少有一个剩余的可用时段必须与预订中的所有标签匹配

为了澄清:当同时存在两个带有标签A的可用时段时,在那个时间可以进行两个A的预订,但不能再多了。

我已经使用修改过的区间树实现了这个功能; 简单概述如下:

  • 所有可用时段都添加到区间树中(重复/重叠保留)
  • 遍历所有已预定的时段并:
    • 从树中查询与预订时间匹配的所有可用时段
    • 切割其中与预订标签匹配的第一个时段,并将该切片从树中删除

该过程结束后,剩下的是可用时段的剩余切片,我可以查询特定时间是否可以进行新的预订并添加它。

数据结构看起来像这样:

{
  type: 'available', 
  begin: 1497857244, 
  end: 1497858244, 
  tags: [{ foo: 'bar' }, { baz: 42 }]
}
{
  type: 'reserved', 
  begin: 1497857345, 
  end: 1497857210, 
  tags: [{ foo: 'bar' }]
}

标签本身就是键值对象,它们的列表称为“标签集”。如果有帮助可以对它们进行序列化;到目前为止我使用的是Python set类型,这使得比较非常容易。时间槽的开始/结束时间是树内的UNIX时间戳。我不特别依赖这些具体的数据结构,如果有用的话可以重构。


我面临的问题是,这并不是完全无错的;每隔一段时间会有一个预订会偷偷溜进系统,与其他预订发生冲突,我还不能确定它究竟是如何发生的。当标记以复杂的方式重叠时,最佳分配需要计算,以使所有预订尽可能适合可用插槽;实际上,目前在重叠情况下,预订如何匹配可用插槽是不确定性的。

我想知道的是:间隔树大多数情况下很适合这个目的,但我当前的系统将标签集匹配作为附加维度添加的方法很笨拙;是否有一种数据结构或算法可以优雅地处理这个问题?

必须支持的操作:

  1. 查询满足特定标签集的可用插槽(考虑到可能会减少可用性但本身不属于该标签集的预订;例如,在上面的示例中查询B的可用性)。
  2. 确保不能将没有匹配的可用插槽的预订添加到系统中。

我已经澄清了所使用的数据结构。是的,一个预订只需要匹配可用时间段的标签和时间,就可以被接受。 - deceze
顺便问一下,你说的是计算预订的最佳分配,但是“先到先得”不是预订的惯用语吗?你能举个例子说明之前的预订需要更改吗? - Florian Castellane
1
@Florian 拿苹果商店 Genius Bar 预约来举例:你只是预约了一个时间段,可以和 某个人 谈论你的 iPhone,不管你得到哪个具体的时间段或者人。只要在那个时间段内有一个有关 iPhone 的知识人士能够和你交流,但是会有多个人,具体是哪一个将在那时候确定。 - deceze
@Florian 目前我正在按需进行所有这些计算,树没有被持久化在任何地方。我从数据存储中提取了两种类型的所有相关插槽,构建树形结构,切割树形结构,然后从中获取“剩余”的可用插槽或检查是否可以添加新的预订。- 我正在考虑将其更改为持久性内存树,因此您可以自由地提出使用任一方法的解决方案。 - deceze
@AakashM 是的,时间间隔是非零的。实际上,它们至少为5分钟,但从理论上讲,它们可以小到1秒(尽管这没有实际意义)。您的建议听起来很有趣,但您需要详细说明一下;我无法从简短的评论中完全理解... :) - deceze
显示剩余5条评论
3个回答

8
你的问题可以使用 约束编程 来解决。在Python中,可以使用 python-constraint 库来实现。
首先,我们需要一种方法来检查两个时间段是否与彼此一致。这是一个函数,如果两个时间段共享一个标签并且它们的时间框重叠,则返回true。在Python中,可以使用以下函数来实现。
def checkNoOverlap(slot1, slot2):
    shareTags = False
    for tag in slot1['tags']:
        if tag in slot2['tags']:
            shareTags = True
            break    
    if not shareTags: return True
    return not (slot2['begin'] <= slot1['begin'] <= slot2['end'] or 
                slot2['begin'] <= slot1['end'] <= slot2['end'])

我不确定您是否希望标签完全相同(如{foo:bar}等于{foo:bar})还是仅限于键(如{foo:bar}等于{foo:qux}),但您可以在上面的函数中更改它。
一致性检查
我们可以使用python-constraint模块来实现您请求的两种功能之一。
第二个功能最简单。为了实现这一点,我们可以使用函数isConsistent(set),它以提供的数据结构中插槽的列表作为输入。然后,该函数将所有插槽提供给python-constraint,并检查插槽列表是否一致(没有两个不应重叠的插槽重叠),并返回一致性。
def isConsistent(set):
        #initialize python-constraint context
        problem = Problem()
        #add all slots the context as variables with a singleton domain
        for i in range(len(set)):
            problem.addVariable(i, [set[i]])        
        #add a constraint for each possible pair of slots
        for i in range(len(set)):
            for j in range(len(set)):
                #we don't want slots to be checked against themselves
                if i == j:
                    continue
                #this constraint uses the checkNoOverlap function
                problem.addConstraint(lambda a,b: checkNoOverlap(a, b), (i, j))
        # getSolutions returns all the possible combinations of domain elements
        # because all domains are singleton, this either returns a list with length 1 (consistent) or 0 (inconsistent)
        return not len(problem.getSolutions()) == 0

这个函数可以在用户想要添加预订时间段时调用。输入的时间段可以添加到已存在的时间段列表中,并检查一致性。如果是一致的,新的时间段就可以被预定。否则,新的时间段会重叠,应该被拒绝。

查找可用时间段

这个问题有点棘手。我们可以使用与上面相同的功能,但需要进行一些重要的更改。我们现在不想将新的时间段与现有的时间段一起添加,而是想将所有可能的时间段添加到已存在的时间段中。然后,我们可以检查所有这些可能的时间段与预订时间段的一致性,并向约束系统请求一致的组合。
由于如果没有任何限制,可能的时间段数量将是无限的,因此我们首先需要为程序声明一些参数:
MIN = 149780000 #available time slots can never start earlier then this time
MAX = 149790000 #available time slots can never start later then this time
GRANULARITY = 1*60 #possible time slots are always at least one minut different from each other

我们现在可以继续主函数。它看起来很像一致性检查,但是与用户的新插槽不同,我们现在添加一个变量以发现所有可用的插槽。
def availableSlots(tags, set):
    #same as above
    problem = Problem()
    for i in range(len(set)):
        problem.addVariable(i, [set[i]])
    #add an extra variable for the available slot is added, with a domain of all possible slots
    problem.addVariable(len(set), generatePossibleSlots(MIN, MAX, GRANULARITY, tags))
    for i in range(len(set) +1):
        for j in range(len(set) +1):
            if i == j:
                continue
            problem.addConstraint(lambda a, b: checkNoOverlap(a, b), (i, j))
    #extract the available time slots from the solution for clean output
    return filterAvailableSlots(problem.getSolutions())

我使用一些辅助函数来使代码更加清晰。它们在这里被包含。

def filterAvailableSlots(possibleCombinations):
    result = []
    for slots in possibleCombinations:
        for key, slot in slots.items():
            if slot['type'] == 'available':
                result.append(slot)

    return result

def generatePossibleSlots(min, max, granularity, tags):
    possibilities = []
    for i in range(min, max - 1, granularity):
        for j in range(i + 1, max, granularity):
            possibleSlot = {
                              'type': 'available',
                              'begin': i,
                              'end': j,
                              'tags': tags
            }
            possibilities.append(possibleSlot)
    return tuple(possibilities)

你现在可以使用函数 getAvailableSlots(tags, set) 来获取你想要的可用时间段标签和已经预定的时间段集合。请注意,此函数确实返回所有一致可能的时间段,因此不会做出找到最大长度或其他优化的努力。
希望这可以帮到你!(我在我的pycharm中按照你的描述使它工作了)

问题:您似乎在比较方法中没有区分可用和保留的插槽...我是否应该将所有插槽(包括可用和保留的)无差别地输入到“问题”中?那么,“checkNoOverlap”应该做出一些让步,因为可用插槽也可能会“重叠”,对吗? - deceze
那么您也想在预定义的可用时段中安排新的预订,并检查它们是否与其他已预订的时段重叠,并且是否包含在该标签的可用时段内? - Arne Van Den Kerchove
是的。首先需要定义可用时段,然后可以在有空余时段的地方添加预订,考虑到已经存在的预订会“减少”可用性。因此,我需要获取减去现有预订后仍然可用的时段,并且(由于其必然)在尝试添加新预订时对整个集合进行一致性检查。 - deceze
1
差不多就是这样,是的。谢谢你澄清现有的示例并不完全涵盖它。我会继续尝试这个。当然,如果您能提供更新的示例,我会很高兴的。:o) - deceze
也许问题没有表述清楚:“可用时间段”表示一个人在特定时间范围内的可用性;该人可以在该时间范围内进行多个预约,只要它们不重叠。即,从11:00到13:00的单个可用时间段可以安排八个15分钟的预约。我认为你的方法没有考虑到这一点,对吗? - deceze
显示剩余5条评论

4
这里有一个解决方案,我会在下面包含所有代码。

1. 创建插槽表和预订表

example tables

2. 创建预订与时间段的矩阵

根据预订与时间段的组合是否可行,填充该矩阵中的 true 或 false 值。

example boolean combinations matrix

3. 找出最佳映射,以允许最多的预订-时间段组合

注意:我的当前解决方案在处理非常大的数组时缩放效果较差,因为它涉及循环遍历大小为时间段数量的列表的所有可能排列。我已发布另一个问题,看看是否有人能找到更好的方法。但是,这个解决方案是准确的,并且可以进行优化。

enter image description here

Python代码来源

第一部分

from IPython.display import display
import pandas as pd
import datetime

available_data = [
    ['SlotA', datetime.time(11, 0, 0), datetime.time(12, 30, 0), set(list('ABD'))],
    ['SlotB',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('C'))],
    ['SlotC',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('ABCD'))],
    ['SlotD',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('AD'))],
]

reservation_data = [
    ['ReservationA', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('AD'))],
    ['ReservationB', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('A'))],
    ['ReservationC', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationD', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationE', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('D'))]
]

reservations = pd.DataFrame(data=reservation_data, columns=['reservations', 'begin', 'end', 'tags']).set_index('reservations')
slots = pd.DataFrame(data=available_data, columns=['slots', 'begin', 'end', 'tags']).set_index('slots')

display(slots)
display(reservations)

第二部分

def is_possible_combination(r):
    return (r['begin'] >= slots['begin']) & (r['end'] <= slots['end']) & (r['tags'] <= slots['tags'])

solution_matrix = reservations.apply(is_possible_combination, axis=1).astype(int)
display(solution_matrix)

第三部分

import numpy as np
from itertools import permutations

# add dummy columns to make the matrix square if it is not
sqr_matrix = solution_matrix
if sqr_matrix.shape[0] > sqr_matrix.shape[1]:
    # uhoh, there are more reservations than slots... this can't be good
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc[:,'FakeSlot' + str(i)] = [1] * sqr_matrix.shape[0]
elif sqr_matrix.shape[0] < sqr_matrix.shape[1]:
    # there are more slots than customers, why doesn't anyone like us?
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc['FakeCustomer' + str(i)] = [1] * sqr_matrix.shape[1]

# we only want the values now
A = solution_matrix.values.astype(int)

# make an identity matrix (the perfect map)
imatrix = np.diag([1]*A.shape[0])

# randomly swap columns on the identity matrix until they match. 
n = A.shape[0]

# this will hold the map that works the best
best_map_so_far = np.zeros([1,1])

for column_order in permutations(range(n)):
    # this is an identity matrix with the columns swapped according to the permutation
    imatrix = np.zeros(A.shape)
    for row, column in enumerate(column_order):
        imatrix[row,column] = 1

    # is this map better than the previous best?
    if sum(sum(imatrix * A)) > sum(sum(best_map_so_far)):
        best_map_so_far = imatrix

    # could it be? a perfect map??
    if sum(sum(imatrix * A)) == n:
        break

if sum(sum(imatrix * A)) != n:
    print('a perfect map was not found')

output = pd.DataFrame(A*imatrix, columns=solution_matrix.columns, index=solution_matrix.index, dtype=int)
display(output)

测试所有组合的优化可能通过Arne的约束编程实现...?感谢您的回答,我会深入研究并看看它会带我去哪里。 - deceze
没错,那可以行得通。虽然很难解释,但你可以根据可能的矩阵组合来限制组合方式(因为你只关心其中特定元素的值为1)。但是使用约束库实际上可能更难编写代码,我稍后会尝试想出一些办法。 - tinker tailor
是的,我也很难用Arne的答案表达所有适用的约束条件,所以我欢迎这种替代方法。一旦有时间,我会试着去操作它。 - deceze

0

Arnetinker提供的建议都很有帮助,但并不是最终有效的方法。我想出了一种混合方法,解决了这个问题。

主要问题在于它是一个三维问题,同时解决所有维度很困难。它不仅涉及匹配时间重叠或标签重叠,而且涉及将时间片段与标签重叠匹配。根据时间和标签将插槽与其他插槽进行匹配非常简单,但是将已经匹配的可用插槽与另一个时间的另一个预订进行匹配就会变得相当复杂。这意味着一个可用插槽可以覆盖两个不同时间的预订:

+---------+
| A, B    |
+---------+
xxxxx xxxxx
x A x x A x
xxxxx xxxxx

试图将这个问题纳入基于约束的编程中需要一个极其复杂的约束关系,几乎难以管理。我的解决方案是简化问题...

去掉一个维度

不是一次性解决所有维度,而是大大简化问题,主要是通过大量减少时间维度来实现的。我使用了现有的区间树,并根据需要对其进行切片:

def __init__(self, slots):
    self.tree = IntervalTree(slots)

def timeslot_is_available(self, start: datetime, end: datetime, attributes: set):
    candidate = Slot(start.timestamp(), end.timestamp(), dict(type=SlotType.RESERVED, attributes=attributes))
    slots = list(self.tree[start.timestamp():end.timestamp()])
    return self.model_is_consistent(slots + [candidate])

为了查询特定时间是否有可用的插槽,我只取那个特定时间相关的插槽(self.tree[..:..]),这将计算复杂度降低到一个局部子集:

  |      |             +-+ = availability
+-|------|-+           xxx = reservation
  |  +---|------+
xx|x  xxx|x
  |  xxxx|
  |      |

然后我确认在那个狭窄的范围内的一致性:

@staticmethod
def model_is_consistent(slots):
    def can_handle(r):
        return lambda a: r.attributes <= a.attributes and a.contains_interval(r)

    av = [s for s in slots if s.type == SlotType.AVAILABLE]
    rs = [s for s in slots if s.type == SlotType.RESERVED]

    p = Problem()
    p.addConstraint(AllDifferentConstraint())
    p.addVariables(range(len(rs)), av)

    for i, r in enumerate(rs):
        p.addConstraint(can_handle(r), (i,))

    return p.getSolution() is not None

(我在这里省略了一些优化和其他代码。)

这部分是 Arne 和 tinker 建议的混合方法。它使用基于约束的编程来查找匹配的插槽,使用 tinker 建议的矩阵算法。基本上:如果在解决此问题的任何解决方案中,所有预订都可以分配到不同的可用插槽中,那么这个时间片就处于一致状态。由于我传入了所需的预订插槽,如果模型仍然包括该插槽并且保持一致,这意味着安全地预订该插槽。

如果在这个狭窄的窗口内有两个短预订可以分配给同一个可用性,那么这仍然存在问题,但这种情况的可能性很低,结果只是对可用性查询的误报;假阳性会更加棘手。

查找可用插槽

查找所有可用时间段是一个更复杂的问题,因此需要进行一些简化。首先,只能查询特定标签集合的可用性模型(没有“给我所有全局可用的时间段”),其次只能以特定粒度(所需时间段长度)查询。这非常适合我的特定用例,我只需要为用户提供他们可以预订的时间段列表,如9:15-9:30,9:30-9:45等。这通过重用上面的代码使算法非常简单:

def free_slots(self, start: datetime, end: datetime, attributes: set, granularity: timedelta):
    slots = []
    while start < end:
        slot_end = start + granularity
        if self.timeslot_is_available(start, slot_end, attributes):
            slots.append((start, slot_end))
        start += granularity

    return slots

换句话说,在给定的时间间隔内,它只是遍历所有可能的插槽,并逐字检查该插槽是否可用。这是一种有点蛮力的解决方案,但完全可以正常工作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接