GoLang: Decompress bz2 in on goroutine, consume in

2019-05-10 17:45发布

I am a new-grad SWE learning Go (and loving it).

I am building a parser for Wikipedia dump files - basically a huge bzip2-compressed XML file (~50GB uncompressed).

I want to do both streaming decompression and parsing, which sounds simple enough. For decompression, I do:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

And then pass the reader to the XML parser:

decoder := xml.NewDecoder(inputFile)

However, since both decompressing and parsing are expensive operations, I would like to have them run on separate Go routines to make use of additional cores. How would I go about doing this in Go?

The only thing I can think of is wrapping the file in a chan []byte, and implementing the io.Reader interface, but I presume there might be a built way (and cleaner) way of doing it.

Has anyone ever done something like this?

Thanks! Manuel

1条回答
Anthone
2楼-- · 2019-05-10 18:08

You can use io.Pipe, then use io.Copy to push the decompressed data into the pipe, and read it in another goroutine:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "sync"
)

func main() {

    rawJson := []byte(`{
            "Foo": {
                "Bar": "Baz"
            }
        }`)

    bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader

    var wg sync.WaitGroup
    wg.Add(2)

    r, w := io.Pipe()

    go func() {
        // write everything into the pipe. Decompression happens in this goroutine.
        io.Copy(w, bzip2Reader)
        w.Close()
        wg.Done()
    }()

    decoder := json.NewDecoder(r)

    go func() {
        for {
            t, err := decoder.Token()
            if err != nil {
                break
            }
            fmt.Println(t)
        }
        wg.Done()
    }()

    wg.Wait()
}

http://play.golang.org/p/fXLnfnaWYA

查看更多
登录 后发表回答