Java.net.Socket 如何保证线程安全?

11

我有一个Socket,通过BufferedReader和BufferedWriter进行读写操作。我不确定哪些操作可以从不同的线程中执行。我猜测如果同时从两个不同的线程向Socket写入数据可能会出现问题。在两个不同的线程中同时从Socket读取数据也是如此。那么在一个线程上读取,在另一个线程上写入呢?

我之所以问这个问题,是因为我想要一个线程长时间地阻塞在读取操作上等待更多的数据,但在此期间我偶尔也需要发送一些数据到Socket上。我不清楚这是否是线程安全的,或者是否应该在写入之前取消读取操作(这将很麻烦)。

6个回答

7

在流级别上,套接字不是线程安全的。你必须提供同步。唯一的保证是,无论并发如何,你都不会在不同的读取调用中获得完全相同的字节副本。

但是在Reader和Writer级别上,你可能会遇到一些锁定问题

无论如何,你可以像处理完全独立的对象一样处理Socket的流的读写操作(它们确实是独立的,它们唯一共享的是它们的生命周期)。

一旦你正确地在读线程和写线程之间提供了同步,任何数量的读者和写者都没有问题。这意味着,是的,你可以在一个线程上读取,在另一个线程上写入(事实上这很常见),而且你不必在写入时停止读取。

最后一个建议:所有涉及线程的操作都有关联的超时时间,请确保正确处理超时。


1
即使是 getInputStream() 也不是线程安全的,因为它以一种非线程安全的方式执行 isConnected()。如果您没有提供同步,则根本不应该让多个线程使用它。 - Op De Cirkel

5

你实际上是从InputStream读取并写入OutputStream。它们相当独立,只要对它们的每个访问进行串行化,就可以了。

然而,你必须将发送的数据与接收到的数据进行关联。这与线程安全性不同。


3
你的意思是什么?“对它们中的每一个进行串行访问”是什么意思? - dhblah
2
@gasan:不要同时使用两个线程进行写入,或者同时使用两个线程进行读取。从某个同步块/方法中调用读取/写入操作(一个锁用于读取,一个锁用于写入)。 - Paŭlo Ebermann
5
只要对它们每个都进行串行访问:这并不足以使访问在多线程中安全,你必须确保使用相同的锁来管理套接字状态以实现线程安全。我建议,最好不要这样做。使用缓存区,在一个线程中管理套接字的读/写操作。 - Op De Cirkel

2

Java中的java.net.Socket实际上并不是线程安全的:打开Socket源代码,看一下(假设)connected成员字段以及它的使用方式。您会发现它不是volatile,在没有同步的情况下进行读取和更新。这表明Socket类不适用于多个线程使用。虽然有一些锁和同步,但它们不是一致的。

我建议不要这样做。最终,使用缓冲区(nio),并在一个线程中执行套接字读取/写入操作。

有关详细信息,请参见讨论


2
有很多代码会因为这个假设而崩溃。'connected'成员只在套接字的生命周期中设置一次。基本问题是输入流与输出流是分开的,底层TCP流是全双工的:没有线程不安全的情况。 - user207421
输入流与输出流是分开的。如果它们是不同的对象,这并不意味着它们不访问套接字的相同状态。浏览一下SocketSocketImplPlainSocketImpl等的源代码,你会发现这些流实际上有共同的状态。这个类没有设计用于多线程(给我一个Java-doc/规范,证明它是安全的)的原因足以不这样做。
- Op De Cirkel
别开玩笑了,libc根本没有套接字支持,所以那完全不相关。我们正在讨论java.net中的Java API。如果它不打算被多个线程使用,那么所有的acquireFD()/releaseFD()调用是为什么呢? - user207421
在 libc 中根本没有套接字支持。http://www.gnu.org/s/hello/manual/libc/index.html#toc_Sockets - Op De Cirkel
“我不认为有很多代码。”:我们也不是在谈论你的想法,而是在谈论事实。您还没有回答我的关于获取和释放FD的问题,也没有解决我非常明显的反例来反驳你所谓的“不认为”。 - user207421
显示剩余11条评论

1

你可以有一个线程读取套接字,另一个线程写入它。如果你想让多个线程写入套接字,那么你必须使用同步来序列化你的访问,或者你可以有一个单独的写入线程,从队列中获取要写入的数据。(我更喜欢前者)

你可以使用非阻塞IO,在单个线程中共享读写工作。然而,这实际上更加复杂和棘手,需要正确处理。如果你想这样做,我建议你使用Netty或Mina等库来帮助你。


0

非常有趣,nio的SocketChannel写操作是同步的

http://www.docjar.com/html/api/sun/nio/ch/SocketChannelImpl.java.html

旧的io套接字依赖于操作系统,因此您必须查看操作系统本地代码才能确定(这可能因操作系统而异)...

只需查看返回Socket.getOutputStream的java.net.SocketOutputStream.java即可。

(当然,除非我错过了什么)。

哦,还有一件事,他们可能已经在每个操作系统上的每个JVM中的本机代码中放置了同步,但是谁知道呢。 只有nio明显存在同步。


0

这是原生代码中的socketWrite,因此从代码上来说它不是线程安全的

JNIEXPORT void JNICALL
Java_java_net_SocketOutputStream_socketWrite0(JNIEnv *env, jobject this,
                                              jobject fdObj,
                                              jbyteArray data,
                                              jint off, jint len) {
    char *bufP;
    char BUF[MAX_BUFFER_LEN];
    int buflen;
    int fd;

    if (IS_NULL(fdObj)) {
        JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
        return;
    } else {
        fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
        /* Bug 4086704 - If the Socket associated with this file descriptor
         * was closed (sysCloseFD), the the file descriptor is set to -1.
         */
        if (fd == -1) {
            JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
            return;
        }

    }

    if (len <= MAX_BUFFER_LEN) {
        bufP = BUF;
        buflen = MAX_BUFFER_LEN;
    } else {
        buflen = min(MAX_HEAP_BUFFER_LEN, len);
        bufP = (char *)malloc((size_t)buflen);

        /* if heap exhausted resort to stack buffer */
        if (bufP == NULL) {
            bufP = BUF;
            buflen = MAX_BUFFER_LEN;
        }
    }

    while(len > 0) {
        int loff = 0;
        int chunkLen = min(buflen, len);
        int llen = chunkLen;
        (*env)->GetByteArrayRegion(env, data, off, chunkLen, (jbyte *)bufP);

        while(llen > 0) {
            int n = NET_Send(fd, bufP + loff, llen, 0);
            if (n > 0) {
                llen -= n;
                loff += n;
                continue;
            }
            if (n == JVM_IO_INTR) {
                JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);
            } else {
                if (errno == ECONNRESET) {
                    JNU_ThrowByName(env, "sun/net/ConnectionResetException",
                                    "Connection reset");
                } else {
                    NET_ThrowByNameWithLastError(env, "java/net/SocketException",
                                                 "Write failed");
                }
            }
            if (bufP != BUF) {
                free(bufP);
            }
            return;
        }
        len -= chunkLen;
        off += chunkLen;
    }

    if (bufP != BUF) {
        free(bufP);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接