活锁的好例子是什么?

174

我了解什么是死锁,但我想知道是否有一个良好的基于代码的例子? 当我说基于代码的,我并不是指“两个人试图在走廊中相互穿过对方”的例子。如果我再读一遍,我会吐的。


114
两个人在走廊里相互擦肩而过的场景可以做一个软件模拟吗? - 1800 INFORMATION
43
该怨恨你了!我把午餐都吐出来了! - Alex Miller
3
奇妙的贴切:http://seuss.wikia.com/wiki/The_Zax这是一个关于两只顽固的动物相遇的故事。它们互相碰撞,因为他们都不愿意让路给对方,最终导致了僵局。这个故事试图告诉我们不要过分坚持自己的立场,而应该学会妥协和合作。 - NotMe
两个线程相互等待对方通知,这也是活锁,不是吗? - Sudhakar
4
两个人试图在走廊里相互擦肩而过。:https://gist.github.com/deepankarb/d2dd6f21bc49902376e614d3746b8965 :p - iceman
显示剩余3条评论
12个回答

140

这是一个很简单的Java示例,演示了活锁的情况。其中一对夫妇正在尝试吃汤,但只有一把勺子。每个配偶都太有礼貌了,如果另一个人还没有吃过,就会将勺子递给对方。

public class Livelock {
    static class Spoon {
        private Diner owner;
        public Spoon(Diner d) { owner = d; }
        public Diner getOwner() { return owner; }
        public synchronized void setOwner(Diner d) { owner = d; }
        public synchronized void use() { 
            System.out.printf("%s has eaten!", owner.name); 
        }
    }
    
    static class Diner {
        private String name;
        private boolean isHungry;
        
        public Diner(String n) { name = n; isHungry = true; }       
        public String getName() { return name; }
        public boolean isHungry() { return isHungry; }
        
        public void eatWith(Spoon spoon, Diner spouse) {
            while (isHungry) {
                // Don't have the spoon, so wait patiently for spouse.
                if (spoon.owner != this) {
                    try { Thread.sleep(1); } 
                    catch(InterruptedException e) { continue; }
                    continue;
                }                       
            
                // If spouse is hungry, insist upon passing the spoon.
                if (spouse.isHungry()) {                    
                    System.out.printf(
                        "%s: You eat first my darling %s!%n", 
                        name, spouse.getName());
                    spoon.setOwner(spouse);
                    continue;
                }
                
                // Spouse wasn't hungry, so finally eat
                spoon.use();
                isHungry = false;               
                System.out.printf(
                    "%s: I am stuffed, my darling %s!%n", 
                    name, spouse.getName());                
                spoon.setOwner(spouse);
            }
        }
    }
    
    public static void main(String[] args) {
        final Diner husband = new Diner("Bob");
        final Diner wife = new Diner("Alice");
        
        final Spoon s = new Spoon(husband);
        
        new Thread(new Runnable() { 
            public void run() { husband.eatWith(s, wife); }   
        }).start();

        new Thread(new Runnable() { 
            public void run() { wife.eatWith(s, husband); } 
        }).start();
    }
}

运行该程序,你将会得到以下结果:
Bob: You eat first my darling Alice!
Alice: You eat first my darling Bob!
Bob: You eat first my darling Alice!
Alice: You eat first my darling Bob!
Bob: You eat first my darling Alice!
Alice: You eat first my darling Bob!
...

如果没有干扰,这将永远持续下去。这是一个活锁,因为Alice和Bob都在无限循环中反复要求对方先行动(因此称为)。在死锁情况下,Alice和Bob都会被冻结在等待对方先行动的状态下,除了等待什么也不做(因此称为)。


7
“getOwner”方法也需要同步吗?根据《Effective Java》中的说法:“同步没有效果,除非读和写都进行同步。” - Sanghyun Lee
2
他应该使用Thread.join()而不是Thread.sleep(),因为他想等待配偶吃完饭。 - Solace
1
在这个特定的例子中,我们应该怎么做才能克服活锁问题? - Thor
1
getOwner 方法必须同步,因为即使 setOwner 是同步的,这也不能保证使用 getOwner(或直接访问 owner 字段)的线程将看到执行 setOwner 的其他线程所做的更改。 这个视频非常仔细地解释了这一点:https://www.youtube.com/watch?v=WTVooKLLVT8 - Timofey
3
对于setOwner方法,您不需要使用synchronized关键字,因为对于引用变量的读写是原子操作。 - atiqkhaled
显示剩余6条评论

82

抛开轻浮的评论,一个已知的例子是在试图检测和处理死锁情况的代码中。如果两个线程检测到死锁并试图为对方“让路”,如果不小心,它们将被困在一次又一次的“让路”循环中,永远无法向前移动。

通过“让路”,我指的是它们将释放锁并尝试让另一个线程获取它。我们可以用两个线程执行以下伪代码来想象这种情况:

// thread 1
getLocks12(lock1, lock2)
{
  lock1.lock();
  while (lock2.locked())
  {
    // attempt to step aside for the other thread
    lock1.unlock();
    wait();
    lock1.lock();
  }
  lock2.lock();
}

// thread 2
getLocks21(lock1, lock2)
{
  lock2.lock();
  while (lock1.locked())
  {
    // attempt to step aside for the other thread
    lock2.unlock();
    wait();
    lock2.lock();
  }
  lock1.lock();
}

除了竞争条件之外,我们在这里面对的情况是,如果两个线程同时进入,它们将在内部循环中运行而没有进一步操作。显然,这是一个简化的示例。一种天真的修复方法是在线程等待的时间上加入某种随机性。

正确的解决方案是始终遵守锁层次结构。选择一种获取锁的顺序并坚持下去。例如,如果两个线程总是在获取lock1之前先获取lock2,则不可能发生死锁。


是的,我理解。我正在寻找一个实际的代码示例。问题是“step aside”是什么意思,它如何产生这样的情况。 - Alex Miller
1
我知道这只是一个人为制造的例子,但这是否可能导致活锁?难道不是更有可能最终会出现一个窗口,使得一个函数可以抓住两个线程,因为线程被允许运行的时间和它们被调度的时间不一致吗? - DubiousPusher
1
虽然它不是一个稳定的活锁,因为它们显然最终会打破它,但我认为它足够符合描述。 - 1800 INFORMATION
优秀且有意义的例子。 - alecov

8
由于没有标记为“已接受答案”的答案,所以我尝试创建了一个实时锁定的示例;原始程序是我在2012年4月编写的,旨在学习多线程的各种概念。这次我修改了它以创建死锁、竞态条件、活锁等。
因此,让我们先了解一下问题陈述; 饼干制造商问题 有一些成分容器:巧克力粉容器小麦粉容器CookieMaker从成分容器中取出一些粉末来烤 Cookie 。 如果找不到一个装满粉末的容器,那么饼干制造商会检查另一个容器以节省时间。 并等待 Filler 填充所需容器。 有一个 Filler 会定期检查容器,并在需要时填充一些物品。
请在GitHub上查看完整代码;
让我简要地解释一下实现。
  • 我将 Filler 作为守护线程启动。 因此,它将在定期间隔上填充容器。 首先锁定容器以填充容器->检查其是否需要粉末->填充它->向等待它的所有制造商发出信号->解锁容器。
  • 我创建一个 CookieMaker 并设置它可以同时烘烤8个饼干。 然后我启动8个线程来烘焙饼干。
  • 每个制造者线程创建2个可调用的子线程,从容器中取出粉末。
  • 子线程对容器进行锁定,并检查它是否有足够的粉末。如果没有,则等待一段时间。 一旦填充器填满了容器,它会取下粉末,然后解锁容器。
  • 现在它完成其他活动,如:制作混合物、烘焙等。
让我们看看代码: Cookiemaker.java
private Integer getMaterial(final Ingredient ingredient) throws Exception{
        :
        container.lock();
        while (!container.getIngredient(quantity)) {
            container.empty.await(1000, TimeUnit.MILLISECONDS);
            //Thread.sleep(500); //For deadlock
        }
        container.unlock();
        :
}

IngredientContainer.java

public boolean getIngredient(int n) throws Exception {
    :
    lock();
    if (quantityHeld >= n) {
        TimeUnit.SECONDS.sleep(2);
        quantityHeld -= n;
        unlock();
        return true;
    }
    unlock();
    return false;
}

一切都很正常,直到Filler开始填充容器。但是如果我忘记启动填充器,或者填充器意外离开,子线程会不断地改变它们的状态以允许其他制造者去检查容器。

我还创建了一个名为ThreadTracer的守护进程,它监视线程状态和死锁。这是控制台的输出:

2016-09-12 21:31:45.065 :: [Maker_0:WAITING, Maker_1:WAITING, Maker_2:WAITING, Maker_3:WAITING, Maker_4:WAITING, Maker_5:WAITING, Maker_6:WAITING, Maker_7:WAITING, pool-7-thread-1:TIMED_WAITING, pool-7-thread-2:TIMED_WAITING, pool-8-thread-1:TIMED_WAITING, pool-8-thread-2:TIMED_WAITING, pool-6-thread-1:TIMED_WAITING, pool-6-thread-2:TIMED_WAITING, pool-5-thread-1:TIMED_WAITING, pool-5-thread-2:TIMED_WAITING, pool-1-thread-1:TIMED_WAITING, pool-3-thread-1:TIMED_WAITING, pool-2-thread-1:TIMED_WAITING, pool-1-thread-2:TIMED_WAITING, pool-4-thread-1:TIMED_WAITING, pool-4-thread-2:RUNNABLE, pool-3-thread-2:TIMED_WAITING, pool-2-thread-2:TIMED_WAITING]
2016-09-12 21:31:45.065 :: [Maker_0:WAITING, Maker_1:WAITING, Maker_2:WAITING, Maker_3:WAITING, Maker_4:WAITING, Maker_5:WAITING, Maker_6:WAITING, Maker_7:WAITING, pool-7-thread-1:TIMED_WAITING, pool-7-thread-2:TIMED_WAITING, pool-8-thread-1:TIMED_WAITING, pool-8-thread-2:TIMED_WAITING, pool-6-thread-1:TIMED_WAITING, pool-6-thread-2:TIMED_WAITING, pool-5-thread-1:TIMED_WAITING, pool-5-thread-2:TIMED_WAITING, pool-1-thread-1:TIMED_WAITING, pool-3-thread-1:TIMED_WAITING, pool-2-thread-1:TIMED_WAITING, pool-1-thread-2:TIMED_WAITING, pool-4-thread-1:TIMED_WAITING, pool-4-thread-2:TIMED_WAITING, pool-3-thread-2:TIMED_WAITING, pool-2-thread-2:TIMED_WAITING]
WheatPowder Container has 0 only.
2016-09-12 21:31:45.082 :: [Maker_0:WAITING, Maker_1:WAITING, Maker_2:WAITING, Maker_3:WAITING, Maker_4:WAITING, Maker_5:WAITING, Maker_6:WAITING, Maker_7:WAITING, pool-7-thread-1:TIMED_WAITING, pool-7-thread-2:TIMED_WAITING, pool-8-thread-1:TIMED_WAITING, pool-8-thread-2:TIMED_WAITING, pool-6-thread-1:TIMED_WAITING, pool-6-thread-2:TIMED_WAITING, pool-5-thread-1:TIMED_WAITING, pool-5-thread-2:TIMED_WAITING, pool-1-thread-1:TIMED_WAITING, pool-3-thread-1:TIMED_WAITING, pool-2-thread-1:TIMED_WAITING, pool-1-thread-2:TIMED_WAITING, pool-4-thread-1:TIMED_WAITING, pool-4-thread-2:TIMED_WAITING, pool-3-thread-2:TIMED_WAITING, pool-2-thread-2:RUNNABLE]
2016-09-12 21:31:45.082 :: [Maker_0:WAITING, Maker_1:WAITING, Maker_2:WAITING, Maker_3:WAITING, Maker_4:WAITING, Maker_5:WAITING, Maker_6:WAITING, Maker_7:WAITING, pool-7-thread-1:TIMED_WAITING, pool-7-thread-2:TIMED_WAITING, pool-8-thread-1:TIMED_WAITING, pool-8-thread-2:TIMED_WAITING, pool-6-thread-1:TIMED_WAITING, pool-6-thread-2:TIMED_WAITING, pool-5-thread-1:TIMED_WAITING, pool-5-thread-2:TIMED_WAITING, pool-1-thread-1:TIMED_WAITING, pool-3-thread-1:TIMED_WAITING, pool-2-thread-1:TIMED_WAITING, pool-1-thread-2:TIMED_WAITING, pool-4-thread-1:TIMED_WAITING, pool-4-thread-2:TIMED_WAITING, pool-3-thread-2:TIMED_WAITING, pool-2-thread-2:TIMED_WAITING]

您会注意到子线程及其状态的更改和等待。


6
一个真实的例子(尽管没有确切的代码)是两个竞争的进程在尝试纠正SQL服务器死锁时出现了活锁,每个进程都使用相同的等待重试算法进行重试。虽然这是一种时间上的幸运,但我曾经看到它发生在具有类似性能特征的不同机器上,响应于添加到EMS主题的消息(例如多次保存单个对象图的更新),并且无法控制锁定顺序。
在这种情况下,一个好的解决方案是使用竞争消费者(通过在无关对象上分割工作,尽可能高地防止重复处理)。
一个不太理想的(好吧,肮脏的黑客)解决方案是预先打破时间上的坏运气(在处理中强制产生差异)或者使用不同的算法或某些随机元素来打破死锁后。这仍然可能存在问题,因为每个进程的锁定顺序可能是“粘性的”,而这需要一定的最小时间,在等待重试中没有考虑到。
另一种解决方案(至少对于SQL Server)是尝试不同的隔离级别(例如快照)。

2

我编写了两个人在走廊里相遇的示例代码。当他们意识到他们的方向相同时,两个线程将会避免相撞。

public class LiveLock {
    public static void main(String[] args) throws InterruptedException {
        Object left = new Object();
        Object right = new Object();
        Pedestrian one = new Pedestrian(left, right, 0); //one's left is one's left
        Pedestrian two = new Pedestrian(right, left, 1); //one's left is two's right, so have to swap order
        one.setOther(two);
        two.setOther(one);
        one.start();
        two.start();
    }
}

class Pedestrian extends Thread {
    private Object l;
    private Object r;
    private Pedestrian other;
    private Object current;

    Pedestrian (Object left, Object right, int firstDirection) {
        l = left;
        r = right;
        if (firstDirection==0) {
            current = l;
        }
        else {
            current = r;
        }
    }

    void setOther(Pedestrian otherP) {
        other = otherP;
    }

    Object getDirection() {
        return current;
    }

    Object getOppositeDirection() {
        if (current.equals(l)) {
            return r;
        }
        else {
            return l;
        }
    }

    void switchDirection() throws InterruptedException {
        Thread.sleep(100);
        current = getOppositeDirection();
        System.out.println(Thread.currentThread().getName() + " is stepping aside.");
    }

    public void run() {
        while (getDirection().equals(other.getDirection())) {
            try {
                switchDirection();
                Thread.sleep(100);
            } catch (InterruptedException e) {}
        }
    }
} 

2

jelbourn的代码的C#版本:

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace LiveLockExample
{
    static class Program
    {
        public static void Main(string[] args)
        {
            var husband = new Diner("Bob");
            var wife = new Diner("Alice");

            var s = new Spoon(husband);

            Task.WaitAll(
                Task.Run(() => husband.EatWith(s, wife)),
                Task.Run(() => wife.EatWith(s, husband))
                );
        }

        public class Spoon
        {
            public Spoon(Diner diner)
            {
                Owner = diner;
            }


            public Diner Owner { get; private set; }

            [MethodImpl(MethodImplOptions.Synchronized)]
            public void SetOwner(Diner d) { Owner = d; }

            [MethodImpl(MethodImplOptions.Synchronized)]
            public void Use()
            {
                Console.WriteLine("{0} has eaten!", Owner.Name);
            }
        }

        public class Diner
        {
            public Diner(string n)
            {
                Name = n;
                IsHungry = true;
            }

            public string Name { get; private set; }

            private bool IsHungry { get; set; }

            public void EatWith(Spoon spoon, Diner spouse)
            {
                while (IsHungry)
                {
                    // Don't have the spoon, so wait patiently for spouse.
                    if (spoon.Owner != this)
                    {
                        try
                        {
                            Thread.Sleep(1);
                        }
                        catch (ThreadInterruptedException e)
                        {
                        }

                        continue;
                    }

                    // If spouse is hungry, insist upon passing the spoon.
                    if (spouse.IsHungry)
                    {
                        Console.WriteLine("{0}: You eat first my darling {1}!", Name, spouse.Name);
                        spoon.SetOwner(spouse);
                        continue;
                    }

                    // Spouse wasn't hungry, so finally eat
                    spoon.Use();
                    IsHungry = false;
                    Console.WriteLine("{0}: I am stuffed, my darling {1}!", Name, spouse.Name);
                    spoon.SetOwner(spouse);
                }
            }
        }
    }
}

2
考虑一个具有50个进程插槽的UNIX系统。
有十个程序正在运行,每个程序都需要创建6个(子)进程。
在每个进程创建了4个进程之后,这10个原始进程和40个新进程已经用完了表格。现在,这10个原始进程中的每个进程都处于一个无限循环中,不断地进行分叉和失败——这正是死锁的情况。这种情况发生的概率非常小,但确实可能发生。

1
这里的一个例子可能是使用定时的“tryLock”来获取多个锁,如果无法获取所有锁,则退回并重试。
boolean tryLockAll(Collection<Lock> locks) {
  boolean grabbedAllLocks = false;
  for(int i=0; i<locks.size(); i++) {
    Lock lock = locks.get(i);
    if(!lock.tryLock(5, TimeUnit.SECONDS)) {
      grabbedAllLocks = false;

      // undo the locks I already took in reverse order
      for(int j=i-1; j >= 0; j--) {
        lock.unlock();
      }
    }
  }
}

我可以想象这样的代码会有问题,因为你有很多线程相互冲突并等待获取一组锁。但是对我来说,这不是一个非常有说服力的简单示例。


2
要使这成为活锁,您需要另一个线程以不同的顺序获取那些锁。如果所有线程都按相同顺序在locks中使用tryLockAll(),则没有活锁。 - JaviMerino

1

jelbourn的代码的Python版本:

import threading
import time
lock = threading.Lock()

class Spoon:
    def __init__(self, diner):
        self.owner = diner

    def setOwner(self, diner):
        with lock:
            self.owner = diner

    def use(self):
        with lock:
            "{0} has eaten".format(self.owner)

class Diner:
    def __init__(self, name):
        self.name = name
        self.hungry = True

    def eatsWith(self, spoon, spouse):
        while(self.hungry):
            if self != spoon.owner:
                time.sleep(1) # blocks thread, not process
                continue

            if spouse.hungry:
                print "{0}: you eat first, {1}".format(self.name, spouse.name)
                spoon.setOwner(spouse)
                continue

            # Spouse was not hungry, eat
            spoon.use()
            print "{0}: I'm stuffed, {1}".format(self.name, spouse.name)
            spoon.setOwner(spouse)

def main():
    husband = Diner("Bob")
    wife = Diner("Alice")
    spoon = Spoon(husband)

    t0 = threading.Thread(target=husband.eatsWith, args=(spoon, wife))
    t1 = threading.Thread(target=wife.eatsWith, args=(spoon, husband))
    t0.start()
    t1.start()
    t0.join()
    t1.join()

if __name__ == "__main__":
    main()

错误:在use()函数中,print未被使用,更重要的是,hungry标志未被设置为False。 - GDR

0
我修改了@jelbourn的答案。 当其中一个注意到另一个饿了,他(她)应该释放勺子并等待另一个通知,这样就会发生活锁。
public class LiveLock {
    static class Spoon {
        Diner owner;

        public String getOwnerName() {
            return owner.getName();
        }

        public void setOwner(Diner diner) {
            this.owner = diner;
        }

        public Spoon(Diner diner) {
            this.owner = diner;
        }

        public void use() {
            System.out.println(owner.getName() + " use this spoon and finish eat.");
        }
    }

    static class Diner {
        public Diner(boolean isHungry, String name) {
            this.isHungry = isHungry;
            this.name = name;
        }

        private boolean isHungry;
        private String name;


        public String getName() {
            return name;
        }

        public void eatWith(Diner spouse, Spoon sharedSpoon) {
            try {
                synchronized (sharedSpoon) {
                    while (isHungry) {
                        while (!sharedSpoon.getOwnerName().equals(name)) {
                            sharedSpoon.wait();
                            //System.out.println("sharedSpoon belongs to" + sharedSpoon.getOwnerName())
                        }
                        if (spouse.isHungry) {
                            System.out.println(spouse.getName() + "is hungry,I should give it to him(her).");
                            sharedSpoon.setOwner(spouse);
                            sharedSpoon.notifyAll();
                        } else {
                            sharedSpoon.use();
                            sharedSpoon.setOwner(spouse);
                            isHungry = false;
                        }
                        Thread.sleep(500);
                    }
                }
            } catch (InterruptedException e) {
                System.out.println(name + " is interrupted.");
            }
        }
    }

    public static void main(String[] args) {
        final Diner husband = new Diner(true, "husband");
        final Diner wife = new Diner(true, "wife");
        final Spoon sharedSpoon = new Spoon(wife);

        Thread h = new Thread() {
            @Override
            public void run() {
                husband.eatWith(wife, sharedSpoon);
            }
        };
        h.start();

        Thread w = new Thread() {
            @Override
            public void run() {
                wife.eatWith(husband, sharedSpoon);
            }
        };
        w.start();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        h.interrupt();
        w.interrupt();

        try {
            h.join();
            w.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接