从输出流获取输入流

4
我有一个组件,它给我提供了一个输出流(ByteArrayOutputStream)中的数据,我需要将其写入SQL数据库的blob字段中,而不创建临时缓冲区,因此需要获取输入流。
基于这里这里的答案,我想到了以下方法来从输出流获取输入流:
private PipedInputStream getInputStream(ByteArrayOutputStream outputStream) throws InterruptedException
{
    PipedInputStream pipedInStream = new PipedInputStream();
    Thread copyThread = new Thread(new CopyStreamHelper(outputStream, pipedInStream));
    copyThread.start();
    // Wait for copy to complete
    copyThread.join();
    return pipedInStream;
}

class CopyStreamHelper implements Runnable
{
    private ByteArrayOutputStream outStream;
    private PipedInputStream pipedInStream;

    public CopyStreamHelper (ByteArrayOutputStream _outStream, PipedInputStream _pipedInStream)
    {
        outStream = _outStream;
        pipedInStream = _pipedInStream;
    }

    public void run()
    {
        PipedOutputStream pipedOutStream = null;
        try
        {
            // write the original OutputStream to the PipedOutputStream
            pipedOutStream = new PipedOutputStream(pipedInStream);
            outStream.writeTo(pipedOutStream);
        }
        catch (IOException e) 
        {
            // logging and exception handling should go here
        }
        finally
        {
            IOUtils.closeQuietly(pipedOutStream);
        }
    }
}

请注意,输出流已经包含写入的数据,并且可以运行达到1-2 MB。但是,无论是尝试在两个单独的线程中执行此操作还是在同一个线程中执行此操作,我发现PipedInputStream总是在以下位置挂起:
Object.wait(long) line: not available [native method]   
PipedInputStream.awaitSpace() line: not available   

该组件应返回一个InputStream。OutputStream是一个接收器,不适合读取。在这种情况下,这是可能的,因为ByteArrayOutputStream的特殊性质。当它被传递时,它已经包含了完整的数据,因此你避免临时缓冲区的目标已经失去了。 - Robert Kühne
@Sponiro,同意,但我想避免创建另一个缓冲区来获取输入流。该组件生成输出,因此我不清楚它如何返回InputStream? - Santosh
@Sponiro,我考虑过并且放弃了写入文件,因为我认为这是不必要的。我这里面的情况不是通常意义上的生产者-消费者场景,两者可以异步进行。这里的情况是组件已经完成了输出的生成,并将数据写入OutputStream。现在我需要获取这些数据,并通过SQL语句将其发送到数据库中。鉴于此,我试图找到最有效的方法来实现它。从所看到的情况来看,可能这是不可能的——这意味着链接的SO问题上其他类似的答案不起作用? - Santosh
@Sponiro,继续...关于需要两个线程才能使管道流正常工作的问题,我上面展示的代码已经做到了这一点——你看到有什么问题,因为它没有按照我的预期工作。 - Santosh
让我们在聊天中继续这个讨论 - Santosh
显示剩余2条评论
3个回答

2

你把解决方案搞得太复杂了。

ByteArrayOutputStream baos = ...;
byte[] data = baos.toByteArray();
return new ByteArrayInputStream(data);

1
OP说他想要做到“不创建临时缓冲区”。 - Jonas Czech
2
我正在尝试避免创建临时缓冲区,因为数据大小相当大(1-2 MB),并且将有许多并行线程运行此操作。如果我无法解决原始问题,这将是我的备选方案。 - Santosh

0

我已经为使用PipedInput/OutputStream编写了一个非常简单的演示程序。它可能适用于您的用例,也可能不适用。

一个生产类将数据写入PipedOutputStream:

public class Producer implements Runnable {

    private final PipedOutputStream pipedOutputStream;
    private final PipedInputStream pipedInputStream;

    public Producer() throws IOException {
        this.pipedOutputStream = new PipedOutputStream();
        this.pipedInputStream = new PipedInputStream(pipedOutputStream);
    }

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {

        try(InputStream inputStream = ByteStreams.limit(new RandomInputStream(), 100000)) {
            // guava copy function
            ByteStreams.copy(inputStream, pipedOutputStream);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedOutputStream.close();
            } catch (IOException e) {
                // no-op
            }
        }
    }

    public static void main(String[] args) throws IOException {

        try {
            Producer producer = new Producer();
            Consumer consumer = new Consumer(producer);

            Thread thread1 = new Thread(producer);
            Thread thread2 = new Thread(consumer);

            thread1.start();
            thread2.start();

            thread1.join();
            thread2.join();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

一个只计算字节数的消费者:
public class Consumer implements Runnable {

    private final Producer producer;

    public Consumer(Producer producer) {
        this.producer = producer;
    }

    @Override
    public void run() {

        try (PipedInputStream pipedInputStream = producer.getPipedInputStream()) {

            int counter = 0;
            while (pipedInputStream.read() != -1) {
                counter++;
            }

            System.out.println(counter);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

-1

在某个层面上必须有一个缓冲区。请参见将输入流连接到输出流

我最喜欢的答案来自Dean Hiller:

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

详细信息请参见API


1
我需要的正好相反,获取一个InputStream,以便我可以从OutputStream中读取。 - Santosh
没有将OutputStream转换为InputStream的方法。InputStream是一个数据源,你可以从中读取数据。而OutputStream则是一个数据汇,你可以向其中写入数据。如果你需要同时读写数据,可能列表或队列更适合你的需求。 - Andreas
Andreas - 我相信Santosh正在尝试将输出流的内容重定向到另一个输入流。我也有同样的问题。我得到了一个缩略图作为输出流,我希望通过输入流在HTTP上提供它。 - Sridhar Sarnobat

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接