C语言 - 对一个大的二维整数数组进行排序的最快方法

6

我是一名有用的助手,可以为您进行翻译。

我有一个二维数组,每一行都包含6个已按升序排列的整数。例如:

1  2  3  4  5  6
6  8  9 10 13 15
1  4  5  6  7  9
1  4  5  6  7  8
3 18 19 20 25 34

期望输出:

1  2  3  4  5  6
1  4  5  6  7  8
1  4  5  6  7  9
3 18 19 20 25 34
6  8  9 10 13 15

实际数据包含8m到33m个这样的记录。我正在尝试确定最快的方法来对此数组进行排序。我目前有一些使用qsort的工作代码:
qsort调用:
qsort(allRecords, lineCount, sizeof(int*), cmpfunc);

cmpfunc:

int cmpfunc (const void * a, const void * b)
{
  const int *rowA = *(const int **)a;
  const int *rowB = *(const int **)b;

  if (rowA[0] > rowB[0]) return 1;
  if (rowA[0] < rowB[0]) return -1;

  if (rowA[1] > rowB[1]) return 1;
  if (rowA[1] < rowB[1]) return -1;

  if (rowA[2] > rowB[2]) return 1;
  if (rowA[2] < rowB[2]) return -1;

  if (rowA[3] > rowB[3]) return 1;
  if (rowA[3] < rowB[3]) return -1;

  if (rowA[4] > rowB[4]) return 1;
  if (rowA[4] < rowB[4]) return -1;

  if (rowA[5] > rowB[5]) return 1;
  if (rowA[5] < rowB[5]) return -1;

  return 0;
}

对于这个样本3300万条记录,它大约需要35.6秒(gcc -O1),速度相当快,但我想知道在每行中给定预排序值的情况下是否有更快的方法来执行此操作。
这自然导致了最常见的第一个数字是1的数据,因此在33m的记录文件中,可能有1200万条以1开头的记录,然后是800万条以2开头的记录,500万条以3开头的记录等等......我不确定这是否适合一种特定类型的排序(例如堆排序)。
我的理解是qsort由于所有调用函数的次数而具有相当多的开销,因此我希望能够获得更快的性能。
我通常不编写C代码,因此我非常乐意接受建议和批评,因为我正在从教程和其他StackOverflow问题/答案中拼凑这些内容。
编辑:按要求,我的初始化代码:
// Empty record
int recArray[6] = {0,0,0,0,0,0};

// Initialize allRecords
int** allRecords;
allRecords = (int**) malloc(lineCount*sizeof(int*));
for(i=0; i < lineCount; i++)
{
    allRecords[i] = (int*) malloc(6*sizeof(int));
}

// Zero-out all records
for(i=0; i < lineCount; i++)
{
  memcpy(allRecords[i], recArray, 6 * sizeof(int));
}

我还在学习正确的做法,所以如果我做错了一切,我也不会感到惊讶。希望能得到正确方面的指导。

其他人问到值的范围 - 我不确定未来这个范围是否会更改,但目前值的范围在1到99之间。

此外,对于性能分析 - 我编写了一个小函数,使用 gettimeofday() 获取秒/微秒,然后进行比较。我很愿意接受更好的方法。输出的结果如下:

// <-- Here I capture the gettimeofday() structure output
Sorting...
Sorted.
Time Taken: 35.628882s // <-- Capture it again, show the difference

更新: 根据@doynax的建议,我现在将每行的6个值“打包”为一个无符号长整型:

// Initialize allRecords
unsigned long long int* allRecords;
allRecords = (unsigned long long int*) malloc(lineCount*sizeof(unsigned long long int));
for(i=0; i < lineCount; i++)
{
    allRecords[i] = 0;
}

...

// "Pack" current value (n0) into an unsigned long long int
if(recPos == 0) { lineSum += n0 * UINT64_C(1); }
else if(recPos == 1) { lineSum += n0 * UINT64_C(100); }
else if(recPos == 2) { lineSum += n0 * UINT64_C(10000); }
else if(recPos == 3) { lineSum += n0 * UINT64_C(1000000); }
else if(recPos == 4) { lineSum += n0 * UINT64_C(100000000); }
else if(recPos == 5) { lineSum += n0 * UINT64_C(10000000000); }
...
allRecords[linecount] = lineSum;
lineSum = 0;

我可以将其中一个unsigned long long int值“解包”成原始的6个int值。但是,当我尝试排序时:
qsort(allRecords, lineCount, sizeof(unsigned long long int), cmpfunc);

...

int cmpfunc (const void * a, const void * b)
{
  if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
  if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
  return 0;
}

结果没有按照预期排序。如果我使用以下代码显示排序前后的第一行和最后一行:

printf("[%i] = %llu = %i,%i,%i,%i,%i,%i\n", j, lineSum, recArray[0]...recArray[5]);

输出结果为:
First and last 5 rows before sorting:
[#] = PACKED INT64 = UNPACKED
[0] = 462220191706 = 6,17,19,20,22,46
[1] = 494140341005 = 5,10,34,40,41,49
[2] = 575337201905 = 5,19,20,37,53,57
[3] = 504236262316 = 16,23,26,36,42,50
[4] = 534730201912 = 12,19,20,30,47,53
[46] = 595648302516 = 16,25,30,48,56,59
[47] = 453635251108 = 8,11,25,35,36,45
[48] = 403221161202 = 2,12,16,21,32,40
[49] = 443736310604 = 4,6,31,36,37,44
[50] = 575248312821 = 21,28,31,48,52,57

First and last 5 rows after sorting:
[0] = 403221161202 = 2,12,16,21,32,40
[1] = 413218141002 = 2,10,14,18,32,41
[2] = 443736310604 = 4,6,31,36,37,44
[3] = 444127211604 = 4,16,21,27,41,44
[4] = 453028070302 = 2,3,7,28,30,45
[46] = 585043260907 = 7,9,26,43,50,58
[47] = 593524170902 = 2,9,17,24,35,59
[48] = 595248392711 = 11,27,39,48,52,59
[49] = 595251272612 = 12,26,27,51,52,59
[50] = 595648302516 = 16,25,30,48,56,59

我猜测我正在比较错误的值(例如指针值而不是实际值),但我不太确定正确的语法是什么。
另一方面,这种方式非常快。
排序3300万64位整数需要大约4-5秒钟(至少在其当前错误的形式中)。

2
你有关于数值范围的更多信息吗?例如,仅对值为1..10的数组进行排序可能与所有值均匀分布在整数范围内时所做的排序方式大不相同。 - FlorianK
3
不,你需要从原始数组中预先计算出这些新的宽整数密钥,然后只需在原位置上对新闻进行排序。最后,通过连续地除以100(余数对应旧列值),您可以重新创建数据。其思想是一个64位整数比多个更小的部分更容易且更快速地比较。 - doynax
2
你的打包顺序是相反的。对于每个数字10位,我会使用类似这样的方式:(long long)a [5] | ((long long)a [4] << 10) | ((long long)a [3] << 20) | ((long long)a [2] << 30) | ((long long)a [1] << 40) | ((long long)a [0] << 50); - Antonín Lejsek
2
正如@AntonínLejsek所指出的那样,你的数据排序得很好,但是最后一个数字而不是第一个数字被排列。你需要重新调整你的打包顺序。你应该可以使用循环来驱动打包而不是 N 次 if 语句。当然,你还需要对解包进行相应的更改。 - Jonathan Leffler
1
不要在C语言中强制转换malloc的结果。在基准测试/发布时至少使用-O2-O3,不要使用-O1 - phuclv
显示剩余12条评论
2个回答

5

我无法抵制一个良好的优化挑战。

初步想法

看到这个问题,我的第一个想法是使用基数排序,具体来说是内部计数排序的基数排序。(很高兴看到一些评论也同意我的想法!)我在其他项目中使用过带有内部计数排序的基数排序,虽然快速排序可以在小数据集上胜过它,但对于大数据集,它会远远领先于快速排序。

我选择基数排序是因为比较排序(如快速排序)有着众所周知的性能限制,即O(n lg n)。由于n在这里非常大,那么“lg n”部分也非常大——对于3300万条记录,你可以合理地预期它将花费“某个常数时间”乘以33000000次lg(33000000),而lg(33000000)约为25。像基数排序和计数排序这样的“非比较排序”可以通过具有数据的特殊知识来削减O(n lg n)的边界——在这种情况下,它们可以更快地运行,因为我们知道数据是小整数。

我使用了和其他人一样的代码来将值合并在一起,将每组int值组合成一个BigInt,但我选择了位移和掩码而不是乘法和模数,因为位操作总体上更快。
我的实现(见下文)执行一些常量时间设置计算,然后精确地迭代数据7次以进行排序-“7次”不仅在算法上胜过了“25次”,而且我的实现非常小心地避免不必要地触摸内存,以便它尽可能地与CPU缓存协同工作。
对于合理的源数据,我编写了一个小程序,生成了3300万个看起来类似于您原始数据的记录。每行都按排序顺序排列,并且略微偏向较小的值。(您可以在下面找到我的sixsample.c示例数据集程序。)
性能比较
使用非常接近您的qsort()实现的东西,我在三次运行中得到了这些数字,它们与您得到的数字相差不远(显然是不同的CPU)。
qsort:

Got 33000000 lines. Sorting.
Sorted lines in 5 seconds and 288636 microseconds.
Sorted lines in 5 seconds and 415553 microseconds.
Sorted lines in 5 seconds and 242454 microseconds.

使用我的基数排序和计数排序实现,我进行了三次运行,并得到以下数字:
Radix sort with internal counting sort, and cache-optimized:

Got 33000000 lines. Sorting.
Sorted lines in 0 seconds and 749285 microseconds.
Sorted lines in 0 seconds and 833474 microseconds.
Sorted lines in 0 seconds and 761943 microseconds.

同样的33百万条记录排序,5.318秒对0.781秒。这是快了6.8倍!

为了确保结果相同,我使用diff比较了每次运行的输出-真的可以在不到一秒的时间内对数千万条记录进行排序!

深入挖掘

基数排序计数排序的基本原理在维基百科上有很好的描述,在我的一本喜欢的书算法导论中也有详细说明,因此我不会在这里重复基础知识。相反,我将描述一下我的实现与教科书形式有何不同以使其更快。

这些算法的正常实现是类似以下的Python-esque伪代码:

def radix-sort(records):
    temp = []
    for column = 0 to columns:
         counting-sort(records, temp, column)
         copy 'temp' back into 'records'

def counting-sort(records, dest, column):

    # Count up how many of each value there is at this column.
    counts = []
    foreach record in records:
        counts[record[column]] += 1

    transform 'counts' into offsets

    copy from 'records' to 'dest' using the calculated offsets

这确实有效,但它有一些丑陋之处。 “计算值的总数”步骤涉及为每列在整个数据集中计算值的总数。 "将temp复制回记录"步骤涉及为每个列复制整个数据集。 除了“按排序顺序从记录复制到dest”的必需步骤外,这些步骤还需要执行。 对于每一列,我们需要三次遍历数据; 最好能少做几次!
我的实现将这些混合在一起:不是分别为每列计算计数,而是作为对数据的第一次扫描一起计算计数。 然后,计数被批量转换为偏移量;最后,数据被就地排序。此外,我不是在每个阶段从“temp”复制到“records”,而是简单地翻页:数据在每两次迭代中交替“来源”。 我的伪代码更像是这样:
def radix-sort(records):

    # Count up how many of each value there is in every column,
    # reading each record only once.
    counts = [,]
    foreach record in records:
        foreach column in columns:
            counts[column, record[column]] += 1

    transform 'counts' for each column into offsets for each column

    temp = []
    for column = 0 to columns step 2:
         sort-by-counts(records, temp, column)
         sort-by-counts(temp, records, column+1)

def sort-by-counts(records, dest, counts, column):
    copy from 'records' to 'dest' using the calculated offsets

这样可以避免不必要的数据旋转:一次设置,六次排序,全部有序。这样做可以让CPU缓存更加快乐。
C代码
这是我完整的sixsort.c实现。它包括了你的qsort解决方案,但被注释掉了,这样更容易比较两者。它包含了大量文档和所有的I/O和度量代码,因此有点长,但其完整性应该可以帮助您更好地理解解决方案:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/time.h>

/* Configuration. */
#define NUM_COLUMNS 6       /* How many columns to read/sort */
#define MAX_VALUE 100       /* Maximum value allowed in each column, plus one */
#define BITSHIFT 7      /* How far to shift bits when combining values */

/* Note: BITSHIFT should be as small as possible, but MAX_VALUE must be <= 2**BITSHIFT. */

/* Highest value representable under the given BITSHIFT. */
#define BITMASK ((1 << BITSHIFT) - 1)

/* The type we're going to pack the integers into.  The highest bits will get the
   leftmost number, while the lowest bits will get the rightmost number. */
typedef unsigned long long BigInt;

/*-------------------------------------------------------------------------------------------------
** Radix Sorting.
*/

/* Move a set of items from src to dest, using the provided counts to decide where each
   item belongs.  src and dest must not be the same arrays. */
static void SortByCounts(BigInt *src, BigInt *dest, size_t *counts, size_t totalCount, int bitshift)
{
    BigInt *temp, *end;

    for (temp = src, end = src + totalCount; temp < end; temp++) {
        BigInt value = *temp;
        int number = (int)((value >> bitshift) & BITMASK);
        int offset = counts[number]++;
        dest[offset] = value;
    }
}

/* Use a radix sort with an internal counting sort to sort the given array of values.
   This iterates over the source data exactly 7 times, which for large
   'count' (i.e., count > ~2000) is typically much more efficient than O(n lg n)
   algorithms like quicksort or mergesort or heapsort.  (That said, the recursive O(n lg n)
   algorithms typically have better memory locality, so which solution wins overall
   may vary depending on your CPU and memory system.) */
static void RadixSortWithInternalCountingSort(BigInt *values, size_t count)
{
    size_t i, j;
    BigInt *temp, *end;
    size_t counts[NUM_COLUMNS][MAX_VALUE];
    size_t oldCount;

    /* Reset the counts of each value in each column to zero, quickly.
       This takes MAX_VALUE * NUM_COLUMNS time (i.e., constant time). */
    memset(counts, 0, sizeof(counts));

    /* Count up how many there are of each value in this column.  This iterates over
       the whole dataset exactly once, processing each set of values in full, so it
       takes COUNT * NUM_COLUMNS operations, or theta(n). */
    for (temp = values, end = values + count; temp < end; temp++) {
        BigInt value = *temp;
        for (i = 0; i < NUM_COLUMNS; i++) {
            counts[i][(int)((value >> (i * BITSHIFT)) & BITMASK)]++;
        }
    }

    /* Transform the counts into offsets.  This only transforms the counts array,
       so it takes MAX_VALUE * NUM_COLUMNS operations (i.e., constant time). */
    size_t totals[NUM_COLUMNS];
    for (i = 0; i < NUM_COLUMNS; i++) {
        totals[i] = 0;
    }
    for (i = 0; i < MAX_VALUE; i++) {
        for (j = 0; j < NUM_COLUMNS; j++) {
            oldCount = counts[j][i];
            counts[j][i] = totals[j];
            totals[j] += oldCount;
        }
    }

    temp = malloc(sizeof(BigInt) * count);

    /* Now perform the actual sorting, using the counts to tell us how to move
       the items.  Each call below iterates over the whole dataset exactly once,
       so this takes COUNT * NUM_COLUMNS operations, or theta(n). */
    for (i = 0; i < NUM_COLUMNS; i += 2) {
        SortByCounts(values, temp, counts[i  ], count,  i    * BITSHIFT);
        SortByCounts(temp, values, counts[i+1], count, (i+1) * BITSHIFT);
    }

    free(temp);
}

/*-------------------------------------------------------------------------------------------------
** Built-in Quicksorting.
*/

static int BigIntCompare(const void *a, const void *b)
{
    BigInt av = *(BigInt *)a;
    BigInt bv = *(BigInt *)b;

    if (av < bv) return -1;
    if (av > bv) return +1;
    return 0;
}

static void BuiltInQuicksort(BigInt *values, size_t count)
{
    qsort(values, count, sizeof(BigInt), BigIntCompare);
}

/*-------------------------------------------------------------------------------------------------
** File reading.
*/

/* Read a single integer from the given string, skipping initial whitespace, and
   store it in 'returnValue'.  Returns a pointer to the end of the integer text, or
   NULL if no value can be read. */
static char *ReadInt(char *src, int *returnValue)
{
    char ch;
    int value;

    /* Skip whitespace. */
    while ((ch = *src) <= 32 && ch != '\r' && ch != '\n') src++;

    /* Do we have a valid digit? */
    if ((ch = *src++) < '0' || ch > '9')
        return NULL;

    /* Collect digits into a number. */
    value = 0;
    do {
        value *= 10;
        value += ch - '0';
    } while ((ch = *src++) >= '0' && ch <= '9');
    src--;

    /* Return what we did. */
    *returnValue = value;
    return src;
}

/* Read a single line of values from the input into 'line', and return the number of
   values that were read on this line. */
static int ReadLine(FILE *fp, BigInt *line)
{
    int numValues;
    char buffer[1024];
    char *src;
    int value;
    BigInt result = 0;

    if (fgets(buffer, 1024, fp) == NULL) return 0;
    buffer[1023] = '\0';

    numValues = 0;
    src = buffer;
    while ((src = ReadInt(src, &value)) != NULL) {
        result |= ((BigInt)value << ((NUM_COLUMNS - ++numValues) * BITSHIFT));
    }

    *line = result;
    return numValues;
}

/* Read from file 'fp', which should consist of a sequence of lines with
   six decimal integers on each, and write the parsed, packed integer values
   to a newly-allocated 'values' array.  Returns the number of lines read. */
static size_t ReadInputFile(FILE *fp, BigInt **values)
{
    BigInt line;
    BigInt *dest;
    size_t count, max;

    count = 0;
    dest = malloc(sizeof(BigInt) * (max = 256));

    while (ReadLine(fp, &line)) {
        if (count >= max) {
            size_t newmax = max * 2;
            BigInt *temp = malloc(sizeof(BigInt) * newmax);
            memcpy(temp, dest, sizeof(BigInt) * count);
            free(dest);
            max = newmax;
            dest = temp;
        }
        dest[count++] = line;
    }

    *values = dest;
    return count;
}

/*-------------------------------------------------------------------------------------------------
** File writing.
*/

/* Given a number from 0 to 999 (inclusive), write its string representation to 'dest'
   as fast as possible.  Returns 'dest' incremented by the number of digits written. */
static char *WriteNumber(char *dest, unsigned int number)
{
    if (number >= 100) {
        dest += 3;
        dest[-1] = '0' + number % 10, number /= 10;
        dest[-2] = '0' + number % 10, number /= 10;
        dest[-3] = '0' + number % 10;
    }
    else if (number >= 10) {
        dest += 2;
        dest[-1] = '0' + number % 10, number /= 10;
        dest[-2] = '0' + number % 10;
    }
    else {
        dest += 1;
        dest[-1] = '0' + number;
    }

    return dest;
}

/* Write a single "value" (one line of content) to the output file. */
static void WriteOutputLine(FILE *fp, BigInt value)
{
    char buffer[1024];
    char *dest = buffer;
    int i;

    for (i = 0; i < NUM_COLUMNS; i++) {
        if (i > 0)
            *dest++ = ' ';
        int number = (value >> (BITSHIFT * (NUM_COLUMNS - i - 1))) & BITMASK;
        dest = WriteNumber(dest, (unsigned int)number);
    }
    *dest++ = '\n';
    *dest = '\0';

    fwrite(buffer, 1, dest - buffer, fp);
}

/* Write the entire output file as a sequence of values in columns. */
static void WriteOutputFile(FILE *fp, BigInt *values, size_t count)
{
    while (count-- > 0) {
        WriteOutputLine(fp, *values++);
    }
}

/*-------------------------------------------------------------------------------------------------
** Timeval support.
*/

int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)  
{  
    /* Perform the carry for the later subtraction by updating y. */  
    if (x->tv_usec < y->tv_usec) {  
        int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;  
        y->tv_usec -= 1000000 * nsec;  
        y->tv_sec += nsec;  
    }  
    if (x->tv_usec - y->tv_usec > 1000000) {  
        int nsec = (y->tv_usec - x->tv_usec) / 1000000;  
        y->tv_usec += 1000000 * nsec;  
        y->tv_sec -= nsec;  
    }  

    /* Compute the time remaining to wait. tv_usec is certainly positive. */  
    result->tv_sec = x->tv_sec - y->tv_sec;  
    result->tv_usec = x->tv_usec - y->tv_usec;  

    /* Return 1 if result is negative. */  
    return x->tv_sec < y->tv_sec;  
}

/*-------------------------------------------------------------------------------------------------
** Main.
*/

int main(int argc, char **argv)
{
    BigInt *values;
    size_t count;
    FILE *fp;
    struct timeval startTime, endTime, deltaTime;

    if (argc != 3) {
        fprintf(stderr, "Usage: sixsort input.txt output.txt\n");
        return -1;
    }

    printf("Reading %s...\n", argv[1]);

    if ((fp = fopen(argv[1], "r")) == NULL) {
        fprintf(stderr, "Unable to open \"%s\" for reading.\n", argv[1]);
        return -1;
    }
    count = ReadInputFile(fp, &values);
    fclose(fp);

    printf("Got %d lines. Sorting.\n", (int)count);

    gettimeofday(&startTime, NULL);
    RadixSortWithInternalCountingSort(values, count);
    /*BuiltInQuicksort(values, count);*/
    gettimeofday(&endTime, NULL);
    timeval_subtract(&deltaTime, &endTime, &startTime);

    printf("Sorted lines in %d seconds and %d microseconds.\n",
        (int)deltaTime.tv_sec, (int)deltaTime.tv_usec);

    printf("Writing %d lines to %s.\n", (int)count, argv[2]);

    if ((fp = fopen(argv[2], "w")) == NULL) {
        fprintf(stderr, "Unable to open \"%s\" for writing.\n", argv[2]);
        return -1;
    }
    WriteOutputFile(fp, values, count);
    fclose(fp);

    free(values);
    return 0;
}

此外,供参考,这是我的糟糕的小样本数据生成器 sixsample.c,使用可预测的LCG随机数生成器,因此它总是生成相同的样本数据。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <math.h>

#define NUM_COLUMNS 6
#define MAX_VALUE 35 

unsigned int RandSeed = 0;

/* Generate a random number from 0 to 65535, inclusive, using a simple (fast) LCG.
   The numbers below are the same as used in the ISO/IEC 9899 proposal for C99/C11. */
static int Rand()
{
    RandSeed = RandSeed * 1103515245 + 12345;
    return (int)(RandSeed >> 16);
}

static int CompareInts(const void *a, const void *b)
{
    return *(int *)a - *(int *)b;
}

static void PrintSortedRandomLine()
{
    int values[NUM_COLUMNS];
    int i;

    for (i = 0; i < NUM_COLUMNS; i++) {
        values[i] = 1 + (int)(pow((double)Rand() / 65536.0, 2.0) * MAX_VALUE);
    }
    qsort(values, NUM_COLUMNS, sizeof(int), CompareInts);

    /* Lame. */
    printf("%d %d %d %d %d %d\n",
        values[0], values[1], values[2], values[3], values[4], values[5]);
}

int main(int argc, char **argv)
{
    unsigned int numLines;
    unsigned int i;

    if (argc < 2 || argc > 3) {
        fprintf(stderr, "Usage: sixsample numLines [randSeed]\n");
        return -1;
    }

    numLines = atoi(argv[1]);
    if (argc > 2)
        RandSeed = atoi(argv[2]);

    for (i = 0; i < numLines; i++) {
        PrintSortedRandomLine();
    }

    return 0;
}

最后,这是一个可以在Ubuntu 16上使用gcc 5.4.0构建的Makefile

all: sixsort sixsample

sixsort: sixsort.c
    gcc -O6 -Wall -o sixsort sixsort.c

sixsample: sixsample.c
    gcc -O -Wall -o sixsample sixsample.c

clean:
    rm -f sixsort sixsample

进一步的优化

我考虑通过使用纯汇编来进一步优化一些部分,但是gcc在内联函数、展开循环和通常情况下使用大多数我本来会自己在汇编中考虑的优化方案时表现出色,因此用这种方式难以改进它。核心操作不太适合使用SIMD指令,因此gcc的版本可能是最快的。

如果我将其编译为64位进程,则可能运行得更快,因为CPU可以将每个完整记录保存在单个寄存器中,但是我的测试Ubuntu VM是32位的,所以就这样吧。

鉴于这些值本身都相当小,您可以通过组合代码点来进一步改进我的解决方案:因此,不必对七位列的每个子排序执行六次计数排序,而是可以以其他方式划分相同的42位:您可以对每个14位列执行三个子排序。这将需要在内存中存储3 * 2 ^ 14(= 3 * 16384 = 49152)个计数和偏移量,而不是6 * 2 ^ 7(= 6 * 128 = 768),但由于内存相当便宜,而且数据仍然适合CPU缓存,因此值得尝试。由于原始数据是两位十进制整数,因此甚至可以将其分成两个6十进制数字列的值,并对每个列的一对子排序进行排序(需要2 * 10 ^ 6 = 2000000个计数和偏移量,这可能仍适合您的CPU缓存,具体取决于您的CPU)。
你也可以并行化!我的实现让一个CPU繁忙,但其他CPU处于空闲状态。因此,使这个过程更快的另一种技术是将源数据分成块,每个CPU核心一个块,独立地对每个块进行排序,然后在最后执行类似于归并排序的“合并”操作来组合结果。如果你发现自己经常运行这个计算,那么并行化工作可能是值得考虑的事情。
结论和其他想法
尽管我没有亲自测试过,但可能公平比较手写快速排序的性能,因为qsort()在比较值时包含函数调用开销。我怀疑手写的解决方案会击败qsort()的5.3秒,但它可能无法与基数排序竞争,这既是由于快速排序的非线性增长特性,也是由于其不太有利的CPU缓存特性(它平均需要读取每个值超过7次,所以从RAM中提取数据可能会更加昂贵)。
此外,现在排序的成本已经被读取文本文件并将结果写回的I/O成本所淹没。即使使用我的自定义解析代码,仍需要几秒钟才能读取3300万条记录,只需花费一小部分时间进行排序,再花费几秒钟将它们写回去。如果您的数据已经在RAM中,则可能无关紧要,但如果它在某个磁盘上,您需要开始找到优化它的方法。但既然问题的目标是优化排序本身,我认为在仅用三分之一秒的时间内对3300万条记录进行排序还不错。

这绝对值得为努力和好奇心打'A'的分数 :) - David C. Rankin
@David,我只希望一年半前就偶然发现了@jhilgeman最初提出这个问题;我不知道他是否已经很久没有需要答案了 :) - Sean Werkema
对于那些无疑会在未来找到这篇文章的人来说,这仍然是一个非常有用的补充。《算法导论-第三版》可以在多个网站上在线获取,我也很喜欢它。 - David C. Rankin
自从90年代中期第一版是我大学课本之一以来,我就一直把《算法导论》放在我的桌子上。我强烈推荐任何人阅读它,不管是平装书、精装书、PDF、Kindle、全息卷轴、直接脑部链接或任何新的高端格式。 - Sean Werkema
你可以轻松地将计数部分并行化,这样就可以加速它而不需要合并。 - Peter K
@PeterK:计数部分可以分布在多个线程上,这是真的。(并且必须是线程,而不是SIMD;在所有这些工作中,SIMD真的没有什么帮助。)但由于计数只是整个过程的一部分,因此并不会因为并行计算而获得巨大的加速。话虽如此,由于所有这些都受到内存限制,因此很难说将其分散到多个核心上真正有多大帮助,因为核心们都可能争夺访问同一条总线线路到主存的权限。 - Sean Werkema

2
乔纳森·莱夫勒(Jonathan Leffler)对重新排序包装的评论非常准确,当我查看你的代码时也有同样的想法。以下是我的方法:
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <string.h> // for memcpy

#define ROW_LENGTH      6
#define ROW_COUNT       15

// 1, 2, 3, 4, 5, 6, 6, 8, 9, 10, 13, 15, 1, 4, 5, 6, 7, 9, 1, 4, 5, 6, 7, 8, 3, 18, 19, 20, 25, 34

/*
    1  2  3  4  5  6
    6  8  9 10 13 15
    1  4  5  6  7  9
    1  4  5  6  7  8
    3 18 19 20 25 34
*/

// Insertion sorting taken from https://dev59.com/8-o6XIcBkEYKwwoYTy8d#2789530  with modification

static __inline__ int sortUlliArray(unsigned long long int *d, int length){
        int i, j;
        for (i = 1; i < length; i++) {
                unsigned long long int tmp = d[i];
                for (j = i; j >= 1 && tmp < d[j-1]; j--)
                        d[j] = d[j-1];
                d[j] = tmp;
        }

        return i; // just to shutup compiler
}


int cmpfunc (const void * a, const void * b)
{
  if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
  if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
  return 0;
}

int main(){
    int array[ROW_COUNT][ROW_LENGTH],
        decodedResultsArray[ROW_COUNT][ROW_LENGTH];

    const int rawData[] = {     1, 2, 3, 4, 5, 6,
                                6, 8, 9, 10, 13, 15,
                                1, 4, 5, 6, 7, 9,
                                1, 4, 5, 6, 7, 8,
                                3, 18, 19, 20, 25, 34,
                                6,17,19,20,22,46,
                                5,10,34,40,41,49,
                                5,19,20,37,53,57,
                                16,23,26,36,42,50,
                                12,19,20,30,47,53,
                                16,25,30,48,56,59,
                                8,11,25,35,36,45,
                                2,12,16,21,32,40,
                                4,6,31,36,37,44,
                                21,28,31,48,52,57
                    };

    memcpy(array, rawData, sizeof(rawData)/sizeof(*rawData)); // copy elements into array memory

    // Sort
    // precompute keys
    unsigned long long int *rowSums = calloc(ROW_COUNT, sizeof(unsigned long long int));
    unsigned long long int *sortedSums = rowSums ? calloc(ROW_COUNT, sizeof(unsigned long long int)) : NULL; // if rowSums is null, don't bother trying to allocate.
    if(!rowSums || !sortedSums){
        free(rowSums);
        free(sortedSums);
        fprintf(stderr, "Failed to allocate memory!\n");
        fflush(stderr); // should be unnecessary, but better to make sure it gets printed
        exit(100);
    }

    int i=0, j=0, k=0;
    for(; i < ROW_COUNT; i++){
        rowSums[i] = 0; // this should be handled by calloc, but adding this for debug
        for(j=0; j < ROW_LENGTH; j++){
            unsigned long long int iScalar=1;
            for(k=ROW_LENGTH-1; k > j; --k)
                iScalar *= 100; // probably not the most efficient way to compute this, but this is meant more as an example/proof of concept

            unsigned long long int iHere = array[i][j];
            rowSums[i] += (iHere * iScalar);

            // printf("DEBUG ITERATION REPORT\n\t\tRow #%d\n\t\tColumn #%d\n\t\tiScalar: %llu\n\t\tiHere: %llu\n\t\tCurrent Sum for Row: %llu\n\n", i, j, iScalar, iHere, rowSums[i]);
            fflush(stdout);
        }
    }

    memcpy(sortedSums, rowSums, sizeof(unsigned long long int)*ROW_COUNT);

    // Some debugging output:
    /*

    printf("Uncopied Sums:\n");
    for(i=0; i < ROW_COUNT; i++)
        printf("SortedRowSums[%d] = %llu\n", i, rowSums[i]);

    printf("Memcopyed sort array:\n");
    for(i=0; i < ROW_COUNT; i++)
        printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);

    */

    clock_t begin = clock();

    //qsort(sortedSums, ROW_COUNT, sizeof(unsigned long long int), cmpfunc);
    sortUlliArray(sortedSums, ROW_COUNT);

    clock_t end = clock();
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;

    printf("Time for sort: %lf\n", time_spent);
    printf("Before sort array:\n");
    for(i=0; i<ROW_COUNT; i++){
        for(j=0; j < ROW_LENGTH; j++){
            printf("Unsorted[%d][%d] = %d\n", i, j, array[i][j]);
        }
    }

    printf("Values of sorted computed keys:\n");
    for(i=0; i < ROW_COUNT; i++)
        printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);

    // Unpack:
    for(i=0; i < ROW_COUNT; i++){
        for(j=0; j < ROW_LENGTH; j++){
            unsigned long long int iScalar=1;
            for(k=ROW_LENGTH-1; k > j; --k)
                iScalar *= 100;

            unsigned long long int removalAmount = sortedSums[i]/iScalar;

            decodedResultsArray[i][j] = removalAmount;
            sortedSums[i] -= (removalAmount*iScalar);
            // DEBUG:
            // printf("Decoded Result for decoded[%d][%d] = %d\n", i, j, decodedResultsArray[i][j]);
        }
    }

    printf("\nFinal Output:\n");
    for(i=0; i < ROW_COUNT; i++){
        printf("Row #%d: %d", i, decodedResultsArray[i][0]);
        for(j=1; j < ROW_LENGTH; j++){
            printf(", %d", decodedResultsArray[i][j]);
        }
        puts("");
    }
    fflush(stdout);

    free(rowSums);
    free(sortedSums);

    return 1;
}

请注意,这不是为了最大效率而进行的优化,代码中还有很多调试输出语句,但它仍然证明了打包的可行性。另外,考虑到您需要处理的行数,最好使用qsort(),但我使用的是sortUlliArray(...)(这是StackOverflow答案中修改过的插入排序函数)。您需要测试一下,看哪种方法在您的情况下性能更好。
总之,在对这15个硬编码行运行此代码后,最终输出如下:
Row #0: 1, 2, 3, 4, 5, 6
Row #1: 1, 4, 5, 6, 7, 8
Row #2: 1, 4, 5, 6, 7, 9
Row #3: 2, 12, 16, 21, 32, 40
Row #4: 3, 18, 19, 20, 25, 34
Row #5: 4, 6, 31, 36, 37, 44
Row #6: 5, 10, 34, 40, 41, 49
Row #7: 5, 19, 20, 37, 53, 57
Row #8: 6, 8, 9, 10, 13, 15
Row #9: 6, 17, 19, 20, 22, 46
Row #10: 8, 11, 25, 35, 36, 45
Row #11: 12, 19, 20, 30, 47, 53
Row #12: 16, 23, 26, 36, 42, 50
Row #13: 16, 25, 30, 48, 56, 59
Row #14: 21, 28, 31, 48, 52, 57

所以,这似乎能处理数字非常相似的情况,这是由于数字被打包的顺序引起的问题。

总之,上面的代码应该可以工作,但它只是个例子,所以我会让你进行必要的优化。

代码在一台配备1.6 GHz英特尔Core i5和4 GB 1600 MHz DDR3内存的MacBook Air 64位上进行了测试。所以,这是一款相当弱的CPU和慢速内存,但它能够在0.004毫秒内对15行进行排序,我认为相当快速。(这只是针对上述测试用例的排序函数速度的度量,不是针对预打包或解包速度的度量,因为这些方面可能需要一些优化。)

主要功劳归功于Doynax和Jonathan Leffler。


这可能是一个愚蠢的问题,但我可以问一下为什么你的sortUlliArray函数返回一个值来“让编译器闭嘴”,而不是只使用void作为返回类型吗? - jhilgeman
很简单,因为这就是参考答案中原始函数的作用。如果我没记错的话,大多数编译器都需要返回类型才能执行内联。因此,我保留了返回类型,并返回了i以抑制编译器警告。这并不一定是处理编译器警告的最佳实践,但它完成了工作。用宏定义替换sortUlliArray(...)可能更好。 - Spencer D
1
虽然插入排序对于小数据集非常快,但它的增长率为O(n^2),对于像@jhilgeman的3300万条记录这样的大型数据集,代码在每个操作的基线上运行约1微秒(这似乎是您在MacBook Air上测量到的),我合理地期望您的代码需要大约35年才能完成。你最好使用一些算法更有效的东西,比如快速排序,或者更好的是像我在我的答案中编写的基数和计数排序。 - Sean Werkema

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接