How to write result to JSON files in gcs in Datafl

2019-03-22 05:12发布

I'm using the Python Beam SDK 0.6.0. And I would like to write my output to JSON in Google Cloud Storage. What is the best way to do this?

I quess I can use WriteToText from the Text IO sink but then I have to format each row separately, right? How do I save my result into valid JSON files that contain lists of objects?

3条回答
你好瞎i
2楼-- · 2019-03-22 05:27

Making each file contain a single list with a bunch of elements is difficult, because you'd need to group a bunch of elements and then write them together to a file. Let me advice you to use a different format.

You may consider the JSON Lines format, where each line in a file represents a single JSON element.

Transforming your data to JSON Lines should be pretty easy. The following transform should do the trick:

class WriteToJsonLines(beam.PTransform):
    def __init__(self, file_name):
        self._file_name = file_name

    def expand(self, pcoll):
        return (pcoll
                | 'format json' >> beam.Map(json.dumps)
                | 'write to text' >> beam.WriteToText(self._file_name))

Finally, if you later on want to read your JSON Lines files, you can write your own JsonLinesSource or use the one in beam_utils.

查看更多
欢心
3楼-- · 2019-03-22 05:40

Ok, for reference, I solved this by writing my own sink building on the _TextSink used by WriteToText in the beam SDK.

Not sure if this is the best way to do it but it works well so far. Hope it might help someone else.

import os
import json

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform   

class _JsonSink(beam.io.FileSink):
    """A Dataflow sink for writing JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        super(_JsonSink, self).__init__(
            file_path_prefix,
            file_name_suffix=file_name_suffix,
            num_shards=num_shards,
            shard_name_template=shard_name_template,
            coder=coder,
            mime_type='text/plain',
            compression_type=compression_type)
        self.last_rows = dict()

    def open(self, temp_path):
        """ Open file and initialize it w opening a list."""
        file_handle = super(_JsonSink, self).open(temp_path)
        file_handle.write('[\n')
        return file_handle

    def write_record(self, file_handle, value):
        """Writes a single encoded record converted to JSON and terminates the
        line w a comma."""
        if self.last_rows.get(file_handle, None) is not None:
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))
            file_handle.write(',\n')

        self.last_rows[file_handle] = value

    def close(self, file_handle):
        """Finalize the JSON list and close the file handle returned from
        ``open()``. Called after all records are written.
        """
        if file_handle is not None:
            # Write last row without a comma
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))

            # Close list and then the file
            file_handle.write('\n]\n')
            file_handle.close()


class WriteToJson(PTransform):
    """PTransform for writing to JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
                               shard_name_template, coder, compression_type)

    def expand(self, pcoll):
        return pcoll | Write(self._sink)

Using the sink is similar to how you use the the text sink:

pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')
查看更多
聊天终结者
4楼-- · 2019-03-22 05:50

Although this is a year late, I'd like to add another way to write a result to json files in GCS. For apache beam 2.x pipelines, this transform works:

.withSuffix(".json")

For example:

result.apply("WriteToGCS", TextIO.write().to(bucket)
            .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
            .withSuffix(".json")
            .withNumShards(chunks));
查看更多
登录 后发表回答