我希望有一个通用且可重复使用的getPosition()
方法,它可以告诉我从流的起始点开始已读取的字节数。理想情况下,我希望它适用于所有的InputStream,这样我就不必在从不同来源获取它们时对每个进行包装了。
这样的东西存在吗?如果没有,有人可以推荐一个现有的计数InputStream
的实现吗?
我希望有一个通用且可重复使用的getPosition()
方法,它可以告诉我从流的起始点开始已读取的字节数。理想情况下,我希望它适用于所有的InputStream,这样我就不必在从不同来源获取它们时对每个进行包装了。
这样的东西存在吗?如果没有,有人可以推荐一个现有的计数InputStream
的实现吗?
你需要遵循java.io
中已经建立的装饰器模式来实现这个功能。
让我们在这里尝试一下:
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
public final class PositionInputStream
extends FilterInputStream
{
private long pos = 0;
private long mark = 0;
public PositionInputStream(InputStream in)
{
super(in);
}
/**
* <p>Get the stream position.</p>
*
* <p>Eventually, the position will roll over to a negative number.
* Reading 1 Tb per second, this would occur after approximately three
* months. Applications should account for this possibility in their
* design.</p>
*
* @return the current stream position.
*/
public synchronized long getPosition()
{
return pos;
}
@Override
public synchronized int read()
throws IOException
{
int b = super.read();
if (b >= 0)
pos += 1;
return b;
}
@Override
public synchronized int read(byte[] b, int off, int len)
throws IOException
{
int n = super.read(b, off, len);
if (n > 0)
pos += n;
return n;
}
@Override
public synchronized long skip(long skip)
throws IOException
{
long n = super.skip(skip);
if (n > 0)
pos += n;
return n;
}
@Override
public synchronized void mark(int readlimit)
{
super.mark(readlimit);
mark = pos;
}
@Override
public synchronized void reset()
throws IOException
{
/* A call to reset can still succeed if mark is not supported, but the
* resulting stream position is undefined, so it's not allowed here. */
if (!markSupported())
throw new IOException("Mark not supported.");
super.reset();
pos = mark;
}
}
InputStreams旨在是线程安全的,因此需要大量使用同步。我曾尝试使用volatile
和AtomicLong
位置变量,但是最好还是使用同步,因为它允许一个线程在不释放锁的情况下操作流并查询其位置。
PositionInputStream is = …
synchronized (is) {
is.read(buf);
pos = is.getPosition();
}
InputStream(输入流)不适合处理可能无限的数据,因此计数器会妨碍其使用。除了将它们全部封装起来之外,您还可以通过方面(aspects)来实现某些功能。