将缓冲区写入Java通道:线程安全还是不安全?

4
考虑以下代码片段,它只是将someByteBuffer的内容写入标准输出:
// returns an instance of "java.nio.channels.Channels$WritableByteChannelImpl"
WritableByteChannel w = Channels.newChannel(System.out);
w.write(someByteBuffer);

Java规定通道一般应该是安全的,可供多线程访问,而缓冲区不适合被多个并发线程使用
因此,我想知道上面的代码片段是否需要同步,因为它在某个缓冲区(不是线程安全的)上调用了通道的write方法(该方法应该是线程安全的)。
我查看了write方法的实现
public int write(ByteBuffer src) throws IOException {
    int len = src.remaining();
    int totalWritten = 0;
    synchronized (writeLock) {
        while (totalWritten < len) {
            int bytesToWrite = Math.min((len - totalWritten),
                                        TRANSFER_SIZE);
            if (buf.length < bytesToWrite)
                buf = new byte[bytesToWrite];
            src.get(buf, 0, bytesToWrite);
            try {
                begin();
                out.write(buf, 0, bytesToWrite);
            } finally {
                end(bytesToWrite > 0);
            }
            totalWritten += bytesToWrite;
        }
        return totalWritten;
    }
}

请注意,除了方法中的前两行外,所有内容都由 writeLock 同步。现在,由于 ByteBuffer src 不是线程安全的,在没有适当同步的情况下调用 src.remaining() 是有风险的,因为另一个线程可能会更改它。

我应该在上面的片段中同步行 w.write(someByteBuffer),还是我漏掉了什么,Java 实现的 write() 方法已经处理了这个问题?

编辑: 这是一个经常抛出 BufferUnderflowException 的示例代码,因为我注释掉了最后的 synchronized 块。删除那些注释将使代码无异常。

import java.nio.*;
import java.nio.channels.*;

public class Test {
    public static void main(String[] args) throws Exception {

        ByteBuffer b = ByteBuffer.allocate(10);
        b.put(new byte[]{'A', 'B', 'C', 'D', 'E', 'F', 'G', '\n'});

        // returns an instance of "java.nio.channels.Channels$WritableByteChannelImpl"
        WritableByteChannel w = Channels.newChannel(System.out);

        int c = 10;
        Thread[] r = new Thread[c];
        for (int i = 0; i < c; i++) {
            r[i] = new Thread(new MyRunnable(b, w));
            r[i].start();
        }
    }    
}

class MyRunnable implements Runnable {
    private final ByteBuffer b;
    private final WritableByteChannel w;

    MyRunnable(ByteBuffer b, WritableByteChannel w) {
        this.b = b;
        this.w = w;
    }

    @Override
    public void run() {
        try {
            // synchronized (b) {
                b.flip();
                w.write(b);
            // }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

未来参考:通道是线程安全的。例如,考虑一个在多个线程之间共享的打印机通道。如果所有线程开始向通道写入作业,则它们的作业将不会交错。另一方面,如果多个线程可以访问打印机缓冲区,则它们可以在打印过程中更改它,除非进行适当的同步。 - Sadeq Dousti
感谢您的快速接受;我很高兴能够帮助。我也很欣赏这个写得清晰明了的问题。在结尾额外引用“摘要问题”的想法不错! - GhostCat
@GhostCat:非常感谢您!我认为我已经相当理解了通道、缓冲区和线程安全方面的内容 :) - Sadeq Dousti
2个回答

5
重点是:如果您的设置允许多个线程干扰该缓冲区对象,那么您就会面临线程问题。这很简单!
问题不在于channel.write()是否线程安全。了解这一点很好,但不是问题的核心!
真正的问题是:您的代码正在对缓冲区做什么?
即使此通道实现在某些内容上进行了锁定,操作数据来自外部!有各种各样的事情可能发生在进入此方法的src对象上——而此通道正在忙于写入缓冲区!
换句话说:这段代码是否“安全”完全取决于并行使用src缓冲区对象的您的代码正在做什么。
鉴于OP的评论:核心问题是:您必须确保使用该字节缓冲区的任何活动都是线程安全的。在给定的示例中,我们有两个操作:
b.flip();
w.write(b);

每个线程都会执行这两个操作;因此:当确保只有一个线程可以进行这两个调用时(如图所示;通过查看共享缓冲区对象),你就做到了。

其实很简单:如果你有“共享数据”,那么你必须确保读/写该“共享数据”的线程以某种方式进行了同步,以避免竞态条件。


感谢您对可能出现的错误进行了解释。我编辑了问题,并添加了一个带有“注释”的synchronized块的代码。除非取消注释此块,否则经常会引发异常。假设已删除注释,那么我现在做得对了吗? - Sadeq Dousti
请查看我对答案的更新;是的,根据你的示例代码,删除那些注释将“修复”问题,并为您提供一个线程安全的解决方案,应该始终正常工作。 - GhostCat
ByteBuffer 不允许子类。但在写入过程中操作 src 缓冲区并不需要有一个子类。顺便说一下,如果目标是让多个线程写入相同的不变内容,则最好在启动线程之前只执行一次 flip(),然后让每个线程执行 w.write(b.asReadOnlyBuffer());。这不会复制数据,而是创建一个独立的视图,具有自己的位置、标记和限制,因此线程之间不会有任何干扰,因此不需要额外的同步。 - Holger

3

如果你在多个线程中进行多次写入,并且希望保证这些写入的原子性,那么只需要锁定通道即可,此时需要锁定System.out对象。

如果你在多个线程之间共享一个不安全的数据结构,则需要添加锁定。如果可以的话,我建议避免在多个线程中使用ByteBuffer。


1
最后一句话“如果可以的话,避免在多个线程中使用ByteBuffer”是一个非常宝贵的建议!我会尽量记住这点在生产环境中。然而,出于教育目的,我在我的问题中添加了一个示例代码,演示了在没有同步的情况下可能出现的问题。请注意,锁定ByteBuffer也可以起作用(而不是锁定通道)。那么这样做是不是一个不好的实践呢? - Sadeq Dousti
@M.S.Dousti,如果ByteBuffer实际上是全局锁,则锁定它是有效的。但这对性能来说非常不利。 - Peter Lawrey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接