is putting sqs-consumer to detect receiveMessage e

2019-08-26 16:52发布

问题:

I am using aws sqs as message queue. After sqs.sendMessage sends the data , I want to detect sqs.receiveMessage via either infinite loop or event triggering in scalable way. Then I came accross sqs-consumer to handle sqs.receiveMessage events, the moment it receives the messages. But I was wondering , is it the most suitable way to handle message passing between microservices or is there any other better way to handle this thing?

回答1:

I had written the code in java for fetching the data from sqs queue with SQSBufferedAsyncClient, advantages using this API is buffered the messages in async mode.

/**
 * 
 */
package com.sxm.aota.tsc.config;

import java.net.UnknownHostException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.retry.RetryPolicy.BackoffStrategy;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import com.amazonaws.services.sqs.buffered.QueueBufferConfig;

@Configuration
public class SQSConfiguration {

    /** The properties cache config. */
    @Autowired
    private PropertiesCacheConfig propertiesCacheConfig;

    @Bean
    public AmazonSQSAsync amazonSQSClient() {
        // Create Client Configuration
        ClientConfiguration clientConfig = new ClientConfiguration()
            .withMaxErrorRetry(5)
            .withConnectionTTL(10_000L)
            .withTcpKeepAlive(true)
            .withRetryPolicy(new RetryPolicy(
                    null, 
                new BackoffStrategy() {                 
                    @Override
                    public long delayBeforeNextRetry(AmazonWebServiceRequest req, 
                            AmazonClientException exception, int retries) {
                        // Delay between retries is 10s unless it is UnknownHostException 
                        // for which retry is 60s
                        return exception.getCause() instanceof UnknownHostException ? 60_000L : 10_000L;
                    }
                }, 10, true));
        // Create Amazon client
        AmazonSQSAsync asyncSqsClient = null;
        if (propertiesCacheConfig.isIamRole()) {
            asyncSqsClient = new AmazonSQSAsyncClient(new InstanceProfileCredentialsProvider(true), clientConfig);
        } else {
            asyncSqsClient = new AmazonSQSAsyncClient(
                    new BasicAWSCredentials("sceretkey", "accesskey"));
        }
        final Regions regions = Regions.fromName(propertiesCacheConfig.getRegionName());
        asyncSqsClient.setRegion(Region.getRegion(regions));
        asyncSqsClient.setEndpoint(propertiesCacheConfig.getEndPoint());

        // Buffer for request batching
        final QueueBufferConfig bufferConfig = new QueueBufferConfig();
        // Ensure visibility timeout is maintained
        bufferConfig.setVisibilityTimeoutSeconds(20);
        // Enable long polling
        bufferConfig.setLongPoll(true);
        // Set batch parameters
//      bufferConfig.setMaxBatchOpenMs(500);
        // Set to receive messages only on demand
//      bufferConfig.setMaxDoneReceiveBatches(0);
//      bufferConfig.setMaxInflightReceiveBatches(0);

        return new AmazonSQSBufferedAsyncClient(asyncSqsClient, bufferConfig);
    }

}

then written the scheduleR which executes after every 2 secs and fetches the data from queue, process it and delete it from queue before visibility timeout otherwise it will be ready for processing again when visibility tiiimeout expires again.

package com.sxm.aota.tsc.sqs;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.databind.ObjectMapper;


    /**
     * The Class TSCDataSenderScheduledTask.
     * 
     * Sends the aggregated Vehicle data to TSC in batches
     */
    @EnableScheduling
    @Component("sqsScheduledTask")
    @DependsOn({ "propertiesCacheConfig", "amazonSQSClient" })
    public class SQSScheduledTask {

        private static final Logger LOGGER = LoggerFactory.getLogger(SQSScheduledTask.class);
        @Autowired
        private PropertiesCacheConfig propertiesCacheConfig;
        @Autowired
        public AmazonSQSAsync amazonSQSClient;

        /**
         * Timer Task that will run after specific interval of time Majorly
         * responsible for sending the data in batches to TSC.
         */
        private String queueUrl;
        private final ObjectMapper mapper = new ObjectMapper();

        @PostConstruct
        public void initialize() throws Exception {
            LOGGER.info("SQS-Publisher", "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName());
            // Get queue URL
            final GetQueueUrlRequest request = new GetQueueUrlRequest().withQueueName(propertiesCacheConfig.getSQSQueueName());
            final GetQueueUrlResult response = amazonSQSClient.getQueueUrl(request);
            queueUrl = response.getQueueUrl();

            LOGGER.info("SQS-Publisher", "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName() + ", URL = " + queueUrl);
        }

        @Scheduled(fixedDelayString = "${sqs.consumer.delay}")
        public void timerTask() {

            final ReceiveMessageResult receiveResult = getMessagesFromSQS();
            String messageBody = null;
            if (receiveResult != null && receiveResult.getMessages() != null && !receiveResult.getMessages().isEmpty()) {
                try {
                    messageBody = receiveResult.getMessages().get(0).getBody();
                    String messageReceiptHandle = receiveResult.getMessages().get(0).getReceiptHandle();
                    Vehicles vehicles = mapper.readValue(messageBody, Vehicles.class);
                    processMessage(vehicles.getVehicles(),messageReceiptHandle);
                } catch (Exception e) {
                    LOGGER.error("Exception while processing SQS message : {}", messageBody);
                    // Message is not deleted on SQS and will be processed again after visibility timeout
                }
            }
        }

        public void processMessage(List<Vehicle> vehicles,String messageReceiptHandle) throws InterruptedException {
            //processing code
            //delete the sqs message as the processing is completed
            //Need to create atomic counter that will be increamented by all TS.. Once it will be 0 then we will be deleting the messages

                    amazonSQSClient.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));

        }

        private ReceiveMessageResult getMessagesFromSQS() {
            try {
                // Create new request and fetch data from Amazon SQS queue
                final ReceiveMessageResult receiveResult = amazonSQSClient
                        .receiveMessage(new ReceiveMessageRequest().withMaxNumberOfMessages(1).withQueueUrl(queueUrl));
                return receiveResult;
            } catch (Exception e) {
                LOGGER.error("Error while fetching data from SQS", e);
            }
            return null;
        }

    }