Schnelles Einlesen von Daten

Was ist Fast Data Ingestion?

Das Sammeln, Speichern und Bearbeiten von großen Mengen unterschiedlicher Hochgeschwindigkeitsdaten ist eine komplexe Herausforderungen. Das gilt besonders in Bereichen wie Internet of Things (IoT), e-Commerce, Sicherheit, Kommunikationen, Unterhaltung, Finanzwesen und Einzelhandel. In diesen Bereichen ist reaktionsschnelle, zeitgerechte und genaue Entscheidungsfindung von zentraler Bedeutung. Die Sammlung und Analyse von Daten in Echtzeit ist für diese Unternehmen überlebenswichtig.

Für die Echtzeit-Datenanalyse ist es wichtig, dass genug Ressourcen verfügbar sind, um schnelle Datenstreams effektiv zu erfassen. Die physische Infrastruktur (Hochgeschwindigkeits-Netzwerke, Rechenleistung, Speicher und Arbeitsspeicher) spielt dafür eine wichtige Rolle. Es muss jedoch auch darauf geachtet werden, dass die angewandte Software den Leistung der physischen Infrastruktur entspricht. Andernfalls droht den Firmen Datenstau, oder fehlende beziehungsweise unvollständige Daten.

Herausforderungen und bewährte Praktiken des Fast Data Ingest

Eine Hochgeschwindigkeits-Datenaufnahme bringt häufig verschiedene Arten von Komplexitäten mit sich:

  1. Große Datenvolumen, die schubweise ankommen:  Eine gute Lösung muss auch große Datenvolumen, die schubweise ankommen, mit minimaler Latenzzeit bearbeiten können. Sie sollte Millionen von Schreibvorgängen pro Sekunden mit einer Sub-Millisekunden-Latenzzeit mithilfe minimaler Ressourcen realisieren können.
  2. Daten aus mehreren Quellen/Formaten: Lösungen für die Datenaufnahme müssen flexibel genug sein, Daten in verschiedenen Formaten zu handhaben. Dabei sollten sie die Identität der Quelle wenn nötig behalten und in Echtzeit umwandeln oder normalisieren.
  3. Daten, die gefiltert, analysiert oder weitergeleitet werden müssen: Die meisten Lösungen für Datenaufnahme haben einen oder mehrere Empfänger, die Daten verarbeiten. Dabei handelt es sich oft um verschiedene Anwendungen, die an denselben oder an anderen Standorten mit unterschiedlichen Annahmen fungieren. In solchen Fällen muss die Datenbank nicht nur die Daten umwandeln, sondern auch je nach den Anforderungen der konsumierenden Anwendungen die Daten filtern oder zusammenfassen.
  4. Einen stetigen Datenkanal zwischen Datenquellen und verschiedenen Arten von Datensenkenn verwalten: Wenn Daten in keinem kontinuierlichen Rhythmus ankommen, benötigen Datenquellen und Datensenken einen Kanal, über den die Daten asynchron übertragen werden können. Der Kanal muss auch gegen Verbindungsverlust und Hardwarefehler gewappnet sein. Es kommt oft vor, dass Produzent und Konsument nicht in der gleichen Geschwindigkeit operieren. Das kann zu Datenrückständen führen, durch die Konsumenten erst verspätet mit den Daten arbeiten können.
  5. Daten aus geografisch verteilten Quellen: Hier ist es für die grundlegende Architektur häufig sinnvoll, Datenerfassungsknoten nahe der Quelle zu verteilen. Auf diese Weise werden die Knoten selbst Teil der Lösung zur schnellen Datenaufnahme, um Daten zu sammeln, zu bearbeiten, zu übertragen oder umzuleiten.

Schnelle Datenaufnahme – Ein Kinderspiel für Redis Enterprise

Hohe Leistung mit der kleinsten Anzahl an Servern
Redis Enterprise kann mehr als 200 Millionen Lese-/Schreibvorgänge pro Sekunde handhaben, mit Latenzzeiten von Sub-Millisekunden mit nur einem 40-Knoten-Cluster auf AWS . Das macht Redis Enterprise zur Ressourcen-effizientesten NoSQL-Datenbank auf dem Markt.

Flexible Datenstrukturen und Module für Echtzeitanalysen: Redis Streams, Pub/Sub, Listen, Sorted Sets, RedisTimeSeries
Redis bietet eine Vielfalt von Datenstrukturen wie Streams, Listen, Mengen, Sorted Sets und Hashes, die eine einfache und vielseitige Datenbearbeitung ermöglichen. So wird Hochleistungs-Datenaufnahme mit Echtzeitanalysen effizient kombiniert.

Redis’ Pub/Sub-Fähigkeiten ermöglichen eine Handlung als effizienter Message-Broker zwischen geografisch verteilten Datenaufnahmeknoten. Daten-produzierende Anwendungen speichern Streaming-Daten in Kanälen in dem/den geforderten Format(en). Konsumierende Anwendungen abonnieren diese Kanäle, die für sie relevant sind. Sie empfangen Nachrichten asynchron, im Rhythmus in dem sie gespeichert werden.

Listen und Sorted Sets können als Datenkanäle eingesetzt werden, die Produzenten und Konsumenten miteinander verbinden. Sie können diese Datenstrukturen auch nutzen, um Daten asynchron zu übermitteln. Im Gegensatz zu Pub/Sub bieten Listen und Sorted Sets Persistenz.

Streams können sogar noch mehr: Sie bieten einen persistenten Datenaufnahmekanal zwischen sProduzenten und Konsumenten. Mit Streams können Sie die Anzahl der Konsumenten, die Konsumentenrgruppen nutzen, anpassen. Konsumentengruppen implementieren zudem transaktionsähnliche Datensicherheit, wenn Konsumenten beim Konsumieren und Bearbeiten von Daten scheitern.

Außerdem bietet RedisTimeSeries verbesserte Funktionen für eine schnelle Datenaufnahme, wie Downsampling, spezielle Zählvorgänge zum letzten gespeicherten Wert und eine doppelte Delta-Kompression in Kombination mit Echtzeit-Analysefähigkeiten wie für Datenkennzeichnung mit integrierter Suche, Aggregation, Bereichsanfragen und einen integrierten Konnektor mit Best-of-Class Überwachungs- und Analyse-Tools wie Grafana und Prometheus.

Active-Active Geo-Verteilung
Redis Enterprises CRDTs-basierte Active-Active-Technologie  ermöglicht eine komplexe Datenaufnahme und Nachrichtenübermittlungsvorgänge an geografisch verteilten -Standorten. Die Anwendung kann komplett verteilt eingesetzt werden, um die Verfügbarkeit und Reaktionszeit der Anwendung zu verbessern.

Redis DRAM mit SSD und persistentem Speicher erweitern
Redis Enterprises Redis on Flash -Technologie ermöglicht die Erweiterung von DRAM mit SSD und persistentem Speicher. Dadurch ist wiederum die Speicherung sehr großer Datenmengen von mehreren Terabytes mit denselben Infrastrukturkosten einer Festplatten-basierten Datenbank möglich. Gleichzeitig werden Latenzzeiten im Sub-Millisekunden-Bereich gehalten, selbst bei der Aufnahme von mehr als 1 Million Elementen/Sek auf jedem Knoten des Redis Enterprise Clusters.

Implementierung von schneller Datenaufnahme mit Redis

Hier sind ein paar in Java geschriebene Code-Snippets. Sie alle nutzen die Jedis-Bibliothek. Folgen Sie zuerst den Anweisungen auf Jedis’  Erste-Schritte-Seite  zum Downloaden der neuesten Version von Jedis.

  1. Schnelle Datenaufnahme mit Redis Streams
    1. Schicken Sie eine Nachricht auf einer Stream-Datenstruktur. Dieses Programm nutzt XADD, um neue Elemente zum Stream hinzuzufügen. Dateiname: StreamPublish.java. import java.util.HashMap;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntryID;public class StreamPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {
      try {
      Map<String, String> kv = new HashMap<String, String>();
      kv.put(“a”, “100”); // key -> a; value -> 100
      jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
      }finally {
      jedis.close();
      }
      }
      }
    2. Verarbeiten Sie Daten aus einem Stream asynchron. Warten Sie auf die Nachricht, wenn der Stream leer ist. Dieses Programm verwendet den XREAD-Befehl. Dateiname: StreamConsumeAsync.java.import java.util.AbstractMap.SimpleEntry;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Map.Entry;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamConsumeAsync{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{// Start from 0. For subsequent queries, read from last id + 1
      String lastStreamDataId = “0-0”;
      int count = 1000;
      long waitTimeInMillis = 5000;try {
      // Read new data asynchronously in a loop
      while(true) {
      List next = getNext(“MyStream”, lastStreamDataId,
      count, waitTimeInMillis);
      if(next != null) {
      List<StreamEntry> stList = getStreamEntries(next);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); // Read the fields (key-value pairs) of data stream
      Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
      lastStreamDataId = streamData.getID().getTime()
      +”-”
      +(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
      System.out.println(lastStreamDataId);
      }
      }else{
      System.out.println(“No new data in the stream”);
      }
      }
      } }finally {
      jedis.close();
      }
      } // Read the next set of data from the stream
      private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception{
      HashMap<String, StreamEntryID> map = new HashMap();
      String readFrom = lastId;
      map.put(streamId, new StreamEntryID(readFrom));
      List list = jedis.xread(count, waitTimeInMillis,
      (Entry<String, StreamEntryID>)
      map.entrySet().toArray()[0]);
      return list;
      } // Read stream entries
      // Assumes streamList has only one stream
      private static List<StreamEntry> getStreamEntries(List streamList) throws Exception{
      if(streamList.size()>0) {
      SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
      return (List<StreamEntry>) stEntry.getValue();
      } return null;
      }
      }
    3. Fragen Sie einen Stream mit XRANGE-Befehl ab. Dateiname: StreamQuery.java import java.util.List;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamQuery{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{String streamID = “MyStream”;
      StreamEntryID start = new StreamEntryID(0,0);
      StreamEntryID end = null; // null -> until the last item in the stream
      int count = 2;try {
      List<StreamEntry> stList = jedis.xrange(streamID, start, end, count);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); System.out.println(streamData); // Read the fields (key-value pairs) of data stream
      Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
      StreamEntryID nextStart =
      new StreamEntryID(streamData.getID().getTime(),
      (streamData.getID().getSequence()+1));
      }
      }else {
      System.out.println(“No new data in the stream”);
      }
      }finally {
      jedis.close();
      }
      }
      }
  2. Schnelle Datenaufnahme mit Pub/Sub
    1. Speichern Sie in einem Kanal. Dateiname: PubSubPublish.java import redis.clients.jedis.Jedis;

      public class PubSubPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String channel = “MyChannel”;
      String message = “Hello there!”;
      jedis.publish(channel, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Abonnieren Sie einen Kanal. Dateiname: PubSubPublish.java  import redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPubSub;

      public class PubSubSubscribe extends JedisPubSub{
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {try {
      PubSubSubscribe mySubscriber = new PubSubSubscribe();
      String channel = “MyChannel”;
      jedis.subscribe(mySubscriber, channel);
      }finally {
      jedis.close();
      }
      }// Receive messages
      @Override
      public void onMessage(String channel, String message) {
      System.out.println(message);
      }
      }
  3. Schnelle Datenaufnahme mit Listen
    1. Daten-Push in eine Liste. Dateiname: ListPush.java import redis.clients.jedis.Jedis;

      public class ListPush {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = “Hello there!”;
      jedis.lpush(list, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Daten-Pop aus einer Liste. Dateiname: ListPop.java import redis.clients.jedis.Jedis;

      public class ListPop {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = jedis.rpop(list);
      System.out.println(message);
      }finally {
      jedis.close();
      }
      }
      }

Nächste Schritte