Using Google Cloud Dataflow how do I run with prop

2019-04-16 17:07发布

问题:

I'm new to Google Cloud Dataflow, as is probably obvious from my questions below.

I've got a dataflow application written and can get it to run without issue using my personal credentials both locally and on a GCE instance. However, I can't seem to crack the proper steps to get it to run using the compute engine instance's service credentials or service credentials I've created using the API & AUTH section of the console. I always get a 401 not authorized error when I run.

Here's what I've tried...

1) Created virtual machine granting access rights to storage, datastore, sql and compute engine. My understanding is that this supposedly created a CI specific services account that is the server's default credentials. These should be used the same way a user's authentication is used when an app is run on this instance. Here's where I get a 401. My question is... Where can I see this service account that was supposedly created? Or do I just rely that it exists somewhere?

2) Created service credentials in API & Auth portion of developer's console. Then used cloud auth activate-service-account and activated that account by pointing the command at the credentials json file I downloaded. Kind of like the OAUTH round trip when you use gcloud auth login. Here I also get the 401.

3) This last thing was using the service credentials from step 2 separate from the GCE instance and then create an object that implements the CredentialFactory interface and pass it off to the PipelineOptions. However, when it runs the app crashes now with an error saying that it is looking for a method, fromOptions, that isn't in the CredentialFactory interface. How the options were configured, what the credentials factory looked like and the stack trace from this follows.

I would be happy to utilize any of the above 3 methods to make use of service credentials, if I could get any of them to work. Any insight you can provide on what I'm doing wrong, steps I'm leaving out, other unexplored options would be greatly appreciated. The documentation is a little dis-jointed. If there is a clear step by step guide a link to that would be sufficient. What I've found so far on my own has been of little assistance.

If I can provide any additional information please let me know.

Here's some code that may be helpful and the stack trace I get when the code runs using the credential factory.

Options setup code looks like this:

GcrDataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(GcrDataflowPipelineOptions.class);
options.setKind("Counties");
options.setCredentialFactoryClass(GoogleCredentialProvider.class);

GoogleCredentialProvider.java

Notice the json file I downloaded as part of creating the services account (renamed) is what's loaded as a resource from my apps class path.

public class GoogleCredentialProvider implements CredentialFactory {

    @Override
    public Credential getCredential() throws IOException, GeneralSecurityException {
        final String env = System.getProperty("gcr_dataflow_env", "local");
        Properties props = new Properties();
        ClassLoader loader = this.getClass().getClassLoader();
        props.load(loader.getResourceAsStream(env + "-gcr-dataflow.properties"));
        final String credFileName = props.getProperty("gcloud.dataflow.service.account.file");
        InputStream credStream = loader.getResourceAsStream(credFileName);
        GoogleCredential credential = GoogleCredential.fromStream(credStream);
        return credential;
    }

}

Stacktrace:

java.lang.RuntimeException: java.lang.RuntimeException: Unable to find factory method com.scotcro.gcr.dataflow.components.pipelines.GoogleCredentialProvider#fromOptions
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.evaluateReadHelper(BasicSerializableSourceFormat.java:268)
    at com.google.cloud.dataflow.sdk.io.Read$Bound$1.evaluate(Read.java:123)
    at com.google.cloud.dataflow.sdk.io.Read$Bound$1.evaluate(Read.java:120)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:684)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:99)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:208)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:640)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:354)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:76)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:149)
    at com.scotcro.gcr.dataflow.app.GcrDataflowApp.run(GcrDataflowApp.java:65)
    at com.scotcro.gcr.dataflow.app.GcrDataflowApp.main(GcrDataflowApp.java:49)
Caused by: java.lang.RuntimeException: Unable to find factory method com.scotcro.gcr.dataflow.components.pipelines.GoogleCredentialProvider#fromOptions
    at com.google.cloud.dataflow.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    at com.google.cloud.dataflow.sdk.util.InstanceBuilder.build(InstanceBuilder.java:161)
    at com.google.cloud.dataflow.sdk.options.GcpOptions$GcpUserCredentialsFactory.create(GcpOptions.java:180)
    at com.google.cloud.dataflow.sdk.options.GcpOptions$GcpUserCredentialsFactory.create(GcpOptions.java:175)
    at com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:288)
    at com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:127)
    at com.sun.proxy.$Proxy42.getGcpCredential(Unknown Source)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.getDatastore(DatastoreIO.java:335)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.createReader(DatastoreIO.java:320)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.createReader(DatastoreIO.java:186)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.evaluateReadHelper(BasicSerializableSourceFormat.java:259)
    ... 13 more
java.lang.RuntimeException: java.lang.RuntimeException: Unable to find factory method com.scotcro.gcr.dataflow.components.pipelines.GoogleCredentialProvider#fromOptions
2015-07-03 09:55:42,519 | main | DEBUG | co.sc.gc.da.ap.GcrDataflowApp | destroying
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.evaluateReadHelper(BasicSerializableSourceFormat.java:268)
    at com.google.cloud.dataflow.sdk.io.Read$Bound$1.evaluate(Read.java:123)
    at com.google.cloud.dataflow.sdk.io.Read$Bound$1.evaluate(Read.java:120)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:684)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:99)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:208)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:640)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:354)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:76)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:149)
    at com.scotcro.gcr.dataflow.app.GcrDataflowApp.run(GcrDataflowApp.java:65)
    at com.scotcro.gcr.dataflow.app.GcrDataflowApp.main(GcrDataflowApp.java:49)
Caused by: java.lang.RuntimeException: Unable to find factory method com.scotcro.gcr.dataflow.components.pipelines.GoogleCredentialProvider#fromOptions
    at com.google.cloud.dataflow.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    at com.google.cloud.dataflow.sdk.util.InstanceBuilder.build(InstanceBuilder.java:161)
    at com.google.cloud.dataflow.sdk.options.GcpOptions$GcpUserCredentialsFactory.create(GcpOptions.java:180)
    at com.google.cloud.dataflow.sdk.options.GcpOptions$GcpUserCredentialsFactory.create(GcpOptions.java:175)
    at com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:288)
    at com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:127)
    at com.sun.proxy.$Proxy42.getGcpCredential(Unknown Source)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.getDatastore(DatastoreIO.java:335)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.createReader(DatastoreIO.java:320)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.createReader(DatastoreIO.java:186)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.evaluateReadHelper(BasicSerializableSourceFormat.java:259)
    ... 13 more

回答1:

You likely do not have the proper credentials. When you execute a Dataflow job from GCE, The service account attached to the instance will be used for validation by DataFlow.

Did you do this when creating your machines?

  • create a service account for the instance on GCE? https://cloud.google.com/compute/docs/authentication#using

  • Set the required scopes for using Dataflow such as storage, compute, and bigquery? https://www.googleapis.com/auth/cloud-platform