Credentials error when using google transfer servi

2019-08-24 12:50发布

We have developed an automated pipeline that mainly does tasks on AWS and then some downstream work on Google Cloud. The tasks are deployed on AWS via AWS StepFunctions/Lambda and we need to pass processed files from AWS to Google Cloud Storage (via Google Transfer Service). However, I've been having trouble wiring together the AWS and GCP part.

I have a Lambda function for implementing Google Transfer Service (via google's python client library), however I keep getting the error:

module initialization error: Could not automatically determine 
credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or
explicitly create credential and re-run the application. For more 
information, please see
https://developers.google.com/accounts/docs/application-default-
credentials.

I have a method in my S3-to-GCS Lambda function (which I call "handoff") that sets the GOOGLE_APPLICATION_CREDENTIALS environment variable, but it obviously isn't working.

Here is the Handoff Lambda Function:

"""
Creates a one-time transfer from Amazon S3 to Google Cloud Storage.
"""

import requests
import boto3
from botocore.client import ClientError
from requests.exceptions import RequestException
from google.cloud import storage
import googleapiclient.discovery
import logging
import uuid
import os
import base64


EVENT_GCP_CREDS_KEY = 'gcp_creds_b64_cypher_text'
GCP_CREDENTIALS_FILE_NAME = 'service_creds.json'


# Establish clients

kms = boto3.client('kms')
storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1')

def handler(event, context):

    description = "-".join("transfer-job", event['queue'])
    project_id = event['project_id']
    source_bucket = event['results_uri'] + "final-cohort-vcf/"
    sink_bucket = event['sink_bucket']
    include_prefix = event['cohort_prefix'] + ".gt.vcf.gz"
    access_key = event['aws_access_key']
    secret_access_key = event['aws_secret_key']

    return gcp_storage_read_op_verification(event)    

    now = datetime.datetime.now()

    day = now.day
    month = now.month
    year = now.year

    # Add 7 hours because the time has to be in UTC
    hours = now.hour + 7
    minutes = now.minute + 2

    transfer_job = {
        'description': description,
        'status': 'ENABLED',
        'projectId': project_id,
        'schedule': {
            'scheduleStartDate': {
                'day': day,
                'month': month,
                'year': year
            },
            'scheduleEndDate': {
                'day': day,
                'month': month,
                'year': year
            },
            'startTimeOfDay': {
                'hours': hours,
                'minutes': minutes
            }
        },
        'transferSpec': {
            'objectConditions': {
                'includePrefixes': [
                    include_prefix
                ]
            },
            'awsS3DataSource': {
                'bucketName': source_bucket,
                'awsAccessKey': {
                    'accessKeyId': access_key,
                    'secretAccessKey': secret_access_key
                }
            },
            'gcsDataSink': {
                'bucketName': sink_bucket
            }
        }
    }

    result = storagetransfer.transferJobs().create(body=transfer_job).execute()
    print('Returned transferJob: {}'.format(
        json.dumps(result, indent=4)))

    return result



def _gcp_credentials_hack(event):
    """
    A hack to enable GCP client usage via Application Default Credentials. Uses an encoded, encrypted string of the
    GCP service account JSON from the event object. Has the side effecst of
        1. Writing the credentials in plaintext to /tmp/service_creds.json
        2. Referencing this location in environment variable

    :param event: Passed in by the container, expecting to find key: gcp_creds_b64_cypher_text
    :return: None
    """

    # Get blob from event
    cypher_text_blob = event['GCP_creds']

    # Decode cypher_text from base64, into bytes
    cypher_text_bytes = base64.b64decode(cypher_text_blob)

    # Call KMS to decrypt GCP credentials cypher text
    # Permisssions for this action should be given by a policy attached this function's invocation role
    decrypt_response = kms.decrypt(CiphertextBlob=cypher_text_bytes)

    # Process the plaintext from the result object from kms.decrypt(..) into a utf-8 encoded bytes object
    gcp_credentials_bytes = decrypt_response['Plaintext']
    # Decode the utf8-encoded bytes object into a Python str of th GCP credentials
    gcp_credentials = gcp_credentials_bytes.decode('utf-8')

    # Write the credentials in plaintext to /tmp
    #   - Can we gaurantee that only approved agents can read this file?
    #   YES. See comment.
    gcp_credentials_file_local = os.path.sep.join(('/tmp', GCP_CREDENTIALS_FILE_NAME))
    logger.debug('writing credentials to {}'.format(gcp_credentials_file_local))
    with open(gcp_credentials_file_local, 'w') as credentials_fh:
        credentials_fh.write(gcp_credentials)

    # Set Application Default Credentials environment path to tmp file we created
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = gcp_credentials_file_local


def _get_blob_from_gcs_public_landsat_bucket():
    """
    Create Google Cloud Storage client and use it to return a blob from the public landsat image bucket.

    :return: blob retrieved from public landsat bucket using Google Cloud Storage client library
    """
    # Create GCP storage client instance, using Application Default Credentials (hack)
    client = storage.Client()

    # Use GCP storage client to get public landsat bucket object
    public_landsat_bucket = client.get_bucket(bucket_name='gcp-public-data-landsat')

    # return the first blob in the bucket
    for blob in public_landsat_bucket.list_blobs():
        return blob


def _get_blob_from_hail_project_bucket():
    """
    Create Google Cloud Storage client and use it to return a blob from the public landsat image bucket.

    :return: blob retrieved from public landsat bucket using Google Cloud Storage client library
    """
    # Create GCP storage client instance, using Application Default Credentials (hack)
    client = storage.Client()

    # Use GCP storage client to get public landsat bucket object
    public_landsat_bucket = client.get_bucket(bucket_name='liftover-results')

    # return the first blob in the bucket
    for blob in public_landsat_bucket.list_blobs():
        return blob

def gcp_storage_read_op_verification(event):
    """
    :param event: event from handler that contains the encrypted GCP service credentials
    :return: first blob id from GCS public landsat bucket
    """

    try:
        _gcp_credentials_hack(event=event)

        # If credentials hack succeeded, this will return a blob from the GCS plublic landsat public using GCS
        # python client
        landsat_blob = _get_blob_from_gcs_public_landsat_bucket()
        liftover_blob = _get_blob_from_hail_project_bucket()

        # return dict of blob.id for response jsonification
        return {
            "gcp-public-data-landsat-random-blob-id": landsat_blob.id,
            "gcp-private-hail-bucket-random-blob-id": liftover_blob.id,
        }
    except BaseException as e:
        # Very broad exception class.. using it for time being, until have opportunity to find the fine-grained
        # exceptions that can be raised from the statement suite
        logger.error(e)
        return FAIL_TOKEN
        Appears to be safe. See https://forums.aws.amazon.com/message.jspa?messageID=761306

            "Thanks for reaching out to us. I believe what you are referring to is Lambda container re-usage. All the
            resources associated with a Lambda container, including the /tmp storage, are isolated from other Lambda
            containers.

            https://aws.amazon.com/blogs/compute/container-reuse-in-lambda/

            Indeed there is the possibility of reusing the same Lambda container if your function is executed at close
            time intervals. Important to note is that the container will only be reused for your particular Lambda
            function, other functions from your account or different accounts will run in other isolated containers.
            So there is a 1-to-1 association between a Lambda function and its container.

            After a certain time interval of a Lambda function not being invoked, its associated container is deleted
            and in the process all the data stored in the memory or disk associated with the container is destroyed as
            well."

            ...

    2. With the client library bootstrapped, use the storage client to retrieve the public landsat image
    bucket, and return a blob.

    3. Finally return that blob's id, as proof of the execution.

    :param event: event from handler that contains the encrypted GCP service credentials
    :return: first blob id from GCS public landsat bucket
    """

    # Use encrypted GCP storage crendentials from event to enable file-based Application Default Credentials.
    # This is somewhat of a bad hack since they must exist in plaintext on a filesystem. Although, see above, this
    # is supposed to be private between containers.
    #
    # TODO: If using this hack often, would be better to delete the plaintext creds file after the GCP API call
    #       Possibly using a context manager. E.g.
    #
    #       with gcp_creds_hack:
    #           _get_blob
    #
    #       Implemented like
    #
    #       from contextlib import contextmanager
    #
    #       @contextmanager
    #       def gcp_creds_hack(file):
    #           # write the plaintext creds file, file, to /tmp
    #           yield
    #           # delete the plaintext creds file, file, from /tmp
    #

    try:
        _gcp_credentials_hack(event=event)

        # If credentials hack succeeded, this will return a blob from the GCS plublic landsat public using GCS
        # python client
        landsat_blob = _get_blob_from_gcs_public_landsat_bucket()
        liftover_blob = _get_blob_from_hail_project_bucket()

        # return dict of blob.id for response jsonification
        return {
            "gcp-public-data-landsat-random-blob-id": landsat_blob.id,
            "gcp-private-hail-bucket-random-blob-id": liftover_blob.id,
        }
    except BaseException as e:
        # Very broad exception class.. using it for time being, until have opportunity to find the fine-grained
        # exceptions that can be raised from the statement suite
        logger.error(e)
        return FAIL_TOKEN

The GCP credentials that are getting passed in the event object stem from the pipeline user's input of the local file path to the service account json file. I then use a function that reads this json file and encrypts it via the following script:

import boto3
import base64

class LambdaModeUseCases(object):
    @staticmethod
    def encrypt_gcp_creds(gcp_creds_file, key_id):
        # get GCP creds as string
        with open(gcp_creds_file, 'r') as gcp_creds_fh:
            gcp_creds = gcp_creds_fh.read().replace('\n', '')

        # kms:Encrypt (need key_id or arn)
        kms_client = boto3.client('kms')
        encryption_response = {}
        try:
            encryption_response = kms_client.encrypt(
                KeyId=key_id,
                Plaintext=gcp_creds.encode('utf-8'),
            )
        except ClientError as ce:
            print('Failed calling kms.encrypt with key {key} on text {text}'.format(key=key_id, text=gcp_creds))
            raise ce

        print('Parsing kms_client.encrypt(..) response')
        cypher_text_blob = 'FAILED'
        if 'CiphertextBlob' in encryption_response:
            cypher_text_blob = encryption_response['CiphertextBlob']
        else:
            print('Wait able to call kms.encrypt(..) without a botocore.client.ClientError.')
            print('But failed to find CiphertextBlob in response: {response}'.format(
                response=encryption_response
            ))
            raise Exception('Failed to find CiphertextBlob in kms.encrypt() response.')

        print('Base64 encoding...')
        encrypted_gcp_creds_b64 = base64.b64encode(cypher_text_blob)
        print('b64: {}'.format(encrypted_gcp_creds_b64))

        encrypted_gcp_creds_str = encrypted_gcp_creds_b64.decode("utf-8")
        print('str: {}'.format(encrypted_gcp_creds_str))

        return encrypted_gcp_creds_str

This encrypted credentials object is what then gets passed into the Handoff Lambda function.

Any ideas as to what's going wrong or a more simplified way of doing this?

2条回答
放荡不羁爱自由
2楼-- · 2019-08-24 13:04

Try this:

  1. Save your Google service account credentials in the same directory as the Python script that contains your lambda code, and your script should call this:

    os.setenv['GOOGLE_APPLICATION_CREDENTIALS'] = './yourcred.json'
    
  2. Zip up the script and the JSON file (creating a deployment package).

  3. In AWS Lambda service, create a Lambda function, and upload ZIP file instead of using blueprint

There are examples of doing this, for example https://gist.github.com/cyk/8ec6481d3dcbe10376f8.

查看更多
Anthone
3楼-- · 2019-08-24 13:07

If you have the credentials in a json file, you don’t need to set up environmental variables, but provide service account credentials manually (not default credentials). You can run the following code:

from googleapiclient import discovery
from google.oauth2 import service_account

SERVICE_ACCOUNT_FILE = 'service.json'
credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE)
storagetransfer = discovery.build('storagetransfer', 'v1', credentials=credentials)
查看更多
登录 后发表回答