Processamento paralelo com CompletableFuture no Java 8

A partir do Java 8, o processamento paralelo começa a ficar mais simples com as novas classes e interfaces do pacote java.util.concurrent.

Novas funcionalidades foram adicionadas, mas como utilizar desse recurso na prática? Uma bom exemplo seria pensar nossos métodos sendo executados de forma assíncrona e não síncrona ou sequencial, onde é uma mudança na forma de estruturar nossas implementações.

Por exemplo a classe CompletableFuture, que implementa a interface Future, tem o objetivo de gerenciar ações futuras nas execuções assíncronas: como verificar se uma execução assíncrona já acabou, esperar por um retorno assíncrono ou cancelar uma execução.

Dessa forma, podemos usar métodos estáticos para executar ações assíncronas, como o runAsync que faz a chamada de alguma instrução e continua o método:

CompletableFuture.runAsync(() -> {
    // chama o método payBills de forma assíncrona e não espera finalizar
    billRepository.payBills(billId)
});
...

ou também podemos usuário métodos como o supplyAsync, que faz a chamada de uma execução e retorna um future de um tipo (CompletableFuture) para “controlar” as ações do método executado e o método segue para as próximas instruções, assim não sendo blocante o processo, porém como é uma future não necessariamente o processo assíncrono vai ter finalizado e retornado o objeto desejado.

 final CompletableFuture<Bill> future = CompletableFuture.supplyAsync(() -> {
   // chama o método findById de forma assíncrona e cria uma future com o resultado do método
   billRepository.findById(billId)
});
...
future.isDone();

Como recebemos um CompletableFuture como retorno, ele disponibiliza alguns métodos para trabalharmos com a future, segue alguns exemplos:

    • future.get(): Retorna o valor da ação assíncrona, caso ainda não terminada, o método get() aguarda o fim da execução;
    • future.getNow(valueIfAbsent): Retorna o valor agora ou senão retorna o valor passado por parâmetro;
    • future.complete(value): Completa a execução e caso não tiver finalizado seta o valor do parâmetro como retorno;
    • future.cancel(mayInterruptIfRunning): Cancela uma execução;
    • future.isDone(): Retorna se a ação assíncrona já foi finalizada.

Exemplificando um caso, onde um serviço constrói um resumo de um contexto de negócio composto basicamente por três passos: carregar os dados bases da entidade, sumarizar dados relacionados e fazer um rank de dados mais consumidos, normalmente seria implementado da seguinte forma:

public BillResumeDTO findResume(Long billId) {

    final Bill bill = billRepository.findOne(billId));
    final BillSummaryDTO summary = billItemRepository.summarizeByBillId(billId);
    final List<NumberUseDTO> most10numbersUse = billItemRepository.numberGreaterUse(billId, new PageRequest(0, 10));

   return new BillResumeDTO(bill.getIdentifier(), bill.getCustomer().getName(), summary, most10numbersUse);

Aplicando os conceitos para paralelizar o método findResume, seria executar cada passo em uma thread diferente, onde após finalizados, seus resultados independentes precisam ser juntados para a construção do único resultado final, o objeto de retorno.

public BillResumeDTO findResume(Long billId) {

        final CompletableFuture<Bill> billFuture = CompletableFuture.supplyAsync(() -> billRepository.findOne(billId));
        final CompletableFuture<BillSummaryDTO> summaryDTOCompletableFuture = CompletableFuture.supplyAsync(() -> billItemRepository.summarizeByBillId(billId));
        final CompletableFuture<List<NumberUseDTO>> most10numbersUseFuture = CompletableFuture.supplyAsync(() -> billItemRepository.numberGreaterUse(billId, new PageRequest(0, 10)));

        try {
           final Bill bill = billFuture.get();
           final BillSummaryDTO summary = summaryDTOCompletableFuture.get();
           final List<NumberUseDTO> most10numbersUse = most10numbersUseFuture.get();

           return new BillResumeDTO(bill.getIdentifier(), bill.getCustomer().getName(), summary, most10numbersUse);
        } catch (InterruptedException | ExecutionException e) {
            log.error("findResume process error" + e);
            throw new RuntimeException();
        }
    }

As consultas foram disparadas de forma assíncrono pelo CompletableFuture.supplyAsync e retornada em um CompletableFuture, após lançadas, como é necessário um único formato de retorno (BillResumeDTO) é utilizado o método .get() para aguardar o retorno das consultas assíncronas e montar o objeto de retorno.

O benefício dessa abordagem é que as três consultas são executadas em paralelo, ou seja, não é necessário que uma consulta termine para iniciar a outra, dessa forma, deixando o tempo de execução é menor comparado a primeira abordagem, onde as consultas são executadas sequencialmente.

Observação: Por mais que as consultas são executadas de forma assíncrona, o serviço ainda é blocante porque o método get aguarda o fim da execução, no entanto, o ganho dessa abordagem é no tempo de processamento quando paralelizado partes das execução do método.

No entanto, devemos utilizar com cautela essa abordagem, pois ela dispara várias threads em paralelos para executar um único método e isso pode se tornar um problema principalmente quando utilizadas com banco de dados, como no exemplo acima, onde são criadas várias conexões simultâneas para acessar o banco de dados e em casos de demora nas consultas ou muitas requisições no método pode estourar o pool de conexões.

Anúncios

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

w

Conectando a %s