通过ByteBuffer和CQL 3将Java对象序列化到Cassandra 1.2

15
我编写了以下代码,它没有做任何复杂的事情——只是创建了一个byte[]变量,并将其写入Cassandra(v1.2,通过新的Datastax CQL库)的blob字段中,然后再次读取。当我输入时,它有3个元素长,但当我重新读取时,它有84个元素长……!这意味着我实际尝试做的事情(序列化Java对象)会在尝试重新反序列化时导致“org.apache.commons.lang.SerializationException:java.io.StreamCorruptedException: invalid stream header: 81000008”错误。
以下是演示我的问题的一些示例代码:
import java.nio.ByteBuffer;

import org.apache.commons.lang.SerializationUtils;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class TestCassandraSerialization {


    private Cluster cluster;
    private Session session;

    public TestCassandraSerialization(String node) {
        connect(node);
    }

    private void connect(String node) {
        cluster = Cluster.builder().addContactPoint(node).build();
        Metadata metadata = cluster.getMetadata();
        System.out.printf("Connected to %s\n", metadata.getClusterName());
        for (Host host: metadata.getAllHosts()) {
              System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
                         host.getDatacenter(), host.getAddress(), host.getRack());
        }
        session = cluster.connect();
    }

    public void setUp() {
        session.execute("CREATE KEYSPACE test_serialization WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};");

        session.execute("CREATE TABLE test_serialization.test_table (id text PRIMARY KEY, data blob)");
    }

    public void tearDown() {
        session.execute("DROP KEYSPACE test_serialization");
    }

    public void insertIntoTable(String key, byte[] data) {
        PreparedStatement statement = session.prepare("INSERT INTO test_serialization.test_table (id,data) VALUES (?, ?)");
        BoundStatement boundStatement = new BoundStatement(statement);
        session.execute(boundStatement.bind(key,ByteBuffer.wrap(data)));
    }

    public byte[] readFromTable(String key) {
        String q1 = "SELECT * FROM test_serialization.test_table WHERE id = '"+key+"';";

        ResultSet results = session.execute(q1);
        for (Row row : results) {
            ByteBuffer data = row.getBytes("data");
            return data.array();
        }
        return null;
    }


    public static boolean compareByteArrays(byte[] one, byte[] two) {
        if (one.length > two.length) {
            byte[] foo = one;
            one = two;
            two = foo;
        }

        // so now two is definitely the longer array    
        for (int i=0; i<one.length; i++) {
            //System.out.printf("%d: %s\t%s\n", i, one[i], two[i]);
            if (one[i] != two[i]) {
                return false;
            }
        }
        return true;
    }


    public static void main(String[] args) {
        TestCassandraSerialization tester = new TestCassandraSerialization("localhost");

        try {
            tester.setUp();
            byte[] dataIn = new byte[]{1,2,3};
            tester.insertIntoTable("123", dataIn);
            byte[] dataOut = tester.readFromTable("123");

            System.out.println(dataIn);
            System.out.println(dataOut);

            System.out.println(dataIn.length); // prints "3"
            System.out.println(dataOut.length); // prints "84"

            System.out.println(compareByteArrays(dataIn, dataOut)); // prints false         

            String toSave = "Hello, world!";
            dataIn = SerializationUtils.serialize(toSave);
            tester.insertIntoTable("toSave", dataIn);
            dataOut = tester.readFromTable("toSave");

            System.out.println(dataIn.length); // prints "20"
            System.out.println(dataOut.length); // prints "104"


            // The below throws org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008
            String hasLoaded = (String) SerializationUtils.deserialize(dataOut); 
            System.out.println(hasLoaded);

        } finally {
            tester.tearDown();
        }
    }
}

看起来正确的内容已被保存到数据库中:

cqlsh:flight_cache> select * from test_serialization.test_table;

 id     | data
--------+--------------------------------------------
    123 |                                   0x010203
 toSave | 0xaced000574000d48656c6c6f2c20776f726c6421

cqlsh:flight_cache> 

看起来是在读取二进制数据时出现了错误,而不是写入。有人能给我一些指针,告诉我我做错了什么吗?

2个回答

32

问题几乎肯定是因为ByteBuffer.array()返回的数组是完整的后备数组,但数据可能仅包含在其中一部分中。

被返回的有效数据从ByteBuffer.arrayOffset()开始,长度为ByteBuffer.remaining()。要获取仅包含有效数据的字节数组,请在readFromTable中使用以下代码:

byte[] result = new byte[data.remaining()];
data.get(result);

那么您的数据就在结果中,您可以返回它。


1
谢谢Richard。你也救了我的一天 :) - Easility
非常好的答案,你救了我的夜晚! - doanduyhai
2
我刚遇到这个问题。谢谢,但是这是什么鬼?他们正在设计什么样的API? - rpvilao

10

既然您已经在使用DataStax Java Driver,那么在com.datastax.driver.core.utils中还有一个工具类可供使用,您可以像下面这样使用:

byte[] result = Bytes.getArray(data)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接