Serviço de Mensagens

O que é um corretor de mensagens?

Os aplicativos de software modernos evoluíram de uma única unidade monolítica para coleções de serviços fragilmente acoplados. Embora essa nova arquitetura traga muitas vantagens, esses serviços ainda precisam interagir uns com os outros, o que exige soluções de mensagens robustas e eficientes.

O Redis Streams duplica sua funcionalidade, servindo como um canal de comunicação para a criação de arquiteturas de streaming e como uma estrutura de dados de registro para persistência de dados, tornando o Streams a solução perfeita para o provisionamento de eventos.

O Redis Pub/Sub é um protocolo de mensagens extremamente leve, projetado para transmitir notificações ao vivo em um sistema. Ele é ideal para propagar mensagens de curta duração quando a baixa latência e a alta taxa de transferência são essenciais.

As listas Redis e os conjuntos ordenados Redis são a base para a implementação de filas de mensagens. Você pode usá-los diretamente para criar soluções personalizadas ou por meio de uma estrutura que torne o processamento de mensagens mais idiomático para a linguagem de programação de sua escolha.

Quais são os desafios na criação de soluções de corretor de mensagens?

1. A comunicação entre os serviços deve ser confiável

Quando um serviço deseja se comunicar com outro, nem sempre pode fazê-lo imediatamente. Todos nós sabemos que ocorrem falhas e que implantações independentes podem tornar um serviço indisponível por determinado período. Para aplicativos em escala, não é uma questão de “se” ou “quando” um serviço fica indisponível, mas com que frequência. Para atenuar esse problema, é melhor limitar a quantidade de comunicação síncrona entre os serviços (ou seja, chamar as APIs do serviço diretamente, por exemplo, enviando uma solicitação HTTP(S)) e preferir canais persistentes, sempre que possível, para que os serviços possam consumir as mensagens conforme sua conveniência. Os dois principais paradigmas para esse tipo de comunicação assíncrona são os fluxos de eventos e as filas de mensagens.

Filas de mensagens

  1. As filas de mensagens são baseadas em listas mutáveis e, às vezes, são consumidas por meio de ferramentas que ajudam a implementar padrões comuns. Essas são as duas principais diferenças entre as filas de mensagens e os fluxos de eventos: As filas de mensagens usam um tipo de comunicação “push”: como é um serviço, ele envia uma nova mensagem para a caixa de entrada de outro serviço sempre que algo novo precisa de atenção. Os fluxos funcionam de forma inversa.
  2. As mensagens contêm um estado mutável (por exemplo, o número de tentativas) e, quando processadas com êxito, são removidas do sistema. Os eventos de fluxo são imutáveis e o histórico, quando cortado, geralmente é armazenado a frio.

As listas Redis e os conjuntos classificados são os dois tipos de dados que implementam esse tipo de comportamento e ambos podem ser usados para criar soluções personalizadas, bem como backends para estruturas específicas do ecossistema, como Celery (Python), Bull (JavaScript), Sidekiq (Ruby), Machinery (Go) e muitos outros.

Fluxos de eventos

Os fluxos de eventos baseiam-se no tipo de dados de registro, que é extremamente eficiente na pesquisa de seu histórico e na adição de novos itens ao seu final. Essas duas propriedades tornam o registro imutável, uma ótima maneira de se comunicar e uma maneira eficiente de armazenar dados.

A comunicação por meio de um fluxo é diferente do uso de uma fila de mensagens. Como dissemos anteriormente, as filas de mensagens são “push”, enquanto os fluxos são “pull”. Na prática, isso significa que cada serviço escreve seu próprio fluxo e os outros serviços podem, opcionalmente, observá-lo (extraí-lo). Isso torna a comunicação um-para-muitos muito mais eficiente do que com as filas de mensagens.

As filas de mensagens funcionam melhor quando um serviço deseja que outro execute uma operação. Nessa situação, a fila de mensagens do segundo serviço atua como uma “caixa de entrada de solicitações”. Por outro lado, quando um serviço precisa publicar um evento (ou seja, uma mensagem que é de interesse de vários serviços), o serviço de publicação terá de enviar uma mensagem para a fila de cada serviço interessado no evento. Na prática, a maioria das ferramentas (por exemplo, Enterprise Service Buses) pode fazer isso de forma transparente, mas gerar e armazenar uma cópia da mensagem separadamente para cada destinatário é ineficiente.

Os fluxos de eventos superam as filas de mensagens em padrões de comunicação um-para-muitos invertendo o protocolo: há apenas uma cópia do evento original, e qualquer serviço que queira acessá-lo pode pesquisar o fluxo de eventos (ou seja, o fluxo do serviço de publicação) em seu próprio ritmo. Os fluxos de eventos têm essa vantagem prática sobre as filas de mensagens: não há necessidade de especificar os assinantes do evento com antecedência. Nas filas de mensagens, por outro lado, o sistema precisa saber para quais filas entregar uma cópia do evento, de modo que, se um novo serviço for adicionado posteriormente, ele só receberá os novos eventos. Com os fluxos, esse problema não existe: um novo serviço pode até mesmo percorrer todo o histórico de eventos, o que é muito útil para adicionar novas análises e poder calculá-las retroativamente. Em outras palavras, você não precisa calcular imediatamente todas as métricas que poderá precisar no futuro. Você pode acompanhar as que precisa agora e adicionar outras à medida que avança, pois sabe que poderá ver o histórico completo até mesmo das que forem adicionadas posteriormente.

2. O armazenamento deve ser eficiente em termos de espaço

A eficiência de espaço é uma propriedade bem-conceituada para todos os canais de comunicação que persistem em mensagens. No entanto, para fluxos de eventos, ela é fundamental, pois é usada com frequência para armazenar informações de longo prazo (você sabe que os logs imutáveis são rápidos para adicionar novas entradas e pesquisar o histórico).

O Redis Streams é uma implementação do registro imutável que usa árvores radix com a estrutura de dados subjacente. Cada entrada no fluxo é identificada por um registro de data e hora e pode conter um conjunto arbitrário de pares campo-valor. As entradas no mesmo fluxo podem ter campos diferentes, mas o Redis é capaz de compactar vários eventos em uma linha que compartilha o mesmo esquema. Lembre-se de que, se seus eventos tiverem um conjunto estável de campos, você não pagará uma taxa de armazenamento por nome de campo, o que lhe permitirá usar nomes de chave mais longos e descritivos sem qualquer inconveniente.

Você já aprendeu que os fluxos podem ser cortados para remover entradas mais antigas, e o histórico excluído geralmente é mantido em um formato de arquivo. Outro recurso do Redis Streams é a capacidade de marcar qualquer entrada no meio do fluxo como “excluída” para ajudar a cumprir regulamentos como o GDPR.

Desempenho de processamento em escala

Os fluxos de eventos e as filas de mensagens ajudam a lidar com as explosões de comunicação. No entanto, outro problema com a invocação direta da API é que os serviços podem ficar sobrecarregados quando há picos de tráfego. Os canais de comunicação assíncronos podem atuar como um buffer, o que ajuda a suavizar os picos, mas o desempenho do processamento precisa ser robusto o suficiente para não atrapalhar e sustentar o tráfego normal, caso contrário o sistema falhará e o buffer terá que crescer indefinidamente.

No Redis Streams, é possível aumentar a taxa de transferência de processamento lendo um fluxo em um grupo de consumidores. Os leitores que fazem parte do mesmo grupo de consumidores visualizam as mensagens de forma mutuamente exclusiva. Obviamente, o mesmo fluxo pode ter vários grupos de consumidores. Na prática, você desejará criar um grupo de consumidores separado para cada serviço, de modo que cada serviço possa acelerar várias instâncias de leitura para aumentar o paralelismo, conforme necessário.

3. A semântica das mensagens deve ser clara, tenha isso em mente.

Ao se comunicar de forma assíncrona, é essencial levar em conta possíveis cenários de falha. Por exemplo, uma instância de serviço pode falhar ou perder a conectividade durante o processamento de uma mensagem. Como as falhas de comunicação são inevitáveis, os sistemas de mensagens se enquadram em duas categorias: entrega no máximo uma vez e entrega no mínimo uma vez. (Alguns sistemas de mensagens afirmam fornecer exatamente uma entrega, mas esse não é o cenário completo. Em qualquer sistema de mensagens confiável, as mensagens ocasionalmente precisarão ser entregues mais de uma vez para superar as falhas. Infelizmente, esse é um recurso inevitável da comunicação em redes não confiáveis).

Para lidar corretamente com as falhas, todos os serviços que participam do sistema devem ser capazes de executar o processamento idempotente de mensagens. “Idempotente” significa que o estado do sistema não se altera em caso de entrega de mensagens duplicadas. A idempotência geralmente é obtida aplicando-se qualquer alteração de estado necessária e salvando a última mensagem processada atomicamente (por exemplo, em uma transação). Assim, o bug nunca permanecerá em um estado inconsistente em caso de falha, e o leitor poderá saber se uma determinada mensagem já foi processada ou não, verificando se o identificador da nova mensagem é anterior à última mensagem processada.

O Redis Streams, sendo um canal de comunicação de streaming confiável, é um sistema at-least-once. Ao ler um fluxo por meio de um grupo de consumidores, o Redis se lembra de qual evento foi enviado a qual usuário. Então, é dever de cada um reconhecer que a mensagem foi processada corretamente. Quando um consumidor não está mais ativo, um evento pode ficar preso. Para resolver esse problema, os grupos oferecem uma maneira de inspecionar o status das mensagens pendentes e, se necessário, reatribuir um evento a outro usuário.

Já dissemos que as transações (e operações atômicas) são a principal maneira de obter idempotência. Para ajudar a conseguir isso, o Redis Transactions e o scripting Lua permitem a composição de vários comandos com semântica transacional do tipo tudo ou nada.

O Redis Pub/Sub é um sistema de mensagens at-most-once que permite que os editores retransmitam mensagens para um ou mais canais. Especificamente, o Redis Pub/Sub foi projetado para comunicação em tempo real entre instâncias em que a baixa latência é de suma importância e, como tal, não depende de nenhuma forma de persistência ou reconhecimento. O resultado é um sistema de mensagens em tempo real que é o mais ágil possível, perfeito para aplicativos financeiros e de jogos em que cada milissegundo é importante.

Então, por que usar o Redis Enterprise para mensagens?

O Redis Enterprise é baseado em uma arquitetura simétrica e de uso compartilhado que permite que o tamanho dos conjuntos de dados cresça de forma suave e linear, sem exigir alterações no código do aplicativo.

O Redis Enterprise oferece vários modelos de alta disponibilidade e distribuição geográfica que permitem latências locais para seus usuários, se necessário.

Várias opções de persistência (AOF e snapshots por gravação ou por segundo) que não afetam o desempenho garantem que os servidores de banco de dados não precisem ser reconstruídos após uma falha.

O suporte a conjuntos de dados extremamente grandes por meio do uso de acesso inteligente em camadas à memória (RAM, memória persistente ou Flash) garante que os conjuntos de dados possam ser dimensionados de acordo com as demandas dos usuários sem afetar significativamente o desempenho.

Vamos ver como usar um Pub/Sub com o Redis Enterprise

O Redis Streams e o Pub/Sub têm APIs estáveis em diferentes linguagens de programação, portanto, os exemplos de Python a seguir podem ser facilmente traduzidos para a linguagem de sua escolha:

Conectando-se ao Redis:

import redis
# Conectar-se à instância local
r = redis.Redis(host='localhost', port=6379, db=0)

Escreva em um fluxo:

event = {"eventType": "purchase", "amount": 5, "item_id": "XXX"}
r.xadd("stream_key", '*', event)
# the `*` significa que redis gera automaticamente o ID do evento.

Ler um fluxo diretamente:

last_id = '$' # `$` significa apenas novas mensagens
while True:
events = r.xread({"stream_key": last_id}, block=0, count=10)
for _, e in events:
print(f"new event, amount: {e['amount']}")
last_id = e['id']

Leitura de um fluxo por meio de um grupo de consumidores:

# Comece lendo qualquer possível evento pendente
# que não tenha sido reconhecido anteriormente (por exemplo,
# devido a uma falha). "0" indica eventos pendentes.
pending = r.xreadgroup("service-1", "consumer-A", {"stream_key": "0"})
pending_ids = []
for _, e in pending:
print(f"old event found, amount: {e['amount']}")
pending_ids.append(e['id'])
# marcar eventos pendentes como processados
r.xack("stream_key", "service-1", *pending_ids)

# Agora que já lidamos com todos os eventos anteriores,
# começa a solicitar novos eventos. “>” indica “somente novos eventos”.
while True:
events = r.xreadgroup(“service-1”, “consumer-A”, {“stream_key”: “>”}, count=10)
event_ids = []
for _, e in events:
print(f”new event, amount: {e[‘amount’]}”)
event_ids.append(e[‘id’])
r.xack(“stream_key”, “service-1”, *event_ids)
# Se a falha ocorrer antes de `r.xack`, ao recarregar,
# testaremos esse lote de mensagens novamente.

Para processar alguns eventos, reconheça e aplique as alterações automaticamente:

while True:
events = r.xreadgroup("service-1", "consumer-A", {"stream_key": ">"}, count=10)
event_ids = []

# iniciar uma transação redis
transaction = r.multi()
for _, e in events:
transaction.incrby(f”item:{e[‘item_id’}:total”, e[‘amount’])
event_ids.append(e[‘id’])
transaction.xack(“stream_key”, “service-1”, *event_ids)
transaction.exec()
# Se ocorrer um erro antes que a transação seja executada, nenhuma das outras operações será executada.
# das outras operações será executada, garantindo assim a consistência.

Publicar para Pub/Sub:

# publicar uma mensagem no canal "redis
r.publish("redis", "hello world")

Assinar um canal no Pub/Sub:

sub = r.pubsub()
sub.subscribe("redis")
while True:
msg = sub.get_message()
print(f"new message: {msg['data']}")

Assinar um padrão em Pub/Sub:

sub = r.pubsub()
# essa assinatura retornará mensagens
# de todos os canais que começam com `red`.
sub.psubscribe("red*")
while True:
msg = sub.get_message()
print(f"new message in channel {msg['channel']}: {msg['data']}")


Saiba mais


Próximas etapas