可迭代的gzip放气/膨胀在Java中(Iterable gzip deflate/inflate

2019-07-03 20:21发布

Is there a library for gzip-deflating in terms of ByteBuffers hidden in the Internet? Something which allows us to push raw data then pull deflated data? We have searched for it but found only libraries which deal with InputStreams and OutputStreams.

We are tasked with creating gzip filters for deflating a flow of ByteBuffers in a pipeline architecture. This is a pull architecture where the last element pulls data from earlier elements. Our gzip filter deals with a flow of ByteBuffers, there is no single Stream object available.

We have toyed with adapting the data flow as some kind of InputStream and then use GZipOutputStream to satisfy our requirements but the amount of adaptor code is annoying to say the least.

Post-accept edit: for the record, our architecture is similar to that of GStreamer and the likes.

Answer 1:

大部分信贷马克·阿德勒的建议这种做法,这比我原来的答复要好得多。

package stack;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
import java.util.zip.Deflater;

public class BufferDeflate2 {
    /** The standard 10 byte GZIP header */
    private static final byte[] GZIP_HEADER = new byte[] { 0x1f, (byte) 0x8b,
            Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0 };

    /** CRC-32 of uncompressed data. */
    private final CRC32 crc = new CRC32();

    /** Deflater to deflate data */
    private final Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION,
            true);

    /** Output buffer building area */
    private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();

    /** Internal transfer space */
    private final byte[] transfer = new byte[1000];

    /** The flush mode to use at the end of each buffer */
    private final int flushMode;


    /**
     * New buffer deflater
     * 
     * @param syncFlush
     *            if true, all data in buffer can be immediately decompressed
     *            from output buffer
     */
    public BufferDeflate2(boolean syncFlush) {
        flushMode = syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH;
        buffer.write(GZIP_HEADER, 0, GZIP_HEADER.length);
    }


    /**
     * Deflate the buffer
     * 
     * @param in
     *            the buffer to deflate
     * @return deflated representation of the buffer
     */
    public ByteBuffer deflate(ByteBuffer in) {
        // convert buffer to bytes
        byte[] inBytes;
        int off = in.position();
        int len = in.remaining();
        if( in.hasArray() ) {
            inBytes = in.array();
        } else {
            off = 0;
            inBytes = new byte[len];
            in.get(inBytes);
        }

        // update CRC and deflater
        crc.update(inBytes, off, len);
        deflater.setInput(inBytes, off, len);

        while( !deflater.needsInput() ) {
            int r = deflater.deflate(transfer, 0, transfer.length, flushMode);
            buffer.write(transfer, 0, r);
        }

        byte[] outBytes = buffer.toByteArray();
        buffer.reset();
        return ByteBuffer.wrap(outBytes);
    }


    /**
     * Write the final buffer. This writes any remaining compressed data and the GZIP trailer.
     * @return the final buffer
     */
    public ByteBuffer doFinal() {
        // finish deflating
        deflater.finish();

        // write all remaining data
        int r;
        do {
            r = deflater.deflate(transfer, 0, transfer.length,
                    Deflater.FULL_FLUSH);
            buffer.write(transfer, 0, r);
        } while( r == transfer.length );

        // write GZIP trailer
        writeInt((int) crc.getValue());
        writeInt((int) deflater.getBytesRead());

        // reset deflater
        deflater.reset();

        // final output
        byte[] outBytes = buffer.toByteArray();
        buffer.reset();
        return ByteBuffer.wrap(outBytes);
    }


    /**
     * Write a 32 bit value in little-endian order
     * 
     * @param v
     *            the value to write
     */
    private void writeInt(int v) {
        System.out.println("v="+v);
        buffer.write(v & 0xff);
        buffer.write((v >> 8) & 0xff);
        buffer.write((v >> 16) & 0xff);
        buffer.write((v >> 24) & 0xff);
    }


    /**
     * For testing. Pass in the name of a file to GZIP compress
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        File inFile = new File(args[0]);
        File outFile = new File(args[0]+".test.gz");
        FileChannel inChan = (new FileInputStream(inFile)).getChannel();
        FileChannel outChan = (new FileOutputStream(outFile)).getChannel();

        BufferDeflate2 def = new BufferDeflate2(false);

        ByteBuffer buf = ByteBuffer.allocate(500);
        while( true ) {
            buf.clear();
            int r = inChan.read(buf);
            if( r==-1 ) break;
            buf.flip();
            ByteBuffer compBuf = def.deflate(buf);
            outChan.write(compBuf);
        }

        ByteBuffer compBuf = def.doFinal();
        outChan.write(compBuf);

        inChan.close();
        outChan.close();
    }
}


Answer 2:

我不明白的“隐藏在互联网”的一部分,但zlib的不内存gzip格式压缩和解压。 该java.util.zip API提供了一些访问zlib的,虽然它是有限的。 由于接口的限制,你不能要求zlib的生产和消费直接的gzip流。 但是,您可以使用nowrap选项来生产和消费的原始数据放气。 然后可以很容易地推出自己的gzip的头和尾,使用CRC32java.util.zip 。 您可以在前面加上一个固定的10字节的头,附加的四字节CRC,然后四字节长度的无压缩(模2 32),无论是在little-endian顺序,你是好去。



Answer 3:

处理的ByteBuffers并不难。 请参阅下面我的示例代码。 你需要知道如何创建缓冲区。 选项有:

  1. 每个缓冲器独立地压缩。 这是如此简单处理我想这是不是这样的。 你只需改变缓冲到字节数组,并在GZIPInputStream内的ByteArrayInputStream的包裹。
  2. 每个缓冲器用由作家一个SYNC_FLUSH结束,并且因此包括流内的数据的整个块。 由作家写入缓冲区中的所有数据可立即被读者阅读。
  3. 每个缓冲区只是GZIP流的一部分。 谁也不能保证读者可以从缓冲器读取任何东西。

通过GZIP生成的数据必须按顺序进行处理。 所述的ByteBuffers将具有在它们生成相同的顺序进行处理。

示例代码:

package stack;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.channels.SelectableChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;

public class BufferDeflate {

    static AtomicInteger idSrc = new AtomicInteger(1);

    /** Queue for transferring buffers */
    final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<ByteBuffer>();

    /** The entry point for deflated buffers */
    final Pipe.SinkChannel bufSink;

    /** The source for the inflater */
    final Pipe.SourceChannel infSource;

    /** The destination for the inflater */
    final Pipe.SinkChannel infSink;

    /** The source for the outside world */
    public final SelectableChannel source;



    class Relayer extends Thread {
        public Relayer(int id) {
            super("BufferRelayer" + id);
        }


        public void run() {
            try {
                while( true ) {
                    ByteBuffer buf = buffers.take();
                    if( buf != null ) {
                        bufSink.write(buf);
                    } else {
                        bufSink.close();
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }



    class Inflater extends Thread {
        public Inflater(int id) {
            super("BufferInflater" + id);
        }


        public void run() {
            try {
                InputStream in = Channels.newInputStream(infSource);
                GZIPInputStream gzip = new GZIPInputStream(in);
                OutputStream out = Channels.newOutputStream(infSink);

                int ch;
                while( (ch = gzip.read()) != -1 ) {
                    out.write(ch);
                }
                out.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * New buffer inflater
     */
    public BufferDeflate() throws IOException {
        Pipe pipe = Pipe.open();
        bufSink = pipe.sink();
        infSource = pipe.source();

        pipe = Pipe.open();
        infSink = pipe.sink();
        source = pipe.source().configureBlocking(false);

        int id = idSrc.incrementAndGet();

        Thread thread = new Relayer(id);
        thread.setDaemon(true);
        thread.start();

        thread = new Inflater(id);
        thread.setDaemon(true);
        thread.start();
    }


    /**
     * Add the buffer to the stream. A null buffer closes the stream
     * 
     * @param buf
     *            the buffer to add
     * @throws IOException
     */
    public void add(ByteBuffer buf) throws IOException {
        buffers.offer(buf);
    }
}

简单地将缓冲器传递给add方法和从公共读source信道。 可以从GZIP处理给定的字节数之后被读取的数据量是不可能预测的。 因此,我所做的source渠道无阻塞,所以你可以从它在您添加字节缓冲区相同的线程安全地读取。



文章来源: Iterable gzip deflate/inflate in Java
标签: java gzip nio