I have a dictionary of values that I would like to write to GCS as a valid .CSV file using the Python SDK. I can write the dictionary out as newline separated text file, but I can't seem to find an example converting the dictionary to a valid .CSV. Can anybody suggest the best way to generate csv's within a dataflow pipeline? This answers to this question address Reading from CSV files, but don't really address writing to CSV files. I recognize that CSV files are just text files with rules, but I'm still struggling to convert the dictionary of data to a CSV that can be written using WriteToText.
Here is a simple example dictionary that I would like to turn into a CSV:
test_input = [{'label': 1, 'text': 'Here is a sentence'},
{'label': 2, 'text': 'Another sentence goes here'}]
test_input | beam.io.WriteToText(path_to_gcs)
The above would result in a text file that had each dictionary on a newline. Is there any functionality within Apache Beam that I can take advantage of (similar to csv.DictWriter)?
Based on Andrew's suggestion, here is a ConvertDictToCSV function that I created:
This appears to be working well, but would certainly be safer to make use of csv.DictWriter if possible
Generally you will want to write a function that can convert your original
dict
data elements into a csv-formattedstring
representation.That function can be written as a
DoFn
that you can apply to your BeamPCollection
of data, which would convert each collection element into the desired format; you can do this by applying theDoFn
to yourPCollection
viaParDo
. You can also wrap thisDoFn
in a more user-friendlyPTransform
.You can learn more about this process in the Beam Programming Guide
Here is a simple, translatable non-Beam example:
The
converted_test_input
will look like the following:Beam DictToCSV DoFn and PTransform example using
DictWriter
To use the example, you would put your
test_input
into aPCollection
, and apply theDictToCSV
PTransform
to thePCollection
; you can take the resulting convertedPCollection
and use it as input forWriteToText
. Note that you must provide a list or tuple of column names, via thecolumn_order
argument, corresponding to keys for your dictionary input elements; the resulting CSV-formatted string columns will be in the order of the column names provided. Also, the underlying implementation for the example does not supportunicode
.