How to convert csv into a dictionary in apache bea

2019-01-18 07:30发布

问题:

I would like to read a csv file and write it to BigQuery using apache beam dataflow. In order to do this I need to present the data to BigQuery in the form of a dictionary. How can I transform the data using apache beam in order to do this?

My input csv file has two columns, and I want to create a subsequent two column table in BigQuery. I know how to create data in BigQuery, thats straight forward, what I don't know is how to transform the csv into a dictionary. The below code is not correct but should give an idea of what i'm trying to do.

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

回答1:

The idea is to have a source that returns parsed CSV rows. You can do this by subclassing the FileBasedSource class to include CSV parsing. Particularly, the read_records function would look something like this:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec

I recently wrote a CsvFileSource for Apache Beam. You can take a look on the Github repository. You can use pip install beam_utils, and from beam_utils.sources import CsvFileSource to use it. CsvFileSource also includes options to set a custom delimiter, skip the file header, and/or output dictionaries instead of lists.



回答2:

As a supplement to Pablo's post, I'd like to share a little change I made myself to his sample. (+1 for you!)

Changed: reader = csv.reader(self._file) to reader = csv.DictReader(self._file)

The csv.DictReader uses the first row of the CSV file as Dict keys. The other rows are used to populate a dict per row with it's values. It'll automatically put the right values to the correct keys based on column order.

One little detail is that every value in the Dict is stored as string. This may conflict your BigQuery schema if you use eg. INTEGER for some fields. So you need to take care of proper casting afterwards.