Nachrichtenübermittlung

Was ist ein Message-Broker?

Moderne Software-Anwendungen haben sich von einer einzigen monolithischen Einheit in locker aneinander gekoppelte Sammlungen von Services verwandelt. Diese neue Architektur bringt viele Vorteile mit sich. Damit diese Services jedoch miteinander interagieren können, bedarf es robuster und effizienter Lösungen für die Nachrichtenübermittlung.

Redis Streams hat zwei Funktionen: es dient als Kommunikationskanal für den Aufbau von Streaming-Architekturen und als protokollähnliche Datenstruktur für persistente Daten. Somit ist Streams die perfekte Lösung für das Ereignis-Sourcing.

Redis Pub/Sub ist ein besonders schmales Nachrichtenübermittlungsprotokoll, das für die Live-Übertragung von Benachrichtigungen innerhalb eines Systems bestimmt ist. Es ist ideal für die Verbreitung von kurzlebigen Nachrichten, wenn eine niedrige Latenzzeit und ein hoher Durchsatz benötigt werden.

Redis Listen und Redis Sorted Sets sind die Basis für die Implementierung von Nachrichtenwarteschlangen. Sie können sowohl direkt für die Erstellung maßgeschneiderter Lösungen genutzt werden als auch zusammen mit einem Rahmenwerk, das die Nachrichtenbearbeitung für Ihre gewünschte Programmiersprache idiomatischer macht.

Herausforderungen bei der Erstellung von Message-Broker-Lösungen

1. Die Kommunikation zwischen Services muss zuverlässig sein

Wenn ein Service mit einem anderen Service kommunizieren möchte, kann er das nicht immer sofort tun. Manchmal treten Störungen auf und durch unabhängige Einsätze kann es passieren, dass ein Service für einige Zeit nicht verfügbar ist Für weitreichende Anwendungen ist die Frage nicht, “falls” oder “wann” ein Service nicht verfügbar wird, sondern wie häufig es vorkommt. Eine bewährte Methode für die Lösung dieses Problems ist, die Anzahl synchron laufender Kommunikationen zwischen den Services (die APIs des Services direkt aufrufen, z.B. durch das Senden einer HTTP(S)-Anfrage) zu beschränken und stattdessen, wann immer es sich anbietet, persistente Kanäle zu bevorzugen, damit die Services Nachrichten nach Wunsch verarbeiten können. Die beiden wichtigsten Paradigmen für diese Art asynchroner Kommunikation sind Ereignis-Streams und Nachrichtenwarteschlagen.

Nachrichtenwarteschlangen

  1. Nachrichtenwarteschlangen basieren auf wandelbaren Listen und werden manchmal durch Tools verarbeitet, die bei der Implementierung allgemeiner Muster helfen. Es gibt zwei Hauptunterschiede zwischen Nachrichtenwarteschlangen und Ereignis-Streams: Nachrichtenwarteschlangen nutzen einen “Push”-Kommunikationstyp – ein Service sendet eine Push-Nachricht in den Posteingang eines anderen Services, wann immer es etwas Neues gibt, dem Beachtung geschenkt werden soll. Streams arbeiten in entgegengesetzter Richtung.
  2. Die Nachrichten enthalten einen wandelbaren Status (z. B. die Anzahl der Neuversuche). Nach der erfolgreichen Bearbeitung werden sie aus dem System gelöscht. Stream-Ereignisse sind nicht wandelbar und der Verlauf wird häufig im Cold Storage gespeichert.

Redis Listen und Sorted Sets sind die beiden Datentypen, die diese Verhaltensart implementieren. Zudem können beide für die Erstellung maßgeschneiderter Lösungen angewandt werden sowie für die Erstellung von Backends für Ökosystem-spezifische Frameworks wie  Celery (Python), Bull (JavaScript), Sidekiq (Ruby), Machinery (Go), und viele andere.

Ereignis-Streams

Ereignis-Streams beruhen auf dem Protokolldatentyp, der höchst effizient den Verlauf durchsieht und neue Elemente an sich anhängt. Diese beiden Eigenschaften machen aus dem nicht wandelbaren Protokoll sowohl eine Kommunikationsprimitive als auch eine effiziente Methode der Datenspeicherung.

Das Kommunizieren über einen Stream funktioniert anders als der Einsatz einer Nachrichtenwarteschlange. Wie bereits erwähnt, handelt es sich um “Push”-Nachrichtenwarteschlangen und um “Pull”-Streams. In der Praxis bedeutet dies, dass jeder Service an seinen eigenen Stream schreibt und weitere Services dies gegebenfalls beobachten (d.h. “Pull”). Das macht eine 1:n-Kommunikation viel effizienter, als es mit Nachrichtenwarteschlangen möglich ist.

Nachrichtenwarteschlangen funktionieren am besten, wenn ein Service möchte, dass ein anderer Service einen Vorgang ausführt. In dieser Situation agiert die Nachrichtenwarteschlange des zweiten Services als “Request Inbox”. Wenn ein Service stattdessen ein Ereignis (d.h. eine Nachricht, die für viele Services von Interesse ist) veröffentlichen muss, muss dieser Service eine Push-Nachricht in die Warteschlange eines jeden Services senden, der an diesem Ereignis interessiert ist. In der Praxis sind die meisten Tools (z. B. Enterprise Services Buses) dazu in der Lage. Das Generieren und Speichern einer separaten Nachrichtenkopie für jeden Empfänger bleibt jedoch ineffizient.

Indem der Vorgang umgedreht wird, meistern Ereignis-Streams 1:n-Kommunikationen besser als Nachrichtenwarteschlangen: Bei Streams existiert nur eine Kopie des Originalereignisses. Jeder Service, der darauf zugreifen möchte, durchsucht den Ereignis-Stream (d.h. den Stream des veröffentlichenden Services) in seinem eigenen Tempo. Ereignis-Streams haben einen anderen praktischen Vorteil gegenüber Nachrichtenwarteschlangen: Die Ereignis-Abonnenten müssten nicht im Voraus festgelegt werden. Bei Nachrichtenwarteschlangen muss das System wissen, an welche Schlangen eine Kopie des Ereignisses gesendet werden soll. Wenn Sie also einen neuen Service erst später hinzufügen, erhält dieser nur neue Ereignisse. Mit Ereignis-Streams gibt es dieses Problem nicht – ein neuer Service kann den kompletten Ereignisverlauf durchsuchen. Damit lassen sich neue Analysen hinzufügen, die auch rückwirkend verarbeitet werden können. So müssen Sie nicht im Vornherein bereits alle Metriken bestimmen, die Sie vielleicht später benötigen werden. Da Sie sich darauf verlassen können, stets auf den kompletten Ereignisverlauf zugreifen zu können, können Sie zu Beginn einfach nur die Metriken nachverfolgen, die Sie sofort benötigen und mit der Zeit weitere hinzufügen.

2. Die Speicherung muss platzsparend sein

Die Platzeffizienz ist für alle Kommunikationskanäle, bei denen Nachrichten lange bestehen bleiben, von Vorteil. Für Ereignis-Streams ist sie jedoch fundamental, da sie häufig für langfristige Speicherung von Informationen benutzt werden. (Wir haben weiter oben erwähnt, dass nicht wandelbare Protokolle schnell neue Einträge hinzufügen und durch den Verlauf suchen können.)

Redis Streams ist eine Implementierung des nicht wandelbaren Protokolls, das einen Patricia-Trie als zugrundeliegende Datenstruktur verwendet. Jeder Stream-Eintrag wird durch einen Zeitstempel identifiziert und kann einen beliebiges Set aus Paaren mit Feldwerten enthalten. Einträge desselben Streams können unterschiedliche Felder enthalten. Redis ist jedoch in der Lage, mehrere Ereignisse mit dem selben Schema in einer Reihe zu komprimieren. Wenn Ihre Ereignisse stabile Datenfeldsätze haben, zahlen Sie also keinen Speicherpreis für jeden einzelnen Feldnamen. So können Sie längere und deskriptive Schlüsselnamen nutzen, ohne dass Ihnen dabei irgendwelche Nachteile entstehen.

Wie bereits erwähnt, können Streams getrimmt werden, um ältere Einträge zu entfernen. Der gelöschte Verlauf kann in einem Archivformat gespeichert werden. Eine weitere Eigenschaft von Redis Streams ist die Fähigkeit, jeden Mid-Stream-Eintrag als “gelöscht” zu markieren, um bei der Einhaltung der Regelungen wie DSGVO zu helfen.

Skalieren von Bearbeitungsdurchsatz

Ereignis-Streams und Nachrichtenwarteschlangen helfen, intensive Kommunikation zu bewältigen. Ein weiteres Problem der direkten API-Invokation ist jedoch, dass Services bei Verkehrsstoßzeiten schnell überfordert sind. Asynchrone Kommunikationskanäle können als Puffer agieren, der beim Bewältigen von Spitzenzeiten helfen kann. Der Bearbeitungsdurchsatz muss jedoch robust genug sein, um normalem Traffic gewachsen zu sein. Andernfalls bricht das System zusammen und der Puffer muss ins Unendliche wachsen.

In Redis Stream kann der Bearbeitungsdurchsatz erhöht werden, indem ein Stream durch eine Verbrauchergruppe gelesen wird. Leser, die Teil derselben Verbrauchergruppe sind, können Nachrichten sehen, die anderen nicht zugänglich sind. Natürlich kann ein einziger Stream auch mehrere Verbrauchergruppen beinhalten. Es ist jedoch sinnvoll eine separate Verbrauchergruppe für jeden Service zu erstellen. So kann jeder Service mehrere Leseinstanzen durchgehen, und der Parallelismus kann nach Bedarf erhöht werden.

3. Die Semantik der Nachrichtenübermittlung muss klar sein

Bei asynchroner Kommunikation ist es wichtig, mögliche Ausfallsituationen im Auge zu behalten. Eine Service-Instanz kann zum Beispiel beim Bearbeiten einer Nachricht abstürzen oder die Konnektivität verlieren. Weil Kommunikationsausfälle unvermeidbar sind, sind Nachrichtenübermittlungssysteme in zwei Kategorien aufgeteilt: At-Most-Once- und At-Least-Once- Lieferungen. (Einige Nachrichtenübermittlungssysteme behaupten eine exactly-once Lieferung anzubieten, aber das ist nicht ganz korrekt. In jedem zuverlässigen Nachrichtenübermittlungssystem müssen Nachrichten gelegentlich mehr als einmal gesendet werden, um Ausfälle zu bewältigen. Das ist unvermeidlich bei der Kommunikation über unzuverlässige Netzwerke.)

Um Ausfälle korrekt zu bewältigen, müssen alle in diesem System beteiligten Services in der Lage sein, eine idempotente Nachrichtenbearbeitung auszuführen. ‘Idempotent’ bedeutet, dass der Status des Systems sich im Fall einer doppelten Nachrichtenlieferung nicht ändert. Idempotenz wird durch die Annahme jeder erforderlichen Statusänderung und durch die Speicherung der letzten atomar bearbeiteten Nachricht (z. B. in einer Transaktion) erreicht. Auf diese Weise führt ein Ausfall nicht zu einem inkonsistenten Zustand. Der Leser kann sehen, ob eine bestimmte Nachricht bereits bearbeitet wurde oder nicht. Dafür überprüft er, ob die neue Nachrichtenerkennung der letzten bearbeiteten Nachricht vorangeht.

Redis Streams ist ein zuverlässiger Streaming-Kommunikationskanal und ein  At-Least-Once- System. Beim Lesen eines Streams durch eine Verbrauchergruppe erinnert sich Redis daran, welches Ereignis an welchen Verbraucher gesendet wurde. Der Verbraucher hat dann die Pflicht, ordnungsgemäß zu bestätigen, dass die Nachricht erfolgreich bearbeitet wurde. Wenn ein Verbraucher wegfällt, kann es passieren, dass ein Ereignis steckenbleibt. Um dieses Problem zu lösen, bieten Verbrauchergruppen eine Methode, den Zustand offener Nachrichten zu überprüfen und falls erforderlich, ein Ereignis einem anderen Verbraucher zuzuweisen.

Wie oben bereits erwähnt, erreicht man Idempotenz hauptsächlich durch Transaktionen (und durch atomare Vorgänge). Als Hilfe ermöglichen Redis Transactions und Lua Scripting die Zusammensetzung mehrerer Befehle mit transaktionaler All-or-Nothing-Semantik.

Redis Pub/Sub ist ein At-Most-Once- Nachrichtenübermittlungssystem, mit dem an einen oder mehrere Kanäle Nachrichten übermittelt werden können. Redis Pub/Sub ist für die Echtzeit-Kommunikation zwischen Instanzen bestimmt, bei denen die niedrige Latenzzeit von höchster Bedeutung ist. Es bietet daher keine Form von Persistenz oder Bestätigung. Das Ergebnis ist das schlankste Echtzeit-Nachrichtenübermittlungssystem überhaupt. Es eignet sich damit ideal für Finanz- und Gaming-Anwendungen, bei denen jede Millisekunde zählt.

Warum Redis Entreprise für die Nachrichtenübermittlung?

Redis Enterprise basiert auf einer symmetrischen Shared-Nothing Architektur , die für ein lineares und nahtloses Wachstum von Datensatzgrößen sorgt, ohne Änderungen am Anwendungscode zu fordern.

Das Angebot von Redis Enterprise umfasst verschiedene Modelle von großer Verfügbarkeit und geografischer Verteilung, die Ihren Nutzern bei Bedarf lokale Latenzen ermöglichen.

Mehrere Persistenzoptionen (AOF pro Schreibvorgang oder pro Sekunden und Snapshots), die die Leistung nicht beeinflussen, garantieren, dass Sie Ihre Datenbankserver nach Ausfällen nicht wieder neu aufbauen müssen.

Die Unterstützung von extrem großen Datensätzen mit intelligentem, stufenweisem Zugriff auf Speicher (RAM, persistenter Speicher oder Flash) garantiert, dass Sie Ihre Datensätze skalieren können, um die Anfragen Ihrer Benutzer ohne erhebliche Beeinträchtigung der Leistung zu erfüllen.

Anwendung eines Pub/Sub mit Redis Enterprise

Redis Streams und Pub/Sub bieten stabile APIs in verschiedenen Programmiersprachen. Die folgenden Python-Beispiele können also leicht in Ihre gewünschte Sprache übersetzt werden.

Verbindungsherstellung mit Redis:

import redis
# Verbindung mit einer lokalen Redis Instanz
r = redis.Redis(host='localhost', port=6379, db=0)

Writing to a stream:

event = {"eventType": "purchase", "amount": 5, "item_id": "XXX"}
r.xadd("stream_key", '*', event)
# `*` bedeutet, dass Redis automatisch eine Ereignis-ID generiert

Einen Stream direkt lesen:

last_id = '$' # `$` bedeutet nur neue Nachrichten
while True:
events = r.xread({"stream_key": last_id}, block=0, count=10)
for _, e in events:
print(f"new event, amount: {e['amount']}")
last_id = e['id']

Einen Stream durch eine Verbrauchergruppe lesen:

# Beginnen Sie mit dem Lesen aller potentiellen offenen Ereignisse
#, die vorher nicht bestätigt wurden (z. B.,
# aufgrund eines Absturzes). "0" gibt offene Ereignisse an.
pending = r.xreadgroup("service-1", "consumer-A", {"stream_key": "0"})
pending_ids = []
for _, e in pending:
print(f"old event found, amount: {e['amount']}")
pending_ids.append(e['id'])
# mark pending events as processed
r.xack("stream_key", "service-1", *pending_ids)

# Jetzt, da wir alle vorherigen Ereignisse bearbeitet haben,
# beginnen Sie, nach neuen zu fragen. “>” gibt “nur neue Ereignisse” an.
while True:
events = r.xreadgroup(“service-1”, “consumer-A”, {“stream_key”: “>”}, count=10)
event_ids = []
for _, e in events:
print(f”new event, amount: {e[‘amount’]}”)
event_ids.append(e[‘id’])
r.xack(“stream_key”, “service-1”, *event_ids)
# Wenn wir vor `r.xack` beim erneuten Laden abstürzen,
# versuchen wir es bei diesem Nachrichten-Batch erneut.

Einige Ereignisse bearbeiten, Änderungen atomor bestätigen und anwenden:

während Wahr:
events = r.xreadgroup("service-1", "consumer-A", {"stream_key": ">"}, count=10)
event_ids = []

# eine Redis Transaktion initiieren
transaction = r.multi()
für _, e in Ereignissen:
transaction.incrby(f”item:{e[‘item_id’}:total”, e[‘amount’])
event_ids.append(e[‘id’])
transaction.xack(“stream_key”, “service-1”, *event_ids)
transaction.exec()
# Wenn wir vor Ausführen der Transaktion abstürzen, wird aus Konsistenzgründen keine
# der anderen Vorgänge stattfinden.

Veröffentlichen auf Pub/Sub:

# eine Nachricht in dem `Redis`-Kanal veröffentlichen
r.publish("redis", "hello world")

Einen Kanal auf Pub/Sub abonnieren:

sub = r.pubsub()
sub.subscribe("redis")
while True:
msg = sub.get_message()
print(f"new message: {msg['data']}")

Ein Muster auf Pub/Sub abonnieren:

sub = r.pubsub()
# dieses Abonnement sendet Nachrichten
# von allen Kanälen zurück, die mit `red` starten.
sub.psubscribe("red*")
while True:
msg = sub.get_message()
print(f"new message in channel {msg['channel']}: {msg['data']}")


Mehr entdecken


Nächste Schritte