在两个线程之间共享一个ArrayList?

5

我有两个线程正在运行,其中一个线程应该从用户那里获取信息,另一个线程应该处理用户提供的信息,具体如下:

public class UserRequest implements Runnable {

@Override
public void run() {
    // TODO Auto-generated method stub
    String request;
    Scanner input = new Scanner(System.in);
    while(true)
    {
        System.out.println("Please enter request:");
        request = input.nextLine();
        try
        {
            //do something
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }

}

第二个线程:

public class Poller implements Runnable {

ArrayList<String> colors = new ArrayList<String>();

public void poll()
{
    for(String color : colors)
    {
        if(color == "")
        {
            //do work
        }
        else
        {
            //do work
        }
    }
}

@Override
public void run() {

    colors.add("Violet");
    colors.add("Green");
    colors.add("Yellow");
    colors.add("Orange");

    while(true)
        poll();     
}
}

我想做的是将用户在UserRequest对象中输入的任何内容推送到Poller对象中的ArrayList,以便它也可以“处理”新值。我已经查看了一些类似于BlockingQueue的东西,但我不希望任何线程等待另一个线程,因为它们除了共享数据之外还有其他任务需要完成。我该怎么做?

ArrayList不是线程安全的。 - SLaks
有什么线程安全的替代方案,我应该如何在线程之间共享它? - Jenna Maiz
正如许多人指出的那样,队列可能是您正在寻找的。您可以使用ConcurrentLinkedQueue来实现线程安全而无需过多锁定的功能。 - KennethJ
5个回答

5

由于您使用了“push”和“poll”动词,看起来您正在寻找一个Queue而不是一个List

因此,我认为您正在寻找ConcurrentLinkedQueue,文档在这里

它允许您将UserRequest对象提供给它,将Poller对象消耗掉。

尽管似乎您的Poller对象将具有相当高的CPU消耗,因为开放的while没有任何wait

public class Poller implements Runnable {
  Queue<String> colors = new ConcurrentLinkedQueue<String>();

  public void poll() {
    while(this.colors.isEmpty()){
      Thread.currentThread().wait();
    }

    String color = this.colors.poll();

    while(color != null) {
      if(color == "") {
        //do work

      } else {
        //do work
      }

      color = this.colors.poll();
    }
  }

  @Override
  public void run() {
    colors.offer("Violet");
    colors.offer("Green");
    colors.offer("Yellow");
    colors.offer("Orange");

    while(true) {

      this.poll();
    }
  }
}

这段代码需要进行一些更改才能运行,但它包含了你所需的几乎所有内容。它的作用非常简单:不断轮询直到没有剩余元素。一旦发生这种情况,Poller对象会要求当前的Thread休眠,因为在Queue中没有元素时运行它是没有意义的。
public class UserRequest implements Runnable {

  @Override
  public void run() {
    String request;
    Scanner input = new Scanner(System.in);

    while(true) {
      System.out.println("Please enter request:");
      request = input.nextLine();

      try {
        //do something

      } catch(IOException e) {
        e.printStackTrace();

      } finally {
        this.notifyAll(); // Notifies all sleeping threads to wake up
      }
    }
  }

如果你注意到了,我只是在你的UserRequest类中添加了一个notifyAll调用。为什么?非常简单:notifyAll唤醒所有等待的线程,这正是所有没有元素的Poller正在做的事情。一旦被调用,Poller将会被唤醒,检查它们的颜色Queue是否有元素并处理它们。如果队列没有元素,它们将再次睡眠,直到UserRequest再次唤醒它们,如此循环往复。

3

有两种方法可以解决这个问题:

1) 使用线程安全的集合,例如ConcurrentLinkedQueue用于生产者-消费者逻辑、作业消耗等。如果您想使用实现了List接口的类(因此您可以使用与通常的ArrayList相同的方法),您必须考虑CopyOnWriteArrayList,但请注意,此类使用阻塞同步。

2) 另一种方法是使用内置的Java同步工具,例如

如需了解更多详情,请查阅规范。让我们考虑使用信号量的示例:

private final Semaphore semaphore = new Semaphore(2, true);

   public void appendToList() throws InterruptedException {
     available.acquire();
     arrayList.add(.....); //put here what u need
   }

   public void putItem(Object x) {
     if (someLogicHere(x)) //semaphore releases counter in this place
       available.release();
   }

当然,你可以将它们全部结合起来使用,例如可以同时使用几个 信号量,或使用不同的工具。

1

“但我不希望任何一个线程等待另一个,因为它们除了共享数据外还有其他任务需要完成。”

无法实现这一点。对该类的任何适当线程都将始终存在问题,即您需要让一个线程等待而另一个线程执行某些操作。然而,重点是要尽量减少这种情况。您只想在极短且很少的情况下使线程停顿,并且仅在不这样做会导致其故障的情况下才这样做。您可以使用其中一个同步数据结构,或者只需编写一小段同步代码。

唯一涉及的对象是arraylist,您希望任何一个线程上的最小停顿时间。因此,您希望根据arraylist本身的对象对其进行同步。因此,只需在访问arraylist对象的点周围编写一些同步块即可。

public class Poller implements Runnable {

    ArrayList<String> colors;

    public Poller(ArrayList<String> colors) {
        this.colors = colors;
        //pass in colors object, if modified from the scanner side it must synchronize the block around the colors object too.
    }

    public void doWork(String color) {
        //do work
    }

    public void addColor(String color) {
        synchronized (colors) {
            colors.add(color);
        }
    }

    @Override
    public void run() {
        while (!Thread.interrupted())
            if (!colors.isEmpty()) {
                String color;
                synchronized (colors) {
                    color = colors.remove(0);
                }
                doWork(color); //work done outside synch
            }
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

重点是永远不要同时删除或添加列表中的内容。您不能循环整个列表,因为如果在循环中进行操作,这将是一个问题,并且数组的大小可能会更改,因此您不知道它有多大。但是,您可以使用ArrayList来解决此问题,只需同步更改数据结构的代码块,并从该同步块中获取字符串,然后对其进行操作。这样,唯一的停顿是一个线程正在读取或写入,而另一个线程需要读取或写入。这两种操作都非常快速。

0

你可以使用队列。队列有自己的poll方法。你可以将其设置为静态,但我怀疑这不是最好的方法。通常我使用Spring在某种包装类中实例化队列,但看起来你没有采取这种方式。


我同意将Queue设为静态并不是最好的方法。OP也没有提到spring,即使使用注入也无法解决问题,因为你仍然需要知道你正在注入什么。 - Zeh

0

如果您想从轮询器对象中访问用户输入的新值,则:

  • 由于对象存储在堆中,因此在Poller类中不需要创建ArrayList的新实例,而是可以从UserRequest发送列表对象的引用。这样,当您在userRequest中向ArrayList添加新值时,它将反映在Poller正在使用的ArrayList中。

例如,您可以这样做:

 public class UserRequest implements Runnable {

private ArrayList<String> arrayList  = new ArrayList<String>();

@Override
public void run() {
    // TODO Auto-generated method stub
    String request;
    Scanner input = new Scanner(System.in);
    while(true)
    {
        System.out.println("Please enter request:");
        request = input.nextLine();
        try
        {

         Poller poller = new Poller(arrayList);
         Thread t = new Thread(poller);
         t.start();

        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }

}

你可以像这样更改你的Poller类:
 public class Poller implements Runnable {
  private ArrayList arrayList = null;    

  Poller(ArrayList<String> arrayList){
     this.arrayList = arrayList; 
   }

public void poll()
{
    for(String color : arrayList)
    {
        if(color == "")
        {
            //do work
        }
        else
        {
            //do work
        }
    }
}

@Override
public void run() {

       while(true){
        poll();   
     }    
}

但是,你不应该在一个无限循环中调用池,而是应该向你的ArrayList添加一个监听器,这样当新值被添加到列表时,你才会调用poll()

你可以查看这个链接了解更多关于如何向ArrayList添加监听器的信息:https://dev59.com/K3LYa4cB1Zd3GeqPTA8y#16529462


在多线程环境中使用 ArrayList 可能会导致 ConcurrentModificationException 异常被抛出,因为您无法保证写入和读取的顺序。 - Zeh
是的,但@Jenna想使用ArrayList来完成它。 - MR.JOIS
如果OP想使用ArrayList来完成它,我认为没有其他方法。 - MR.JOIS
是的@MR.JOIS,在使用ArrayList时进行任何更改都会导致异常被抛出,如此处所示(https://dev59.com/A3VC5IYBdhLWcg3wpi98)。 - Zeh
如果楼主真的想使用 ArrayList,那么就会有很多的 synchronize 块。 - Zeh
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接