I'm building an apache beam streaming pipeline whose source is Pubsub and sink is BigQuery. I've gotten the error messsage:
"Workflow failed. Causes: Unknown message code."
As cryptic as this message is I now believe it to be the case that BigQuery is not supported as a sink for streaming pipelines, it says this here:
Streaming from Pub/Sub to BigQuery
Am I certainly correct that this is what's causing the problem? Or if not is it still not supported in any case?
Can anyone hint at when this feature will be released? It's a shame, I was pretty excited to get using this.
Python Streaming pipelines are experimentally available since Beam 2.5.0 as documented in beam docs here
Therefore you will need to install apache-beam 2.5.0 and apache-beam[gcp]
pip install apache-beam==2.5.0
pip install apache-beam[gcp]
I ran this command:
python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming
Using the code below, and it works alright:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
def parse_pubsub(line):
import json
record = json.loads(line)
return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', dest='input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read from PubSub
lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
#Adapt messages from PubSub to BQ table
lines = lines | beam.Map(parse_pubsub)
lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
#Write to a BQ table
lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
This code uses the publicly available topic "--topic
projects/pubsub-public-data/topics/taxirides-realtime" and BQ table that I have created with the right schema.
If you use this example be careful not leaving it running or you will incur into costs as you will receive a lot messages coming from this PubSub topic.