Ingestión rápida de datos

¿Qué es la ingestion rápida de datos?

La recopilación, el almacenamiento y el procesamiento de grandes volúmenes de datos de gran variedad y velocidad presentan varios retos de diseño complejos, especialmente en campos como el Internet de las cosas (IoT), el comercio electrónico, la seguridad, las comunicaciones, el entretenimiento, las finanzas y el comercio minorista. Por eso, dado que la toma de decisiones basada en datos sensibles, oportunos y precisos es fundamental para estas empresas, la recopilación y el análisis de datos en tiempo real son fundamentales.

Un primer paso muy importante para realizar análisis de datos en tiempo real es garantizar la disponibilidad de recursos adecuados para capturar eficazmente flujos de datos rápidos. La infraestructura física (que incluye una red de alta velocidad, computación, almacenamiento y memoria) desempeña un papel importante en este caso, pero la pila de software debe estar a la altura del rendimiento de su capa física o las organizaciones pueden acabar con una acumulación masiva de datos, con datos que faltan o con datos incompletos y engañosos.

Cuáles desafíos y mejores prácticas para la ingestión rápida de datos

La ingestión de datos a alta velocidad suele implicar diferentes tipos de complejidades:

  1. Grandes volúmenes de datos que llegan a ráfagas: la llegada de enormes volúmenes de datos en ráfagas requiere una solución capaz de procesar grandes volúmenes de datos con la mínima latencia. Lo ideal es que pueda realizar millones de escrituras por segundo con una latencia inferior a un milisegundo, utilizando recursos mínimos.
  2. Datos de múltiples fuentes/formatos: Las soluciones de ingestión de datos también deben ser lo suficientemente flexibles como para manejar datos en muchos formatos diferentes, conservando así la identidad de la fuente si es necesario y transformando o normalizando en tiempo real.
  3. Qué datos es necesario filtrar, analizar o reenviar: La mayoría de las soluciones de ingestión de datos tienen uno o más suscriptores que consumen los datos. Son a menudo aplicaciones diferentes que funcionan en el mismo o en diferentes lugares con un conjunto variado de supuestos. Si es tu caso, la base de datos no solo debe transformar los datos, sino también filtrarlos o agregarlos en función de las necesidades de las aplicaciones consumidoras.
  4. Gestión de un canal de datos constante entre productores y varios tipos de consumidores: Si el patrón de llegada de datos no es continuo, entonces los productores y consumidores necesitan un canal que les permita transferir datos de forma asíncrona. El canal, por supuesto, también debe ser resistente a la pérdida de conexión y a los fallos de hardware. Sabemos que en muchos casos prácticos, los productores y los consumidores no operan al mismo ritmo, y así hay retrasos en los datos y hace que los consumidores se retrasen todavía más a la hora de actuar sobre los datos, que es lo que no queremos.
  5. Datos procedentes de fuentes distribuidas geográficamente: En este escenario, es mejor que la arquitectura subyacente distribuya los nodos de recogida de datos cerca de la fuente. De este modo, hcemos que los propios nodos se conviertan en parte de la solución de ingestión rápida de datos, para recoger, procesar, reenviar o redirigir los datos ingeridos.

Cómo facilita Redis Enterprise la rápida ingestión de datos, ¡descúbrelo!

Alto rendimiento con el menor número de servidores
Toma nota, en lo que respecta a rendimiento, Redis Enterprise ha sido evaluada para manejar más de 200 millones de operaciones de lectura/escritura por segundo, con latencias de menos de un milisegundo con solo un clúster de 40 nodos en AWS. Razón por la cual, Redis Enterprise es la base de datos NoSQL más eficiente en cuanto a recursos del mercado.

Estructuras de datos y módulos flexibles para el análisis en tiempo real: Redis Streams, Pub/Sub, Lists, Sorted Sets, RedisTimeSeries
Redis te lo ofrece todo: una variedad de estructuras de datos como Streams, Listas, Conjuntos, Conjuntos Ordenados y Hashes que proporcionan un procesamiento de datos simple y versátil con el fin de combinar eficientemente la ingestión de datos de alta velocidad y el análisis en tiempo real.

Las capacidades Pub/Sub de Redis le permiten actuar como un eficiente intermediario de mensajes entre nodos de anidación de datos distribuidos geográficamente. Las aplicaciones productoras de datos publican los datos en streaming en los canales en el formato o formatos requeridos, y las aplicaciones consumidoras se suscriben a los canales que les interesan, recibiendo los mensajes de forma asíncrona a medida que se publican y así en círculo.

Las listas y los conjuntos ordenados pueden utilizarse como canales de datos que conectan a productores y consumidores. También puedes utilizar estas estructuras de datos para transmitir datos de forma asíncrona. A diferencia de Pub/Sub, pero las listas y los conjuntos ordenados ofrecen persistencia.

Los flujos pueden hacer todavía más, ofreciendo un canal de ingestión de datos persistente entre productores y consumidores. Con Streams, puedes ampliar el número de consumidores utilizando grupos de consumidores. Los grupos de consumidores también implementan la seguridad de los datos de tipo transaccional si los consumidores fallan en medio del consumo y el procesamiento de los datos.

Y, por último, pero no menos importante RedisTimeSeries proporciona un conjunto mejorado de funciones de ingestión rápida de datos que incluye el muestreo descendente, operaciones de contador especiales en el último valor ingerido y la compresión delta doble, combinadas con capacidades de análisis en tiempo real como para el etiquetado de datos con búsqueda incorporada, agregación, consultas de rango y un conector incorporado a las principales herramientas de supervisión y análisis como Grafana y Prometheus.

Despliegue de geodistribución de activo a activo
Latecnología Activo-Activobasada en CRDTs de Redis Enterprise permite realizar operaciones complejas de almacenamiento de datos y mensajería a través de ubicaciones geográficas y permite que la aplicación se despliegue de forma completamente distribuida así mejora significativamente la disponibilidad y el tiempo de respuesta de la aplicación.

Amplía la DRAM de Redis con SSD y memoria persistente
La tecnología Redison Flashde Redis Enterprise te permite ampliar la DRAM con SSD y memoria persistente, y así almacenar conjuntos de datos muy grandes de varios terabytes utilizando los mismos costes de infraestructura de las bases de datos basadas en disco y manteniendo las latencias de las bases de datos a niveles inferiores al milisegundo, incluso cuando se ingieren más de 1 millón de elementos/segundo en cada nodo del clúster de Redis Enterprise.

Descubre cómo implementar la ingestión rápida de datos con Redis

A continuación podrás ver algunos fragmentos de código escritos en Java, no te lo pierdas. Todos utilizan la biblioteca de Jedis. Antes de todo, sigue las instrucciones de la  página de inicio de Jedis para descargar la última versión de Jedis.

  1. Ingestión rápida de datos con Redis Streams
    1. Publicar un mensaje en una estructura de datos de flujo. Este programa utiliza XADD para añadir nuevos elementos al flujo. Nombre del archivo: StreamPublish.java. import java.util.HashMap;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntryID;public class StreamPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {
      try {
      Map<String, String> kv = new HashMap<String, String>();
      kv.put(“a”, “100”); // key -> a; value -> 100
      jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
      }finally {
      jedis.close();
      }
      }
      }
    2. Consumir datos de un flujo de forma asíncrona. Espere al mensaje si el flujo está vacío. Este programa utiliza el comando XREAD. Nombre del archivo: StreamConsumeAsync.java. import java.util.AbstractMap.SimpleEntry;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Map.Entry;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamConsumeAsync{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{// Start from 0. Para las consultas posteriores, lee desde el último id + 1
      String lastStreamDataId = “0-0”;
      int count = 1000;
      long waitTimeInMillis = 5000;try {
      // Read new data asynchronously in a loop
      while(true) {
      List next = getNext(“MyStream”, lastStreamDataId,
      count, waitTimeInMillis);
      if(next != null) {
      List<StreamEntry> stList = getStreamEntries(next);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); // Lee los campos (pareja key-value) del flujo de datos
      Map<String, String> fields = streamData.getFields(); // Lee los datos posteriores de la última id + 1
      lastStreamDataId = streamData.getID().getTime()
      +”-”
      +(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
      System.out.println(lastStreamDataId);
      }
      }else{
      System.out.println(“No new data in the stream”);
      }
      }
      } }finally {
      jedis.close();
      }
      } // Read the next set of data from the stream
      private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception{
      HashMap<String, StreamEntryID> map = new HashMap();
      String readFrom = lastId;
      map.put(streamId, new StreamEntryID(readFrom));
      List list = jedis.xread(count, waitTimeInMillis,
      (Entry<String, StreamEntryID>)
      map.entrySet().toArray()[0]);
      return list;
      } // Read stream entries
      // Assumes streamList has only one stream
      private static List<StreamEntry> getStreamEntries(List streamList) throws Exception{
      if(streamList.size()>0) {
      SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
      return (List<StreamEntry>) stEntry.getValue();
      } return null;
      }
      }
    3. Consulte un flujo mediante el comando XRANGE. Nombre del archivo: StreamQuery.java import java.util.List;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamQuery{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{String streamID = “MyStream”;
      StreamEntryID start = new StreamEntryID(0,0);
      StreamEntryID end = null; // null -> until the last item in the stream
      int count = 2;try {
      List<StreamEntry> stList = jedis.xrange(streamID, start, end, count);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); System.out.println(streamData); // Lee los campos (pareja key-value) del flujo de datos

      Map<String, String> fields = streamData.getFields(); // Lee los datos posteriores de la última id + 1
      StreamEntryID nextStart =
      new StreamEntryID(streamData.getID().getTime(),
      (streamData.getID().getSequence()+1));
      }
      }else {
      System.out.println(“No new data in the stream”);
      }}finally {
      jedis.close();
      }
      }
      }
  2. Recepción rápida de datos mediante Pub/Sub
    1. Publicar en un canal. Nombre del archivo: PubSubPublish.java import redis.clients.jedis.Jedis;

      public class PubSubPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String channel = “MyChannel”;
      String message = “Hello there!”;
      jedis.publish(channel, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Suscripción a un canal. Nombre del archivo: PubSubPublish.java import redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPubSub;

      public class PubSubSubscribe extends JedisPubSub{
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {try {
      PubSubSubscribe mySubscriber = new PubSubSubscribe();
      String channel = “MyChannel”;
      jedis.subscribe(mySubscriber, channel);
      }finally {
      jedis.close();
      }
      }// Receive messages
      @Override
      public void onMessage(String channel, String message) {
      System.out.println(message);
      }
      }
  3. Recepción rápida de datos mediante listas
    1. Empujar los datos a una lista. Nombre del archivo: ListPush.java import redis.clients.jedis.Jedis;

      public class ListPush {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = “Hello there!”;
      jedis.lpush(list, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Extraer datos de una lista. Nombre del archivo: ListPop.java import redis.clients.jedis.Jedis;

      public class ListPop {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = jedis.rpop(list);
      System.out.println(message);
      }finally {
      jedis.close();
      }
      }
      }