S3 files being processed multiple times in AWS Lam

2019-09-16 10:23发布

问题:

I have a Java Lambda function that is triggered by S3 Event every 15 minutes. I've noticed that over the period of about every 3 hours, each Lambda call includes the latest file that was uploaded AND all of the files that were uploaded before it within that 3 hour timespan.

So, if the when iterating through the entire List, it repeats the files that had already been processed in an earlier Lambda call.

How do I get it to only process the most recent file uploaded? In node.js, there is a context.suceed(), which I assume marks that event as successfully processed. Java doesn't seem to have that.

Below are the Cloudwatch logs.

08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST
08:35:26 TIME - AUTHENTICATE: 8101ms
08:35:26 TIME - MESSAGE PARSE: 1ms
08:35:26 data :: event/events/2016/    08/31/2016    0831123000.export.csv
08:35:35 Processed 147 events
08:35:35 TIME - FILE PARSE: 9698
08:35:35 Found 1 event files
08:35:35 Total function took: 17800ms
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST
08:45:03 TIME - AUTHENTICATE: 119ms
08:45:03 TIME - MESSAGE PARSE: 0ms
08:45:03 data :: event/events/2016/    08/31/2016    0831123000.export.csv
08:45:05 Processed 147 events
08:45:05 data :: event/events/2016/    08/31/2016    0831124500.export.csv
08:45:06 Processed 211 events
08:45:06 TIME - FILE PARSE: 2499
08:45:06 Found 2 event files
08:45:06 Total function took: 2618ms
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB
09:05:02 START RequestId: 8747aa    08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST
09:05:02 TIME - AUTHENTICATE: 98ms
09:05:02 TIME - MESSAGE PARSE: 0ms
09:05:02 data :: event/events/2016/    08/31/2016    0831123000.export.csv
09:05:03 Processed 147 events
09:05:03 data :: event/events/2016/    08/31/2016    0831124500.export.csv
09:05:04 Processed 211 events
09:05:04 data :: event/events/2016/    08/31/2016    0831130000.export.csv
09:05:04 Processed 204 events
09:05:04 TIME - FILE PARSE: 2242
09:05:04 Found 3 event files
09:05:04 Total function took: 2340ms
09:05:04 END RequestId: 8747aa    08-6f7b-11e6-80fd-f30a15cf07fc 

EDIT 1 I believe the question has been answered by Michael, however below is some of the code for anyone else. I am indeed using a global List to hold the records.

public class LambdaHandler {

private final List<GDELTEventFile> eventFiles = new ArrayList<>();
private AmazonS3Client s3Client;
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim();

public void gdeltHandler(S3Event event, Context context) {
    StopWatch sw = new StopWatch();
    long time = 0L;

    sw.start();
    s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
    sw.split();
    System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms");
    time += sw.getSplitTime();
    sw.reset();

    sw.start();
    processEvent(event);
    sw.split();
    System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms");
    time += sw.getSplitTime();
    sw.reset();

    sw.start();
    processFiles();
    sw.split();
    System.out.println("TIME - FILE PARSE: " + sw.getSplitTime());
    time += sw.getSplitTime();

    System.out.println("Found " + eventFiles.size() + " event files");
    System.out.println("Total function took: " + time + "ms");
}

private void processEvent(S3Event event) {
    List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords();
    for (S3EventNotification.S3EventNotificationRecord record : records) {
        long filesize = record.getS3().getObject().getSizeAsLong();
        eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize));
    }
}

private void processFiles() {
    for (GDELTEventFile event : eventFiles) {
        try {
            System.out.println(event.getBucket() + " :: " + event.getFilename());
            GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename());
            S3Object file = s3Client.getObject(request);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) {
                CSVParser parser = new CSVParser(reader, CSV_FORMAT);
                int count = 0;
                for (CSVRecord record : parser) {
                        count++;
                    }
                }
                System.out.println("Processed " + count + " events");
            }
        } catch (IOException ioe) {
            System.out.println("IOException :: " + ioe);
        }
    }
}

回答1:

This is a case of code that overlooks an important aspect of Lambda's container reuse -- container reuse in Lambda includes process reuse. When a function is executed in a reused container, it's also necessarily running in the same process that was used before as well.

S3's event notification data structure is such that it can include more than one object per event, but i practice, this never happpens... but pushing the event data into a global structure means that if the container is reused, then later function invocations will see the old data.

While this can be very useful as a cache, it has significant implications for how code must be designed -- always expect but never assume that your process may survive from one invocation to a future, subsequent invocation, and code accordingly.

Note that container reuse also means you need to clean up any temp files, if there is a chance that many reuses of a container will result in space exhaustion there.

Note also that redeploying your function code always means that old containers will be abandoned, not reused for future invocations of the latest version.