I am running an Apache Spark application in AWS EMR cluster. The application retrieves messages from the AWS SQS, does some computations based on the message data and then deletes each message.
I am running the EMR cluster in a VPC on a private subnet with a NAT instance.
The problem that I am facing, is that I cannot delete the message. I am able to retrieve all messages and I am able to send messages, but I cannot delete them.
I am using the following security on the EMR cluster
EC2 instance profile:EMR_EC2_DefaultRole
EMR role:EMR_DefaultRole
Each of these roles has the following policies attached:
AmazonSQSFullAccess
, AmazonElastiCacheFullAccess
, AmazonElasticMapReduceFullAccess
, AmazonVPCFullAccess
I thought that the problem is with the permissions, but the AmazonSQSFullAccess
grants full permissions, so I am out of options.
This is the Java code that deletes the message:
public class SQSMessageBroker
{
private AmazonSQS _amazonSqs;
public SQSMessageBroker()
{
// Create the SQS client
createSQSClient();
}
public void deleteMessage(String queueUrl, String receiptHandle)
{
DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl, receiptHandle);
_amazonSqs.deleteMessage(deleteMessageRequest);
}
private void createSQSClient()
{
_amazonSqs = new AmazonSQSClient();
_amazonSqs.setRegion(Region.getRegion(Regions.EU_WEST_1));
}
}
The SQSMessageBroker
is a singleton in my application.
When I run the same code locally everything works great. Locally I have created an AWS User and I have added the key and secret to a .aws
file.
EDIT
After a lot of research and testing this is what I have found out:
- It appears that it is not a permission issue (at least not for the EC2 instance that is started by EMR). I connected to the instance, installed the aws cli, retrieved a message and deleted it successfully.
- The
_amazonSqs.deleteMessage(deleteMessageRequest);
code does not throw any exceptions. It looks as if the request times out, but not timeout exception is thrown. Any code after thedeleteMessage
is not executed. - I am processing each message in a separate thread, so I added a
Thread.UncaughtExceptionHandler
to each thread, but no exception is thrown there as well. - I suspected that the problem may be in the
ReceiptHandle
and more precisely, because I was running a Spark cluster on several machines, so I thought that the machine IP, name or something like that were encoded in theReceiptHandle
and thedeleteMessage
may have been executed from a different machine, so that is why it did not work. This is why I created a Spark cluster with only one machine. Sadly I still couldn't delete the message.