Enviando mensagens para o Amazon SQS

O Amazon SQS (Simple Queue Service) é uma solução de fila para troca de mensagens entre aplicações hospedada pela própria Amazon, assim seu gerenciamento é todo através de serviços, o que faz com que toda a complexidade da infraestrutura seja terceirizada para AWS, dessa forma, nos deixando apenas concentrado na utilização da solução de fila. Além disso, o SQS disponibiliza até 1 milhões de solicitações por mês sem custo.

O gerenciamento da fila pode ser feita pelo console do SQS na conta da AWS, como na imagem abaixo.

Para utilizar o SQS em aplicações Java, a Amazon disponibiliza a lib aws-java-sdk-sqs, que já traz as funcionalidades da SDK core da AWS mais as funcionalidades do SQS.

 <dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-sqs</artifactId>
    <version>1.11.236</version>
 </dependency>

Como todo serviço da AWS, é preciso configurar o client com as credenciais e região da conta que será utilizada pelos serviços. No exemplo a seguir, é configurado o client do SQS através de um bean do Spring, assim permitindo injetar o client em qualquer classe de contexto Spring.

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    public AmazonSQS createSQSClient() {
      return AmazonSQSClient.builder()
                .withCredentials(new ProfileCredentialsProvider())
                .withRegion(Regions.US_EAST_2)
                .build();
    }

O ProfileCredentialsProvider carrega as credenciais do arquivo .aws, assim não necessitando deixar as credencias configurados na aplicação.

Observação: A região deve ser configurada de acordo com a região configurado no serviço do SQS na conta da AWS.

Como no exemplo é utilizado Spring Boot, o nome da queue é configurado no application.properties, assim evitando de deixar fixo na classe o nome da queue e possibilitando injeção do valor via placeholder.

application.properties

consumer.sqs.message.queue.name=awsSQSMessage

Após configurado as credenciais de acesso ao serviço do SQS, vamos criar um simples método que envia uma mensagem de texto para a fila do SQS. A forma mais simples de fazer isso é utilizando a classe SendMessageRequest, que apenas precisa da URL da queue e a mensagem a ser enviada no body e através do método sendMessage envia a mensagem para o serviço da AWS.


import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

    @Value("${consumer.sqs.message.queue.name}")
    private String queueName;

    @Autowired
    private AmazonSQS amazonSQS;

    public void sentToQueue(String message) {
        final SendMessageRequest sendMessageRequest = new SendMessageRequest()
                .withQueueUrl(queueName)
                .withMessageBody(message);

        amazonSQS.sendMessage(sendMessageRequest);
    }
}

Assim, quando chamado o método sentToQueue será enviando um texto para a fila awsSQSMessage, sendo possível visualizar através do console da AWS, como na imagem abaixo.

Outra forma de enviar mensagens para o SQS é em lote, utilizando o método sendMessageBatch, que recebe como parâmetro uma instância de SendMessageBatchRequest, que é uma classe que permite uma lista de mensagens chamadas de “entries”, como demonstrado no exemplo abaixo.

import java.util.concurrent.atomic.AtomicInteger;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

   private static final int MAX_BATCH_SEND_SQS = 10;

   @Value("${consumer.sqs.message.queue.name}")
    private String queueName;

    @Autowired
    private AmazonSQS amazonSQS;

    public void sentToQueue(List<String> messages) {
        Lists.partition(messages, MAX_BATCH_SEND_SQS)
                .forEach(strings -> {
                    final AtomicInteger index = new AtomicInteger();
                    final List<SendMessageBatchRequestEntry> entries = strings.stream()
                            .map(message -> {
                                final String messageId = String.valueOf(index.getAndIncrement());
                                return new SendMessageBatchRequestEntry(messageId, message);
                            }).collect(Collectors.toList());

                    final SendMessageBatchRequest sendMessageRequest = new SendMessageBatchRequest()
                            .withQueueUrl(queueName)
                            .withEntries(entries);

                    amazonSQS.sendMessageBatch(sendMessageRequest);
                });
    }
}

Porém, a API de envio em lote tem a limitação de 10 mensagens por requisição, por isso, no exemplo acima foi utilizado o Lists.partition do Guava para particionar a lista de mensagens recebidas no método sentToQueue, deixando transparente o tamanho dos lotes de envio.

Outro detalhe dessa abordagem, é que as entries são uma lista do tipo SendMessageBatchRequestEntry, que é uma representação da mensagem a ser enviada para fila, a qual exige um id único dentro do lote enviado, onde é representado pelo index no exemplo.

Por fim, a API do SQS também permite enviar atributos de mensagens, como se fosse atributos de header da fila, assim possibilitando enviar informações adicionais a mensagem, as quais não vão estar contidas no body, por exemplo um id da mensagem ou uma data.

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

@Service
public class MessageService {

    @Value("${consumer.sqs.message.queue.name}")
    private String queueName;

    @Autowired
    private AmazonSQS amazonSQS;

     public void sentToQueueWithAttributes(String message) {
        final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
        messageAttributes.put("id",
                new MessageAttributeValue()
                 .withDataType("String")
                 .withStringValue(UUID.randomUUID().toString()));

        messageAttributes.put("date",
                new MessageAttributeValue()
                        .withDataType("String")
                        .withStringValue(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))));

        final SendMessageRequest sendMessageRequest = new SendMessageRequest()
                .withQueueUrl(queueName)
                .withMessageAttributes(messageAttributes)
                .withMessageBody(message);

        amazonSQS.sendMessage(sendMessageRequest);
    }
}

A utilização é idêntica ao primeiro exemplo, a única diferença que é passado o MessageAttributes, que recebe um mapa de Map, mais informações sobre os atributos de mensagens pode ser encontrados na doc.

Esse post foi apenas um curto tutorial de como enviar mensagens para o SQS em uma aplicação Java, que é uma excelente solução de fila, com um baixo custo e com toda infraestrutura “terceirizada” para a AWS, mais informações sobre o serviço pode ser encontrado na doc oficial.

O código demonstrado pode ser encontrado no github.

Veja também como consumir o a fila no SQS no post “Consumindo fila do SQS com JMS e Spring Boot”.

Anúncios

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

w

Conectando a %s