如何最快地对10个32位数字进行排序?

216

我正在解决一个问题,涉及快速排序10个数字(int32)。我的应用程序需要尽可能快地对10个数字进行数百万次的排序。我正在对数十亿个元素的数据集进行抽样,每次需要从中选择10个数字(简化),并对它们进行排序(并从排序后的10个元素列表中得出结论)。

目前我正在使用插入排序,但我想我可以为我的特定问题实现一个非常快速的自定义排序算法,以打败插入排序。

我该如何解决这个问题?


14
尽管听起来有些粗糙,一系列逐层嵌套的“if”语句应该是最好的选择。避免使用循环。 - John Alexiou
8
你希望在排列的数字集合中,它们被赋予任何偏见吗?还是它们将被均匀分布?一个列表的顺序与下一个列表有关系吗? - Douglas Zare
4
整个数据集(包含数十亿个数字)符合本福德定律的分布规律,但当我随机从中选取元素时,它们不再符合该规律(我认为)。 - bodacydo
13
你可能想阅读这个链接:https://dev59.com/8-o6XIcBkEYKwwoYTy8d - phuclv
11
如果你从数十亿个元素中随机选择,那么拉取数据的延迟可能会比排序所需的时间更具影响力,即使整个数据集都在RAM中。你可以通过对按顺序和随机选择数据进行基准测试来测试其影响。 - Steve S.
显示剩余8条评论
12个回答

1

以下 CUDA 核心在 10 个 CUDA 线程上运行(排名排序算法),在 42 毫秒内对 1000 个数组进行了 1000 次排序,每次排序花费 42 纳秒或 ~70 个周期(在 1.7 GHz 下):

inline
__device__ int findOrder(const int mask, const int data0)
{
    const int order1 = data0>__shfl_sync(mask,data0,0);
    const int order2 = data0>__shfl_sync(mask,data0,1);
    const int order3 = data0>__shfl_sync(mask,data0,2);
    const int order4 = data0>__shfl_sync(mask,data0,3);
    const int order5 = data0>__shfl_sync(mask,data0,4);
    const int order6 = data0>__shfl_sync(mask,data0,5);
    const int order7 = data0>__shfl_sync(mask,data0,6);
    const int order8 = data0>__shfl_sync(mask,data0,7);
    const int order9 = data0>__shfl_sync(mask,data0,8);
    const int order10 = data0>__shfl_sync(mask,data0,9);
    return order1 + order2 + order3 + order4 + order5 + order6 + order7 + order8 + order9 + order10;
}

// launch this with 10 CUDA threads in 1 block in 1 grid
// sorts 10 elements using only SIMT registers
__global__ void rankSort(int * __restrict__ buffer)
{    
    const int id  = threadIdx.x;

    // enable first 10 lanes of warp for shuffling 
    const int mask = __activemask();

    __shared__ int data[10000];

    for(int i=0;i<1000;i++)
    {
        data[id+i*10] = buffer[id+i*10];
    }
    __syncwarp();
    // benchmark 1000 times
    for(int k=0;k<1000;k++)
    {

        // sort 1000 arrays
        for(int j=0;j<1000;j+=5)
        {
            // sort 5 arrays at once to hide latency
            const int data1 = data[id+j*10];
            const int data2 = data[id+(j+1)*10];
            const int data3 = data[id+(j+2)*10];
            const int data4 = data[id+(j+3)*10];
            const int data5 = data[id+(j+4)*10];

            const int order1 = findOrder(mask,data1);        
            const int order2 = findOrder(mask,data2);
            const int order3 = findOrder(mask,data3);
            const int order4 = findOrder(mask,data4);
            const int order5 = findOrder(mask,data5);

            data[order1+j*10]=data1;         
            data[order2+(j+1)*10]=data2;           
            data[order3+(j+2)*10]=data3;
            data[order4+(j+3)*10]=data4;
            data[order5+(j+4)*10]=data5;
        }

    }
    __syncwarp();
    for(int i=0;i<1000;i++)
    {
        buffer[id+i*10] = data[id+i*10];
    }
}   

由于所有10个线程都在同一个SIMT单元上处理,因此它类似于在向量寄存器上运行的AVX512优化版本,但除了更多的寄存器空间以隐藏更多的延迟之外。此外,SIMT单元是32位的,因此可以线性时间复杂度运行直到32个元素。

该算法假定元素是唯一的。对于重复的数据,需要进行额外的约简步骤,以将重复的顺序值解压缩为10个元素。首先计算每个元素的排名,然后直接将它们复制到数组中作为索引的排名。顺序值需要暴力(O(N x N))数量的比较,并且为了减少延迟,使用warp-shuffles从不同的warp-lanes(向量寄存器)中收集数据。


1

当你可以移动时,为什么要交换?一个x86缓存行有足够的额外内存供您执行归并排序。

我可能会单独插入排序索引0-1、2-4、5-6、7-9,或者更好的方法是在插入时保持这些小组已排序,这样每次插入最多只需要一两个移位。

然后将5、6和7-9合并->10-14,将0-1和2-4合并->5-9,最后将5-9和10-14合并->0-9。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接