非递归归并排序

38

有人能用英语解释非递归归并排序是如何工作的吗?

谢谢


1
虽然bobbymcr提供了一个很好的答案,但也要注意递归和迭代在形式上是等价的。请参见https://dev59.com/ZHVC5IYBdhLWcg3w21Mq - Adam Rosenfield
请查看https://dev59.com/wHzaa4cB1Zd3GeqPTbWj#23695092 - rpax
8个回答

28
非递归合并排序通过考虑1、2、4、8、16..2^n窗口大小来处理输入数组。对于每个窗口(下面代码中的“k”),所有相邻的窗口对都合并到一个临时空间中,然后放回到数组中。
这是我用C编写的单函数非递归合并排序。输入和输出在'a'中,临时存储在'b'中。有一天,我想要一个原地版本:
float a[50000000],b[50000000];
void mergesort (long num)
{
    int rght, wid, rend;
    int i,j,m,t;

    for (int k=1; k < num; k *= 2 ) {       
        for (int left=0; left+k < num; left += k*2 ) {
            rght = left + k;        
            rend = rght + k;
            if (rend > num) rend = num; 
            m = left; i = left; j = rght; 
            while (i < rght && j < rend) { 
                if (a[i] <= a[j]) {         
                    b[m] = a[i]; i++;
                } else {
                    b[m] = a[j]; j++;
                }
                m++;
            }
            while (i < rght) { 
                b[m]=a[i]; 
                i++; m++;
            }
            while (j < rend) { 
                b[m]=a[j]; 
                j++; m++;
            }
            for (m=left; m < rend; m++) { 
                a[m] = b[m]; 
            }
        }
    }
}

顺便说一下,证明这是O(n log n)也很容易。外层循环的窗口大小增长为2的幂,因此k有log n次迭代。虽然内层循环覆盖了许多窗口,但对于给定的k,所有窗口恰好覆盖了输入数组,因此内层循环是O(n)的。结合内部和外部循环:O(n)*O(log n) = O(n log n)。

我在这里尝试了类似的方法http://stackoverflow.com/questions/37366365/iterative-non-recursive-merge-sort?noredirect=1#comment62245719_37366365,但是无法解决不符合2 ^ x形式的输入大小,有什么帮助吗? - Aishwat Singh
3
你可以通过合并一些行来大大简化代码,例如将b[m++]=a[i++];替换为b[m]=a[i]; i++; m++; - user3932000
6
虽然通过压缩代码使其难以理解很有趣,但我认为大多数雇主更希望代码更易读,而不是你展示你能在一行中完成多少工作。我建议将 j++ 和 m++ 代码移到单独的行,并且使用一些注释或更有意义的变量名。同时,请保持赋值之间一致的空格。除非你添加尾随空格,否则每个人都喜欢易于阅读的代码。磁盘空间永远不是问题,所有代码都可以编译。复杂的代码就像魔鬼:P - Raishin
2
@Raishin 大多数雇主都在寻找聪明的程序员。 - Handsome Nerd
这段代码非常适合 NVIDIA OptiX 程序,其中不允许递归。 - Serge Rogatch

17

遍历元素并通过交换它们使每个相邻的两个组按顺序排序。

现在,处理两个组的分组(任意两个,最有可能是相邻的组,但您可以使用第一个和最后一个组),通过反复选择每个组中最小值元素将它们合并为一个组,直到所有4个元素都合并成一个4个元素的组。现在,您只剩下4个元素的组和可能的余数。使用循环实现前面逻辑,这次按照4个元素的组进行操作。这个循环一直运行,直到只剩下一个组为止。


3
归并排序可以原地排序,但通常这样做会比较困难。 - bobbymcr
Algorithmist 上的那个看起来不太难。但是,如果您要对一个太大而无法放入内存的数据集进行排序,您将会失去一些局部性。 - John La Rooy
1
啊,你是在谈论归并排序而不是自底向上的归并排序。 - John La Rooy
1
我在询问非递归归并排序,其中自底向上是非递归归并排序。 - DarthVader
1
奇数长度的数组如何分割?似乎最后一个元素可能永远不会被排序。 - Uylenburgh

9

引用自Algorithmist

自底向上的归并排序是归并排序的一种非递归变体,其中数组通过一系列传递进行排序。在每个传递中,该数组被分成大小为m的块。(最初,m=1)相邻的两个块被合并(与正常的归并排序一样),然后下一个传递使用两倍大的值m


5
没问题,各种归并排序的时间复杂度都是O(n log n)。 - bobbymcr

5
使用非递归 MergeSort 的主要原因是为了避免递归堆栈溢出。例如,我正在尝试按字母数字顺序对一亿个记录进行排序,每个记录的长度约为 1 kByte(=100 GB)。一个 O(N^2) 排序需要 10^16 次操作,即使每次比较操作只需要 0.1 微秒,也需要数十年才能运行完毕。而一个 O(N log(N)) 的 MergeSort 只需要少于 10^10 次操作,即在相同的操作速度下不到一小时即可完成。然而,在递归版本的 MergeSort 中,一亿个元素的排序会导致 MergeSort() 调用 5000 万次递归。每次递归堆栈需要几百个字节,即使该进程轻松适应堆内存,也会导致递归堆栈溢出。通过在堆上使用动态分配内存来执行 Merge Sort——我正在使用 Rama Hoetzlein 提供的代码,但我改为使用堆上的动态分配内存而不是使用堆栈——我可以使用非递归合并排序对我的一亿条记录进行排序,而且不会溢出堆栈。这是适合在 Stack Overflow 的网站上讨论的话题!
PS:感谢 Rama Hoetzlein 提供的代码。
PPS:堆上的 100GB?!! 好吧,这是在 Hadoop 集群上的虚拟堆中,并且 MergeSort 将在几台机器上并行实现,共享负载...

5

递归和非递归归并排序的时间复杂度都是O(nlog(n))。这是因为两种方法都以一种或另一种方式使用堆栈。

在非递归方法中,用户/程序员定义并使用堆栈。

在递归方法中,系统内部使用堆栈来存储被递归调用的函数的返回地址。


由于归并排序总是按照相同的顺序执行其分区和排序操作,而不管数据集中项目的初始顺序如何,因此无需使用堆栈来跟踪下一个操作。所需的仅是已知已排序分区的大小(part_size,最初为1)和要合并的第一个这样的分区的索引(next_part,最初为零)。对于每个“步骤”,合并从next_partnext_part+part_size开始的大小为part_size的分区,然后将next_part增加part_size*2。如果这会超出数组的末尾... - supercat
part_size加倍,并将next_part设置为零。无需使用递归。 - supercat

1
我是新来的。 我修改了Rama Hoetzlein的解决方案(感谢您的想法)。我的归并排序不使用最后一个复制回路。此外,它会退回到插入排序。我在我的笔记本电脑上进行了基准测试,它是最快的。甚至比递归版本更好。顺便说一句,它是用Java编写的,可以从降序排序到升序排序。当然,它是迭代的。它可以被多线程化。代码已变得复杂。因此,如果有人感兴趣,请看一下。
代码:
    int num = input_array.length;
    int left = 0;
    int right;
    int temp;
    int LIMIT = 16;
    if (num <= LIMIT)
    {
        // Single Insertion Sort
        right = 1;
        while(right < num)
        {
            temp = input_array[right];
            while(( left > (-1) ) && ( input_array[left] > temp ))
            {
                input_array[left+1] = input_array[left--];
            }
            input_array[left+1] = temp;
            left = right;
            right++;
        }
    }
    else
    {
        int i;
        int j;
        //Fragmented Insertion Sort
        right = LIMIT;
        while (right <= num)
        {
            i = left + 1;
            j = left;
            while (i < right)
            {
                temp = input_array[i];
                while(( j >= left ) && ( input_array[j] > temp ))
                {
                    input_array[j+1] = input_array[j--];
                }
                input_array[j+1] = temp;
                j = i;
                i++;
            }
            left = right;
            right = right + LIMIT;
        }
        // Remainder Insertion Sort
        i = left + 1;
        j = left;
        while(i < num)
        {
            temp = input_array[i];
            while(( j >= left ) && ( input_array[j] > temp ))
            {
                input_array[j+1] = input_array[j--];
            }
            input_array[j+1] = temp;
            j = i;
            i++;
        }
        // Rama Hoetzlein method
        int[] temp_array = new int[num];
        int[] swap;
        int k = LIMIT;
        while (k < num)
        {
            left = 0;
            i = k;// The mid point
            right = k << 1;
            while (i < num)
            {
                if (right > num)
                {
                    right = num;
                }
                temp = left;
                j = i;
                while ((left < i) && (j < right))
                {
                    if (input_array[left] <= input_array[j])
                    {
                        temp_array[temp++] = input_array[left++];
                    }
                    else
                    {
                        temp_array[temp++] = input_array[j++];
                    }
                }
                while (left < i)
                {
                    temp_array[temp++] = input_array[left++];
                }
                while (j < right)
                {
                    temp_array[temp++] = input_array[j++];
                }
                // Do not copy back the elements to input_array
                left = right;
                i = left + k;
                right = i + k;
            }
            // Instead of copying back in previous loop, copy remaining elements to temp_array, then swap the array pointers
            while (left < num)
            {
                temp_array[left] = input_array[left++];
            }
            swap = input_array;
            input_array = temp_array;
            temp_array = swap;
            k <<= 1;
        }
    }

    return input_array;

0

如果还有人在这个帖子里潜伏着...我已经改编了Rama Hoetzlein的非递归归并排序算法,用于对双向链表进行排序。这种新的排序方法是原地排序、稳定的,并且避免了其他链表归并排序实现中耗时的列表分割代码。

// MergeSort.cpp
// Angus Johnson 2017
// License: Public Domain

#include "io.h"
#include "time.h"
#include "stdlib.h"

struct Node {
    int data;
    Node *next;
    Node *prev;
    Node *jump;
};

inline void Move2Before1(Node *n1, Node *n2)
{
    Node *prev, *next;
    //extricate n2 from linked-list ...
    prev = n2->prev;
    next = n2->next;
    prev->next = next; //nb: prev is always assigned
    if (next) next->prev = prev;
    //insert n2 back into list ...  
    prev = n1->prev;
    if (prev) prev->next = n2;
    n1->prev = n2;
    n2->prev = prev;
    n2->next = n1;
}

void MergeSort(Node *&nodes)
{
    Node *first, *second, *base, *tmp, *prev_base;

    if (!nodes || !nodes->next) return;
    int mul = 1;
    for (;;) {
        first = nodes;
        prev_base = NULL;
        //sort each successive mul group of nodes ...
        while (first) {
            if (mul == 1) {
                second = first->next;
                if (!second) { 
                  first->jump = NULL;
                  break;
                }
                first->jump = second->next;
            }
            else
            {
                second = first->jump;
                if (!second) break;
                first->jump = second->jump;
            }
            base = first;
            int cnt1 = mul, cnt2 = mul;
            //the following 'if' condition marginally improves performance 
            //in an unsorted list but very significantly improves
            //performance when the list is mostly sorted ...
            if (second->data < second->prev->data) 
                while (cnt1 && cnt2) {
                    if (second->data < first->data) {
                        if (first == base) {
                            if (prev_base) prev_base->jump = second;
                            base = second;
                            base->jump = first->jump;
                            if (first == nodes) nodes = second;
                        }
                        tmp = second->next;
                        Move2Before1(first, second);
                        second = tmp;
                        if (!second) { first = NULL; break; }
                        --cnt2;
                    }
                    else
                    {
                        first = first->next;
                        --cnt1;
                    }
                } //while (cnt1 && cnt2)
            first = base->jump;
            prev_base = base;
        } //while (first)
        if (!nodes->jump) break;
        else mul <<= 1;
    } //for (;;)
}

void InsertNewNode(Node *&head, int data)
{
    Node *tmp = new Node;
    tmp->data = data;
    tmp->next = NULL;
    tmp->prev = NULL;
    tmp->jump = NULL;
    if (head) {
        tmp->next = head;
        head->prev = tmp;
        head = tmp;
    }
    else head = tmp;
}

void ClearNodes(Node *head)
{
    if (!head) return;
    while (head) {
        Node *tmp = head;
        head = head->next;
        delete tmp;
    }
}

int main()
{  
    srand(time(NULL));
    Node *nodes = NULL, *n;
    const int len = 1000000; //1 million nodes 
    for (int i = 0; i < len; i++)
        InsertNewNode(nodes, rand() >> 4);

    clock_t t = clock();
    MergeSort(nodes);    //~1/2 sec for 1 mill. nodes on Pentium i7. 
    t = clock() - t;
    printf("Sort time: %d msec\n\n", t * 1000 / CLOCKS_PER_SEC);

    n = nodes;
    while (n)
    {
        if (n->prev && n->data < n->prev->data) { 
            printf("oops! sorting's broken\n"); 
            break;
        }
        n = n->next;
    }
    ClearNodes(nodes);
    printf("All done!\n\n");
    getchar();
    return 0;
}

编辑于2017年10月27日:修复了影响奇数列表的错误


0

还有人对此感兴趣吗?可能没有了。不过没关系,我们还是来试试看。

归并排序的精髓在于,你可以将两个(或多个)小的已排序记录运行合并成一个更大的已排序记录,并且你可以通过简单的流式操作“读取第一个/下一个记录”和“追加记录”来完成这个过程——这意味着你不需要一次性在RAM中拥有大量数据集:你只需要用来自不同运行的两个记录即可。如果你能跟踪到文件中已排序运行的起始位置和结束位置,你就可以简单地重复将相邻的运行对合并(到一个临时文件中),直到文件被排序:这需要对文件进行对数次遍历。

单个记录很容易排序:每次合并两个相邻的运行时,每个运行的大小都会翻倍。所以这是一种跟踪方式。另一种方法是使用运行的优先队列。从队列中取出最小的两个运行,将它们合并,并将结果入队,直到只剩下一个运行为止。如果你期望你的数据自然地以排序运行开始,那么这种方法是适当的。

在处理大量数据集时,您需要利用内存层次结构。假设您有几十亿字节的RAM和数千亿字节的数据。为什么不一次合并一千个运行呢?确实可以这样做,并且运行的优先级队列可以帮助您。这将显着减少您必须对文件进行的排序次数。某些细节留给读者作为练习。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接