我已经开始实现我的解决方案。
为了简化节点内部缓存并简化CPU负载平衡,我在每个数据库集群(“Cassandra节点”)上使用Cassandra数据库来运行聚合作业(以前我是手动聚合本地数据库结果集)。我将单个Cassandra数据库用于关系型、Cassandra和MongoDB数据(缺点是一些关系查询在Cassandra上运行较慢,但这可以通过单个统一的聚合数据库比单独的关系和非关系聚合数据库更易于维护来弥补)。由于缓存使此算法不必要,我也不再按十分钟时段聚合作业。
节点中的每台机器都引用称为Cassandra_Cache_[MachineID]的Cassandra columnfamily,该列族用于存储它发送到Cassandra节点的key_ids和column_ids。Cassandra_Cache列族由一个Table列、一个Primary_Key列、一个Column_ID列、一个Last_Modified_Timestamp列、一个Last_Used_Timestamp列组成,并且由Table|Primary_Key|Column_ID组成的复合键。Last_Modified_Timestamp列表示源数据库中数据的last_modified时间戳,而Last_Used_Timestamp列表示数据最后被聚合作业使用/读取的时间戳。当Cassandra节点请求来自机器的数据时,机器计算结果集,然后取结果集和其Cassandra_Cache中的table|key|columns的差集,并且与其Cassandra_Cache中行的Last_Modified_Timestamp相同(如果时间戳不匹配,则缓存数据已过期并随新的Last_Modified_Timestamp一起更新)。本地机器然后将差集发送到Cassandra节点,并使用差集更新其Cassandra_Cache,并更新用于组成结果集的每个缓存数据的Last_Used_Timestamp。(维护每个table|key|column的单独时间戳的更简单的替代方法是维护每个table|key的时间戳,但这不够精确,table|key|column时间戳不是非常复杂。)在Cassandra_Caches之间保持Last_Used_Timestamps同步只需要本地机器和远程节点发送与每个作业关联的Last_Used_Timestamp,因为作业内的所有数据都使用相同的Last_Used_Timestamp。
Cassandra节点使用来自节点内部和其他节点的新数据更新其结果集。Cassandra节点还维护一个列族,该列族存储每个机器的Cassandra_Cache中的相同数据(除了Last_Modified_Timestamp,它仅在本地机器上需要以确定数据何时过期),以及指示数据来自节点内部还是来自另一个节点的源ID - 该ID区分不同的节点,但不区分本地节点内的不同机器。(另一种选择是使用统一的Cassandra_Cache而不是为每台机器使用一个Cassandra_Cache再加上一个节点的另一个Cassandra_Cache,但我决定增加的复杂性不值得空间节省。)
每个Cassandra节点还维护一个Federated_Cassandra_Cache,其中包括已从本地节点发送到其他两个节点的{数据库、表、主键、列ID、最后使用时间戳}元组。
当作业通过管道时,每个Cassandra节点都会使用本地结果集更新其节点内部缓存,并完成可以在本地执行的子作业(例如,在多个节点之间汇总数据的作业中,每个节点对其节点内部数据进行汇总,以最小化需要共同定位的数据量)-如果子作业仅使用节点内部数据,则可以在本地执行。然后管理节点确定在哪个节点上执行其余的作业:每个Cassandra节点可以通过获取其结果集和根据其Federated_Cassandra_Cache缓存的结果集子集之间的差集来本地计算将其结果集发送到另一个节点的成本,并且管理节点将最小化成本方程["从NodeX传输结果集的成本" + "从NodeY传输结果集的成本"]。例如,将Node1的结果集传输到{Node2、Node3}的成本为{3、5},将Node2的结果集传输到{Node1、Node3}的成本为{2、2},将Node3的结果集传输到{Node1、Node2}的成本为{4、3},因此作业在Node1上运行,成本为“6”。
我对每个Cassandra节点使用LRU清除策略;我最初使用最旧的优先清除策略,因为它更容易实现并且需要较少的写入到Last_Used_Timestamp列(每次数据更新一次而不是每次数据读取一次),但是LRU策略的实现并不过于复杂,Last_Used_Timestamp写入也没有创建瓶颈。当Cassandra节点达到20%的可用空间时,它会清除数据直到达到30%的可用空间,因此每次清除大约是总空间的10%大小。节点维护两个时间戳:上次清除的节点内部数据的时间戳和上次清除的节点间/联合数据的时间戳;由于相对于节点内部通信而言节点间通信的延迟增加,清除策略的目标是将75%的缓存数据作为节点间数据,将25%的缓存数据作为节点内部数据,这可以通过让每次清除的25%为节点间数据,75%为节点内部数据来快速近似。清除的工作如下:
while(evicted_local_data_size < 7.5% of total space available) {
evict local data with Last_Modified_Timestamp <
(last_evicted_local_timestamp += 1 hour)
update evicted_local_data_size with evicted data
}
while(evicted_federated_data_size < 2.5% of total space available) {
evict federated data with Last_Modified_Timestamp <
(last_evicted_federated_timestamp += 1 hour)
update evicted_federated_data_size with evicted data
}
只有在节点内的机器和其他节点发送了驱逐确认后,驱逐数据才会被永久删除。
然后,Cassandra节点会向其节点内的机器发送通知,指示新的last_evicted_local_timestamp是什么。本地机器更新它们的Cassandra_Caches以反映新的时间戳,并在此完成后向Cassandra节点发送通知;当Cassandra节点收到所有本地机器的通知时,它就会永久删除已驱逐的本地数据。Cassandra节点还向远程节点发送通知,其中包含新的last_evicted_federated_timestamp;其他节点会更新其Federated_Cassandra_Caches以反映新的时间戳,当Cassandra节点从每个节点接收到通知时,它就会永久删除被驱逐的联邦数据(Cassandra节点跟踪数据来自哪个节点,因此在从NodeX接收到一个清除确认之前,它可以永久删除已驱逐的NodeX数据,而不必等待从NodeY接收到一个清除确认)。在所有机器/节点发送通知之前,如果Cassandra节点从未清空其旧数据的机器/节点接收到结果集,则Cassandra节点将在其查询中使用缓存的驱逐数据。例如,Cassandra节点可能有一个已经被驱逐的本地Table|Primary_Key|Column_ID数据,同时本地机器(尚未处理驱逐请求)未在其结果集中包含Table|Primary_Key|Column_ID数据,因为它认为Cassandra节点已经在其缓存中拥有此数据;Cassandra节点从本地机器接收到结果集,并且由于本地机器还没有确认清除请求,因此Cassandra节点将包含缓存的驱逐数据在其自己的结果集中。