可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I have a Lambda function in Node.js that processes new images added to my bucket. I want to run the function for all existing objects. How can I do this? I figured the easiest way is to "re-put" each object, to trigger the function, but I'm not sure how to do this.
To be clear - I want to run, one-time, on each of the existing objects. The trigger is already working for new objects, I just need to run it on the objects that were inserted before the lambda function was created.
回答1:
The following Lambda function will do what you require.
It will iterate through each file in your target S3 bucket and for each it will execute the desired lambda function against it emulating a put operation.
You're probably going to want to put a very long execution time allowance against this function
var TARGET_BUCKET="my-bucket-goes-here";
var TARGET_LAMBDA_FUNCTION_NAME="TestFunct";
var S3_PUT_SIMULATION_PARAMS={
"Records": [
{
"eventVersion": "2.0",
"eventTime": "1970-01-01T00:00:00.000Z",
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"s3": {
"configurationId": "testConfigRule",
"object": {
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901",
"key": "HappyFace.jpg",
"size": 1024
},
"bucket": {
"arn": "arn:aws:s3:::mybucket",
"name": "sourcebucket",
"ownerIdentity": {
"principalId": "EXAMPLE"
}
},
"s3SchemaVersion": "1.0"
},
"responseElements": {
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
"x-amz-request-id": "EXAMPLE123456789"
},
"awsRegion": "us-east-1",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"eventSource": "aws:s3"
}
]
};
var aws = require('aws-sdk');
var s3 = new aws.S3();
var lambda = new aws.Lambda();
exports.handler = (event, context, callback) => {
retrieveS3BucketContents(TARGET_BUCKET, function(s3Objects){
simulateS3PutOperation(TARGET_BUCKET, s3Objects, simulateS3PutOperation, function(){
console.log("complete.");
});
});
};
function retrieveS3BucketContents(bucket, callback){
s3.listObjectsV2({
Bucket: TARGET_BUCKET
}, function(err, data) {
callback(data.Contents);
});
}
function simulateS3PutOperation(bucket, s3ObjectStack, callback, callbackEmpty){
var params = {
FunctionName: TARGET_LAMBDA_FUNCTION_NAME,
Payload: ""
};
if(s3ObjectStack.length > 0){
var s3Obj = s3ObjectStack.pop();
var p = S3_PUT_SIMULATION_PARAMS;
p.Records[0].s3.bucket.name = bucket;
p.Records[0].s3.object.key = s3Obj.Key;
params.Payload = JSON.stringify(p, null, 2);
lambda.invoke(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else{
callback(bucket, s3ObjectStack, callback, callbackEmpty);
}
});
}
else{
callbackEmpty();
}
}
Below is the full policy that your lambda query will need to execute this method, it allows R/W to CloudWatch logs and ListObject access to S3. You need to fill in your bucket details where you see MY-BUCKET-GOES-HERE
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1477382207000",
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::MY-BUCKET-GOES-HERE/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
回答2:
This thread helped push me in the right direction as I needed to invoke a lambda function per file for an existing 50k files in two buckets. I decided to write it in python and limit the amount of lambda functions running simultaneously to 500 (the concurrency limit for many aws regions is 1000).
The script creates a worker pool of 500 threads who feed off a queue of bucket keys. Each worker waits for their lambda to be finished before picking up another. Since the execution of this script against my 50k files will take a couple hours, I'm just running it off my local machine. Hope this helps someone!
#!/usr/bin/env python
# Proper imports
import json
import time
import base64
from queue import Queue
from threading import Thread
from argh import dispatch_command
import boto3
from boto.s3.connection import S3Connection
client = boto3.client('lambda')
def invoke_lambdas():
try:
# replace these with your access keys
s3 = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
buckets = [s3.get_bucket('bucket-one'), s3.get_bucket('bucket-two')]
queue = Queue()
num_threads = 500
# create a worker pool
for i in range(num_threads):
worker = Thread(target=invoke, args=(queue,))
worker.setDaemon(True)
worker.start()
for bucket in buckets:
for key in bucket.list():
queue.put((bucket.name, key.key))
queue.join()
except Exception as e:
print(e)
def invoke(queue):
while True:
bucket_key = queue.get()
try:
print('Invoking lambda with bucket %s key %s. Remaining to process: %d'
% (bucket_key[0], bucket_key[1], queue.qsize()))
trigger_event = {
'Records': [{
's3': {
'bucket': {
'name': bucket_key[0]
},
'object': {
'key': bucket_key[1]
}
}
}]
}
# replace lambda_function_name with the actual name
# InvocationType='RequestResponse' means it will wait until the lambda fn is complete
response = client.invoke(
FunctionName='lambda_function_name',
InvocationType='RequestResponse',
LogType='None',
ClientContext=base64.b64encode(json.dumps({}).encode()).decode(),
Payload=json.dumps(trigger_event).encode()
)
if response['StatusCode'] != 200:
print(response)
except Exception as e:
print(e)
print('Exception during invoke_lambda')
queue.task_done()
if __name__ == '__main__':
dispatch_command(invoke_lambdas)
回答3:
As I had to do this on a very large bucket, and lambda functions have a max. execution time of 10 minutes, I ended up doing a script with the Ruby AWS-SDK.
require 'aws-sdk-v1'
class LambdaS3Invoker
BUCKET_NAME = "HERE_YOUR_BUCKET"
FUNCTION_NAME = "HERE_YOUR_FUNCTION_NAME"
AWS_KEY = "HERE_YOUR_AWS_KEY"
AWS_SECRET = "HERE_YOUR_AWS_SECRET"
REGION = "HERE_YOUR_REGION"
def execute
bucket.objects({ prefix: 'products'}).each do |o|
lambda_invoke(o.key)
end
end
private
def lambda_invoke(key)
lambda.invoke({
function_name: FUNCTION_NAME,
invocation_type: 'Event',
payload: JSON.generate({
Records: [{
s3: {
object: {
key: key,
},
bucket: {
name: BUCKET_NAME,
}
}
}]
})
})
end
def lambda
@lambda ||= Aws::Lambda::Client.new(
region: REGION,
access_key_id: AWS_KEY,
secret_access_key: AWS_SECRET
)
end
def resource
@resource ||= Aws::S3::Resource.new(
access_key_id: AWS_KEY,
secret_access_key: AWS_SECRET
)
end
def bucket
@bucket ||= resource.bucket(BUCKET_NAME)
end
end
And then you can call it like:
LambdaS3Invoker.new.execute
回答4:
What you need to do is create a one time script which uses AWS SDK to invoke your lambda function. This solution doesn't require you to "re-put" the object.
I am going to base my answer on AWS JS SDK.
To be clear - I want to run, one-time, on each of the existing
objects. The trigger is already working for new objects, I just need
to run it on the objects that were inserted before the lambda function
was created.
As you have a working lambda function which accepts S3 put events what you need to do is find all the unprocessed object in S3 (If you have DB entries for each S3 object the above should be easy if not then you might find the S3 list object function handy http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#listObjectsV2-property).
Then for each unprocessed S3 object obtained create a JSON object which looks like S3 Put Event Message(shown below) and call the Lambda invoke function with the above JSON object as payload.
You can find the lambda invoke function docs at http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property
When creating the fake S3 Put Event Message Object for your lambda function you can ignore most of the actual object properties depending on your lambda function. I guess the least you will have to set is bucket name and object key.
S3 Put Event Message Structure http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
{
"Records":[
{
"eventVersion":"2.0",
"eventSource":"aws:s3",
"awsRegion":"us-east-1",
"eventTime":"1970-01-01T00:00:00.000Z",
"eventName":"ObjectCreated:Put",
"userIdentity":{
"principalId":"AIDAJDPLRKLG7UEXAMPLE"
},
"requestParameters":{
"sourceIPAddress":"127.0.0.1"
},
"responseElements":{
"x-amz-request-id":"C3D13FE58DE4C810",
"x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"testConfigRule",
"bucket":{
"name":"mybucket",
"ownerIdentity":{
"principalId":"A3NL1KOZZKExample"
},
"arn":"arn:aws:s3:::mybucket"
},
"object":{
"key":"HappyFace.jpg",
"size":1024,
"eTag":"d41d8cd98f00b204e9800998ecf8427e",
"versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko",
"sequencer":"0055AED6DCD90281E5"
}
}
}
]
}
回答5:
well basically what you need is to use some api calls(boto for example if you use python)and list all new objects or all objects in your s3 bucket and then process these objects
here is a snippet:
from boto.s3.connection import S3Connection
conn = S3Connection()
source = conn.get_bucket(src_bucket)
src_list = set([key.name for key in source.get_all_keys(headers=None, prefix=prefix)])
//and then you can go over this src list
for entry in src_list:
do something