I've cobbled together the below code that doesn't do anything complex -- just creates a byte[] variable, writes it into a blob field in Cassandra (v1.2, via the new Datastax CQL library), then reads it back out again.
When I put it in it's 3 elements long, and when I read it back it's 84 elements long...! This means the thing I'm actually trying to do (serialize Java objects) fails with an org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008
error when trying to deserialize again.
Here's some sample code that demonstrates my problem:
import java.nio.ByteBuffer;
import org.apache.commons.lang.SerializationUtils;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
public class TestCassandraSerialization {
private Cluster cluster;
private Session session;
public TestCassandraSerialization(String node) {
connect(node);
}
private void connect(String node) {
cluster = Cluster.builder().addContactPoint(node).build();
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to %s\n", metadata.getClusterName());
for (Host host: metadata.getAllHosts()) {
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
host.getDatacenter(), host.getAddress(), host.getRack());
}
session = cluster.connect();
}
public void setUp() {
session.execute("CREATE KEYSPACE test_serialization WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};");
session.execute("CREATE TABLE test_serialization.test_table (id text PRIMARY KEY, data blob)");
}
public void tearDown() {
session.execute("DROP KEYSPACE test_serialization");
}
public void insertIntoTable(String key, byte[] data) {
PreparedStatement statement = session.prepare("INSERT INTO test_serialization.test_table (id,data) VALUES (?, ?)");
BoundStatement boundStatement = new BoundStatement(statement);
session.execute(boundStatement.bind(key,ByteBuffer.wrap(data)));
}
public byte[] readFromTable(String key) {
String q1 = "SELECT * FROM test_serialization.test_table WHERE id = '"+key+"';";
ResultSet results = session.execute(q1);
for (Row row : results) {
ByteBuffer data = row.getBytes("data");
return data.array();
}
return null;
}
public static boolean compareByteArrays(byte[] one, byte[] two) {
if (one.length > two.length) {
byte[] foo = one;
one = two;
two = foo;
}
// so now two is definitely the longer array
for (int i=0; i<one.length; i++) {
//System.out.printf("%d: %s\t%s\n", i, one[i], two[i]);
if (one[i] != two[i]) {
return false;
}
}
return true;
}
public static void main(String[] args) {
TestCassandraSerialization tester = new TestCassandraSerialization("localhost");
try {
tester.setUp();
byte[] dataIn = new byte[]{1,2,3};
tester.insertIntoTable("123", dataIn);
byte[] dataOut = tester.readFromTable("123");
System.out.println(dataIn);
System.out.println(dataOut);
System.out.println(dataIn.length); // prints "3"
System.out.println(dataOut.length); // prints "84"
System.out.println(compareByteArrays(dataIn, dataOut)); // prints false
String toSave = "Hello, world!";
dataIn = SerializationUtils.serialize(toSave);
tester.insertIntoTable("toSave", dataIn);
dataOut = tester.readFromTable("toSave");
System.out.println(dataIn.length); // prints "20"
System.out.println(dataOut.length); // prints "104"
// The below throws org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008
String hasLoaded = (String) SerializationUtils.deserialize(dataOut);
System.out.println(hasLoaded);
} finally {
tester.tearDown();
}
}
}
It looks like the right stuff makes it into the database:
cqlsh:flight_cache> select * from test_serialization.test_table;
id | data
--------+--------------------------------------------
123 | 0x010203
toSave | 0xaced000574000d48656c6c6f2c20776f726c6421
cqlsh:flight_cache>
So it looks like an error when reading, rather than writing, the binary data. Can anyone give me any pointers as to what I'm doing wrong?
Since you already use DataStax Java Driver, there is also a utility class in
com.datastax.driver.core.utils
which you can use like:The problem is almost certainly because the array returned by ByteBuffer.array() is the full backing array, but the data may only be contained within a portion of it.
The valid data that is being returned starts at ByteBuffer.arrayOffset() and is of length ByteBuffer.remaining(). To get a byte array containing just the valid data use this code in readFromTable:
then your data is in result and you can return that.