Triggering Cloud Dataflow pipeline from Cloud Func

2020-04-20 21:22发布

问题:

I am trying to trigger a Dataflow pipeline from a Cloud Function which itself is triggered upon upload of a new file in a GCS bucket. When I upload a file, the Cloud function gets triggered properly but timesout after few seconds without any Dataflow being triggered. Below is my function code:

const google = require('googleapis');
const projectId = "iot-fitness-198120";
 
exports.moveDataFromGCStoPubSub = function(event, callback) {
 const file = event.data;
 if (file.resourceState === 'exists' && file.name) {
   google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }
 
     if (authClient.createScopedRequired && authClient.createScopedRequired()) {
       authClient = authClient.createScoped([
         'https://www.googleapis.com/auth/cloud-platform',
         'https://www.googleapis.com/auth/userinfo.email'
       ]);
     }
     console.log("File exists and client function is authenticated");
     console.log(file);
     
 
     const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
     console.log(`Incoming data: ${file.name}`);
 
     dataflow.projects.templates.create({
       projectId: projectId,
       resource: {
         parameters: {
           inputFile: `gs://${file.bucket}/${file.name}`,
           outputTopic: `projects/iot-fitness-198120/topics/MemberFitnessData`
         },
         jobName: 'CStoPubSub',
         gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub',
         staginglocation: 'gs://fitnessanalytics-tmp/tmp'
       }
     }, function(err, response) {
       if (err) {
         console.error("problem running dataflow template, error was: ", err);
       }
       console.log("Dataflow template response: ", response);
       callback();
     });
 
   });
 }
};

The execution doesn't even log the following line, console.log("File exists and client function is authenticated"); which tells me it is not even getting that far.

Here's the log output during execution:

2018-03-20 04:56:43.283 GST DataflowTriggeringFunction 52957909906492 Function execution took 60097 ms, finished with status: 'timeout'

2018-03-20 04:55:43.188 GST DataflowTriggeringFunction 52957909906492 Function execution started

Any idea why it's not triggering the Dataflow and yet not throwing an error message ?

回答1:

I have finally modified the code. Got some help from GCP support. Below is the right syntax that works:

var {google} = require('googleapis');

exports.moveDataFromGCStoPubSub = (event, callback) => {


const file = event.data;
const context = event.context;

console.log(`Event ${context.eventId}`);
console.log(`  Event Type: ${context.eventType}`);
console.log(`  Bucket: ${file.bucket}`);
console.log(`  File: ${file.name}`);
console.log(`  Metageneration: ${file.metageneration}`);
console.log(`  Created: ${file.timeCreated}`);
console.log(`  Updated: ${file.updated}`);

  google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

 console.log(projectId);

 const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
 console.log(`gs://${file.bucket}/${file.name}`);
 dataflow.projects.templates.create({
  gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub', 
  projectId: projectId,
   resource: {
    parameters: {
        inputFilePattern: `gs://${file.bucket}/${file.name}`,
        outputTopic: 'projects/iot-fitness-198120/topics/MemberFitnessData2'
      },
    environment: {
      tempLocation: 'gs://fitnessanalytics-tmp/tmp'
    },
      jobName: 'CStoPubSub',
      //gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub',    
    }
 }, function(err, response) {
   if (err) {
     console.error("problem running dataflow template, error was: ", err);
   }
   console.log("Dataflow template response: ", response);
   callback();
 });

   });

 callback();
};


回答2:

Guess your cloud function execution fails doesn't satisfy your if statement,

if (file.resourceState === 'exists' && file.name)

I had similar issue when I started working on Cloud Function. Modify your index.js file var {google} = require('googleapis'); as provided in the solution here