BT

Início Artigos Atomicidade em transações distribuídas no Apache Kafka

Atomicidade em transações distribuídas no Apache Kafka

Favoritos

Pontos Principais

 

  • Discutir as necessidade de atomicidade em sistemas distribuídos. 
  • Exemplo e funcionamento de transações no Apache Kafka.
  • Tradeoffs e impactos em performance que precisamos estar atentos.

Em um mundo onde os sistemas de software estão se tornando cada vez mais distribuídos com o intuito de alcançar maior vazão e disponibilidade, nos deparamos com problemas complexos a serem resolvidos, mas muitas vezes banalizados. Por exemplo: como fazer rollback de um processo que foi executado em N microsserviços? Como garantir a não duplicação de determinada informação quando um ACK (Acknowledgement) demorar mais do que o esperado? Todas essas questões já estão sendo resolvidas com padrões conhecidos como é o caso do pattern SAGA, que aplica compensação de operações, dentre outras técnicas.

Se pensarmos em um processo complexo que exige N escritas no Apache Kafka, podemos partir do princípio que a falha em uma das etapas desse processo requer o rollback de todo processo. Uma das formas de se resolver tal problema com uma Operação de Compensação, por exemplo, o que pode ser muito oneroso de acordo com a quantidade de processos realizados e o nível de complexidade destes.

Além da complexidade de operações de compensação, temos outros cenários complexos que podem requerer atomicidade, como por exemplo uma modelagem onde você precisa garantir a transação de produção de mensagens entre diferentes partições de um mesmo tópico no Apache Kafka.

Diante desses exemplos, não é de se estranhar que a aparente falta de suporte transacional por parte do Apache Kafka pode parecer um problema. Inclusive na familiaridade e segurança que desenvolvedores têm com outras ferramentas com garantias transacionais.

Neste artigo vamos introduzir uma feature do Apache Kafka pouco explorada na maioria das empresas, mas que podem fazer muita diferença em processos atômicos e diminuir consideravelmente a complexidade em possíveis rollbacks: as transações.

Uma revisão sobre Atomicidade, "All or Nothing"

Diretamente do dicionário, uma das definições encontradas sobre Atomicidade é: "Trata o trabalho como parte indivisível (atômico)". Isso significa dizer que todo trabalho deve ser feito ou nada deverá ser feito.

O exemplo mais comum é de uma transação de transferência bancária, onde o cliente A deseja transferir R$ 20,00 para o cliente B, vejamos o processo simplificado:

  1. Cliente A solicita transferência de R$ 20,00 para o cliente B.
  2. Processo retira o valor de R$ 20,00 da conta do cliente A.
  3. Processo verifica se o cliente B não possui nenhum bloqueio ou impedimento para receber o valor.
  4. Cliente B recebe os R$ 20,00.
  5. Processo confirma a operação.

Se ocorrer qualquer problema em um dos passos, devemos cancelar toda a operação. Não podemos permitir que o dinheiro saia da conta do cliente A e não chegue na conta do cliente B. Isso é Atomicidade: garantimos tudo ou nada.

Streaming (read-process-write) e suas dificuldades

Quando falamos de Streaming, umas das formas mais simples de explicar tal processo é através do read-process-write, palavra composta que deixa claro que este processo, tão citado atualmente, é composto de leituras, processamentos e escritas. Vejamos a figura abaixo que ilustra um read-process-write no Apache Kafka:

Dado dois tópicos A e B, temos um consumidor (uma aplicação genérica como exemplo) que, através dos dados consumidos desses tópicos, realiza algum processo interno e grava os estados necessários em uma base de dados, como uma base de dados da aplicação por exemplo. Após um processamento interno, os dados tratados são enviados para o tópico C e D para serem consumidos por quem estiver interessado. Parece um processo simples ao olhos despercebidos, mas vários problemas podem ocorrer neste exemplo, vejamos alguns cenários:

  1. Caso o produtor que esteja escrevendo nos tópicos A e B não receba o ACK no tempo esperado, o mesmo pode retentar o envio da mesma mensagem ocasionando em processamento duplicado de mensagem. Um processamento duplicado neste caso pode significar que os resultados nos tópico C e D sejam diferentes da mensagem original, isso porque o estado interno pode ter sido alterado, algo inesperado e indesejado e que não era previsto.
  2. O processamento pode sofrer uma instabilidade um instante antes de enviar as mensagens tratadas para os tópicos C e D, ou seja, todo processamento foi realizado mas no momento de enviar os resultados a aplicação fica offline. Supondo que as mensagens já foram comitadas nos tópicos A e B, você terá uma perda de mensagem, pois elas não serão consumidas novamente.
  3. Para tentar resolver o problema do item (B) você pode pensar em deixar o commit da mensagem dos tópicos A e B para serem realizados assim que você enviar as mensagens ao tópico C e D, ou seja, postergar o commit. Porém, após realizar o commit das mensagens nos tópicos C e D você ainda estará sujeito a falha de commit no A e B. Sendo assim, as mensagens poderão ser reprocessadas e o mesmo efeito do item (A) será produzido.

Matthias Sax, exemplifica os cenários descritos anteriormente em sua palestra Exactly-Once Semantics in Apache Kafka durante o Kafka Summit em Londres de 2018.

Funcionamento prático das transações para resolução do read-process-write

Para resolver os problemas descritos anteriormente e mais alguns outros, os mantenedores do Apache Kafka resolveram desenvolver a feature de Transactional Message. Caso queira saber um pouco mais sobre a proposta de melhoria que deu origem a feature, basta acessar o wiki do projeto. A ideia desta feature é facilitar casos onde exista a necessidade de garantias de atomicidade em cenários de read-process-write, publicação entre diversas partições do mesmo tópico e melhorar semântica de exactly once.

Com relação aos problemas descritos anteriormente, podemos resolver usando as transações nativas do Apache Kafka, sem precisar desenvolver uma solução externa para realizar nenhum tipo de controle.

Existem dois componentes no Apache Kafka que tratam, em conjunto, do funcionamento das mensagens transacionais: o Transaction Log e Transaction Coordinator. De forma resumida, o Transaction Log é um tópico onde ficam armazenadas as informações sobre cada transação, ou seja, se ela foi comitada, se ela está em processamento e etc. Por outro lado, o Transaction Coordinator é o responsável por gravar neste tópico, usando o mesmo como "base de dados" para controle dos estados das transações.

É importante lembrar que o tópico "Transaction Log" não contém o payload da mensagem que foi enviada pelo produtor uma vez que este continua sendo gravado no tópico de destino. A função do Log é possuir as informações sobre as transações, ou seja, o metadado.

Para tentar ilustrar melhor, vamos a um exemplo prático onde teremos o seguinte caso:

  1. Um produtor qualquer grava em um tópico que chamamos de "input-topic";
  2. Um consumidor começa o read-process-write assim que ocorre uma leitura do "input-topic";
  3. O processamento é simples, sendo a mensagem uma string qualquer. Se essa string conter o valor "55" então a transação será abortada, caso contrário prosseguimos para o passo 4;
  4. A string, sem o valor "55", é concatenada com outra string "-processed", então esta é enviada para o "output-topic";

Dada a situação acima, vale ressaltar que queremos garantir a atomicidade de todo processo, ou seja, tudo deverá ser feito ou nada deverá ser feito (All or nothing). Vejamos na prática como cada item foi construído:

 

//Cria o produtor

KafkaProducer<String, String> kafkaProducer = createProducer();

//Inicializa as configurações deste produtor no Coordinator do Cluster

kafkaProducer.initTransactions();

 

No trecho acima criamos o produtor e, através do método initTransactions(), informamos ao Transaction Coordinator que gostaríamos de registrar esse produtor para posteriormente iniciar uma transação.

Para criar nosso produtor temos algumas poucas diferenças de um produtor que não é transacional, vejamos:

 

private static KafkaProducer<String, String> createProducer() {

Map<String, Object> props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.BROKER_URL);

props.put(ProducerConfig.CLIENT_ID_CONFIG, "PRODUCER_OUTPUT_TOPIC");

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID);

props.put(ProducerConfig.ACKS_CONFIG, "all");

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

props.put("ssl.endpoint.identification.algorithm", "https");

props.put("security.protocol", "SASL_SSL");

props.put("sasl.mechanism", "PLAIN");

props.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", Configuration.APIKEY, Configuration.SECRET));

return new KafkaProducer<>(props);

}

 

Para quem já conhece a criação de um produtor não transacional, deve ter percebido que a diferença foi em 2 pontos:

  1. "TRANSACTIONAL_ID_CONFIG": Esse valor diz ao Transaction Coordinator o código da transação ligada ao produtor. Assim, ele conseguirá distinguir a qual produtor uma determinada transação está relacionada. Se esse produtor ficar offline no meio de uma transação, o Transaction Coordinator conseguirá "resumir" o estado anterior baseado também nesse ID.
  2. "ENABLE.IDEMPOTENCE": Serve para garantir que nosso produtor não duplique mensagens, resolvendo o problema citado no item (A) da seção "Streaming (read-process-write) e suas dificuldades". Não entraremos em muitos detalhes sobre esse item pois queremos dar maior foco na transação em si, mas vale deixar claro que essa propriedade garante que o produtor será idempotente, antes dessa propriedade nós precisávamos garantir no consumidor a idempotência.

Após o produtor criado e configurado, lembrando que ele será o responsável por escrever no "output-topic", precisamos criar o consumidor do "input-topic", vejamos:

 

//Cria o Consumidor e inscreve o mesmo no INPUT-TOPIC

KafkaConsumer<String, String> kafkaConsumer = createConsumer();

kafkaConsumer.subscribe(Collections.singleton(Configuration.INPUT_TOPIC));

 

Não temos nenhuma novidade nesse trecho, mas sim nas configurações do consumidor:

 

private static KafkaConsumer<String, String> createConsumer() {

Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.BROKER_URL);

props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);

props.put(ConsumerConfig.CLIENT_ID_CONFIG, "CONSUMER_INPUT_TOPIC-02");

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

props.put("ssl.endpoint.identification.algorithm", "https");

props.put("security.protocol", "SASL_SSL");

props.put("sasl.mechanism", "PLAIN");

props.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", Configuration.APIKEY, Configuration.SECRET));

return new KafkaConsumer<>(props);

}

 

A única configuração diferente de um consumidor não transacional, é o "Isolation Level". Por padrão, todos os consumidores têm o Isolation Level igual a read_uncommitted, significando que independente de uma mensagem estar ou não dentro de uma transação, ela será consumida. Esse comportamento não é que o desejamos para nosso caso, pois este consumidor só deverá ler mensagens com a marcação committed, por isso usamos o valor read_committed.

Como o consumidor consegue identificar quais mensagens estão com a marcação commited ou não? Todas as mensagens de um tópico podem ou não ter um marcador, que podem ser: ABORTED, COMMITED ou NENHUM (nenhum marcador). O broker armazena o LSO (Last Stable Offsets) que indica o último offset que pode ser lido pelo Isolation Level read_commited. Internamente, o LSO contém o último offset que possui a marcação ABORTED ou COMMITED, significando que essas mensagens já foram processadas por alguma transação e já podem ser lidas.

Por exemplo, se você iniciar uma transação e começar a enviar mensagens para produção (producer.send()), elas ainda não terão a marcação ABORTED ou COMMITED. Sendo assim, os consumidores com read_commited não irão ler estas mensagens, pois o LSO está apontando para um offset anterior a elas.

Até o momento só configuramos o consumidor. Agora vamos para o loop completo de consumir, processar e produzir:

 

while (true) {

//Map responsável por guardar quais offsets devem ser comitados no fim da transação ('All or Nothing')

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

//Aguarda por no máximo 1minuto por mensagens a serem consumidas

ConsumerRecords<String, String> records = kafkaConsumer.poll(Long.MAX_VALUE);

//Ao chegar neste ponto significa que mensagens estão sendo consumidas, então podemos iniciar uma transação

kafkaProducer.beginTransaction();

 

Um dos pontos mais importantes quando estamos começando a trabalhar com transações é entender que, diferente de uma aplicação não transacional onde o próprio consumidor faz o commit das mensagens lidas, em uma aplicação transacional o commit de leitura das mensagens é feito no final do processo, junto com o commit da transação que engloba as escritas também. Vamos entender mais pra frente essa necessidade. Por hora, apenas entenda que a variável offsets vai armazenar todos os offsets lidos pelo nosso consumidor.

Não há muito o que falar sobre o beginTransaction: ele diz ao Transaction Coordinator que uma transação está sendo iniciada. Dessa forma, o mesmo começará a controlar todo esse fluxo, fazendo rollback ou Commit quando solicitado.

 

boolean aborted = false;

for (ConsumerRecord<String, String> record : records) {

//Para cada mensagem consumida nós realizamos uma 'transformação simples', sem comitar o offset de consumo

String message = transformMessage(record.value());

kafkaProducer.send(new ProducerRecord<>(Configuration.OUTPUT_TOPIC, message));

//todas as mensagens consumidas e produzidas dentro dessa transação são abortadas se conter um valor que desejamos testar

if (message.contains("55")) {

kafkaProducer.abortTransaction();

aborted = true;

break;

}

//Sempre que uma mensagem for 'transformada' com sucesso, gravamos o offset dela no nosso Map, para futuro commit da transação

offsets.put(new TopicPartition(Configuration.INPUT_TOPIC, record.partition()), new OffsetAndMetadata(record.offset() + 1));

}

 

Para cada mensagem lida pelo nosso consumidor, adicionamos o sufixo "-processed" no método transformMessage() e, logo em seguida, usamos o kafkaProducer.send() para enviar essa mensagem para o nosso "output-topic".

Se nesse ponto você olhar as mensagens do "output-topic" verá que nenhuma foi gravada, pois nada foi comitado ainda. Outro ponto importante é que o offset de consumo do "input-topic" ainda não se moveu, como se ainda não tivéssemos lido a mensagem. Inclusive você verá o campo lag > 1. Se a mensagem possuir o valor "55", como citamos anteriormente, toda a transação será abortada, marcando as mensagens como aborted.

Por fim, começamos a popular nossa variável offsets do tipo Map. Perceba que estamos populando essa variável com a partição e o offset lido do "input-topic". Atente para esse ponto, pois é essencial para o funcionamento adequado do nosso processo transacional.

Após várias mensagens lidas e gravadas na nossa variável offsets só nos falta realizar o commit:

 

//Após tudo finalizado com sucesso, podemos comitar a transação, mas precisamos dizer ao Coordinator quais os offsets devem

//ser comitados.

if (!aborted) {

kafkaProducer.sendOffsetsToTransaction(offsets, CONSUMER_GROUP_ID);

kafkaProducer.commitTransaction();

}

 

Lembre-se que criamos uma situação que pode abortar a nossa transação. Quando esse abort for chamado não devemos chamar o commitTransaction pois a transação já foi abortada. Por isso, colocamos a checagem do !aborted.

O método sendOffsetsToTransaction é um dos mais importantes em todo esse fluxo. No início deste artigo falamos um pouco sobre atomicidade e é exatamente aqui que toda a explicação entra em prática. Este método é responsável por enviar os offsets que lemos do consumidor, para quando realizarmos o processo de commitTransaction(). O que o Transaction Coordinator faz é garantir que ou todos os offsets são marcados como committed ou nenhum será. É o que chamamos de All or nothing.

Após o commitTransaction() bem sucedido você verá 2 comportamentos:

  1. As mensagens do "input-topic" são realmente comitadas, e o lag desaparece do consumidor.
  2. As mensagens do "output-topic" são escritas, possibilitando que outros consumidores possam ler.

Perceba que os passos a e b são dependentes um do outro, ou seja, um só irá ocorrer se o outro também ocorrer e isso é garantido pelo Transaction Coordinator.

O exemplo completo, mostrado nesse artigo, pode ser encontrado no github.

Cuidados relativos a configurações de transações

Normalmente temos que tomar cuidado com algumas configurações como tempo de retenção dos offsets e números de réplicas in sync. Para o cenário de uso de transações no Apache Kafka esses cuidados aumentam, pois temos que olhar para mais variáveis nas configurações do broker, produtores e consumidores.

Se você tem sagas de longa duração e precisa manter a transação aberta por muito tempo (ou em caso de falha pode demorar muito para se recuperar) é interessante olhar a configuração do broker de transaction.max.timeout.ms. Essa configuração define o tempo máximo permitido para que o produtor que solicitou a transação fique sem atualizar o status antes do timeout. No produtor, a configuração que define esse tempo é o transaction.timeout.ms.

Lembre-se que ao iniciar uma transação de escrita em um tópico você pode impedir que outros consumidores leiam daquele mesmo tópico, caso eles estejam marcados com leitura de read commited. Portanto, tenha prudência no uso de transações e na hora de estabelecer seus tempos de timeout.

Se você está pensando em fazer tunning do seu pipeline com transações, pode ser interessante olhar outras configurações do Broker como transaction.state.log.load.buffer.size e transaction.state.log.min.isr.

Trade-Offs no uso de transações

O uso de transações pode trazer certo overhead de performance. Para deixar claro quanto de overhead teríamos em um caso real, fizemos um teste de read-process-write usando um processo com transação e outro sem transação com, exatamente, 1 milhão de mensagens a serem processadas, seguindo o mesmo exemplo apresentado nesse artigo.

Obs: Estamos usando um cluster Kafka localizado na região us-central1, usando a plataforma Confluent Cloud. Por conta disso, o tempo pode ser um pouco maior do que o esperado. Além disso, as configurações da instância/container utilizadas para teste podem influenciar nos valores. O objetivo desse teste é validar a diferença de tempo entre um processo transacional e não transacional. Também estamos usando exatamente as mesmas configurações para o consumidor e produtor. Obviamente que no modelo transacional adicionamos as configurações obrigatórias para o funcionamento do mesmo.

Qtd Mensagens Processadas

Transacional

Tempo Total em Segundos

1.000.000

Sim

217 segundos e 122 ms

1.000.000

Não

77 segundos e 307 ms

Conclusão

No Kafka, por padrão, produtores não possuem garantia de idempotência na escrita sobre diferentes partições do mesmo tópico. Para essa finalidade, a feature de transação do Apache Kafka é uma opção que pode dar todas as garantias que sua aplicação necessita. Em momentos onde a duplicação de mensagens em um fluxo de read-process-write seja inaceitável, também temos um forte indício de uso da funcionalidade de transações no Apache Kafka.

A necessidade de não permitir "leitura suja" em um stream ou em uma arquitetura SEDA (ler uma mensagem já publicada que faz parte de uma transação maior que ainda não foi concluída), é mais um motivo que pode nos levar para o uso de transações em alguns cenários. O uso de transações é muito útil quando queremos garantir processos de entrega com a semântica Exactly-Once. Tão útil que pode tornar-se obrigatório dependendo do caso de uso.

Vimos neste artigo que com o uso de transações o desenvolvimento torna-se muito mais simples, pois não precisamos nos preocupar com algumas situações que antes teríamos que garantir através de lógicas complexas.

Vale ressaltar também que temos certo overhead na performance, como mostrado neste artigo. Sem dúvidas que é possível fazer diversos tunings para tentar melhorar ainda mais o tempo. Mas o objetivo aqui é deixar claro que existe uma perda de performance, e, dependendo do caso, essa queda pode não fazer diferença. De certa forma, podemos pensar que se garantias transacionais forem muito importantes para você, o uso de transações pode fazer muito sentido. Mas se performance e vazão são itens que você não pode abrir mão, então vale pensar duas vezes sobre o uso de transações. Como em qualquer sistema distribuído, transações são pontos de contenção e com o Apache Kafka isso não é diferente.

O Kafka é uma ferramenta que foi construída pensando em vazão e performance, por isso a utilização de garantias transacionais representam um alto custo de processamento para a ferramenta. Sendo assim, vale o estudo de cada caso para aplicação ou não de processos transacionais utilizando o Apache Kafka.

Sobre os autores

Ronaldo Lanhellas (LinkedIn) é graduado em Ciência da Computação e pós-graduado em Desenvolvimento de Sistemas Web pela Universidade Federal do Pará. Atua na área de desenvolvimento desde 2009, com experiência em sistemas críticos de alta performance, Inteligência Artificial, automação comercial, automação industrial, contribui com a comunidade Open Source onde desenvolveu um Framework Java que está consolidado em diversos sistemas de grande porte (github), além de disponibilizar serviços OpenSource para emissão de Danfe e Nota Fiscal sem custo. Trabalha atualmente como Arquiteto de Software na Via Varejo S.A.

João Bosco Seixas (LinkedIn) MBA em gestão empresarial pela FGV e formado em computação. Atua promovendo transformação de equipes e empresas na área de tecnologia e desenvolvimento de software. Apaixonado por tecnologia, inovação e futurismo. Especialista em arquitetura de software com experiência em sistemas de alta complexidade, microservices e plataformas em cloud. Atualmente é Atualmente é Especialista em arquitetura de soluções digitais no Banco Itau S.A.

Marcelo Costa (LinkedIn) é pós-graduado em Engenharia de Software pela UNICAMP. Atua em sistemas de alta complexidade desde 2002, liderando equipes multidisciplinares no desenvolvimento de soluções de software nas áreas de varejo, aeroespacial, logística, educação, saúde e finanças. Especializa-se em liderança de equipes e arquiteturas de soluções, na coleta inteligente de informações na Internet e de conteúdo eletronicamente disponível; atualmente é consultor em Arquitetura de Soluções.

Avalie esse artigo

Relevância
Estilo/Redação

Olá visitante

Você precisa cadastrar-se no InfoQ Brasil ou para enviar comentários. Há muitas vantagens em se cadastrar.

Obtenha o máximo da experiência do InfoQ Brasil.

HTML é permitido: a,b,br,blockquote,i,li,pre,u,ul,p

Comentários da comunidade

HTML é permitido: a,b,br,blockquote,i,li,pre,u,ul,p

HTML é permitido: a,b,br,blockquote,i,li,pre,u,ul,p

BT

Seu cadastro no InfoQ está atualizado? Poderia rever suas informações?

Nota: se você alterar seu email, receberá uma mensagem de confirmação

Nome da empresa:
Cargo/papel na empresa:
Tamanho da empresa:
País:
Estado:
Você vai receber um email para validação do novo endereço. Esta janela pop-up fechará em instantes.