Kafka no Spring Boot

O Apache Kafka é uma plataforma de streaming distribuído excelente para a troca de mensagem em alta escala, como por exemplo, em arquitetura orientada em eventos, um pouco mais sobre Kafka pode ser visto nesse outro post: Kafka.

Como na maioria das tecnologias, também há dependências para facilitar a utilização do Kafka em aplicações Spring Boot, utilizando a dependência spring-kafka temos auto configuração e implementações para utilizar o Kafka nas nossas aplicações.

Para os producer temos o KafkaTemplate, classe que fornece operações de alto nível para o envio de mensagens para o Kafka.

Para os consumers temos a anotação KafkaListener, que marca o método anotado como um lister do tópico configurado e também possibilita configurações como grupo de consumidores, partições, entre outras.

Exemplo

O Exemplo a ser demonstrado é o famoso exemplo do producer e consumer, onde o producer é uma aplicação Spring Boot que disponibiliza um endpoint para receber a mensagem e quando recebido adiciona no tópico do Kafka e o consumer, que também é uma aplicação Spring Boot, que fica “escutando” o tópico do Kafka e logando as mensagens recebidas, como demonstrado na imagem abaixo.

Kafka

Iniciando pela infraestrutura do Kafka, vamos criar uma arquivo do docker compose com as configurações do Zookeeper e do Kafka.

docker-compose.yml

version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2

kafka:
image: confluentinc/cp-kafka:5.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

Execute o comando docker-compose up para inicializar os containers.

Producer

Na sequência, vamos criar a aplicações responsável por produzir mensagens na fila, onde ela terá um endpoint Rest para receber pedidos e colocá-los no tópico do Kafka.

Como a aplicação vai disponibilizar um endpoint precisamos da dependência spring-boot-starter-web e também da dependência spring-kafka para ter as implementações do Kafka na aplicação spring boot.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

Em termos de configurações precisamos de basicamente duas, o endereço do Kafka, definido na propriedade spring.kafka.producer.bootstrap-servers e o nome do tópico que vamos utilizar no momento de enviar para o Kafka, que fica a nosso critério o padrão da configuração, no caso será definido como order.topic.

application.properties

spring.kafka.producer.bootstrap-servers=localhost:9092

order.topic=ordertopic

Feito as configurações, podemos criar nosso producer, que será responsável por enviar uma mensagem para o tópico. Para enviar vamos utilizar o kafkaTemplate, disponibilizado pela dependência do spring-kafka, que possui o método send, onde passamos o tópico (injetado no atributo orderTopic via configuração), a chave única da mensagem e a mensagem a ser enviada.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;

@Component
public class OrderProducer {

    @Value("${order.topic}")
    private String orderTopic;

    private final KafkaTemplate kafkaTemplate;

    public OrderProducer(final KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(final @RequestBody String order) {
        final String mensageKey = UUID.randomUUID().toString();
        kafkaTemplate.send(orderTopic, mensageKey, order);
    }
}

Observação: Não é obrigatório enviar a chave da mensagem, inclusive tem o método send no kafkaTemplate, porém como o Kafka trabalha com um mapa (chave/valor) é recomendado enviar uma chave única para mensagem.

Por fim, vamos criar nosso endpoint que apenas vai receber um order, do tipo texto, e invocar o producer para adicionar o pedido no tópico do Kafka.

import br.com.emmanuelneri.producer.OrderProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/orders")
@Slf4j
public class OrderController {

    private final OrderProducer orderProducer;

    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    @RequestMapping(method = RequestMethod.POST)
    public void send(@RequestBody String order) {
        orderProducer.send(order);
    }
}

Consumer

O consumer é apenas uma aplicação que ficará “lendo” o tópico e logando a mensagem recebido. Com isso, as dependências utilizadas são o spring-boot-starter, para rodar executar a aplicação Spring Boot e e a dependência do spring-kafka.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

Em termos de configurações, é necessário o endereço do Kafka, o consumer group, configurar o auto-offset-reset e o tópico que vamos utilizar na implementação do listener do Kafka.

spring.kafka.consumer.bootstrap-servers=http://localhost:9092
spring.kafka.consumer.group-id=group-id
spring.kafka.consumer.auto-offset-reset=earliest

order.topic=ordertopic
  • group-id: É o identificador do grupo de consumo do tópico, responsável pelas configurações de consumo em paralelo do tópico, mais sobre grupo de consumos pode ser visto na documentação.
  • auto-offset-reset: É a configuração da posição inicial que será consumida do tópico, no caso foi configurado como “earliest”, então será do início do tópico.

Concluído as configurações, podemos criar nosso consumer, que não será nada mais que um método anotado para receber as informações de um tópico e configurado em um grupo de consumidores, como demonstrado no código abaixo.

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "${order.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumer(String order) {
        log.info("Order: " + order);
    }
}

Com isso, quando for produzido uma mensagem no tópico, será logado como demostrado abaixo.

b.c.emmanuelneri.consumer.OrderConsumer : Order: {"identifier": "12343","customer": "Customer X", "value": 1500}

Utilizando consumer ConsumerRecord

Também podemos alterar o parâmetro do método anotado @KafkaListener parar receber um ConsumerRecord, que é uma classe da dependência do Kafka que prove outras informações da mensagem como: chave, headers, partição, tópico, timestamp, entre outros.

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "${order.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumer(final ConsumerRecord consumerRecord) {
        log.info("key: " + consumerRecord.key());
        log.info("Headers: " + consumerRecord.headers());
        log.info("Partion: " + consumerRecord.partition());
        log.info("Order: " + consumerRecord.value());
    }
}

Com isso, quando for produzido uma mensagem no tópico, temos informações além do conteúdo da mensagem, como demostrado abaixo.

b.c.emmanuelneri.consumer.OrderConsumer : key: 4d3d2311-071a-4a26-8600-a9b764a2f004
b.c.emmanuelneri.consumer.OrderConsumer : Headers: RecordHeaders(headers = [], isReadOnly = false)
b.c.emmanuelneri.consumer.OrderConsumer : Partion: 0
b.c.emmanuelneri.consumer.OrderConsumer : Order: {"identifier": "12343","customer": "Customer X", "value": 1500}

Mapeando mensagens “tipadas”

Nem sempre queremos trafegar apenas como texto, ou até mesmo queremos enviar uma mensagem como texto e mapear para uma classe no consumer, para isso, precisamos fazer algumas configurações, que vamos ver na sequencia.

Na aplicação producer, vamos adicionar 3 configurações, listadas abaixo:

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.add.type.headers=false
  • key-serializer: É o tipo de serialização da chave da mensagem, no caso vamos manter como String;
  • value-serializer: É o tipo de serialização do conteúdo da mensagem, que vamos alterar para um formato Json;
  • add.type.headers: Como vamos enviar a mensagem como Json e não sabemos qual o tipo da mensagem, desabilitamos a adição tipo no header da mensagem.

Observação: Quando não declaramos a propriedade add.type.headers como false o producer automaticamente coloca o tipo do objetivo produzido no header da mensagem e consequentemente o consumer deverá esperar esse mesmo tipo, então como estão enviando um texto no formato de Json e vamos consumir um objeto do tipo Order.class, devemos desabilitar essa passagem de tipo por parâmetro.

Observação 2: Quando estamos trabalhando com classes com os mesmo tipos no producer e no consumer não precisamos desativar essa propriedade.

Na aplicação consumer, vamos adicionar o Jackson para fazer a conversão de Json para classe e também adicionar algumas configurações.

Para adicionar a deserialização por JSON é necessário adicionar a dependência jackson-databind.

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version> 2.9.8</version>
</dependency>

Também adicionamos configurações, só que deserializar e na propriedade spring.kafka.consumer.

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  • key-deserializer: É o tipo de deserialização da chave da mensagem, no caso vamos manter como String;
  • value-deserializer: É o tipo de deserialização do conteúdo da mensagem, que vamos alterar para um formato Json;

Por fim, basta alterar o tipo do recebimento de String para Order, que nosso consumer está pronto para receber as mensagem nesse formato.

import br.com.emmanuelneri.consumer.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "${order.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumer(ConsumerRecord consumerRecord) {
        log.info("key: " + consumerRecord.key());
        log.info("Headers: " + consumerRecord.headers());
        log.info("Partion: " + consumerRecord.partition());
        log.info("Order: " + consumerRecord.value());
    }
}

b.c.emmanuelneri.consumer.OrderConsumer : key: 4d3d2311-071a-4a26-8600-a9b764a2f004
b.c.emmanuelneri.consumer.OrderConsumer : Headers: RecordHeaders(headers = [], isReadOnly = false)
b.c.emmanuelneri.consumer.OrderConsumer : Partion: 0
b.c.emmanuelneri.consumer.OrderConsumer : Order: Order(identifier=12343, customer=Customer X, value=1500)

Conclusão

Concluindo, o Kafka é uma excelente tecnologia para processamento de mensagem em alta escala e o Spring Boot fornece implementações facilitadoras para a utilização de Kafka, por exemplo, o KafkaTemplate e o KafkaLister que abstraem implementações específicas do Kafka e possibilita utilizarmos conceitos de consumidores e provedores naturalmente nas aplicações Spring Boot utilizando implementações do Kafka.

Anúncios

2 comentários sobre “Kafka no Spring Boot

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Foto do Google

Você está comentando utilizando sua conta Google. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

Conectando a %s

Este site utiliza o Akismet para reduzir spam. Saiba como seus dados em comentários são processados.