Java如何避免在循环中使用Thread.sleep()

26

从我的主函数中,我启动了两个线程,称为生产者和消费者。两个线程都包含while(true)循环。生产者循环是UDP服务器,因此不需要睡眠。我的问题在于消费者循环。消费者循环从链式队列中删除对象,并将其传递给一个函数进行进一步处理。根据我的研究,在循环中使用线程睡眠并不是一个好习惯,因为有时操作系统不会在设定的时间结束时释放它。如果我在应用程序处于空闲状态时删除线程休眠,则会将CPU拉到20到30%。

class Producer implements Runnable {
    private DatagramSocket dsocket;
    FError fer = new FError();

    int port =1548;
    ConcurrentLinkedQueue<String> queue;

    Producer(ConcurrentLinkedQueue<String> queue){
        this.queue = queue; 
    }

    @Override
    public void run() {

        try {

            // Create a socket to listen on the port.
            dsocket = new DatagramSocket(port);
            // Create a buffer to read datagrams into.
            byte[] buffer = new byte[30000];
            // Create a packet to receive data into the buffer
            DatagramPacket packet = new DatagramPacket(buffer,
            buffer.length);

            while (true) {
                try {

                   // Wait to receive a datagram
                    dsocket.receive(packet);
                    //Convert the contents to a string,
                    String msg = new String(buffer, 0, packet.getLength());

                    int ltr = msg.length();
                     // System.out.println("MSG =" + msg);

                    if(ltr>4)
                    {

                        SimpleDateFormat sdfDate = new SimpleDateFormat  ("yyyy-MM-dd HH:mm:ss");//dd/MM/yyyy

                        Date now = new Date();
                        String strDate = sdfDate.format(now);

                        //System.out.println(strDate);

                        queue.add(msg + "&" + strDate);

                     // System.out.println("MSG =" + msg);
                    }

                  // Reset the length of the packet before reusing it.
                   packet.setLength(buffer.length);

                } catch (IOException e) {
                    fer.felog("svr class", "producer", "producer thread",e.getClass().getName() + ": " + e.getMessage());
                    dsocket.close();
                    break; 
                }
            }

        } catch (SocketException e) {
          fer.felog("svr class", "producer","Another App using the udp port " + port, e.getClass().getName() + ": " + e.getMessage()); 

        }

    }

}

class Consumer implements Runnable {

    String str;  
    ConcurrentLinkedQueue<String> queue;

    Consumer(ConcurrentLinkedQueue<String> queue) {
        this.queue = queue;  
    }

    @Override
    public void run() {

        while (true) {
            try {

                while ((str = queue.poll()) != null) {

                    call(str);  // do further processing

                   }
            } catch (IOException e) {
                ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
                break;
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException ex) {

                ferpt.felog("svr class", "consumer","sleep", ex.getClass().getName() + ": " + ex.getMessage());
            }

        }

    }

}

3
建议阅读一下 java.util.concurrent 包中可用的各种类别,有助于更好地理解。 - Michael Krause
3
传统的生产者/消费者模式会使用监视器锁定来阻塞消费者,直到生产者有东西可以提供。您可以使用阻塞队列来实现相同的结果。 - MadProgrammer
3
好的,我会尽力做到最好。下面是需要翻译的内容:Why not use BlockingQueue - Scary Wombat
感谢您的所有答案。在我的情况下,我无法使用阻塞队列。生产者将随时在UDP端口上接收消息。消费者部分必须逐个删除并对消息进行进一步处理,这需要一些时间,因此我无法使用阻塞。 - Jro
3个回答

28

你可以通过使用ScheduledExecutorService而不是让消费者extend Runnable,来改变你的代码并使其每半秒轮询队列,而不是让线程睡眠。 以下是一个示例:

public void schedule() {
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    executor.scheduleAtFixedRate(() -> {
        String str;
        try {
            while ((str = queue.poll()) != null) {
                call(str);  // do further processing
            }
        } catch (IOException e) {
            ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
        }
    }, 0, 500, TimeUnit.MILLISECONDS);
}

感谢您的迅速回复,我会尝试并告诉您结果。 - Jro

11
正确的解决方案是使用阻塞队列。它给你带来了几个优势:
- 不浪费 CPU 忙等待 - 可以有限的容量 - 想象一下你有一个快速的生产者,但是一个慢速的消费者 -> 如果队列没有限制大小,那么你的应用程序很容易达到内存溢出的情况
这里有一个小的演示,你可以试试玩:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProdConsTest {
    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        final Runnable producer = () -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    System.out.println("Producing: " + i);
                    queue.put(i);

                    //Adjust production speed by modifying the sleep time
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
            }
        };

        final Runnable consumer = () -> {
            while (true) {
                final Integer integer;
                try {
                    //Uncomment to simulate slow consumer:
                    //Thread.sleep(1000);

                    integer = queue.take();
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
                System.out.println("Consumed: " + integer);
            }
        };


        final Thread consumerThread = new Thread(consumer);
        consumerThread.start();

        final Thread producerThread = new Thread(producer);
        producerThread.start();

        producerThread.join();
        consumerThread.interrupt();
        consumerThread.join();
    }
}

现在取消注释消费者中的sleep()并观察应用程序的情况。如果您使用的是基于定时器的解决方案,如建议的ScheduledExecutorService,或者您一直在忙等待,那么在生产者速度很快的情况下,队列将不受控制地增长,并最终导致应用程序崩溃。

谢谢你的答复。我已经实现了scheduledExectorService,它运行得很好。正如我所提到的,生产者正在侦听UDP端口,随时可以接收数据包,因此我无法使用阻塞队列。消费者比生产者快得多。调用方法将对象交给多线程类。不管怎样,我也会尝试你的方法。 - Jro

1
让消费者在它们都能访问的对象上 wait(),并让生产者在此对象上向监听器 notify() 发送新消息。消费者应该删除所有消息,而不仅仅是像示例中那样删除单个消息。

5
这与阻塞队列的作用相同(完全比建议使用定时任务更可取)。没有必要重新发明它。 - Nathan Hughes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接