I've created and run a DataPrep job, and am trying to use the template from python on app engine. I can successfully start a job using
gcloud dataflow jobs run
--parameters "inputLocations={\"location1\":\"gs://bucket/folder/*\"},
outputLocations={\"location1\":\"project:dataset.table\"},
customGcsTempLocation=gs://bucket/DataPrep-beta/temp"
--gcs-location gs://bucket/DataPrep-beta/temp/cloud-dataprep-templatename_template
however trying to use python on app engine;
service = build('dataflow', 'v1b3', credentials=credentials)
input1 = {"location1": "{i1}".format(i1=input)}
output1 = {"location1": "{o1}".format(o1=output)}
print('input location: {}'.format(input1))
GCSPATH="gs://{bucket}/{template}".format(bucket=BUCKET, template=template)
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"parameters": {
"inputLocations": input1,
"outputLocations": output1,
"customGcsTempLocation": "gs://{}/DataPrep-beta/temp".format(BUCKET)
}
}
print("dataflow request body: {}".format(BODY))
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()
I get back;
"Invalid JSON payload received. Unknown name "location1" at
'launch_parameters.parameters[1].value': Cannot find field.
Invalid JSON payload received. Unknown name "location1" at
'launch_parameters.parameters[2].value': Cannot find field."
Nothing I've tried seems to support passing a dict or a json.dumps() or a str() to "inputLocations" or "outputLocations".