Running S3-put-triggered Lambda function on existi

2020-07-06 06:20发布

I have a Lambda function in Node.js that processes new images added to my bucket. I want to run the function for all existing objects. How can I do this? I figured the easiest way is to "re-put" each object, to trigger the function, but I'm not sure how to do this.

To be clear - I want to run, one-time, on each of the existing objects. The trigger is already working for new objects, I just need to run it on the objects that were inserted before the lambda function was created.

5条回答
再贱就再见
2楼-- · 2020-07-06 06:58

This thread helped push me in the right direction as I needed to invoke a lambda function per file for an existing 50k files in two buckets. I decided to write it in python and limit the amount of lambda functions running simultaneously to 500 (the concurrency limit for many aws regions is 1000).

The script creates a worker pool of 500 threads who feed off a queue of bucket keys. Each worker waits for their lambda to be finished before picking up another. Since the execution of this script against my 50k files will take a couple hours, I'm just running it off my local machine. Hope this helps someone!

#!/usr/bin/env python

# Proper imports
import json
import time
import base64
from queue import Queue
from threading import Thread
from argh import dispatch_command

import boto3
from boto.s3.connection import S3Connection

client = boto3.client('lambda')

def invoke_lambdas():
    try:
        # replace these with your access keys
        s3 = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
        buckets = [s3.get_bucket('bucket-one'), s3.get_bucket('bucket-two')]

        queue = Queue()
        num_threads = 500

        # create a worker pool
        for i in range(num_threads):
            worker = Thread(target=invoke, args=(queue,))
            worker.setDaemon(True)
            worker.start()

        for bucket in buckets:
            for key in bucket.list():
                queue.put((bucket.name, key.key))

        queue.join()

    except Exception as e:
        print(e)

def invoke(queue):
    while True:
        bucket_key = queue.get()

        try:
            print('Invoking lambda with bucket %s key %s. Remaining to process: %d'
                % (bucket_key[0], bucket_key[1], queue.qsize()))
            trigger_event = {
                'Records': [{
                    's3': {
                        'bucket': {
                            'name': bucket_key[0]
                        },
                        'object': {
                            'key': bucket_key[1]
                        }
                    }
                }]
            }

            # replace lambda_function_name with the actual name
            # InvocationType='RequestResponse' means it will wait until the lambda fn is complete
            response = client.invoke(
                FunctionName='lambda_function_name',
                InvocationType='RequestResponse',
                LogType='None',
                ClientContext=base64.b64encode(json.dumps({}).encode()).decode(),
                Payload=json.dumps(trigger_event).encode()
            )
            if response['StatusCode'] != 200:
                print(response)

        except Exception as e:
            print(e)
            print('Exception during invoke_lambda')

        queue.task_done()

if __name__ == '__main__':
    dispatch_command(invoke_lambdas)
查看更多
迷人小祖宗
3楼-- · 2020-07-06 07:08

The following Lambda function will do what you require.

It will iterate through each file in your target S3 bucket and for each it will execute the desired lambda function against it emulating a put operation.

You're probably going to want to put a very long execution time allowance against this function

var TARGET_BUCKET="my-bucket-goes-here";
var TARGET_LAMBDA_FUNCTION_NAME="TestFunct";
var S3_PUT_SIMULATION_PARAMS={
  "Records": [
    {
      "eventVersion": "2.0",
      "eventTime": "1970-01-01T00:00:00.000Z",
      "requestParameters": {
        "sourceIPAddress": "127.0.0.1"
      },
      "s3": {
        "configurationId": "testConfigRule",
        "object": {
          "eTag": "0123456789abcdef0123456789abcdef",
          "sequencer": "0A1B2C3D4E5F678901",
          "key": "HappyFace.jpg",
          "size": 1024
        },
        "bucket": {
          "arn": "arn:aws:s3:::mybucket",
          "name": "sourcebucket",
          "ownerIdentity": {
            "principalId": "EXAMPLE"
          }
        },
        "s3SchemaVersion": "1.0"
      },
      "responseElements": {
        "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
        "x-amz-request-id": "EXAMPLE123456789"
      },
      "awsRegion": "us-east-1",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "EXAMPLE"
      },
      "eventSource": "aws:s3"
    }
  ]
};

var aws = require('aws-sdk');
var s3 = new aws.S3();
var lambda = new aws.Lambda();


exports.handler = (event, context, callback) => {
    retrieveS3BucketContents(TARGET_BUCKET, function(s3Objects){
        simulateS3PutOperation(TARGET_BUCKET, s3Objects, simulateS3PutOperation, function(){ 
            console.log("complete."); 
        });
    });
};

function retrieveS3BucketContents(bucket, callback){
    s3.listObjectsV2({
        Bucket: TARGET_BUCKET
    }, function(err, data) {
        callback(data.Contents);
    });
}

function simulateS3PutOperation(bucket, s3ObjectStack, callback, callbackEmpty){
    var params = {
      FunctionName: TARGET_LAMBDA_FUNCTION_NAME, 
      Payload: ""
    };

    if(s3ObjectStack.length > 0){
        var s3Obj = s3ObjectStack.pop();
        var p = S3_PUT_SIMULATION_PARAMS;
        p.Records[0].s3.bucket.name = bucket;
        p.Records[0].s3.object.key = s3Obj.Key;
        params.Payload = JSON.stringify(p, null, 2);
        lambda.invoke(params, function(err, data) {
          if (err) console.log(err, err.stack); // an error occurred
          else{
              callback(bucket, s3ObjectStack, callback, callbackEmpty);
          }
        });
    }
    else{
        callbackEmpty();   
    }
}

Below is the full policy that your lambda query will need to execute this method, it allows R/W to CloudWatch logs and ListObject access to S3. You need to fill in your bucket details where you see MY-BUCKET-GOES-HERE

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1477382207000",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::MY-BUCKET-GOES-HERE/*"
            ]
        },
        {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
    ]
}
查看更多
Evening l夕情丶
4楼-- · 2020-07-06 07:10

As I had to do this on a very large bucket, and lambda functions have a max. execution time of 10 minutes, I ended up doing a script with the Ruby AWS-SDK.

require 'aws-sdk-v1'

class LambdaS3Invoker

  BUCKET_NAME = "HERE_YOUR_BUCKET"
  FUNCTION_NAME = "HERE_YOUR_FUNCTION_NAME"
  AWS_KEY = "HERE_YOUR_AWS_KEY"
  AWS_SECRET = "HERE_YOUR_AWS_SECRET"
  REGION = "HERE_YOUR_REGION"

  def execute
    bucket.objects({ prefix: 'products'}).each do |o|
      lambda_invoke(o.key)
    end
  end

  private

  def lambda_invoke(key)
    lambda.invoke({
      function_name: FUNCTION_NAME,
      invocation_type: 'Event',
      payload: JSON.generate({
        Records: [{
          s3: {
            object: {
              key: key,
            },
            bucket: {
              name: BUCKET_NAME,
            }
          }
        }]
      })
    })
  end

  def lambda
    @lambda ||= Aws::Lambda::Client.new(
      region: REGION,
      access_key_id: AWS_KEY,
      secret_access_key: AWS_SECRET
    )
  end

  def resource
    @resource ||= Aws::S3::Resource.new(
      access_key_id: AWS_KEY,
      secret_access_key: AWS_SECRET
    )
  end

  def bucket
    @bucket ||= resource.bucket(BUCKET_NAME)
  end
end

And then you can call it like:

LambdaS3Invoker.new.execute
查看更多
\"骚年 ilove
5楼-- · 2020-07-06 07:11

well basically what you need is to use some api calls(boto for example if you use python)and list all new objects or all objects in your s3 bucket and then process these objects

here is a snippet:

from boto.s3.connection import S3Connection

conn = S3Connection()
source = conn.get_bucket(src_bucket)
src_list = set([key.name for key in source.get_all_keys(headers=None, prefix=prefix)])
//and then you can go over this src list
for entry in src_list:
   do something
查看更多
甜甜的少女心
6楼-- · 2020-07-06 07:17

What you need to do is create a one time script which uses AWS SDK to invoke your lambda function. This solution doesn't require you to "re-put" the object.

I am going to base my answer on AWS JS SDK.

To be clear - I want to run, one-time, on each of the existing objects. The trigger is already working for new objects, I just need to run it on the objects that were inserted before the lambda function was created.

As you have a working lambda function which accepts S3 put events what you need to do is find all the unprocessed object in S3 (If you have DB entries for each S3 object the above should be easy if not then you might find the S3 list object function handy http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#listObjectsV2-property).

Then for each unprocessed S3 object obtained create a JSON object which looks like S3 Put Event Message(shown below) and call the Lambda invoke function with the above JSON object as payload.

You can find the lambda invoke function docs at http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property

When creating the fake S3 Put Event Message Object for your lambda function you can ignore most of the actual object properties depending on your lambda function. I guess the least you will have to set is bucket name and object key.

S3 Put Event Message Structure http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html

{  
   "Records":[  
      {  
         "eventVersion":"2.0",
         "eventSource":"aws:s3",
         "awsRegion":"us-east-1",
         "eventTime":"1970-01-01T00:00:00.000Z",
         "eventName":"ObjectCreated:Put",
         "userIdentity":{  
            "principalId":"AIDAJDPLRKLG7UEXAMPLE"
         },
         "requestParameters":{  
            "sourceIPAddress":"127.0.0.1"
         },
         "responseElements":{  
            "x-amz-request-id":"C3D13FE58DE4C810",
            "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"testConfigRule",
            "bucket":{  
               "name":"mybucket",
               "ownerIdentity":{  
                  "principalId":"A3NL1KOZZKExample"
               },
               "arn":"arn:aws:s3:::mybucket"
            },
            "object":{  
               "key":"HappyFace.jpg",
               "size":1024,
               "eTag":"d41d8cd98f00b204e9800998ecf8427e",
               "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko",
               "sequencer":"0055AED6DCD90281E5"
            }
         }
      }
   ]
}
查看更多
登录 后发表回答