AtomicReference用于可变对象和可见性

9

假设我有一个指向对象列表的AtomicReference

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
线程 A 向此列表添加元素:batch.get().add(o); 稍后,线程 B 获取该列表并将其存储在数据库中,例如:insertBatch(batch.get()); 在写入(线程 A)和读取(线程 B)时,我需要进行额外的同步以确保线程 B 以与线程 A 留下的方式查看列表,还是原子引用已经处理了这个问题?
换句话说:如果我有一个对可变对象的原子引用,并且一个线程更改该对象,其他线程会立即看到此更改吗?
编辑:
也许需要一些示例代码:
public void process(Reader in) throws IOException {
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
    ExecutorService exec = Executors.newFixedThreadPool(4);

    for (int i = 0; i < 4; ++i) {
        tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
            @Override public AtomicReference<List<Object>> call() throws IOException {

                final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));

                Processor.this.parser.parse(in, new Parser.Handler() {
                    @Override public void onNewObject(Object event) {
                            batch.get().add(event);

                            if (batch.get().size() >= batchSize) {
                                dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
                            }
                    }
                });

                return batch;
            }
        }));
    }

    List<Object> remainingBatches = new ArrayList<Object>();

    for (Future<AtomicReference<List<Object>>> task : tasks) {
        try {
            AtomicReference<List<Object>> remainingBatch = task.get();
            remainingBatches.addAll(remainingBatch.get());
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();

            if (cause instanceof IOException) {
                throw (IOException)cause;
            }

            throw (RuntimeException)cause;
        }
    }

    // these haven't been flushed yet by the worker threads
    if (!remainingBatches.isEmpty()) {
        dao.insertBatch(remainingBatches);
    }
}

这里发生的是我创建了四个工作线程来解析一些文本(这是process()方法中的Reader in参数)。每个工作线程将其解析的行保存在一个批次中,并在批次满时刷新该批次 (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));)。
由于文本中的行数不是批处理大小的倍数,因此最后的对象会被放入未刷新的批次中,因为它不是满的。因此,这些剩余的批次由主线程插入。
我使用AtomicReference.getAndSet()用空批次替换完整批次。这个程序在线程方面是正确的吗?
4个回答

11

嗯...它并不像这样工作。 AtomicReference 确保该引用本身在线程之间可见,也就是说,如果将其分配给与原始引用不同的引用,则更新将可见。但它对引用所指向的对象的实际内容不做任何保证。

因此,对列表内容的读/写操作需要单独进行同步。

编辑:根据您的更新代码和发布的评论,将本地引用设置为 volatile 即足以确保可见性。


好的,我在上面的问题中添加了一些示例代码。我使用AtomicReference.getAndSet()来用一个新的空批次替换整个批次。我还需要额外的同步吗? - Jan Van den bosch
1
是的,您的代码是正确的,尽管在这里似乎不需要使用AtomicReference - Tudor
@Tudor 正在考虑完全相同的事情。实际上,getAndSet() 可能不会做他想要的事情,因为它将获取当前值,然后更改 AtomicReference 的值。 - John Vint
@Tudor:是的,我也是这么想的。但问题在于必须将批处理声明为“final”,才能在我的(匿名)处理程序的“onNewObject()”方法中使用它。无论如何,我已决定放弃匿名内部“Callable”类,转而采用更易读和可测试的“ParseAndInsertTask”类来实现“Callable”,并将批处理作为任务的实例变量,而不是局部变量。 - Jan Van den bosch
请注意,如果您不需要执行原子比较和交换操作,可以使用volatile变量而不是AtomicReference - Alex D
@Bossie:在那种情况下,使用volatile应该足够了,虽然Future.get的调用可能会引入内存屏障,但我不确定。只需将其保留为volatile即可。 - Tudor

1

我认为,忘记所有的代码,你确切的问题是:

当写入(线程A)和读取(线程B)时,我是否需要进行额外的同步以确保线程B以A离开的方式看到列表,或者原子引用已经处理了这个问题?

所以,确切的答复是:是的,原子操作会处理可见性。这不是我的意见,而是JDK文档中的规定:

对于原子操作的访问和更新,内存效果通常遵循易失性的规则,如Java语言规范第三版(17.4内存模型)所述。

希望这可以帮助你。


0

Tudor的回答中补充一点:你必须使ArrayList本身是线程安全的,或者根据你的要求甚至需要更大的代码块。

如果你可以使用线程安全的ArrayList,你可以像这样“装饰”它:

batch = java.util.Collections.synchronizedList(new ArrayList<Object>());

但请记住:即使是像这样的“简单”结构,也不适用于线程安全:not
Object o = batch.get(batch.size()-1);

0

AtomicReference 只能帮助您引用列表,它不会对列表本身进行任何操作。特别是在您的场景中,当系统负载过重时,消费者正在获取列表,而生产者正在向其中添加项目时,几乎肯定会遇到问题。

这听起来像您应该使用 BlockingQueue。如果您的生产者比消费者更快,则可以限制内存占用,并让队列处理所有争用。

类似于:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50);

// ... Producer
queue.put(o);

// ... Consumer
List<Object> queueContents = new ArrayList<Object> ();
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items.
queue.drainTo(queueContents);

新增

感谢@Tudor指出您正在使用的架构。...我必须承认它相当奇怪。据我所见,您根本不需要AtomicReference。每个线程都拥有自己的ArrayList直到传递给dao时才被替换,因此根本没有争用。

我有点担心您在单个Reader上创建了四个解析器。我希望您有某种方法确保每个解析器不会影响其他解析器。

个人而言,我会使用代码中描述的某种形式的生产者-消费者模式。也许是这样的:

static final int PROCESSES = 4;
static final int batchSize = 10;

public void process(Reader in) throws IOException, InterruptedException {

  final List<Future<Void>> tasks = new ArrayList<Future<Void>>();
  ExecutorService exec = Executors.newFixedThreadPool(PROCESSES);
  // Queue of objects.
  final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2);
  // The final object to post.
  final Object FINISHED = new Object();

  // Start the producers.
  for (int i = 0; i < PROCESSES; i++) {
    tasks.add(exec.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {

        Processor.this.parser.parse(in, new Parser.Handler() {
          @Override
          public void onNewObject(Object event) {
            queue.add(event);
          }
        });
        // Post a finished down the queue.
        queue.add(FINISHED);
        return null;
      }
    }));
  }

  // Start the consumer.
  tasks.add(exec.submit(new Callable<Void>() {
    @Override
    public Void call() throws IOException {
      List<Object> batch = new ArrayList<Object>(batchSize);
      int finishedCount = 0;
      // Until all threads finished.
      while ( finishedCount < PROCESSES ) {
        Object o = queue.take();
        if ( o != FINISHED ) {
          // Batch them up.
          batch.add(o);
          if ( batch.size() >= batchSize ) {
            dao.insertBatch(batch);
            // If insertBatch takes a copy we could merely clear it.
            batch = new ArrayList<Object>(batchSize);
          }
        } else {
          // Count the finishes.
          finishedCount += 1;
        }
      }
      // Finished! Post any incopmplete batch.
      if ( batch.size() > 0 ) {
        dao.insertBatch(batch);
      }
      return null;
    }
  }));

  // Wait for everything to finish.
  exec.shutdown();
  // Wait until all is done.
  boolean finished = false;
  do {
    try {
      // Wait up to 1 second for termination.
      finished = exec.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
    }
  } while (!finished);
}

从他的代码来看,它并不像生产者-消费者模型。实际上,他正在生成一组线程,每个线程都在做一些工作,然后加入它们并在主线程中完成工作。实际上没有数据在线程之间传递。 - Tudor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接