Peek into stream of Popen pipeline in Python

2019-07-23 14:52发布

Background:
Python 2.6.6 on Linux. First part of a DNA sequence analysis pipeline.
I want to read a possibly gzipped file from a mounted remote storage (LAN) and if it is gzipped; gunzip it to a stream (i.e. using gunzip FILENAME -c) and if the first character of the stream (file) is "@", route that entire stream into a filtering program that takes input on standard input, otherwise just pipe it directly to a file on local disk. I'd like to minimize the number of file reads/seeks from remote storage (just a single pass through the file shouldn't be impossible?).

Contents of an example input file, first four lines corresponding to one record in FASTQ format:

@I328_1_FC30MD2AAXX:8:1:1719:1113/1                                        
GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG
+I328_1_FC30MD2AAXX:8:1:1719:1113/1                                        
hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhahhhhhhfShhhYhhQhh]hhhhffhU\UhYWc

Files that should not be piped into the filtering program contain records that look like this (first two lines corresponding to one record in FASTA format):

>I328_1_FC30MD2AAXX:8:1:1719:1113/1
GTTATTATTATAATTTTTTACCGCATTTATCATTTCTTCTTTATTTTCATATTGATAATAAATATATGCAATTCG

Some made up semi-pseudo code effort to visualize what I want to do (I know this isn't possible the way I've written it). I hope it makes some sense:

if gzipped:
    gunzip = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE)
    if gunzip.stdout.peek(1) == "@": # This isn't possible
        fastq = True
    else:
        fastq = False
if fastq:
    filter = Popen(["filter", "localstorage/outputfile.fastq"], stdin=gunzip.stdout).communicate()
else:
    # Send the gunzipped stream to another file

Disregard the fact that the code won't run like I've written it here and that I have no error handling etc, all that is already in my other code. I just want help with peeking into the stream or finding a way around that. I would be great if you could gunzip.stdout.peek(1) but I realize that's not possible.

What I've tried so far:
I figured subprocess.Popen might help me achieve this, and I've tried a lot of different ideas, amongst others trying to use some kind of io.BufferedRandom() object to write the stream to but I can't figure out how that would work. I know streams are non-seekable but maybe a workaround might be to read the first character of the gunzip-stream and then create a new stream where you first input a "@" or ">" depending on file contents and then stuff the rest of the gunzip.stdout-stream into the new stream. This new stream would then be fed into filter's Popen stdin.

Note that the file sizes might be several times larger than available memory. I do not want to perform more than one single read of the source file from remote storage and no unnecessary file accessing.

Any ideas are welcome! Please ask me questions so I can clarify if I didn't make it clear enough.

2条回答
做个烂人
2楼-- · 2019-07-23 15:20

It doesn't make sense to wrap shell commands in Python. You can achieve everything you need in Python however without shelling out:

  1. Open the input file and read the first 3 bytes. If they equal 1F 8B 08 then it should be gzip file.
  2. Reset file marker
  3. Pass file contents to zlib.decompress() if it is a gzip file or read file
  4. Pass to filter function if required
  5. write to results to file

EDIT

This won't work as the gzip headers would need to be stripped before passing to zlib. It would be possible, however, to check the first 3 bytes, perform a fh.seek(0) and pass the file to gzip.open() if you wanted to be sure the file was a gzip (with DEFLATE compression).

It may be easier to just pass the file to gzip and catch the exception thrown if the file is not gzipped:

import gzip

try:
    in_file = gzip.open("infile")
    f_contents = in_file.read()
except IOError, e:
    # Re-raise exception if exception message is not "Not a gzipped file"
    # Perhaps it would be safer to check the header!
    if e.__str__() != "Not a gzipped file":
        raise
    in_file = open("infile")
    f_contents = in_file.read()

if f_contents[0] == "@":
    result = filter_function(f_contents)
else:
    result = f_contents

new_file = open("new_file", "w")
new_file.write(result)  
查看更多
手持菜刀,她持情操
3楼-- · 2019-07-23 15:22

Here is an implementation of your first input a "@" or ">" depending on file contents and then stuff the rest of the gunzip.stdout-stream into the new stream proposal. I only tested the local-file branch of the test, but it should be enough to demonstrate the concept.

if gzipped:
    source = Popen(["gunzip", "-c", "remotestorage/file.gz"], stdout=PIPE)
else:
    source = Popen(["cat", "remotestorage/file"], stdout=PIPE)
firstchar = source.stdout.read(1)
# "unread" the char we've just read
source = Popen([r"(printf '\x%02x' && cat)" % ord(firstchar)],
               shell=True, stdin=source.stdout, stdout=PIPE)

# Now feed the output to a filter or to a local file.
flocal = None
try:
    if firstchar == "@":
        filter = Popen(["filter", "localstorage/outputfile.fastq"],
                       stdin=source.stdout)
    else:
        flocal = open('localstorage/outputfile.stream', 'w')
        filter = Popen(["cat"], stdin=source.stdout, stdout=flocal)
    filter.communicate()
finally:
    if flocal is not None:
        flocal.close()

The idea is to read a single character from the source command's output, and then recreate the original output using (printf '\xhh' && cat), effectively implementing the peek. The replacement stream specifies shell=True to Popen, leaving it to the shell and cat to do the heavy lifting. The data remains in the pipeline at all times, never getting entirely read into memory. Note that services of the shell are only requested for the single call to Popen that implements unreading the peeked byte, not to the calls that involve of user-supplied file names. Even at that point, the byte is escaped to hex to make sure that the shell does not mangle it when invoking printf.

The code could be further cleaned up to implement an actual function named peek that returns the peeked contents and a replacement new_source.

查看更多
登录 后发表回答