I'm working with data from spinn3r, which consists of multiple different protobuf messages serialized into a byte stream:
http://code.google.com/p/spinn3r-client/wiki/Protostream
"A protostream is a stream of protocol buffer messages, encoded on the wire as length prefixed varints according to the Google protocol buffer specification. The stream has three parts: a header, the payload, and a tail marker."
This seems like a pretty standard use case for protobufs. In fact, protobuf core distribution provides CodedInputStream for both C++ and Java. But, it appears that protobuf does not provide such a tool for python -- the 'internal' tools are not setup for this kind of external use:
https://groups.google.com/forum/?fromgroups#!topic/protobuf/xgmUqXVsK-o
So... before I go and cobble together a python varint parser and tools for parsing a stream of different message types: does anyone know of any tools for this?
Why is it missing from protobuf? (Or am I just failing to find it?)
This seems like a big gap for protobuf, especially when compared to thrift's equivalent tools for both 'transport' and 'protocol'. Am I viewing that correctly?
It looks like the code in the other answer is potentially lifted from here. Check the licence before using this file but I managed to get it to read varint32
s using code such as this:
import sys
import myprotocol_pb2 as proto
import varint # (this is the varint.py file)
data = open("filename.bin", "rb").read() # read file as string
decoder = varint.decodeVarint32 # get a varint32 decoder
# others are available in varint.py
next_pos, pos = 0, 0
while pos < len(data):
msg = proto.Msg() # your message type
next_pos, pos = decoder(data, pos)
msg.ParseFromString(data[pos:pos + next_pos])
# use parsed message
pos += next_pos
print "done!"
This is very simple code designed to load messages of a single type delimited by varint32
s which describe the next message's size.
Update: It may also be possible to include this file directly from the protobuf library by using:
from google.protobuf.internal.decoder import _DecodeVarint32
I've implemented a small python package to serialize multiple protobuf messages into a stream and deserialize them from a stream. You can install it by pip
:
pip install pystream-protobuf
Here's a sample code writing two lists of protobuf messages in to a file:
import stream
with stream.open("test.gam", "wb") as ostream:
ostream.write(*objects_list)
ostream.write(*another_objects_list)
and then reading the same messages (e.g. Alignment messages defined in vg_pb2.py
) from the stream:
import stream
import vg_pb2
alns_list = []
with stream.open("test.gam", "rb") as istream:
for data in istream:
aln = vg_pb2.Alignment()
aln.ParseFromString(data)
alns_list.append(aln)
This is simple enough that I can see why maybe nobody has bothered to make a reusable tool:
'''
Parses multiple protobuf messages from a stream of spinn3r data
'''
import sys
sys.path.append('python_proto/src')
import spinn3rApi_pb2
import protoStream_pb2
data = open('8mny44bs6tYqfnofg0ELPg.protostream').read()
def _VarintDecoder(mask):
'''Like _VarintDecoder() but decodes signed values.'''
local_ord = ord
def DecodeVarint(buffer, pos):
result = 0
shift = 0
while 1:
b = local_ord(buffer[pos])
result |= ((b & 0x7f) << shift)
pos += 1
if not (b & 0x80):
if result > 0x7fffffffffffffff:
result -= (1 << 64)
result |= ~mask
else:
result &= mask
return (result, pos)
shift += 7
if shift >= 64:
## need to create (and also catch) this exception class...
raise _DecodeError('Too many bytes when decoding varint.')
return DecodeVarint
## get a 64bit varint decoder
decoder = _VarintDecoder((1<<64) - 1)
## get the three types of protobuf messages we expect to see
header = protoStream_pb2.ProtoStreamHeader()
delimiter = protoStream_pb2.ProtoStreamDelimiter()
entry = spinn3rApi_pb2.Entry()
## get the header
pos = 0
next_pos, pos = decoder(data, pos)
header.ParseFromString(data[pos:pos + next_pos])
## should check its contents
while 1:
pos += next_pos
next_pos, pos = decoder(data, pos)
delimiter.ParseFromString(data[pos:pos + next_pos])
if delimiter.delimiter_type == delimiter.END:
break
pos += next_pos
next_pos, pos = decoder(data, pos)
entry.ParseFromString(data[pos:pos + next_pos])
print entry