AWS DynamoDB trigger using Lambda in JAVA

2019-02-24 12:18发布

问题:

I am trying to trigger an AWS lambda function written in Java, on dynamodb stream events. Amazon has a guide for the same, using NodeJS here http://docs.aws.amazon.com/lambda/latest/dg/wt-ddb-create-test-function.html

The testing input for NodeJS (from the above link) looks like an SNS event, so I tried to use the corresponding SNSEvent class in Java as an input to my handler method.

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord;
import java.util.List;

public class RecomFunction {

    public void handler(SNSEvent event, Context context) {

        LambdaLogger logger = context.getLogger();

        List<SNSRecord> records = event.getRecords();

        if (records != null) {
            for (SNSRecord record : records) {
                if (record != null) {
                    logger.log("SNS record: " + record.getSNS().getMessage());
                }
            }
        }
    }

}

Unfortunately, record.getSNS() returns NULL resulting to a NullPointer exception

There is a related question, however a specific answer was not given: Setup DynamoDB Trigger using Lambda

回答1:

This code worked for me. You can use it to receive and process DynamoDB events in a Lambda function -

public class Handler implements RequestHandler<DynamodbEvent, Void> {

    @Override
    public Void handleRequest(DynamodbEvent dynamodbEvent, Context context) {

        for (DynamodbStreamRecord record : dynamodbEvent.getRecords()) {

            if (record == null) {
                continue;
            }

            // Your code here
        }

        return null;
    }
}

Similarly, you can use SNSEvent and SNSRecord to handle Amazon SNS events.



回答2:

This worked for me - case DynamoDB stream events:

import com.amazonaws.services.lambda.runtime.RequestHandler;
...

public class DynamoStreamHandler implements RequestHandler<Object, Void> {
    @Override
    public Void handleRequest(Object o, Context context) {

        LinkedHashMap lhm = (LinkedHashMap) o;
        ...etc.
    }
}

It seems they use a customized JSON mapper which utilizes Map and List objects. It's quite straightforward (but tedious) to verify this for other event types by testing and print-outing logs. (sigh)

EDIT: If ~5 MB overhead is ok, you can use DynamodbEvent.DynamodbStreamRecord provided by aws-lambda-java-events library v1.1.0 as described in AWS Lambda Walkthrough 3: Process Amazon DynamoDB Events (Java) in AWS Lambda Documentation.



回答3:

Create a handler that takes an InputStream, read in the contents of the InputStream (which is just JSON) and then deserialize it to get the data that you need.

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import com.amazonaws.services.lambda.runtime.Context; 

public class MyHandler {    
    public void handler(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {           
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        int letter;        
        while((letter = inputStream.read()) != -1)
        {
            baos.write(letter);                     
        }        

        //Send the contents of baos to a JSON deserializer ...          
    }      
}

It's a bit cumbersome, but as far as I'm aware AWS doesn't currently provide a higher level Java lambda interface for consuming DynamoDB Streams. I have a full blown example here with details on how I deserialized the stream of JSON to get Java objects for the data.



回答4:

Your code caused the following exception in CloudWatch log,

Class does not implement an appropriate handler interface....

Once I changed the code to the following, I can get the SNS message just fine.

public class RecomFunction implements RequestHandler<SNSEvent, Void> {

    public Void handleRequest(SNSEvent event, Context context) {

        ...
        return null;
    }

}


回答5:

To obtain the deserialized objects from event handler : 1) In the handler get json from input stream using something like this :

private String getJsonFrom(InputStream stream) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    int letter;
    while ((letter = stream.read()) != -1)
        baos.write(letter);

    return new String(baos.toByteArray());
}

2) Then create a specific deserializer derialize the object from json. 'NewImage' in case of DynamoDB event. One example here.