I am running an Apache Spark application in AWS EMR cluster. The application retrieves messages from the AWS SQS, does some computations based on the message data and then deletes each message.
I am running the EMR cluster in a VPC on a private subnet with a NAT instance.
The problem that I am facing, is that I cannot delete the message. I am able to retrieve all messages and I am able to send messages, but I cannot delete them.
I am using the following security on the EMR cluster
EC2 instance profile:EMR_EC2_DefaultRole
EMR role:EMR_DefaultRole
Each of these roles has the following policies attached:
AmazonSQSFullAccess
, AmazonElastiCacheFullAccess
, AmazonElasticMapReduceFullAccess
, AmazonVPCFullAccess
I thought that the problem is with the permissions, but the AmazonSQSFullAccess
grants full permissions, so I am out of options.
This is the Java code that deletes the message:
public class SQSMessageBroker
{
private AmazonSQS _amazonSqs;
public SQSMessageBroker()
{
// Create the SQS client
createSQSClient();
}
public void deleteMessage(String queueUrl, String receiptHandle)
{
DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl, receiptHandle);
_amazonSqs.deleteMessage(deleteMessageRequest);
}
private void createSQSClient()
{
_amazonSqs = new AmazonSQSClient();
_amazonSqs.setRegion(Region.getRegion(Regions.EU_WEST_1));
}
}
The SQSMessageBroker
is a singleton in my application.
When I run the same code locally everything works great. Locally I have created an AWS User and I have added the key and secret to a .aws
file.
EDIT
After a lot of research and testing this is what I have found out:
- It appears that it is not a permission issue (at least not for the EC2 instance that is started by EMR). I connected to the instance, installed the aws cli, retrieved a message and deleted it successfully.
- The
_amazonSqs.deleteMessage(deleteMessageRequest);
code does not throw any exceptions. It looks as if the request times out, but not timeout exception is thrown. Any code after thedeleteMessage
is not executed. - I am processing each message in a separate thread, so I added a
Thread.UncaughtExceptionHandler
to each thread, but no exception is thrown there as well. - I suspected that the problem may be in the
ReceiptHandle
and more precisely, because I was running a Spark cluster on several machines, so I thought that the machine IP, name or something like that were encoded in theReceiptHandle
and thedeleteMessage
may have been executed from a different machine, so that is why it did not work. This is why I created a Spark cluster with only one machine. Sadly I still couldn't delete the message.
After A LOT of debugging and testing I finally managed to figure out what the problem is.
As expected it was not a permission problem. The problem was, that the EC2 instances that are started by the EMR, and on which the Spark application is run, contain a certain version of all AWS packages for java (including the SQS package). And the path containing the packages were added to Hadoop, Yarn and Spark. So when my application were started, it used the packages that were already on the machine and I received an error. (The error were logged in the Yarn log. It took me some time to figure that out.)
I am using the maven shade plugin to build the uber jar for my application, so I thought that I can try and shade (relocate) the AWS packages. This would allow me to encapsulate the dependencies inside my application. Sadly this DID NOT work. It appears that Amazon are using reflection inside the packages and they have hardcoded the names of some classes, thus rendering the shading useless.(The hardcoded classes were not found in my shaded packages)
So after some more frustration I found the following solution:
--driver-class-path /path_to_your_jar/myapp.jar --class com.myapp.startapp
Here the key is the
--driver-class-path
option. You can read more about it here. Basically I am adding my uber jar to the Spark driver classpath, allowing for the application to use my dependencies.So far this is the only acceptable solution that I have found. If you know of another or a better one, please write a comment or an answer.
I hope that this answer can be of use to some unfortunate soul. It would have saved me several excruciating days.