Ingestion rapide des données

Qu’est-ce que l’ingestion de données ?

La collecte, le stockage et le traitement de grand volumes de données diverses à grande vitesse s’accompagne de défis de conception complexes, spécialement dans des domaines comme l’Internet des objets (IoT), l’e-commerce, la sécurité, les communications, les loisirs, la finance et la distribution de détail. La prise de décision réactive, à temps et précise sur la base des données est un élément essentiel de ces secteurs d’activité, ce qui fait de la collecte de données et de leur analyse en temps réel un aspect critique.

Une première étape dans la fourniture d’analyse de données en temps réel consiste à s’assurer que des ressources adéquates sont disponibles pour capturer de manière efficace des flux de données rapides. Alors que l’infrastructure physique (y compris un réseau à haute vitesse, le calcul, le stockage et la mémoire) joue un rôle important à ce niveau, la pile logicielle doit être en accord avec la performance de sa couche physique. Sinon, les organisations peuvent se trouver confrontées à une quantité importante de tâches en retard, des données manquantes, incomplètes ou fausses.

Les défis et les meilleures pratiques relatives à l’ingestion de données

L’ingestion de données à haute vitesse implique souvent différents types de complexités:

  1. Large volumes de données arrivant par salves: les salves de données requièrent une solution capable de traiter des volumes de données avec une latence minimale. Dans l’idéal, cette solution doit pouvoir réaliser des millions d’écriture par seconde avec une latence inférieure au millième de seconde avec un minimum de ressources.
  2. Données provenant de plusieurs sources/formats: les solutions d’ingestion de données doivent aussi être assez flexibles pour gérer les données d’un grand nombre de formats, en conservant l’identité de la source si nécessaire et en transformant ou en normalisant en temps réel.
  3. Données devant être filtrées, analysées ou transférées: la plupart des solutions d’ingestion de données ont un ou plusieurs souscripteurs consommant les données. Ce sont souvent des applications différentes fonctionnant sur le même site ou sur des sites différents avec un ensemble variés de suppositions. Dans de tels cas, la base de données ne doit pas seulement transformer les données, mais elle doit aussi les filtrer ou les agréger selon les exigences des applications consommatrices.
  4. Gérer un canal de données stable entre les producteurs et plusieurs types de consommateurs: si la structure d’arrivée des données n’est pas continue, les producteurs et les consommateurs ont besoin d’un canal leur permettant le transférer les données de manière asynchrone. Le canal doit aussi pouvoir supporter les pertes de connexion tout comme les pannes matérielles. Dans un grand nombre de cas d’utilisation, les producteurs et les consommateurs ne fonctionnent pas avec le même débit. Ceci peut entraîner des retards de données retardant encore plus l’action des consommateurs sur les données.
  5. Données de sources géographiquement distribuées: dans ce scénario, il est souvent pratique pour l’architecture sous-jacente de distribuer les nœuds de collection de données près de la surface. De cette façon, les nœuds eux-mêmes sont intégrés à la solution d’ingestion rapide des données afin de collecter, traiter, transférer ou rediriger les données d’ingestion.

Comment Redis Enterprise facilite l’ingestion rapide de données

Une performance élevée avec le plus petit nombre de serveurs possible:
Quand il s’agit de performance, Redis Enterprise a été soumis à un banc d’essai de traitement de plus de 200 millions d’opérations de lecture/écriture par seconde avec des latences inférieures au millième de seconde, avec seulement une grappe de 40 nœuds sur AWS. Ceci fait de Redis Enterprise la base de données NoSQL la plus efficace en matière de ressources du marché.

Structures et modules de données polyvalents pour des analyses en temps réel: Redis Streams, Pub/Sub, listes, Sorted Sets (ensembles triés), RedisTimeSeries
Redis propose une variété de structures tels que Streams, Lists, Sets, Sorted Sets et Hashes qui assurent un traitement de données simple et flexible afin de combiner efficacement l’ingestion de données à haute vitesse et les analyses en temps réel.

Les capacités Pub/Sub de Redis lui permettent d’agir en tant que « message broker » efficace entre les nœuds d’ingestion de données géographiquement distribués. Les applications produisant des données publient des données en flux vers les canaux au(x) format(s) requis alors que les applications consommatrices souscrivent aux canaux dont elles ont besoin et reçoivent des messages de manière asynchrones dès qu’elles sont publiées.

Les listes et les ensembles triés peuvent être utilisés comme canaux de données connectant les producteurs et les consommateurs. Vous pouvez aussi utiliser ces structures de données pour transmettre les données de mode asynchrone. Contrairement à Pub/Sub, les listes et les ensembles triés sont persistants.

Les streams peuvent faire encore plus et offrent un canal d’ingestion de données continu entre les producteurs et les consommateurs. Avec les streams, vous pouvez diminuer le nombre de consommateurs à l’aide des groupes de consommateurs. Les groupes de consommateurs mettent également en œuvre la sécurité de données similaire à celle des transactions lorsque les consommateurs ne répondent pas pendant la consommation et le traitement des données.

Pour terminer, RedisTimeSeries fournit un jeu de fonctions rapide et avancé d’ingestion de données incluant le sous-échantillonnage, les contre-opérations spéciales sur la dernière valeur ingérée et la double compression delta combinée aux capacités d’analyse en temps réel similaire à celle de l’étiquetage des données avec recherche intégrée, agrégation, requête sur plage de données ainsi qu’un connecteur intégré pour les outils leaders de surveillance et d’analyse comme Grafana et Prometheus.

Déploiement avec géodistribution active-active
La technologie active-active basée sur les CRDT de Redis Enterprise permet l’ingestion complexe de données et les opérations de messagerie sur des sites géographiquement distants et rend possible le déploiement des applications d’une façon totalement distribuée pour améliorer de manière significative la disponibilité et le temps de réponse de l’application.

Extension de la DRAM de Redis avec du SSD et une mémoire permanente
La technologie de Redis Enterprise Redis on Flash permet l’extension de la DRAM avec du SSD et une mémoire permanente, permet de stocker de très grands ensembles de données de plusieurs téraoctets avec les mêmes coûts d’infrastructure que les bases de données à bases de disques, avec des latences de base de données inférieures au millième de seconde même lors de l’ingestion de plus de 1M d’éléments/sec sur chacun des nœuds de la grappe de Redis Enterprise.

Comment mettre en œuvre l’ingestion rapide de données avec Redis

Voici quelques bouts de codes écrits en Java. Ils utilisent tous la bibliothèque Jedis. Tout d’abord, suivez les instructions sur la page Commencer de Jedis pour télécharger la dernière version de Jedis.

  1. Ingestion rapide de données à l’aide de Redis Streams
    1. Publier un message sur une structure de données de stream. Ce programme utilise XADD pour ajouter de nouveaux éléments au stream. Nom de fichier: StreamPublish.java. import java.util.HashMap;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntryID;public class StreamPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {
      try {
      Map<String, String> kv = new HashMap<String, String>();
      kv.put(“a”, “100”); // key -> a; value -> 100
      jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
      }finally {
      jedis.close();
      }
      }
      }
    2. Consommation de données d’un stream de manière asynchrone. Attendre le message si le stream est vide. Ce programme utilise la commande XREAD. Nom de fichier: StreamConsumeAsync.java.  import java.util.AbstractMap.SimpleEntry;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Map.Entry;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamConsumeAsync{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{// Start from 0. For subsequent queries, read from last id + 1
      String lastStreamDataId = “0-0”;
      int count = 1000;
      long waitTimeInMillis = 5000;try {
      // Read new data asynchronously in a loop
      while(true) {
      List next = getNext(“MyStream”, lastStreamDataId,
      count, waitTimeInMillis);
      if(next != null) {
      List<StreamEntry> stList = getStreamEntries(next);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); // Read the fields (key-value pairs) of data stream
      Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
      lastStreamDataId = streamData.getID().getTime()
      +”-”
      +(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
      System.out.println(lastStreamDataId);
      }
      }else{
      System.out.println(“No new data in the stream”);
      }
      }
      } }finally {
      jedis.close();
      }
      } // Read the next set of data from the stream
      private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception{
      HashMap<String, StreamEntryID> map = new HashMap();
      String readFrom = lastId;
      map.put(streamId, new StreamEntryID(readFrom));
      List list = jedis.xread(count, waitTimeInMillis,
      (Entry<String, StreamEntryID>)
      map.entrySet().toArray()[0]);
      return list;
      } // Read stream entries
      // Assumes streamList has only one stream
      private static List<StreamEntry> getStreamEntries(List streamList) throws Exception{
      if(streamList.size()>0) {
      SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
      return (List<StreamEntry>) stEntry.getValue();
      } return null;
      }
      }
    3. Interroger un stream à l’aide de la commande XRANGE. Nom de fichier: StreamQuery.java. import java.util.List;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamQuery{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{String streamID = “MyStream”;
      StreamEntryID start = new StreamEntryID(0,0);
      StreamEntryID end = null; // null -> until the last item in the stream
      int count = 2;try {
      List<StreamEntry> stList = jedis.xrange(streamID, start, end, count);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); System.out.println(streamData); // Read the fields (key-value pairs) of data stream
      Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
      StreamEntryID nextStart =
      new StreamEntryID(streamData.getID().getTime(),
      (streamData.getID().getSequence()+1));
      }
      }else {
      System.out.println(“No new data in the stream”);
      }
      }finally {
      jedis.close();
      }
      }
      }
  2. Ingestion rapide de données à l’aide de Pub/Sub
    1. Publier sur un canal. Nom de fichier: PubSubPublish.java import redis.clients.jedis.Jedis;

      public class PubSubPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String channel = “MyChannel”;
      String message = “Hello there!”;
      jedis.publish(channel, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Souscrire à un canal. Nom de fichier: PubSubPublish.java  import redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPubSub;

      public class PubSubSubscribe extends JedisPubSub{
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {try {
      PubSubSubscribe mySubscriber = new PubSubSubscribe();
      String channel = “MyChannel”;
      jedis.subscribe(mySubscriber, channel);
      }finally {
      jedis.close();
      }
      }// Receive messages
      @Override
      public void onMessage(String channel, String message) {
      System.out.println(message);
      }
      }
  3. Ingestion rapide des données à l’aide de listes
    1. Pousse les données vers une liste. Nom de fichier: ListPush.java import redis.clients.jedis.Jedis;

      public class ListPush {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = “Hello there!”;
      jedis.lpush(list, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Données émergentes d’une liste. Nom de fichier: ListPop.java import redis.clients.jedis.Jedis;

      public class ListPop {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = jedis.rpop(list);
      System.out.println(message);
      }finally {
      jedis.close();
      }
      }
      }

Les prochaines étapes