非阻塞I/O与使用线程(上下文切换有多糟糕?)

10

我在工作中经常使用套接字,并且有时需要同时处理来自大约100台机器的连接。我们使用非阻塞I/O结合状态表进行管理,也使用传统的Java套接字(使用线程)。

我们遇到了很多非阻塞套接字的问题,个人而言我更喜欢使用线程来处理套接字。因此我的问题是:

使用单线程的非阻塞套接字可以节省多少资源?使用线程会涉及多少上下文切换,使用Java的线程模型可以扩展到多少并发连接?

3个回答

10

根据服务器的活动配置,I/O和非阻塞I/O的选择因情况而异。例如,如果您使用长连接并且有成千上万的客户端,则由于系统资源耗尽,I/O可能会变得过于昂贵。但是,不会占用CPU缓存的直接I/O比非阻塞I/O更快。有一篇很好的文章介绍了这方面的知识 - 编写Java多线程服务器-新旧交替.

关于上下文切换成本 - 它是相当便宜的操作。请考虑以下简单测试:

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class AAA {

    private static final long DURATION = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
    private static final int THREADS_NUMBER = 2;
    private static final ThreadLocal<AtomicLong> COUNTER = new ThreadLocal<AtomicLong>() {
        @Override
        protected AtomicLong initialValue() {
            return new AtomicLong();
        }
    };
    private static final ThreadLocal<AtomicLong> DUMMY_DATA = new ThreadLocal<AtomicLong>() {
        @Override
        protected AtomicLong initialValue() {
            return new AtomicLong();
        }
    };
    private static final AtomicLong DUMMY_COUNTER = new AtomicLong();
    private static final AtomicLong END_TIME = new AtomicLong(System.nanoTime() + DURATION);

    private static final List<ThreadLocal<CharSequence>> DUMMY_SOURCE = new ArrayList<ThreadLocal<CharSequence>>();
    static {
        for (int i = 0; i < 40; ++i) {
            DUMMY_SOURCE.add(new ThreadLocal<CharSequence>());
        }
    }

    private static final Set<Long> COUNTERS = new ConcurrentSkipListSet<Long>();

    public static void main(String[] args) throws Exception {
        final CountDownLatch startLatch = new CountDownLatch(THREADS_NUMBER);
        final CountDownLatch endLatch = new CountDownLatch(THREADS_NUMBER);

        for (int i = 0; i < THREADS_NUMBER; i++) {
            new Thread() {
                @Override
                public void run() {
                    initDummyData();
                    startLatch.countDown();
                    try {
                        startLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    while (System.nanoTime() < END_TIME.get()) {
                        doJob();
                    }
                    COUNTERS.add(COUNTER.get().get());
                    DUMMY_COUNTER.addAndGet(DUMMY_DATA.get().get());
                    endLatch.countDown();
                }
            }.start();
        }
        startLatch.await();
        END_TIME.set(System.nanoTime() + DURATION);

        endLatch.await();
        printStatistics();
    }

    private static void initDummyData() {
        for (ThreadLocal<CharSequence> threadLocal : DUMMY_SOURCE) {
            threadLocal.set(getRandomString());
        }
    }

    private static CharSequence getRandomString() {
        StringBuilder result = new StringBuilder();
        Random random = new Random();
        for (int i = 0; i < 127; ++i) {
            result.append((char)random.nextInt(0xFF));
        }
        return result;
    }

    private static void doJob() {
        Random random = new Random();
        for (ThreadLocal<CharSequence> threadLocal : DUMMY_SOURCE) {
            for (int i = 0; i < threadLocal.get().length(); ++i) {
                DUMMY_DATA.get().addAndGet(threadLocal.get().charAt(i) << random.nextInt(31));
            }
        }
        COUNTER.get().incrementAndGet();
    }

    private static void printStatistics() {
        long total = 0L;
        for (Long counter : COUNTERS) {
            total += counter;
        }
        System.out.printf("Total iterations number: %d, dummy data: %d, distribution:%n", total, DUMMY_COUNTER.get());
        for (Long counter : COUNTERS) {
            System.out.printf("%f%%%n", counter * 100d / total);
        }
    }
}

我为两种情况分别进行了两个测试,一个是两个线程的情况,另一个是十个线程的情况。测试结果显示性能损失约为2.5%(两个线程为78626次迭代,十个线程为76754次迭代),线程使用系统资源基本相等。

此外,'java.util.concurrent' 的作者认为上下文切换时间约为2000-4000个CPU周期:

public class Exchanger<V> {
   ...
   private static final int NCPU = Runtime.getRuntime().availableProcessors();
   ....
   /**
    * The number of times to spin (doing nothing except polling a
    * memory location) before blocking or giving up while waiting to
    * be fulfilled.  Should be zero on uniprocessors.  On
    * multiprocessors, this value should be large enough so that two
    * threads exchanging items as fast as possible block only when
    * one of them is stalled (due to GC or preemption), but not much
    * longer, to avoid wasting CPU resources.  Seen differently, this
    * value is a little over half the number of cycles of an average
    * context switch time on most systems.  The value here is
    * approximately the average of those across a range of tested
    * systems.
    */
   private static final int SPINS = (NCPU == 1) ? 0 : 2000; 

非常感谢,有一个测试用例很好。 - Benj
感谢您发布了“编写Java多线程服务器-往事如新”的链接。我忘记了它的名字,找不到它。 - Adam Paynter

3

针对您的问题,最好的方法可能是编写一个测试程序,获取一些硬性测量数据,并根据这些数据做出最佳决策。当我试图做出此类决策时,通常会这样做,有硬性数据可以支持您的论点。

不过,在开始之前,您需要多少个线程?您的软件在什么类型的硬件上运行?


好主意,我所工作的程序是点对点的,其中一个节点可能会与100多个其他节点进行通信。这些节点可以是Linux / Windows / Mac(各种不同的版本),通常在办公环境中运行在性能良好的PC上(即2个或更多CPU)。 - Benj

1

对于100个连接,使用阻塞IO并为每个连接使用两个线程(一个用于读取和写入)可能不会出现问题。在我看来,这是最简单的模型。

然而,您可能会发现使用JMS更好地管理您的连接。如果您使用像ActiveMQ这样的东西,您可以合并所有连接。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接