在Java中实现资源读写锁

6
我正在尝试为被多个线程并发访问的资源实现一个简单的读写锁。工作线程随机尝试读取或写入共享对象。当设置读锁时,工作线程在解除锁定之前不能进行写操作。当设置写锁时,不允许读取和写入。 虽然我的实现似乎有效,但我认为它在概念上是错误的。
正在进行的读操作应该允许同时发生更多的读操作,从而导致读取的总数大于写入的数量。我的程序产生的数字遵循由工作者执行这些操作的概率。
我觉得我的实现实际上根本不是并发的,但我很难找出错误。我真的很希望能指点一下正确的方向。
调度和终止工作线程的主类:
class Main {

    private static final int THREAD_NUMBER = 4;

    public static void main(String[] args) {
        // creating workers
        Thread[] workers = new Thread[THREAD_NUMBER];
        for (int i = 0; i < THREAD_NUMBER; i++) {
            workers[i] = new Thread(new Worker(i + 1));
        }
        System.out.println("Spawned workers: " + THREAD_NUMBER);

        // starting workers
        for (Thread t : workers) {
            t.start();
        }
        try {
            Thread.sleep((long) 10000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // stopping workers
        System.out.println("Stopping workers...");
        for (Thread t : workers) {
            t.interrupt();
        }
    }
}

资源类(Resource class):
class Resource {

    enum ResourceLock {
        ON,
        OFF
    } 

    private static Resource instance = null;
    private ResourceLock writeLock = ResourceLock.OFF;
    private ResourceLock readLock = ResourceLock.OFF;

    private Resource() {}

    public static synchronized Resource getInstance() {
        if (instance == null) {
            instance = new Resource();
        }
        return instance;
    }

    public ResourceLock getWriteLock() {
        return writeLock;
    }
    public ResourceLock getReadLock() {
        return readLock;
    }
    public void setWriteLock() {
        writeLock = ResourceLock.ON;
    }
    public void setReadLock() {
        readLock = ResourceLock.ON;
    }
    public void releaseWriteLock() {
        writeLock = ResourceLock.OFF;
    }
    public void releaseReadLock() {
        readLock = ResourceLock.OFF;
    }
}

最后是Worker类:

import java.util.Random;

class Worker implements Runnable {

    private static final double WRITE_PROB = 0.5;
    private static Random rand = new Random();
    private Resource res;
    private int id;

    public Worker(int id) {
        res = Resource.getInstance();
        this.id = id;
    }

    public void run() {
        message("Started.");
        while (!Thread.currentThread().isInterrupted()) {
            performAction();
        }
    }

    private void message(String msg) {
        System.out.println("Worker " + id + ": " + msg);
    }

    private void read() {
        synchronized(res) {
            while (res.getWriteLock() == Resource.ResourceLock.ON) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            res.setReadLock();
            // perform read
            try {
                Thread.sleep((long) 500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            res.releaseReadLock();
            res.notifyAll();
        }
        message("Finished reading.");
    }

    private void write() {
        synchronized(res) {
            while (res.getWriteLock() == Resource.ResourceLock.ON || res.getReadLock() == Resource.ResourceLock.ON) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            res.setWriteLock();
            // perform write
            try {
                Thread.sleep((long) 500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            res.releaseWriteLock();
            res.notifyAll();
        }
        message("Finished writing.");
    }

    private void performAction() {
        double r = rand.nextDouble();
        if (r <= WRITE_PROB) {
            write();
        } else {
            read();
        }
    }
}

为什么要分别使用读锁和写锁呢?这是因为我想同时原子化锁定操作和其查询。

以下是在0.5写入概率下的输出示例:

Spawned workers: 4
Worker 2: Started.
Worker 3: Started.
Worker 1: Started.
Worker 4: Started.
Worker 2: Finished writing.
Worker 4: Finished reading.
Worker 1: Finished writing.
Worker 3: Finished writing.
Worker 1: Finished reading.
Worker 4: Finished writing.
Worker 2: Finished reading.
Worker 4: Finished reading.
Worker 1: Finished reading.
Worker 3: Finished writing.
Worker 1: Finished writing.
Worker 4: Finished writing.
Worker 2: Finished writing.
Worker 4: Finished writing.
Worker 1: Finished reading.
Worker 3: Finished writing.
Worker 1: Finished writing.
Worker 4: Finished reading.
Worker 2: Finished writing.
Stopping workers...
Worker 4: Finished writing.
Worker 1: Finished writing.
Worker 3: Finished reading.
Worker 2: Finished reading.

非常感谢您的帮助。


你为什么觉得你的实现不是并发的? - Alain-Michel Chomnoue N
正如您所看到的,操作日志的记录大致上具有读取和写入操作的相等分布。这不应该是这样的,因为读取操作具有较少限制的锁定,并且从长远来看应该会导致更多的读取操作。因此,我猜测问题在于似乎没有写入者饥饿,如果这些工作人员同时工作,那么根据我的理解,应该会发生写入者饥饿的情况。 - Jan Parzydło
1
如果您使用synchronize访问,则会有一个独占锁。您必须从头开始构建自己的锁定机制。 - Ben Manes
2
只要整个操作都在synchronized块内,就不可能有并发。一旦将操作移出synchronized块,它就会中断,因为每个读取器在结束时都会执行readLock = ResourceLock.OFF,无论有多少读取器。这在概念层面上就行不通了。您需要一个计数器来记住读取器的数量以支持多个读取器。除此之外,您应该将逻辑放入Resource类中,而不是使其成为白盒结构,并希望调用者正确实现逻辑。 - Holger
2个回答

12
您正在一个synchronized块内执行整个操作,因此没有并发。此外,没有任何锁类型的优先级,因为最多只有一个线程可以拥有锁。如果不在synchronized块中执行整个操作,则无法使用当前代码,因为每个读取器不管有多少个读取器都会在结尾处执行readLock = ResourceLock.OFF。如果没有计数器,则无法正确支持多个读取器。

除此之外,这是一种奇怪的代码结构,提供一个Resource类来维护状态,但完全由调用者决定如何处理它。这不是处理责任和封装的方式。

实现可能如下:

class ReadWriteLock {
    static final int WRITE_LOCKED = -1, FREE = 0;

    private int numberOfReaders = FREE;
    private Thread currentWriteLockOwner;

    public synchronized void acquireReadLock() throws InterruptedException {
        while(numberOfReaders == WRITE_LOCKED) wait();
        numberOfReaders++;
    }
    public synchronized void releaseReadLock() {
        if(numberOfReaders <= 0) throw new IllegalMonitorStateException();
        numberOfReaders--;
        if(numberOfReaders == FREE) notifyAll();
    }
    public synchronized void acquireWriteLock() throws InterruptedException {
        while(numberOfReaders != FREE) wait();
        numberOfReaders = WRITE_LOCKED;
        currentWriteLockOwner = Thread.currentThread();
    }
    public synchronized void releaseWriteLock() {
        if(numberOfReaders!=WRITE_LOCKED || currentWriteLockOwner!=Thread.currentThread())
            throw new IllegalMonitorStateException();
        numberOfReaders = FREE;
        currentWriteLockOwner = null;
        notifyAll();
    }
}

它简单地使用已获取读锁的计数器,当存在写锁时将计数器设置为-1(因此无法嵌套写锁)。只有在没有写锁时才能成功获取读锁,因此不需要为它们实现优先级,当另一个线程已经拥有真正的锁时,成功的可能性就足够了。实际上,当读者数量显著大于作者时,您可能会遇到“饥饿作家”问题。(参见维基百科) worker简化为:
class Worker implements Runnable {
    private static final double WRITE_PROB = 0.5;
    private static final Random rand = new Random();
    private final ReadWriteLock theLock;
    private final int id;

    public Worker(int id, ReadWriteLock lock) {
        theLock = lock;
        this.id = id;
    }

    public void run() {
        message("Started.");
        while(!Thread.currentThread().isInterrupted()) {
            performAction();
        }
    }

    private void message(String msg) {
        System.out.println("Worker " + id + ": " + msg);
    }

    private void read() {
        try {
            theLock.acquireReadLock();
        } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // perform read
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally { theLock.releaseReadLock(); }
        message("Finished reading.");
    }

    private void write() {
        try {
            theLock.acquireWriteLock();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // perform write
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally { theLock.releaseWriteLock(); }
        message("Finished writing.");
    }

    private void performAction() {
        double r = rand.nextDouble();
        if (r <= WRITE_PROB) {
            write();
        } else {
            read();
        }
    }
}

请注意,我在这里避免了全局变量。锁应该传递给构造函数。当在获取锁期间被中断时,方法返回非常重要。像您原来的代码中那样自我中断并重试获取会导致无限循环,因为下一个等待将再次抛出InterruptedException,在恢复当前线程的中断状态后。当然,没有获得锁就继续进行也是错误的,因此唯一有效的选择是不恢复中断状态或立即返回。
您主程序的唯一更改是构造和传递锁实例:
ReadWriteLock sharedLock = new ReadWriteLock();
// creating workers
Thread[] workers = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
    workers[i] = new Thread(new Worker(i + 1, sharedLock));
}
System.out.println("Spawned workers: " + THREAD_NUMBER);

// starting workers
for (Thread t : workers) {
    t.start();
}
try {
    Thread.sleep(10000);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

// stopping workers
System.out.println("Stopping workers...");
for (Thread t : workers) {
    t.interrupt();
}

为什么 numberOfReaders 不是 volatile 的?它的值不可能被缓存在每个线程中吗? - nkr
2
@nkr,当每个访问都在synchronized方法内时,无需使用volatile - Holger

2
这是一个简单的实现,使用ReadWriteLock,更加注重写操作的优先级:
public class ReadWriteLock{

  private int readers       = 0;
  private int writers       = 0;
  private int writeRequests = 0;

  public synchronized void lockRead() throws InterruptedException{
    while(writers > 0 || writeRequests > 0){
      wait();
    }
    readers++;
  }

  public synchronized void unlockRead(){
    readers--;
    notifyAll();
  }

  public synchronized void lockWrite() throws InterruptedException{
    writeRequests++;

    while(readers > 0 || writers > 0){
      wait();
    }
    writeRequests--;
    writers++;
  }

  public synchronized void unlockWrite() throws InterruptedException{
    writers--;
    notifyAll();
  }
}

来源:http://tutorials.jenkov.com/java-concurrency/read-write-locks.html

读写锁是一种特殊的锁,它允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。在读取共享资源时,读写锁可以提高并发性能和吞吐量。Java中的ReentrantReadWriteLock类提供了读写锁实现。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接