Mensajería

¿Qué es un agente de mensajes?

Las aplicaciones de software modernas han pasado de ser una única unidad monolítica a ser colecciones de servicios débilmente acoplados. Si bien esta nueva arquitectura te aporta muchas ventajas, esos servicios siguen necesitando interactuar entre sí, lo que hace necesario disponer de soluciones de mensajería sólidas y eficaces.

Redis Streams duplica su funcionalidad, sirviendo como un canal de comunicación para la construcción de arquitecturas de streaming y como una estructura de datos tipo log (registro) para la persistencia de datos, lo que hace que Streams sea la solución perfecta para el abastecimiento de eventos.

Redis Pub/Sub es un protocolo de mensajería extremadamente ligero diseñado para transmitir notificaciones en vivo dentro de un sistema. Es ideal para propagar mensajes de corta duración cuando la baja latencia y el gran rendimiento son fundamentales.

Laslistas de Redis y los conjuntos ordenados de Redis son la base para implementar colas de mensajes. Puedes utilizarlas directamente para crear soluciones a medida, o a través de un marco de trabajo que hace que el procesamiento de mensajes sea más idiomático para el lenguaje de programación de su elección.

¿Cuáles son los desafíos en la creación de soluciones de corredores de mensajes?

1. La comunicación entre servicios debe ser fiable

Cuando un servicio quiere comunicarse con otro, no siempre puede hacerlo de inmediato. Sabemos todos que los fallos ocurren y los despliegues independientes pueden hacer que un servicio no esté disponible durante determinados períodos de tiempo. Para las aplicaciones a escala, no es cuestión de «si» o «cuándo» un servicio deja de estar disponible, sino de con qué frecuencia. Para mitigar este problema, lo mejor es limitar la cantidad de comunicación sincrónica entre los servicios (es decir, llamar directamente a las API del servicio, por ejemplo, enviando una solicitud HTTP(S)) y preferir los canales persistentes, siempre que te resulte práctico, para que los servicios puedan consumir los mensajes a su conveniencia. Los dos principales paradigmas de este tipo de comunicación asíncrona son los flujos de eventos y las colas de mensajes.

Colas de mensajes

  1. Las colas de mensajes se basan en listas mutables y a veces se consumen a través de herramientas que ayudan a implementar patrones comunes. Estas son las dos diferencias principales entre las colas de mensajes y los flujos de eventos: Las colas de mensajes utilizan un tipo de comunicación «push»: porque es un servicio empuja un nuevo mensaje a la bandeja de entrada de otro servicio cada vez que algo nuevo necesita atención. Los flujos funcionan al revés.
  2. Los mensajes contienen un estado mutable (por ejemplo, el número de reintentos) y, cuando se procesan con éxito, se eliminan del sistema. Los eventos de flujos son inmutables y el historial, cuando se recorta, suele guardarse en frío.

Redis Lists y Sorted Sets son los dos tipos de datos que implementan este tipo de comportamiento y ambos pueden utilizarse para construir soluciones a tu medida, así como backends para frameworks específicos del ecosistema como Celery (Python), Bull (JavaScript), Sidekiq (Ruby), Machinery (Go) y muchos otros.

Flujos de eventos

Los flujos de eventos se basan en el tipo de datos de registro, que es extremadamente eficiente en la búsqueda a través de su historial y la adición de nuevos elementos a su final. Gracias a estas dos propiedades hacen que el registro inmutable sea tanto una gran forma de comunicación como una forma eficiente de almacenar datos.

Comunicarse a través de un flujo es diferente a utilizar una cola de mensajes. Como te hemos dicho anteriormente, las colas de mensajes son «push», mientras que los flujos son «pull». En la práctica, esto significa que cada servicio escribe su propio flujo y el resto de servicios pueden, opcionalmente, observarlo (extraerlo). Esto hace que la comunicación de uno a muchos sea mucho más eficiente que con las colas de mensajes.

Las colas de mensajes funcionan mejor cuando un servicio quiere que otro realice una operación. En esa situación, la cola de mensajes del segundo servicio actúa como «bandeja de entrada de solicitudes». En cambio, cuando un servicio necesita publicar un evento (es decir, un mensaje que interesa a varios servicios), el servicio que lo publica tendrá que enviar un mensaje a la cola de cada servicio interesado en el evento. En la práctica, la mayoría de las herramientas (por ejemplo, Enterprise Service Buses) pueden hacerlo de forma transparente, sin embargo, generar y almacenar una copia del mensaje por separado para cada destinatario es ineficiente.

Los flujos de eventos superan a las colas de mensajes en los patrones de comunicación de uno a muchos invirtiendo el protocolo: solo existe una copia del evento original, y cualquier servicio que quiera acceder a él puede buscar a través del flujo de eventos (es decir, el flujo del servicio de publicación) a su propio ritmo. Los flujos de eventos tienen esta ventaja práctica sobre las colas de mensajes: no es necesario especificar los suscriptores del evento con antelación. En las colas de mensajes, en cambio, el sistema necesita saber a qué colas debe entregar una copia del evento, de modo que si se añade un nuevo servicio más tarde, solo recibirá los nuevos eventos. Con los flujos de eventos, este problema no existe: incluso se puede hacer que un nuevo servicio recorra todo el historial de eventos, lo que es de gran utilidad para añadir nuevos análisis y poder calcularlos de forma retroactiva. Es decir, no es necesario elaborar inmediatamente cada métrica que pueda necesitar en el futuro. Puedes hacer un seguimiento de las que necesitas ahora y añadir más a medida que avanza porque sabes que podrás ver el historial completo incluso de las que se añadan más tarde.

2. El almacenamiento debe ser eficiente en cuanto a espacio

La eficiencia del espacio es una propiedad bien vista en todos los canales de comunicación que persisten en los mensajes. Sin embargo, para los flujos de eventos es fundamental, ya que suelen utilizarse para almacenar información a largo plazo. (Ya sabes que los registros inmutables son rápidos a la hora de añadir nuevas entradas y de buscar en el historial).

Redis Streams es una implementación del registro inmutable que utiliza árboles de radix como estructura de datos subyacente. Cada entrada del flujo se identifica con una marca de tiempo y puede contener un conjunto arbitrario de pares campo-valor. Las entradas del mismo flujo pueden tener diferentes campos, pero Redis es capaz de comprimir múltiples eventos en una fila que comparten el mismo esquema. Recuerda, si tus eventos tienen un conjunto estable de campos no pagarás un precio de almacenamiento por cada nombre de campo, permitiéndote utilizar nombres de clave más largos y descriptivos sin ningún inconveniente.

Ya has aprendido que los flujos pueden recortarse para eliminar las entradas más antiguas y el historial eliminado suele conservarse en un formato de archivo. Otra característica de Redis Streams es la capacidad de marcar cualquier entrada a mitad de flujo como «eliminada» para ayudar a cumplir con regulaciones como el RGPD.

Escalar el rendimiento del procesamiento

Los flujos de eventos y las colas de mensajes ayudan a hacer frente a las ráfagas de comunicación. Sin embargo, otro problema de la invocación directa de la API es que los servicios pueden verse desbordados cuando el tráfico se dispara. Los canales de comunicación asíncronos pueden actuar como un buffer, lo que ayuda a suavizar los picos, pero el rendimiento del procesamiento tiene que ser lo suficientemente robusto para no liarla y para sostener el tráfico normal o el sistema se colapsará y el buffer tendrá que crecer indefinidamente.

En Redis Streams es posible aumentar el rendimiento del procesamiento leyendo un flujo a través de un grupo de consumidores. Los lectores que forman parte del mismo grupo de consumidores ven los mensajes de forma mutuamente excluyente. Por supuesto, un mismo flujo puede tener varios grupos de consumidores. En la práctica, querrás crear un grupo de consumidores separado para cada servicio, de modo que cada servicio pueda acelerar varias instancias de lectura para aumentar el paralelismo según sea necesario.

3. La semántica de los mensajes debe ser clara, tenlo en cuenta

Cuando se comunica de forma asíncrona, es fundamental tener en cuenta los posibles escenarios de fallo. Por ejemplo, una instancia de servicio puede bloquearse o perder la conectividad mientras se procesa un mensaje. Dado que los fallos de comunicación son inevitables, los sistemas de mensajería se dividen en dos categorías: at-most-once y at-least-once delivery. (Algunos sistemas de mensajería afirman que ofrecen exactamente una entrega, pero no es la imagen completa. En cualquier sistema de mensajería fiable, los mensajes tendrán que entregarse ocasionalmente más de una vez para superar los fallos. Esta es, desafortunadamente, una característica inevitable de la comunicación a través de redes poco fiables).

Para gestionar correctamente los fallos, todos los servicios que participan en el sistema deben ser capaces de realizar un procesamiento de mensajes idempotente. «Idempotente» significa que el estado del sistema no cambia en caso de entrega de mensajes duplicados. La idempotencia por lo general se consigue aplicando cualquier cambio de estado necesario y guardando el último mensaje procesado atómicamente (por ejemplo, en una transacción). Así, el fallo nunca quedará en un estado inconsistente en caso de fallo, y el lector podrá saber si un determinado mensaje ya fue procesado o no comprobando si el nuevo identificador de mensaje es anterior al último mensaje procesado.

Redis Streams, al ser un canal de comunicación de streaming fiable, es un sistema  at-least-once  Al leer un flujo a través de un grupo de consumidores, Redis recuerda qué evento fue enviado a qué consumidor. Es el deber del consumidor entonces reconocer adecuadamente que el mensaje se ha procesado correctamente. Cuando un consumidor deja de estar activo, un evento puede quedar atascado. Para resolver este problema, los grupos de consumidores ofrecen una forma de inspeccionar el estado de los mensajes pendientes y, en caso necesario, reasignar un evento a otro consumidor.

Ya hemos dicho que las transacciones (y las operaciones atómicas) son la principal forma de lograr la idempotencia. Para ayudar a lograrlo,& Redis Transactionsy Lua scripting permiten la composición de múltiples comandos con semántica transaccional de todo o nada.

Redis Pub/Sub es un sistema de mensajería  at-most-once que permire a los editores retransmitir mensajes a uno o más canales. En concreto Redis Pub/Sub está diseñado para la comunicación en tiempo real entre instancias donde la baja latencia es de suma importancia, y como tal no cuenta con ninguna forma de persistencia o reconocimiento. Se obtiene así es un sistema de mensajería en tiempo real lo más ágil posible, perfecto para aplicaciones financieras y de juegos, donde cada milisegundo es importante.

Y ¿por qué utilizar Redis Enterprise para mensajería?

Redis Enterprise se basa en una a arquitectura simétrica y de uso compartido  que permite que el tamaño de los conjuntos de datos crezca de forma lineal y sin problemas, sin necesidad de realizar cambios en el código de la aplicación.

Redis Enterprise ofrece múltiples modelos de alta disponibilidad y distribución geográfica que permiten latencias locales para sus usuarios en caso necesario.

Las múltiples opciones de persistencia (AOF por escritura o por segundo e instantáneas) que no afectan al rendimiento garantizan que no será necesario reconstruir los servidores de bases de datos tras haber experimentado un fallo.

La compatibilidad con  conjuntos de datos extremadamente grandes  mediante la utilización de un acceso inteligente por niveles a la memoria (RAM, memoria persistente o Flash) garantiza la posibilidad de escalar los conjuntos de datos con las demandas de los usuarios sin afectar significativamente al rendimiento.

Veamos cómo utilizar un Pub/Sub con Redis Enterprise

Redis Streams y Pub/Sub tienen APIs estables a través de diferentes lenguajes de programación, por lo que los siguientes ejemplos de Python pueden ser fácilmente traducidos al lenguaje que tú elijas:

Conexión a Redis:

import redis
# Conectar a instancia local
r = redis.Redis(host='localhost', port=6379, db=0)

Escribir en un flujo:

event = {"eventType": "purchase", "amount": 5, "item_id": "XXX"}
r.xadd("stream_key", '*', event)
# the `*` significa que redis genera de forma automática la id del evento

Leer un flujo directamente:

last_id = '$' # `$` significa solo nuevos mensajes
while True:
events = r.xread({"stream_key": last_id}, block=0, count=10)
for _, e in events:
print(f"new event, amount: {e['amount']}")
last_id = e['id']

Leer un flujo a través de un grupo de consumidores:

# Comienza por leer cualquier posible evento pendiente
# que no se hubiese reconocido anteriormente(por ejemplo,
# debido a un fallo). "0" indica eventos pendientes.
pending = r.xreadgroup("service-1", "consumer-A", {"stream_key": "0"})
pending_ids = []
for _, e in pending:
print(f"old event found, amount: {e['amount']}")
pending_ids.append(e['id'])
# marcar los eventos pendientes como procesados
r.xack("stream_key", "service-1", *pending_ids)

# Ahora que hemos manejado todos los eventos anteriores,
# comienza a preguntar por los nuevos. “>” indica “solo eventos nuevos”.
while True:
events = r.xreadgroup(“service-1”, “consumer-A”, {“stream_key”: “>”}, count=10)
event_ids = []
for _, e in events:
print(f”new event, amount: {e[‘amount’]}”)
event_ids.append(e[‘id’])
r.xack(“stream_key”, “service-1”, *event_ids)
# Si el fallo se produce antes de`r.xack`, en la recarga,
# probaremos de nuevo este lote de mensajes.

Para procesar algunos eventos, reconocer y aplicar cambios automáticamente:

while True:
events = r.xreadgroup("service-1", "consumer-A", {"stream_key": ">"}, count=10)
event_ids = []

# iniciar una transacción redis
transaction = r.multi()
for _, e in events:
transaction.incrby(f”item:{e[‘item_id’}:total”, e[‘amount’])
event_ids.append(e[‘id’])
transaction.xack(“stream_key”, “service-1”, *event_ids)
transaction.exec()
# Si se produce un error antes de llevar a cabo la transacción, ninguna
# de las otras operaciones se llevará a cabo, asegurando así la consistencia.

Publicar en Pub/Sub:

# publicar un mensaje en el canal «redis»
r.publish("redis", "hello world")

Suscribirse a un canal en Pub/Sub:

sub = r.pubsub()
sub.subscribe("redis")
while True:
msg = sub.get_message()
print(f"new message: {msg['data']}")

Suscribirse a un patrón en Pub/Sub:

sub = r.pubsub()
# esta suscripción devolverá mensajes
# de todos los canales que comiencen con `red`.
sub.psubscribe("red*")
while True:
msg = sub.get_message()
print(f"new message in channel {msg['channel']}: {msg['data']}")


Descubre más


Próximos pasos