Java SQL结果转为输入流

12

我需要一个Java函数,以InputStream参数的形式返回SQL SELECT查询的结果,以便另一个系统将结果通过网络发送。

然而,InputStream必须是一个带有自定义分隔符的String(通常是CSV格式,但不总是)。

虽然我可以轻松创建一个函数来检索结果、创建分隔符的String,最后将该String转换为InputStream,但SQL结果通常太大而无法在内存中处理。另外,在返回结果之前处理整个结果集将导致不必要的等待时间。

如何返回一个InputStream以遍历SQL结果,并在从数据库返回已处理(分隔)数据时发送?


你有考虑使用JDBC缓存行集吗?这可能对你正在尝试做的事情有所帮助。http://docs.oracle.com/javase/1.5.0/docs/api/javax/sql/rowset/CachedRowSet.html - ChadNC
1
不,但是这会对我有什么帮助呢?问题不在于保持连接的开放,而在于将结果保存在内存中。 - Cameron S
这就是缓存行集。它提供了一种更简单的方式,将查询结果通过网络发送到其他设备、应用程序等。 - ChadNC
整个结果需要在一个 InputStream 中发送。你能展示一下可以实现这个功能的简短例子/方法吗? - Cameron S
3个回答

9

以下是(未经测试的)代码片段,它应该能给你一个基本的思路:

/**
 * Implementors of this interface should only convert current row to byte array and return it.
 * 
 * @author yura
 */
public interface RowToByteArrayConverter {
    byte[] rowToByteArray(ResultSet resultSet);
}

public class ResultSetAsInputStream extends InputStream {

    private final RowToByteArrayConverter converter;
    private final PreparedStatement statement;
    private final ResultSet resultSet;

    private byte[] buffer;
    private int position;

    public ResultSetAsInputStream(final RowToByteArrayConverter converter, final Connection connection, final String sql, final Object... parameters) throws SQLException {
        this.converter = converter;
        statement = createStatement(connection, sql, parameters);
        resultSet = statement.executeQuery();
    }

    private static PreparedStatement createStatement(final Connection connection, final String sql, final Object[] parameters) {
        // PreparedStatement should be created here from passed connection, sql and parameters
        return null;
    }

    @Override
    public int read() throws IOException {
        try {
            if(buffer == null) {
                // first call of read method
                if(!resultSet.next()) {
                    return -1; // no rows - empty input stream
                } else {
                    buffer = converter.rowToByteArray(resultSet);
                    position = 0;
                    return buffer[position++] & (0xff);
                }
            } else {
                // not first call of read method
                if(position < buffer.length) {
                    // buffer already has some data in, which hasn't been read yet - returning it
                    return buffer[position++] & (0xff);
                } else {
                    // all data from buffer was read - checking whether there is next row and re-filling buffer
                    if(!resultSet.next()) {
                        return -1; // the buffer was read to the end and there is no rows - end of input stream
                    } else {
                        // there is next row - converting it to byte array and re-filling buffer
                        buffer = converter.rowToByteArray(resultSet);
                        position = 0;
                        return buffer[position++] & (0xff);
                    }
                }
            }
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }



    @Override
    public void close() throws IOException {
        try {
            statement.close();
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }
}

这是一个非常简单的实现,可以通过以下方式进行改进:

  • 在read方法中,if和else之间的代码重复可以被移除——它只是为了澄清而发布的。
  • 可以实现更复杂的逻辑来使用仅初始化一次并然后重新填充的字节数组缓冲区,而不是为每一行重新创建字节数组缓冲区(new byte[]是昂贵的操作)。然后应该更改RowToByteArrayConverter.rowToByteArray方法的签名为int fillByteArrayFromRow(ResultSet rs, byte[] array),它应该返回填充的字节数并填充传递的字节数组。

因为字节数组包含有符号字节,所以它可以包含-1(实际上是无符号字节的255),从而指示流的结束位置不正确,因此使用& (0xff)将有符号字节转换为无符号字节作为整数值。有关详细信息,请参阅Java如何将int转换为byte?

还请注意,如果网络传输速度较慢,这可能会使结果集保持打开状态很长时间,从而给数据库带来问题。

希望这可以帮助...


2
上述答案提供了解决stringbuilder大小限制问题的有用方案,同时也具有内存高效的特点。然而,我的测试表明它们比仅将数据写入stringbuilder并调用"new ByteArrayInputStream(data.getBytes("UTF-8"))"来获取输入流要慢。
我发现更有效的方法是使用切片函数切分传入的数据,然后使用多个线程分别进行以下操作:
1. 查询源数据库的子集数据 2. 将数据写入目标 这也避免了总数据量超过字符串缓冲区最大大小的问题。
例如,我有一个名为“RecordDate”的SQL Server表中有600万条记录。Recorddate的值在2013年和2016年之间变化。因此,我将每个线程配置为分别请求2013、14、15、16年的数据。然后,每个线程将转码后的数据写入StringBuilder,并通过使用上面的getBytes()转换为InputStream进行批量加载到目标中。
这导致速度提高了2倍。
为什么呢?因为源数据库和目标数据库可以处理多个并发请求,因此整个工作负载分布在所有三个进程的多个线程中:源数据库、转码器、目标数据库。

2
我会改进@Yura建议的答案,引入以下内容:
使用初始化为ByteArrayOutputStream的DataOutputStream方便地将数据写入字节数组,在RowToByteArrayConverter的实现中。实际上,我建议拥有一个转换器层次结构,它们都扩展相同的抽象类(这是我的想法代码片段 - 可能不会一次性编译)。
public abstract class RowToByteArrayConverter {
  public byte[] rowToByteArray(ResultSet resultSet) {
      parseResultSet(dataOutputStream, resultSet);
      return byteArrayOutputSteam.toByteArray();
  }

  public RowToByteArrayConverter() {
    dataOutputStream = new DataOutputStream(byteArrayOutputStream);
  }

  protected DataOutputStream dataOutputStream;
  protected ByteArrayOutputStream byteArrayOutputStream;

  protected abstract void parseResultSet(DataOutputStream dataOutputStresm, ResultSet rs); 
}

现在,您可以通过简单地重写parseResultSet方法来覆盖此类。例如,编写从列“name”中获取名称作为字符串的代码,并对DataOutputStream执行writeUTF8操作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接