Kafka no Vert.x

O Apache Kafka é uma plataforma de streaming distribuído excelente para a troca de mensagem em alta escala, como por exemplo, em uma arquitetura orientada em eventos. Mais sobre Kafka pode ser visto nesse outro post: Kafka.

O Vert.x é uma tool-kit lightweight que possibilita o desenvolvimento de aplicações reativas baseadas em JVM. Dentro da gama de módulos, nesse post vamos falar da dependência vertx-kafka-client, que facilita a utilização do Kafka em aplicações Vert.x, fornecendo interfaces para produzir e consumir mensagens em clusters Kafka.

Para os producer, que farão o envio de mensagem da aplicação para o Kafka, temos a interface KafkaProducer, que fornece operações de alto nível para o envio de mensagens para o Kafka.

Para os consumers, que farão o recebimento de mensagem do Kafka para a aplicação, temos a interface KafkaConsumer, que possibilita a inscrição em tópicos e a implementação de handlers para ser executados a cada mensagem.

Obs: Como o Vert.x é uma tecnologia para o desenvolvimento de aplicações reativas, com isso o vertx-kafka-client também segue esse padrão com todas ações assíncronas através de handlers.

Exemplo

O Exemplo a ser demonstrado é “hello world” de producer e consumer, onde o producer é uma aplicação Vert.x que recebe um pedido através de um endpoint Rest e produz o pedido em forma de mensagem no tópico, enquanto tem outra aplicação Vert.x consumindo todas mensagens no tópico, como demonstrado na imagem abaixo.

Kafka

Iniciando pela infraestrutura do Kafka necessária para executar o exemplo, vamos iniciar o Kafka e o Zookeeper executando o arquivo de docker compose abaixo.

docker-compose.yml

version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2

kafka:
image: confluentinc/cp-kafka:5.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

Execute o comando docker-compose up para inicializar os containers.

Observação: Como estamos configurando KAFKA_AUTO_CREATE_TOPICS_ENABLE = true, não é necessário a criação dos tópicos via comandos diretamente no Kafka, os tópicos serão gerados quando utilizados pelas aplicações. Lembrando que é uma configuração para testes, em casos de ambientes de produção é recomendado a criação dos tópicos pelo comando informando corretamente as configurações de partições e replicações de cada tópico.

Producer

Na sequência, vamos criar a aplicações responsável por produzir mensagens na fila, onde ela terá um endpoint Rest para receber pedidos e colocá-los no tópico do Kafka.

Como a aplicação vai disponibilizar um endpoint precisamos da dependência vertx-web e também da dependência vertx-kafka-client para ter as implementações do Kafka na aplicação Vert.x.

pom.xml (Maven)

<dependencies>
    <dependency>
        <groupId>io.vertx</groupId>
	<artifactId>vertx-web</artifactId>
    </dependency>
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-kafka-client</artifactId> 
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
	    <groupId>io.vertx</groupId>
            <artifactId>vertx-stack-depchain</artifactId>
	    <version>3.8.1</version>
	    <type>pom</type>
	    <scope>import</scope>
	</dependency>
    </dependencies>
</dependencyManagement>

Adicionado as dependências, podemos criar nosso producer, que será uma classe responsável por receber um evento de pedido recebido e enviar uma mensagem para o tópico com o conteúdo do pedido.

Essa classe será um Verticle, que é uma classe que pode ser instalada (Deployed) dentro do contexto do Vert.x, e tem acesso ao eventBus que é o canal de comunicação utilizado para receber o evento de “pedido recebido”. Assim, a cada pedido recebido no eventBus, vamos criar um KafkaProducerRecord, utilizado para preparar a mensagem a ser enviada, informando o tópico de destino, a chave e o conteúdo, e por fim podemos utilizar a interface kafkaProducer para fazer o envio para o Kafka através do método send.

import br.com.emmanuelneri.producer.EventbusAddress;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class OrderProducerVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderProducerVerticle.class);
    private static final String ORDER_TOPIC = "Order";

    @Override
    public void start(final Promise<Void> startPromise) {
        final Map<String, String> config = createKafkaProducerConfig();
        final KafkaProducer<String, String> kafkaProducer = KafkaProducer.create(vertx, config);

        vertx.eventBus().localConsumer(EventbusAddress.ORDER_RECEVIED.name(), message -> {
            final String key = UUID.randomUUID().toString();
            final KafkaProducerRecord<String, String> kafkaProducerRecord = KafkaProducerRecord
                    .create(ORDER_TOPIC, key, message.body().toString());

            kafkaProducer.send(kafkaProducerRecord, result -> {
                if (result.failed()) {
                    LOGGER.error("message produce error {0}", kafkaProducerRecord, result.cause());
                    return;
                }

                LOGGER.info("message produced. key: {0}", kafkaProducerRecord.key());
            });
        });

        startPromise.complete();
    }

    private Map<String, String> createKafkaProducerConfig() {
        final Map<String, String> config = new HashMap<>();
        config.put("bootstrap.servers", "localhost:9092");
        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return config;
    }
}

As configurações são feitas em um Map, passando a propriedade da configuração e valor em String, posteriormente passadas na criação da classe KafkaProducer.

Observação: Como mencionado anteriormente, o Vert.x trabalha suas ações de forma assíncrona por utilizar o paradigma reativo, baseado nisso, o método send do KafkaProducer possui um handler nos seus parâmetros do tipo AsyncResult, que trará o resultado de falha ou sucesso da ação quando o método for executado.

Feito a implementação do producer, agora precisamos receber os pedidos e postar no eventBus para notificarmos que um novo pedido foi recebido. Para isso, vamos criar mais uma classe que disponibiliza o endpoint Rest para recebimento dos pedidos.

Essa classe também será um Verticle, responsável por iniciar um Http Server e disponibilizar o endpoint “/orders” no método POST para receber os pedidos e enviar para o eventBus.

import br.com.emmanuelneri.producer.EventbusAddress;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;

public class OrderHttpServerVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderHttpServerVerticle.class);
    private static final int PORT = 8080;
    private static final String ORDERS_PATH = "/orders";

    @Override
    public void start(final Promise<Void> startPromise) {
        final HttpServer httpServer = vertx.createHttpServer();

        final Handler<RoutingContext> failureHandler = this::routingFailureHandler;
        final Handler<RoutingContext> orderRoutingHandler = this::routingSuccessHandler;

        final Router router = Router.router(vertx);
        router.route("/orders/*").handler(BodyHandler.create()).failureHandler(failureHandler);
        router.post(ORDERS_PATH).handler(orderRoutingHandler).failureHandler(failureHandler);

        httpServer.requestHandler(router)
                .listen(PORT, handler -> {
                    if (handler.failed()) {
                        startPromise.fail(handler.cause());
                        return;
                    }
                    startPromise.complete();
                    LOGGER.info("HTTP Server is running.");
                });
    }

    private void routingSuccessHandler(final RoutingContext routingContext) {
        final String body = routingContext.getBodyAsString();
        LOGGER.info("order received: {0}", body);
        vertx.eventBus().send(EventbusAddress.ORDER_RECEVIED.name(), body);
    }

    private void routingFailureHandler(final RoutingContext routingContext) {
        LOGGER.error("Routing error", routingContext.failure());
        routingContext.response()
                .setStatusCode(routingContext.statusCode())
                .end();
    }

}

Com base no código acima, a cada requisição no “/orders” será invocado o orderRoutingHandler, que obtém o corpo da mensagem e envia para o eventBus no endereço ORDER_RECEVIED.

Um poucos mais sobre a utiliação de APIs Rest com Vert.x pode ser encontrado nesse outros post: Criando APIs Rest com Vert.x

Implementado os dois Verticles, o contexto do Vert.x precisa ser inicializado para executarmos nossa aplicação. Para isso, vamos iniciar o Vert.x dentro do método main de uma classe Java e fazer deploy das duas classes que implementamos.

import br.com.emmanuelneri.producer.verticle.OrderHttpServerVerticle;
import br.com.emmanuelneri.producer.verticle.OrderProducerVerticle;
import io.vertx.core.Vertx;

public class Application {

    public static void main(String[] args) {
        final Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(new OrderHttpServerVerticle());
        vertx.deployVerticle(new OrderProducerVerticle());
    }
}

Consumer

O consumer é uma segunda aplicação que ficará “escutando” o tópico e logando a mensagem recebido. Com isso, a única dependência necessária é a vertx-kafka-client.

pom.xml (Maven)

<dependencies>
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-kafka-client</artifactId> 
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
	    <groupId>io.vertx</groupId>
            <artifactId>vertx-stack-depchain</artifactId>
	    <version>3.8.1</version>
	    <type>pom</type>
	    <scope>import</scope>
	</dependency>
    </dependencies>
</dependencyManagement>

Concluído as configurações, podemos criar nosso consumer que também será um Verticle com uma instância de KafkaConsumer inscrita no tópico de Order, fazendo o log do conteúdo do pedido a cada mensagem consumida.

import br.com.emmanuelneri.consumer.model.Order;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.Json;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.HashMap;
import java.util.Map;

public class OrderConsumerVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderConsumerVerticle.class);
    private static final String ORDER_TOPIC = "Order";

    @Override
    public void start(final Promise<Void> startPromise) {
        final Map<String, String> config = createKafkaConsumerConfig();
        final KafkaConsumer<String, String> kafkaConsumer = KafkaConsumer.create(vertx, config);

        kafkaConsumer.subscribe(ORDER_TOPIC, result -> {
            if (result.failed()) {
                LOGGER.error("failed to subscribe {0} topic", ORDER_TOPIC, result.cause());
                return;
            }

            LOGGER.info("{0} topic subscribed", ORDER_TOPIC);
        });

        kafkaConsumer.handler(result -> {
            LOGGER.info("message consumed {0}", result);

            final ConsumerRecord<String, String> record = result.record();
            LOGGER.info("key: " + record.key());
            LOGGER.info("Headers: " + record.headers());
            LOGGER.info("Order: " + Json.decodeValue(record.value(), Order.class));
        });
    }

    public Map<String, String> createKafkaConsumerConfig() {
        final Map<String, String> config = new HashMap<>();
        config.put("bootstrap.servers", "localhost:9092");
        config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("group.id", "order-consumer-group");
        config.put("auto.offset.reset", "latest");
        return config;
    }

}

Após a inicialização do KafkaConsumer, além de implementar o método handler para realizar a ação a cada mensagem, também é preciso inscrever o consumer no tópico através do método subscribe.

As configurações também são feitas em um Map, passando as propriedades e seus respectivos valores na criação da classe KafkaConsumer. O consumer possui algumas configurações a mais relacionadas ao consumer group do Kafka, com isso é necessário o id do grupo de consumo (group.id) e configuração do offset (auto.offset.reset) “latest” ou “earliest”.

Observação: Como o auto.offset.reset foi configurado como latest, apenas as mensagem produzidas após o consumer se conectar no Kafka serão logadas.

Após implementado o Verticle responsável para consumir os dados do tópico, é necessário adicionar no contexto do Vert.x, como no código abaixo.

import br.com.emmanuelneri.consumer.verticle.OrderConsumerVerticle;
import io.vertx.core.Vertx;

public class Application {

    public static void main(String[] args) {
        final Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new OrderConsumerVerticle());
    }
}

Com isso, quando for produzido uma mensagem no tópico, será logado como demostrado abaixo.

[INFO] mar 14, 2020 17:37:53 AM br.com.emmanuelneri.consumer.verticle.OrderConsumerVerticle
[INFO] INFORMAÇÕES: Order: Order(identifier=12343, customer=Customer X, value=1500)

Conclusão

Concluindo, o Kafka é uma excelente tecnologia para processamento de mensagem em alta escala e o Vert.x fornece implementações para a utilização de Kafka com programação reativa, onde as interfaces KafkaConsumer e KafkaProducer possibilitam métodos como handler, subscribe, commit, send, entre outros, todos de forma assíncrona.

O código fonte dos exemplos estão disponíveis no github.

2 comentários sobre “Kafka no Vert.x

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Foto do Google

Você está comentando utilizando sua conta Google. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

Conectando a %s

Este site utiliza o Akismet para reduzir spam. Saiba como seus dados em comentários são processados.