Java中实现并发管道的策略

25

考虑以下shell脚本:

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 

这个程序有三个进程并行工作,分别是解压缩流、修改和重新压缩。通过运行time,我发现我的用户时间大约是实际时间的两倍,这表明程序有效地并行工作。
我尝试在Java中创建相同的程序,将每个任务放在自己的线程中。不幸的是,多线程Java程序仅对上述示例的单线程版本速度提高了30%左右。我尝试使用ExchangerConcurrentLinkedQueue。ConcurrentLinkedQueue链接队列会导致很多争用,尽管三个线程通常都很忙碌。Exchanger具有较低的争用率,但更加复杂,并且似乎无法使最慢的工作人员始终保持100%运行。
我正在尝试找出此问题的纯Java解决方案,而无需查看字节代码编织框架或基于JNI的MPI。
大多数并发研究和API都涉及分治算法,将每个节点的工作划分为正交且不依赖于先前计算的任务。另一种并发方法是管道方法,其中每个工作者完成一些工作并将数据传递给下一个工作者。
我不是在寻找最有效的方式来操作gzip压缩文件,而是在研究如何高效地拆分管道中的任务,以将运行时间降至最慢任务的运行时间。
目前针对1000万行文件的计时如下:
Testing via shell

real    0m31.848s
user    0m58.946s
sys     0m1.694s

Testing SerialTest

real    0m59.997s
user    0m59.263s
sys     0m1.121s

Testing ParallelExchangerTest

real    0m41.573s
user    1m3.436s
sys     0m1.830s

Testing ConcurrentQueueTest

real    0m44.626s
user    1m24.231s
sys     0m10.856s

我正在提供一份赏金,用于改进Java的10%,衡量标准是在一个四核系统上处理1000万行测试数据的实时表现。当前源代码可在Bitbucket上获取。


2
提供Java代码示例将非常有用,以便提供改进建议。很难看出您已经尝试了什么。 - Chris Dail
你是在多CPU的机器上进行测试吗?不确定JVM是否可以使用超过1个CPU。 - user85421
我正在一台运行Solaris 10的四核AMD机器上进行测试。测试的源代码位于此处:http://bitbucket.org/brianegge/java-concurrent/src/tip/ - brianegge
在这项工作中,你不会在某个时候受到IO限制吗?这将限制你可以实现的并行性。 - Chris O
好问题!我花了差不多半天时间进行测试 :-) 似乎一些争议来自gc(在jdk linux双核上)和使用更大的读取对此有所帮助,因为字符串数量会变得更少。但所有问题都归结于java.concurrent工具不够高效。我还尝试了使用管道输入/输出流进行测试,但在这里它们完全无用,因为所有操作都是同步的,一个线程会阻塞另一个。我猜这可能是可以重写的。 - KarlP
5个回答

14

首先,整个过程的速度只能和最慢的步骤一样快。如果时间分解如下:

  • gunzip:1秒
  • sed:5秒
  • gzip:1秒

通过多线程,你最快也只能在5秒内完成而不是7秒。

其次,不要使用你正在使用的队列,相反尝试复制你正在复制的功能,并使用PipedInputStreamPipedOutputStream将进程链接在一起。

编辑:使用Java并发工具处理相关任务有几种方法。将其分成线程。首先创建一个共同的基类:

public interface Worker {
  public run(InputStream in, OutputStream out);
}

这个接口代表处理输入并生成输出的任意作业。将它们链接在一起,就形成了一个管道。您也可以抽象出样板文件。为此,我们需要一个类:
public class UnitOfWork implements Runnable {
  private final InputStream in;
  private final OutputStream out;
  private final Worker worker;

  public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
    if (in == null) {
      throw new NullPointerException("in is null");
    }
    if (out == null) {
      throw new NullPointerException("out is null");
    }
    if (worker == null) {
      throw new NullPointerException("worker is null");
    }
    this.in = in;
    this.out = out;
    this.worker = worker;
  }

  public final void run() {
    worker.run(in, out);
  }
}

比如,Unzip 部分:

public class Unzip implements Worker {
  protected void run(InputStream in, OutputStream out) {
    ...
  }
}

类似于SedZip等内容。将它们联系在一起的是:

public static void pipe(InputStream in, OutputStream out, Worker... workers) {
  if (workers.length == 0) {
    throw new IllegalArgumentException("no workers");
  }
  OutputStream last = null;
  List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
  PipedOutputStream last = null;
  for (int i=0; i<workers.length-2; i++) {
    PipedOutputStream out = new PipedOutputStream();
    work.add(new UnitOfWork(
      last == null ? in, new PipedInputStream(last), out, workers[i]);
    last = out;
  }
  work.add(new UnitOfWork(new PipedInputStream(last),
    out, workers[workers.length-1);
  ExecutorService exec = Executors.newFixedThreadPool(work.size());
  for (UnitOfWork w : work) {
    exec.submit(w);
  }
  exec.shutdown();
  try {
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  } catch (InterruptedExxception e) {
    // do whatever
  }
}

我不确定你能做得比这更好,而且每个任务需要编写的代码很少。然后你的代码就变成了:

public static processFile(String inputName, String outputName) {
  pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
    new Zip(), new Sed(), new Unzip());
}

3
我做了一些关于多线程加密的测试,我做了类似的事情,但是真正提高性能的是当我自己实现了缓冲区。管道流和缓冲流已经有缓冲机制了,但是如果缓冲区大小不对齐转换算法的话,会引入很多开销。因此,如果zip每次处理1k字节,就使用这种大小的定制缓冲区,在压缩之前提供数据。如果sed每次使用完整行,则一次获取128行数据,这种方法大大提高了速度,减少了争用和开销(但另一方面增加了复杂性...)。 - Lorenzo Boccaccia

6
我个人验证了所需时间,似乎阅读只占用不到10%的时间,阅读和处理所需时间不到整个时间的30%。因此,我使用ParallelExchangerTest(您代码中表现最好的程序)并对其进行修改,仅使用两个线程,第一个线程进行读取和替换,第二个线程进行写入。
在我的机器上(Intel双核(非Core2),运行ubuntu,具有1GB内存),以下是要比较的数字:
通过shell测试: real 0m41.601s user 0m58.604s sys 0m1.032s
通过ParallelExchangerTest测试: real 1m55.424s user 2m14.160s sys 0m4.768s
ParallelExchangerTestMod (2 thread): real 1m35.524s user 1m55.319s sys 0m3.580s
我知道字符串处理需要更长的时间,所以我用matcher.replaceAll替换了line.repalce,得到了这些数字:
ParallelExchangerTestMod_Regex (2 thread): real 1m12.781s user 1m33.382s sys 0m2.916s
现在我更进一步,没有一次读取一行,而是读取各种大小的char[]缓冲区并计时(使用正则表达式搜索/替换),我得到以下数字:
使用100字节处理的ParallelExchangerTestMod_Regex_Buff进行测试: real 1m13.804s user 1m32.494s sys 0m2.676s
使用500字节处理的ParallelExchangerTestMod_Regex_Buff进行测试: real 1m6.286s user 1m29.334s sys 0m2.324s
使用800字节处理的ParallelExchangerTestMod_Regex_Buff进行测试: real 1m12.309s user 1m33.910s sys 0m2.476s
看起来500字节的数据大小最优。我已经分叉并拥有我的更改的副本在此处。

https://bitbucket.org/chinmaya/java-concurrent_response/


我检查了你的更改并在Solaris机器上运行了它们。结果与Ubuntu相比有很大不同。最快的一个比我的ParallelExchangerTest快了1.5秒。ParallelExchangerTestMod_Regex真实时间 0m40.418s 用户时间 0m56.314s 系统时间 0m1.374s在Ubuntu、Cygwin和OS X上运行相同的测试显示,结果因平台而异。 - brianegge
当然,JVM 实现因平台而异。您可能希望尝试编译时(javac -O)和运行时(java -X)的优化。 - chinmaya

3

鉴于您没有说出如何测量经过的时间,我假设您正在使用类似以下方式的方法:

time java org.egge.concurrent.SerialTest < in.gz > out.gz
time java org.egge.concurrent.ConcurrentQueueTest < in.gz > out.gz

这种方法的问题在于,你同时在衡量两件事情:
  1. JVM启动所需时间
  2. 程序运行时间
你只能通过改变代码来调整第二个因素。根据你提供的数据:
Testing SerialTest
real    0m6.736s
user    0m6.924s
sys     0m0.245s

Testing ParallelExchangerTest
real    0m4.967s
user    0m7.491s
sys     0m0.850s

如果我们假设JVM启动需要3秒钟,那么"程序运行时间"分别为3.7秒和1.9秒,这几乎是100%的加速。我强烈建议您使用更大的数据集进行测试,以便您可以最小化JVM启动对计时结果的影响。
编辑:根据您对此问题的回答,您很可能正在遭受锁争用的困扰。在Java中解决这个问题的最佳方法可能是使用管道读写器,逐字节从管道中读取,并将输入流中的任何'@'字符替换为输出流中的"_at_"。您可能正在遭受每个字符串被扫描三次的问题,并且任何替换都需要构建新对象,最终导致字符串再次被复制。希望这有所帮助...

如果我用单个记录运行测试,我可以看到实际时间为0m0.292s。但测试并不理想,因为最后一个阶段比前两个阶段更加耗费资源。 - brianegge
我使用的测试工具位于http://bitbucket.org/brianegge/java-concurrent/src/tip/bin/test。运行10m行数据时,ParallelExchanger显示出与shell脚本几乎相同的“用户”时间,但实际时间要长10秒。如果我能提高效率,它可能能够在与shell脚本相同的时间内执行。 - brianegge

3

你也可以在Java中使用管道。它们以流的形式实现,详见PipedInputStreamPipedOutputStream了解更多细节。

为避免阻塞,建议设置适当的管道大小。


PipedOutputStream和PipedInputStream在需要将输出流连接到输入流时非常有用。串行测试中的'Sed'类实际上正在执行上述两个类所做的事情。我不是在尝试实现管道,而是并发管道。管道就像汽车装配线,将工作从一个阶段传递到下一个阶段。在其自己的线程中运行每个阶段为无法并行运行的任务提供了并发的可能性。 - brianegge
此外,我刚刚测试了PidedIOStream的性能,它实际上只是一个信号量,一次只能有一个线程在基础缓冲区上工作。虽然我猜测重新编写类来使用多个缓冲区可能会增加吞吐量。 - KarlP

0

减少读取次数和对象数量可以让我的性能提高超过10%。

但是java.util.concurrent的性能仍然有些令人失望。

ConcurrentQueueTest:

private static class Reader implements Runnable {

@Override
  public void run() {
   final char buf[] = new char[8192];
   try {

    int len;
    while ((len = reader.read(buf)) != -1) {
     pipe.put(new String(buf,0,len));
    }
    pipe.put(POISON);

   } catch (IOException e) {
    throw new RuntimeException(e);
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接