实际问题是:每个计算节点都有一组样本,在重新采样步骤之后,某些计算机节点可能会有剩余,而其他节点则存在赤字,因此我希望在最小化通信的同时重新分配样本。
我想象这种问题有一个名称。
这个问题可以建模为最小费用流的一个实例。
假设有一个有向图G,其中包含顶点s、t、a1、…、ak、b1、…、bk和弧s→ai,其容量为xi,成本为0;弧ai→bj,其容量为无穷大,成本为0(如果i = j)或1(如果i ≠ j);以及弧bj→t,其容量为N,成本为0。我们希望找到从s到t移动∑i xi单位的最小费用流。想法是,如果y单位从ai→bj流动,则将y个物品从箱子i移动到箱子j(如果i ≠ j;如果i = j,则不进行移动)。
当然,使用最小费用流来解决这个简单问题就像用大锤砸蚂蚁一样,但是几个变种可以被建模为网络流问题。
如果一对节点i,j没有直接连接,则删除ai→bj弧。 我们可以通过在“a”侧或“b”侧上给它们顶点来启动和关闭节点。 我们可以通过调整成本来模拟通信速度的差异。 我们可以通过限制他们的弧的容量来限制两个节点交换物品的数量。 我们可以引入更多内部顶点并改变完全二分连接的拓扑结构,以模拟网络争用的影响。听起来你想使用一致性哈希
http://en.wikipedia.org/wiki/Consistent_hashing
基本上,使用好的随机哈希函数可以为您的样本生成唯一的ID,并获得均匀分布。然后,将样本一致地分发到一组节点上变得容易。http://www.lexemetech.com/2007/11/consistent-hashing.html http://www.tomkleinpeter.com/2008/03/17/programmers-toolbox-part-3-consistent-hashing/
更多信息请参考。
我认为问题应该这样澄清:
有 M
个剩余节点和 K
个赤字节点,我们应该在节点之间重新分配样本,使得剩余节点数量为0,而(可能)一些赤字节点。样本应该以包的形式交换,并且这些包的数量应该最小化。
或者,从数学上讲,我们有一个 M*K
矩阵,其中每个单元格表示要从节点 M
传递到节点 K
的样本数量,每行的元素总和和每列的元素总和都有给定的最大值。目标是最小化非零单元格的数量。
这是一种 "约束满足问题"。它是 NP 完全的。我发现两个经典问题与此问题类似:“集合打包”和“广义精确覆盖”。
为了将问题简化为"集合覆盖", 我们需要添加(暂时)几个多余节点,每个节点有N+1
个样本,以便在重新分配后没有剩余节点。然后对于每个节点,我们需要为多余元素和“赤字”元素生成所有可能的分区。然后对于多余和赤字分区的笛卡尔积应用“集合覆盖”的“优化”版本,找到最小子集的数量。M
、M+1
等优化节点来最小化覆盖中的子集数量。然后对于多余和赤字分区以及优化节点的笛卡尔积应用“广义精确覆盖”。对于较少数量的优化节点,此算法将失败,对于一些更大的数量,它将找到最小子集的数量。通常我们称之为数据重分配,理解是如果你正在重新分配它,你希望在某些度量标准下(如任务之间的均匀性)分配最优。
当你尝试进行计算负载平衡时,这在科学/技术计算中确实会出现。即使你在多个维度上进行计算,如果你正在通过空间填充曲线将空间数据分配给处理器,那么这个问题就会出现,而你通常希望数据被均匀分配。
该过程非常简单;首先,您需要对xi进行独占性前缀和,以便知道在你左边有多少项。例如,对于上面Noxville的示例数据:
[9, 6, 1, 6, 2]
前缀和应为:
[0, 9, 15, 16, 22]
你会发现(从最后一个处理器的总和加上它有多少个)总共有24个项目。
然后,你要计算出你理想的分区大小——比如说,ceil(totitems / nprocs)。你可以用任何方法来计算,只要每个处理器都同意所有分区大小即可。
现在,你有几种方法可以继续进行。如果数据项在某种意义上很大,并且你不能在内存中有两个副本,那么你可以开始将数据移动到最近的邻居处。你知道你左边的项目数量以及该方向上的“过剩”或“不足”;你还知道你有多少个项目(并且在你完成修复过剩或不足之后,你也会知道你将拥有多少个项目)。因此,你开始向左右邻居发送数据,并从左右邻居接收数据,直到向左的处理器集体拥有正确数量的项目,你也是如此。
但是,如果您能够负担得起两份数据的副本,那么您可以采取另一种方法来最小化发送的消息数量。您可以将左侧单元格的数量视为您的本地数据进入“全局”数组的起始索引。由于您知道每个处理器最终会有多少项,因此可以直接确定这些项将要到达哪个进程,并将它们直接发送。(例如,在上面的示例中,具有0..8项的处理器0知道,如果除了最后一个处理器之外的每个处理器都将以5个数据项结束,则值5-8可以发送到处理器1。)一旦发送了这些数据,您只需接收所期望的数据量即可完成。
以下是在C和MPI中执行此操作的简单示例,但基本方法几乎可以在任何地方使用。MPI的前缀扫描操作生成包含总和,因此我们必须减去自己的值以获得独占总和:
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <time.h>
void initdata(const int rank, const int maxvals, char **data, int *nvals) {
time_t t;
unsigned seed;
t = time(NULL);
seed = (unsigned)(t * (rank + 1));
srand(seed);
*nvals = (rand() % (maxvals-1)) + 1;
*data = malloc((*nvals+1) * sizeof(char));
for (int i=0; i<*nvals; i++) {
(*data)[i] = 'A' + (rank % 26);
}
(*data)[*nvals] = '\0';
}
int assignrank(const int globalid, const int totvals, const int size) {
int nvalsperrank = (totvals + size - 1)/size;
return (globalid/nvalsperrank);
}
void redistribute(char **data, const int totvals, const int curvals, const int globalstart,
const int rank, const int size, int *newnvals) {
const int stag = 1;
int nvalsperrank = (totvals + size - 1)/size;
*newnvals = nvalsperrank;
if (rank == size-1) *newnvals = totvals - (size-1)*nvalsperrank;
char *newdata = malloc((*newnvals+1) * sizeof(char));
newdata[(*newnvals)] = '\0';
MPI_Request requests[curvals];
int nmsgs=0;
/* figure out whose data we have, redistribute it */
int start=0;
int newrank = assignrank(globalstart, totvals, size);
for (int val=1; val<curvals; val++) {
int nextrank = assignrank(globalstart+val, totvals, size);
if (nextrank != newrank) {
MPI_Isend(&((*data)[start]), (val-1)-start+1, MPI_CHAR, newrank, stag, MPI_COMM_WORLD, &(requests[nmsgs]));
nmsgs++;
start = val;
newrank = nextrank;
}
}
MPI_Isend(&((*data)[start]), curvals-start, MPI_CHAR, newrank, stag, MPI_COMM_WORLD, &(requests[nmsgs]));
nmsgs++;
/* now receive all of our data */
int newvalssofar= 0;
int count;
MPI_Status status;
while (newvalssofar != *newnvals) {
MPI_Recv(&(newdata[newvalssofar]), *newnvals - newvalssofar, MPI_CHAR, MPI_ANY_SOURCE, stag, MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_CHAR, &count);
newvalssofar += count;
}
/* wait until all of our sends have been received */
MPI_Status statuses[curvals];
MPI_Waitall(nmsgs, requests, statuses);
/* now we can get rid of data and relace it with newdata */
free(*data);
*data = newdata;
}
int main(int argc, char **argv) {
const int maxvals=30;
int size, rank;
char *data;
int mycurnvals, mylvals, myfinalnvals;
int totvals;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
initdata(rank, maxvals, &data, &mycurnvals);
MPI_Scan( &mycurnvals, &mylvals, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD );
if (rank == size-1) totvals = mylvals;
mylvals -= mycurnvals;
MPI_Bcast( &totvals, 1, MPI_INT, size-1, MPI_COMM_WORLD );
printf("%3d : %s %d\n", rank, data, mylvals);
redistribute(&data, totvals, mycurnvals, mylvals, rank, size, &myfinalnvals);
printf("%3d after: %s\n", rank, data);
free(data);
MPI_Finalize();
return 0;
}
运行此代码,您将获得预期的行为;请注意,我确定“期望”分区的方式(使用ceil(totvals / nprocesses)),最终处理器通常会负载不足。此外,我没有尝试确保在重新分配中保留顺序(尽管如果顺序很重要,则很容易做到):
$ mpirun -np 13 ./distribute
0 : AAAAAAAAAAA 0
1 : BBBBBBBBBBBB 11
2 : CCCCCCCCCCCCCCCCCCCCCCCCCC 23
3 : DDDDDDD 49
4 : EEEEEEEEE 56
5 : FFFFFFFFFFFFFFFFFF 65
6 : G 83
7 : HHHHHHH 84
8 : IIIIIIIIIIIIIIIIIIIII 91
9 : JJJJJJJJJJJJJJJJJJ 112
10 : KKKKKKKKKKKKKKKKKKKK 130
11 : LLLLLLLLLLLLLLLLLLLLLLLLLLLL 150
12 : MMMMMMMMMMMMMMMMMM 178
0 after: AAAAAAAAAAABBBBB
1 after: BBBBBBBCCCCCCCCC
2 after: CCCCCCCCCCCCCCCC
3 after: DDDDDDDCEEEEEEEE
4 after: EFFFFFFFFFFFFFFF
5 after: FFFHHHHHHHIIIIIG
6 after: IIIIIIIIIIIIIIII
7 after: JJJJJJJJJJJJJJJJ
8 after: JJKKKKKKKKKKKKKK
9 after: LLLLLLLLLLKKKKKK
10 after: LLLLLLLLLLLLLLLL
11 after: LLMMMMMMMMMMMMMM
12 after: MMMM
我认为这个重新分配问题与计算机中的负载平衡有所不同。
负载平衡算法术语通常意味着一组策略/启发式,以确保未来负载(而不是当前负载)相对均匀分布。
在这种情况下,负载均衡器不会重新分配现有服务器/系统的负载,而是任何新请求到来时,它将尝试使用某些策略(即最小负载、轮询等)进行分配,从而在长期内使服务器相对均衡负载。
http://en.wikipedia.org/wiki/Load_balancing_(computing)
这个问题中的负载重新分配,可以通过迭代地将物品从最大负载的箱子移动到最小负载的箱子来实现。
基本上你想从
[9, 6, 1, 6, 2]
到
[5, 5, 4, 5, 5]
我认为最好的方法是计算floor(sum(array)/len(array)),然后评估到达此位置所需的差分。在这种情况下,floor((9+6+1+6+2)/5)=4,因此我们正在查看一个初始差分[-5,-2,+3,-2,+2]。然后,您可以贪心地交换相邻对中符号不同的值(例如将array[2]中的2转移到arr[1]和将array[4]中的2转移到array[3])。然后,您将剩下[-5,0,1,0,0],从这里您可以贪心地分配剩余的位。