Kafka no Spring Boot

O Apache Kafka é uma plataforma de streaming distribuído excelente para a troca de mensagem em alta escala, como por exemplo, em arquitetura orientada em eventos, um pouco mais sobre Kafka pode ser visto nesse outro post: Kafka.

Como na maioria das tecnologias, também há dependências para facilitar a utilização do Kafka em aplicações Spring Boot, utilizando a dependência spring-kafka temos auto configuração e implementações para utilizar o Kafka nas nossas aplicações.

Para os producer temos o KafkaTemplate, classe que fornece operações de alto nível para o envio de mensagens para o Kafka.

Para os consumers temos a anotação KafkaListener, que marca o método anotado como um lister do tópico configurado e também possibilita configurações como grupo de consumidores, partições, entre outras.

Exemplo

O Exemplo a ser demonstrado é o famoso exemplo do producer e consumer, onde o producer é uma aplicação Spring Boot que disponibiliza um endpoint para receber a mensagem e quando recebido adiciona no tópico do Kafka e o consumer, que também é uma aplicação Spring Boot, que fica “escutando” o tópico do Kafka e logando as mensagens recebidas, como demonstrado na imagem abaixo.

Kafka

Iniciando pela infraestrutura do Kafka, vamos criar uma arquivo do docker compose com as configurações do Zookeeper e do Kafka.

docker-compose.yml

version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2

kafka:
image: confluentinc/cp-kafka:5.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

Execute o comando docker-compose up para inicializar os containers.

Producer

Na sequência, vamos criar a aplicações responsável por produzir mensagens na fila, onde ela terá um endpoint Rest para receber pedidos e colocá-los no tópico do Kafka.

Como a aplicação vai disponibilizar um endpoint precisamos da dependência spring-boot-starter-web e também da dependência spring-kafka para ter as implementações do Kafka na aplicação spring boot.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

Em termos de configurações precisamos de basicamente duas, o endereço do Kafka, definido na propriedade spring.kafka.producer.bootstrap-servers e o nome do tópico que vamos utilizar no momento de enviar para o Kafka, que fica a nosso critério o padrão da configuração, no caso será definido como order.topic.

application.properties

spring.kafka.producer.bootstrap-servers=localhost:9092

order.topic=ordertopic

Feito as configurações, podemos criar nosso producer, que será responsável por enviar uma mensagem para o tópico. Para enviar vamos utilizar o kafkaTemplate, disponibilizado pela dependência do spring-kafka, que possui o método send, onde passamos o tópico (injetado no atributo orderTopic via configuração), a chave única da mensagem e a mensagem a ser enviada.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;

@Component
public class OrderProducer {

    @Value("${order.topic}")
    private String orderTopic;

    private final KafkaTemplate kafkaTemplate;

    public OrderProducer(final KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(final @RequestBody String order) {
        final String mensageKey = UUID.randomUUID().toString();
        kafkaTemplate.send(orderTopic, mensageKey, order);
    }
}

Observação: Não é obrigatório enviar a chave da mensagem, inclusive tem o método send no kafkaTemplate, porém como o Kafka trabalha com um mapa (chave/valor) é recomendado enviar uma chave única para mensagem.

Por fim, vamos criar nosso endpoint que apenas vai receber um order, do tipo texto, e invocar o producer para adicionar o pedido no tópico do Kafka.

import br.com.emmanuelneri.producer.OrderProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/orders")
@Slf4j
public class OrderController {

    private final OrderProducer orderProducer;

    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    @RequestMapping(method = RequestMethod.POST)
    public void send(@RequestBody String order) {
        orderProducer.send(order);
    }
}

Consumer

O consumer é apenas uma aplicação que ficará “lendo” o tópico e logando a mensagem recebido. Com isso, as dependências utilizadas são o spring-boot-starter, para rodar executar a aplicação Spring Boot e e a dependência do spring-kafka.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

Em termos de configurações, é necessário o endereço do Kafka, o consumer group, configurar o auto-offset-reset e o tópico que vamos utilizar na implementação do listener do Kafka.

spring.kafka.consumer.bootstrap-servers=http://localhost:9092
spring.kafka.consumer.group-id=group-id
spring.kafka.consumer.auto-offset-reset=earliest

order.topic=ordertopic
  • group-id: É o identificador do grupo de consumo do tópico, responsável pelas configurações de consumo em paralelo do tópico, mais sobre grupo de consumos pode ser visto na documentação.
  • auto-offset-reset: É a configuração da posição inicial que será consumida do tópico, no caso foi configurado como “earliest”, então será do início do tópico.

Concluído as configurações, podemos criar nosso consumer, que não será nada mais que um método anotado para receber as informações de um tópico e configurado em um grupo de consumidores, como demonstrado no código abaixo.

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "${order.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumer(String order) {
        log.info("Order: " + order);
    }
}

Com isso, quando for produzido uma mensagem no tópico, será logado como demostrado abaixo.

b.c.emmanuelneri.consumer.OrderConsumer : Order: {"identifier": "12343","customer": "Customer X", "value": 1500}

Utilizando consumer ConsumerRecord

Também podemos alterar o parâmetro do método anotado @KafkaListener parar receber um ConsumerRecord, que é uma classe da dependência do Kafka que prove outras informações da mensagem como: chave, headers, partição, tópico, timestamp, entre outros.

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "${order.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumer(final ConsumerRecord consumerRecord) {
        log.info("key: " + consumerRecord.key());
        log.info("Headers: " + consumerRecord.headers());
        log.info("Partion: " + consumerRecord.partition());
        log.info("Order: " + consumerRecord.value());
    }
}

Com isso, quando for produzido uma mensagem no tópico, temos informações além do conteúdo da mensagem, como demostrado abaixo.

b.c.emmanuelneri.consumer.OrderConsumer : key: 4d3d2311-071a-4a26-8600-a9b764a2f004
b.c.emmanuelneri.consumer.OrderConsumer : Headers: RecordHeaders(headers = [], isReadOnly = false)
b.c.emmanuelneri.consumer.OrderConsumer : Partion: 0
b.c.emmanuelneri.consumer.OrderConsumer : Order: {"identifier": "12343","customer": "Customer X", "value": 1500}

Mapeando mensagens “tipadas”

Nem sempre queremos trafegar apenas como texto, ou até mesmo queremos enviar uma mensagem como texto e mapear para uma classe no consumer, para isso, precisamos fazer algumas configurações, que vamos ver na sequencia.

Na aplicação producer, vamos adicionar 3 configurações, listadas abaixo:

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.add.type.headers=false
  • key-serializer: É o tipo de serialização da chave da mensagem, no caso vamos manter como String;
  • value-serializer: É o tipo de serialização do conteúdo da mensagem, que vamos alterar para um formato Json;
  • add.type.headers: Como vamos enviar a mensagem como Json e não sabemos qual o tipo da mensagem, desabilitamos a adição tipo no header da mensagem.

Observação: Quando não declaramos a propriedade add.type.headers como false o producer automaticamente coloca o tipo do objetivo produzido no header da mensagem e consequentemente o consumer deverá esperar esse mesmo tipo, então como estão enviando um texto no formato de Json e vamos consumir um objeto do tipo Order.class, devemos desabilitar essa passagem de tipo por parâmetro.

Observação 2: Quando estamos trabalhando com classes com os mesmo tipos no producer e no consumer não precisamos desativar essa propriedade.

Na aplicação consumer, vamos adicionar o Jackson para fazer a conversão de Json para classe e também adicionar algumas configurações.

Para adicionar a deserialização por JSON é necessário adicionar a dependência jackson-databind.

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version> 2.9.8</version>
</dependency>

Também adicionamos configurações, só que deserializar e na propriedade spring.kafka.consumer.

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  • key-deserializer: É o tipo de deserialização da chave da mensagem, no caso vamos manter como String;
  • value-deserializer: É o tipo de deserialização do conteúdo da mensagem, que vamos alterar para um formato Json;

Por fim, basta alterar o tipo do recebimento de String para Order, que nosso consumer está pronto para receber as mensagem nesse formato.

import br.com.emmanuelneri.consumer.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "${order.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumer(ConsumerRecord consumerRecord) {
        log.info("key: " + consumerRecord.key());
        log.info("Headers: " + consumerRecord.headers());
        log.info("Partion: " + consumerRecord.partition());
        log.info("Order: " + consumerRecord.value());
    }
}

b.c.emmanuelneri.consumer.OrderConsumer : key: 4d3d2311-071a-4a26-8600-a9b764a2f004
b.c.emmanuelneri.consumer.OrderConsumer : Headers: RecordHeaders(headers = [], isReadOnly = false)
b.c.emmanuelneri.consumer.OrderConsumer : Partion: 0
b.c.emmanuelneri.consumer.OrderConsumer : Order: Order(identifier=12343, customer=Customer X, value=1500)

Conclusão

Concluindo, o Kafka é uma excelente tecnologia para processamento de mensagem em alta escala e o Spring Boot fornece implementações facilitadoras para a utilização de Kafka, por exemplo, o KafkaTemplate e o KafkaLister que abstraem implementações específicas do Kafka e possibilita utilizarmos conceitos de consumidores e provedores naturalmente nas aplicações Spring Boot utilizando implementações do Kafka.

O código fonte dos exemplos estão disponíveis no github.

10 comentários sobre “Kafka no Spring Boot

  1. Excelente post. Uma dúvida, se no caso o nosso quando eu invocasse /orders no microserviço A eu desejaria que retornasse a lista de orders em formato json, mas que para isso /orders envie uma mensagem para o microserviço B(que tem o order consumer) e quem realmente fizesse a consulta ao banco , serialização do objeto,etc..fosse esse microserviçoB, como eu receberia esse json no microserviço A, eu teria que ter um consumer no microserviço A?

    Curtir

    • Obrigado! Na requisição /orders no microserviço A não seria possível retornar a lista de orders porque o processo de envio via Kafka para o microserviços B processar é assíncrono, ou seja, o microserviço A não sabe quando o processamento terminou e nem não por quanto tempo teria que esperar pelo termino do processamento. Uma alternativa para esse cenário seria fazer um callback no fim processamento, onde o microserviço B notifica quando terminar o processamento.

      Curtir

  2. Boa tarde amigo, seu posto é excelente e me ajudou muito a construir minha primeira prova de conceito com o kafka, obrigado!
    Poderia ajudar com duas dúvidas?
    1) É possível ter mais de um consumer lendo as mensagens de um mesmo tópico gerado pelo producer? Vi que seria alguma coisa com partitions ou com offsets e agora no seu tutorial, vi com group-id. Como seria o procedimento, teria algum material?
    2) Vc teria algum material para comunicação usando classes geradas a partir de um protobuf ao invés de json, da mesma forma que é feito com o protocolo grpc ?
    Se puder ajudar agradeço muito, se não, seu tutorial já foi de grande valia! Obrigado!

    Curtir

    • Boa tarde Viktor, fico feliz em ter ajudado!

      1) Sim, utilizando consumer group no Kafka você está fazendo uso do conceito de publisher/subscriber, ou seja, um producer pode ter N consumidores, para isso, a configuração do group-id é bastante importante porque ele que vai definir o grupo de consumo para cada aplicação, então caso um mesmo tópico precisa ser consumido por duas aplicações, são dois group-id diferentes (ex: app1-consumer-group, app2-consumer-group) e caso você precise de mais consumidores dentro da mesma aplicação, é apenas um consumer group (ex: app-consumer-group) junto com o conceito de partitions dos tópicos, onde um tópico precisa ter mais de uma particação para ser consumido em paralelo no mesmo consumer group. Mais sobre esse conceito pode ser visto na doc do Kafka: https://kafka.apache.org/documentation/#distributionimpl

      2) Infelizmente não tenho material relacionado a protobuf com Java/Spring, acredito que seria necessário implementar uma conversão do objeto para protobuf e configrar nas properites do spring boot value-serializer/value-deserializer.

      Abraço!

      Curtir

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Foto do Google

Você está comentando utilizando sua conta Google. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

Conectando a %s

Este site utiliza o Akismet para reduzir spam. Saiba como seus dados em comentários são processados.