How to encode a text stream into a byte stream in

2020-07-11 06:44发布

问题:

Decoding a byte stream into a text stream is easy:

import io
f = io.TextIOWrapper(io.BytesIO(b'Test\nTest\n'), 'utf-8')
f.readline()

In this example, io.BytesIO(b'Test\nTest\n') is a byte stream and f is a text stream.

I want to do exactly the opposite of that. Given a text stream or file-like object, I would like to encode it into a byte stream or file-like object without processing the entire stream.

This is what I've tried so far:

import io, codecs

f = codecs.getreader('utf-8')(io.StringIO('Test\nTest\n'))
f.readline()
# TypeError: can't concat str to bytes

f = codecs.EncodedFile(io.StringIO('Test\nTest\n'), 'utf-8')
f.readline()
# TypeError: can't concat str to bytes

f = codecs.StreamRecoder(io.StringIO('Test\nTest\n'), None, None,
                         codecs.getreader('utf-8'), codecs.getwriter('utf-8'))
# TypeError: can't concat str to bytes

f = codecs.encode(io.StringIO('Test\nTest\n'), 'utf-8')
# TypeError: utf_8_encode() argument 1 must be str, not _io.StringIO

f = io.TextIOWrapper(io.StringIO('Test\nTest\n'), 'utf-8')
f.readline()
# TypeError: underlying read() should have returned a bytes-like object, not 'str'

f = codecs.iterencode(io.StringIO('Test\nTest\n'), 'utf-8')
next(f)
# This works, but it's an iterator instead of a file-like object or stream.

f = io.BytesIO(io.StringIO('Test\nTest\n').getvalue().encode('utf-8'))
f.readline()
# This works, but I'm reading the whole stream before converting it.

I'm using Python 3.7

回答1:

You can write this yourself pretty easily; you just need to decide how you want to do the buffering.

For example:

class BytesIOWrapper(io.RawIOBase):
    def __init__(self, file, encoding='utf-8', errors='strict'):
        self.file, self.encoding, self.errors = file, encoding, errors
        self.buf = b''
    def readinto(self, buf):
        if not self.buf:
            self.buf = self.file.read(4096).encode(self.encoding, self.errors)
            if not self.buf:
                return 0
        length = min(len(buf), len(self.buf))
        buf[:length] = self.buf[:length]
        self.buf = self.buf[length:]
        return length
    def readable():
        return True

I think this is exactly what you were asking for.

>>> f = BytesIOWrapper(io.StringIO("Test\nTest\n"))
>>> f.readline()
b'Test\n'
>>> f.readline()
b'Test\n'
>>> f.readline()
b''

If you want to get cleverer, you probably want to wrap a codecs.iterencode rather than buffering 4K at a time. Or, since we're using a buffer, you might want to create a BufferedIOBase instead of a RawIOBase. Also, a class named BytesIOWrapper probably ought to handle write, but that's the easy part. The hard part would be implementing seek/tell, since you can't seek arbitrarily within a TextIOBase; making seeking to start and end is pretty easy; seeking to known previous positions, on the other hand, is hard (unless you rely on the TextIOBase.tell returning a byte position—which it's not guaranteed to do, and, while TextIOWrapper does, StringIO doesn't…).

Anyway, I think this is the simplest demonstration of how to write even the most complicated kind of io class.