分布式数据库聚合作业中优化网络带宽

8
我有一个分布式/联邦数据库,结构如下:
1. 数据库分布在三个地理位置(“节点”)上。 2. 每个节点都有多个集群化的数据库。 3. 关系数据库包括PostgreSQL、MySQL、Oracle和MS SQL Server;非关系型数据库为MongoDB或Cassandra。 4. 通过RabbitMQ实现每个节点内部和节点联邦之间的松耦合,每个节点运行一个RabbitMQ代理。
我正在实现一个只读的节点间聚合作业系统,针对跨越节点联邦的作业(即非本地节点的作业)。这些作业仅执行“获取”查询,不修改数据库。 (如果作业的结果要进入一个或多个数据库,则通过一个单独的作业完成,该作业不是我试图优化的节点间作业系统的一部分。)我的目标是尽量减少这些作业所需的网络带宽(首先是最小化节点间/广域网带宽,然后是最小化节点内/局域网带宽);我假设每个WAN链接的成本相同,并且每个LAN链接的成本也相同。作业没有特别时间敏感性。我在节点内执行一些CPU负载平衡,但不在节点之间执行。
与本地集群或特定数据库的数据库写入相比,用于聚合作业的传输数据量很小,因此完全分布数据库于整个联邦中并不实际。
我用于最小化网络带宽的基本算法是:
1. 给定一个在联邦各处分布的数据集上运行的作业,管理节点向每个其他节点发送包含相关数据库查询的消息。 2. 每个节点运行其自己的查询、压缩它们、将它们缓存,并将它们的压缩大小发送给管理节点。 3. 管理员移动到包含最多数据的节点(具体来说,是具有最多数据且有空闲核心的集群内机器),从其他两个节点和集群内的其他机器请求剩余的数据,然后运行作业。
尽可能地使用分治方法来最小化需要的数据共同位置。例如,如果作业需要计算整个联邦销售额的总和,则每个节点本地计算其销售总额,然后在管理节点进行聚合(而不是将所有未处理的销售数据复制到管理节点)。但是,有时需要数据共同位置(例如,在位于不同节点的两个表之间执行连接时)。
为了优化这个系统,我首先对作业进行了聚合,并在十分钟时间周期内运行聚合作业(所有机器都运行着NTP,因此我可以相当确定“每十分钟”在每个节点上的意思相同)。目标是让两个作业能够共享相同的数据,从而降低传输数据的总体成本。
  1. 给定两个查询相同表的作业,我生成每个作业的结果集,然后取两个结果集的交集。
  2. 如果两个作业都计划在同一个节点上运行,则网络传输成本被计算为两个结果集的总和减去两个结果集的交集。
  3. 这两个结果集存储到PostgreSQL临时表(在关系数据的情况下)或者存储到所选节点上的临时Cassandra列族/ MongoDB集合(在nosql数据的情况下)。然后针对合并后的结果集执行原始查询,并将数据传递给各个作业。(此步骤仅在合并后的结果集上执行;单个结果集数据会直接传递给其作业而不必先存储在临时表/列族/集合中。)

这样可以提高网络带宽利用率,但我想知道是否有一个框架/库/算法可以进一步改进这个过程。我考虑的一个选项是在节点上缓存结果集,并在确定网络带宽时考虑这些缓存的结果集(即尝试在当前预定的共同定位的作业集之外重用结果集,以便例如在一个10分钟的周期内运行的作业可以使用以前10分钟结果集的缓存结果集),但除非作业使用完全相同的结果集(即除非它们使用相同的where子句),否则我不知道有什么通用算法能够填补结果集中缺少的部分(例如,如果结果集使用"where N>3"子句,而另一个作业需要带有"where N>0"子句的结果集,则我该使用什么算法来确定需要将原始结果集与带有"where N>0 AND N<=3"子句的结果集求并集)- 我可以尝试编写自己的算法来实现这一点,但结果可能会是一个错误且无用的混乱。我还需要确定缓存数据何时过期-最简单的方法是将缓存数据的时间戳与源表上的最后修改时间戳进行比较,并在时间戳更改时替换所有数据,但理想情况下,我希望能够基于每行或每个块的时间戳仅更新已更改的值。


是否将表格完全分发到每个站点,而不是尝试处理部分where子句中的片段,会更容易一些呢?磁盘空间很便宜,但这取决于底层数据变化的频率和谓词的狭窄程度,以确定这是否会减少网络流量。 - rlb
@rlb 问题在于每个集群内都有大量的写入活动,因此完全分布式的表意味着即使不需要,这些写入活动也需要传播到每个集群。例如,一个数据库是一个金融数据库,包含股票价格,这意味着有很多数据库写入。联合作业可能每小时只需要这些数据的快照,这相当于传播每次股票更新数据所需的网络带宽的一小部分。 - Zim-Zam O'Pootertoot
好的,我明白音量问题。您是否可以控制通过网络传输的内容?我们将结果集从逐行转换为逐列,压缩率大大提高,因此这可能是一个简单的胜利,风险较低,但不完全符合您的要求。我会在办公室里找一些与您实际问题相关的东西,但大多数情况下,我们都致力于优化分布式连接,正如您所提到的,如果没有完美地完成,可能会出现问题。 - rlb
@rlb 我可以控制通过RabbitMQ传输的内容,因此如果重新打包结果集有助于在另一个节点上缓存它们,我可以这样做。 - Zim-Zam O'Pootertoot
1个回答

4
我已经开始实现我的解决方案。
为了简化节点内部缓存并简化CPU负载平衡,我在每个数据库集群(“Cassandra节点”)上使用Cassandra数据库来运行聚合作业(以前我是手动聚合本地数据库结果集)。我将单个Cassandra数据库用于关系型、Cassandra和MongoDB数据(缺点是一些关系查询在Cassandra上运行较慢,但这可以通过单个统一的聚合数据库比单独的关系和非关系聚合数据库更易于维护来弥补)。由于缓存使此算法不必要,我也不再按十分钟时段聚合作业。
节点中的每台机器都引用称为Cassandra_Cache_[MachineID]的Cassandra columnfamily,该列族用于存储它发送到Cassandra节点的key_ids和column_ids。Cassandra_Cache列族由一个Table列、一个Primary_Key列、一个Column_ID列、一个Last_Modified_Timestamp列、一个Last_Used_Timestamp列组成,并且由Table|Primary_Key|Column_ID组成的复合键。Last_Modified_Timestamp列表示源数据库中数据的last_modified时间戳,而Last_Used_Timestamp列表示数据最后被聚合作业使用/读取的时间戳。当Cassandra节点请求来自机器的数据时,机器计算结果集,然后取结果集和其Cassandra_Cache中的table|key|columns的差集,并且与其Cassandra_Cache中行的Last_Modified_Timestamp相同(如果时间戳不匹配,则缓存数据已过期并随新的Last_Modified_Timestamp一起更新)。本地机器然后将差集发送到Cassandra节点,并使用差集更新其Cassandra_Cache,并更新用于组成结果集的每个缓存数据的Last_Used_Timestamp。(维护每个table|key|column的单独时间戳的更简单的替代方法是维护每个table|key的时间戳,但这不够精确,table|key|column时间戳不是非常复杂。)在Cassandra_Caches之间保持Last_Used_Timestamps同步只需要本地机器和远程节点发送与每个作业关联的Last_Used_Timestamp,因为作业内的所有数据都使用相同的Last_Used_Timestamp。
Cassandra节点使用来自节点内部和其他节点的新数据更新其结果集。Cassandra节点还维护一个列族,该列族存储每个机器的Cassandra_Cache中的相同数据(除了Last_Modified_Timestamp,它仅在本地机器上需要以确定数据何时过期),以及指示数据来自节点内部还是来自另一个节点的源ID - 该ID区分不同的节点,但不区分本地节点内的不同机器。(另一种选择是使用统一的Cassandra_Cache而不是为每台机器使用一个Cassandra_Cache再加上一个节点的另一个Cassandra_Cache,但我决定增加的复杂性不值得空间节省。)
每个Cassandra节点还维护一个Federated_Cassandra_Cache,其中包括已从本地节点发送到其他两个节点的{数据库、表、主键、列ID、最后使用时间戳}元组。
当作业通过管道时,每个Cassandra节点都会使用本地结果集更新其节点内部缓存,并完成可以在本地执行的子作业(例如,在多个节点之间汇总数据的作业中,每个节点对其节点内部数据进行汇总,以最小化需要共同定位的数据量)-如果子作业仅使用节点内部数据,则可以在本地执行。然后管理节点确定在哪个节点上执行其余的作业:每个Cassandra节点可以通过获取其结果集和根据其Federated_Cassandra_Cache缓存的结果集子集之间的差集来本地计算将其结果集发送到另一个节点的成本,并且管理节点将最小化成本方程["从NodeX传输结果集的成本" + "从NodeY传输结果集的成本"]。例如,将Node1的结果集传输到{Node2、Node3}的成本为{3、5},将Node2的结果集传输到{Node1、Node3}的成本为{2、2},将Node3的结果集传输到{Node1、Node2}的成本为{4、3},因此作业在Node1上运行,成本为“6”。
我对每个Cassandra节点使用LRU清除策略;我最初使用最旧的优先清除策略,因为它更容易实现并且需要较少的写入到Last_Used_Timestamp列(每次数据更新一次而不是每次数据读取一次),但是LRU策略的实现并不过于复杂,Last_Used_Timestamp写入也没有创建瓶颈。当Cassandra节点达到20%的可用空间时,它会清除数据直到达到30%的可用空间,因此每次清除大约是总空间的10%大小。节点维护两个时间戳:上次清除的节点内部数据的时间戳和上次清除的节点间/联合数据的时间戳;由于相对于节点内部通信而言节点间通信的延迟增加,清除策略的目标是将75%的缓存数据作为节点间数据,将25%的缓存数据作为节点内部数据,这可以通过让每次清除的25%为节点间数据,75%为节点内部数据来快速近似。清除的工作如下:
while(evicted_local_data_size < 7.5% of total space available) {
    evict local data with Last_Modified_Timestamp < 
        (last_evicted_local_timestamp += 1 hour)
    update evicted_local_data_size with evicted data
}

while(evicted_federated_data_size < 2.5% of total space available) {
    evict federated data with Last_Modified_Timestamp < 
        (last_evicted_federated_timestamp += 1 hour)
    update evicted_federated_data_size with evicted data
}

只有在节点内的机器和其他节点发送了驱逐确认后,驱逐数据才会被永久删除。

然后,Cassandra节点会向其节点内的机器发送通知,指示新的last_evicted_local_timestamp是什么。本地机器更新它们的Cassandra_Caches以反映新的时间戳,并在此完成后向Cassandra节点发送通知;当Cassandra节点收到所有本地机器的通知时,它就会永久删除已驱逐的本地数据。Cassandra节点还向远程节点发送通知,其中包含新的last_evicted_federated_timestamp;其他节点会更新其Federated_Cassandra_Caches以反映新的时间戳,当Cassandra节点从每个节点接收到通知时,它就会永久删除被驱逐的联邦数据(Cassandra节点跟踪数据来自哪个节点,因此在从NodeX接收到一个清除确认之前,它可以永久删除已驱逐的NodeX数据,而不必等待从NodeY接收到一个清除确认)。在所有机器/节点发送通知之前,如果Cassandra节点从未清空其旧数据的机器/节点接收到结果集,则Cassandra节点将在其查询中使用缓存的驱逐数据。例如,Cassandra节点可能有一个已经被驱逐的本地Table|Primary_Key|Column_ID数据,同时本地机器(尚未处理驱逐请求)未在其结果集中包含Table|Primary_Key|Column_ID数据,因为它认为Cassandra节点已经在其缓存中拥有此数据;Cassandra节点从本地机器接收到结果集,并且由于本地机器还没有确认清除请求,因此Cassandra节点将包含缓存的驱逐数据在其自己的结果集中。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接