I'm using the Python Beam SDK 0.6.0. And I would like to write my output to JSON in Google Cloud Storage. What is the best way to do this?
I quess I can use WriteToText
from the Text IO sink but then I have to format each row separately, right? How do I save my result into valid JSON files that contain lists of objects?
Ok, for reference, I solved this by writing my own sink building on the _TextSink
used by WriteToText
in the beam SDK.
Not sure if this is the best way to do it but it works well so far. Hope it might help someone else.
import os
import json
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform
class _JsonSink(beam.io.FileSink):
"""A Dataflow sink for writing JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
super(_JsonSink, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='text/plain',
compression_type=compression_type)
self.last_rows = dict()
def open(self, temp_path):
""" Open file and initialize it w opening a list."""
file_handle = super(_JsonSink, self).open(temp_path)
file_handle.write('[\n')
return file_handle
def write_record(self, file_handle, value):
"""Writes a single encoded record converted to JSON and terminates the
line w a comma."""
if self.last_rows.get(file_handle, None) is not None:
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
file_handle.write(',\n')
self.last_rows[file_handle] = value
def close(self, file_handle):
"""Finalize the JSON list and close the file handle returned from
``open()``. Called after all records are written.
"""
if file_handle is not None:
# Write last row without a comma
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
# Close list and then the file
file_handle.write('\n]\n')
file_handle.close()
class WriteToJson(PTransform):
"""PTransform for writing to JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
shard_name_template, coder, compression_type)
def expand(self, pcoll):
return pcoll | Write(self._sink)
Using the sink is similar to how you use the the text sink:
pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')
Making each file contain a single list with a bunch of elements is difficult, because you'd need to group a bunch of elements and then write them together to a file. Let me advice you to use a different format.
You may consider the JSON Lines format, where each line in a file represents a single JSON element.
Transforming your data to JSON Lines should be pretty easy. The following transform should do the trick:
class WriteToJsonLines(beam.PTransform):
def __init__(self, file_name):
self._file_name = file_name
def expand(self, pcoll):
return (pcoll
| 'format json' >> beam.Map(json.dumps)
| 'write to text' >> beam.WriteToText(self._file_name))
Finally, if you later on want to read your JSON Lines files, you can write your own JsonLinesSource or use the one in beam_utils.
Although this is a year late, I'd like to add another way to write a result to json files in GCS. For apache beam 2.x pipelines, this transform works:
.withSuffix(".json")
For example:
result.apply("WriteToGCS", TextIO.write().to(bucket)
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
.withSuffix(".json")
.withNumShards(chunks));