Recepção rápida de dados

O que é recepção rápida de dados?

A coleta, o armazenamento e o processamento de grandes volumes de dados em alta variedade e velocidade apresentam vários desafios complexos de design, especialmente em áreas como a Internet das Coisas (IoT), comércio eletrônico, segurança, comunicações, entretenimento, finanças e varejo. Portanto, assim como a tomada de decisões com base em dados sensíveis, oportunos e precisos é fundamental às empresas, a coleta e a análise de dados em tempo real são essenciais da mesma maneira.

Uma primeira etapa muito importante na execução da análise de dados em tempo real é garantir que os recursos adequados estejam disponíveis para capturar com eficiência fluxos de dados rápidos. A infraestrutura física (incluindo rede de alta velocidade, computação, armazenamento e memória) desempenha um papel importante aqui, mas a gama de software deve corresponder ao desempenho de sua camada física ou as organizações podem acabar com um enorme acúmulo de dados, dados ausentes ou dados incompletos e inúteis.

Quais são os desafios e as práticas recomendadas para a recepção rápida de dados?

O recebimento de dados em alta velocidade geralmente envolve diferentes tipos de complexidades:

  1. Grandes volumes de dados que chegam de forma intensa: essa demanda exige uma solução capaz de processar grandes volumes de dados com latência mínima. Idealmente, ela deve ser capaz de realizar milhões de gravações por segundo com latência inferior a um milissegundo, usando o mínimo de recursos.
  2. Dados de várias fontes/formatos: As soluções de recepção de dados também devem ser flexíveis o suficiente para lidar com dados em formatos diferentes, preservando assim a identidade da fonte, se necessário, e transformando se for o caso – em tempo real.
  3. Quais dados precisam ser filtrados, analisados ou encaminhados: a maioria das soluções de dados de entrada tem um ou mais assinantes que consomem os dados. Geralmente, são aplicativos diversos executados no mesmo local ou em locais diferentes com um conjunto variado de suposições. Se esse for o caso, o banco de dados deve não apenas transformar os dados, mas também filtrá-los ou agregá-los de acordo com as necessidades dos aplicativos dos consumidores.
  4. Gerenciamento de um canal de dados constante entre produtores e vários tipos de consumidores: se o padrão de chegada de dados não for contínuo, os produtores e os consumidores precisarão de um canal que lhes permita transferir dados de forma assíncrona. O canal, é claro, também deve ser resiliente à perda de conexão e à falha de hardware. Sabemos que, em muitos casos práticos, os produtores e os consumidores não operam no mesmo ritmo e, portanto, há atrasos nos dados, o que faz com que os consumidores sejam ainda mais lentos para agir com base nos dados – exatamente o que não queremos.
  5. Dados de fontes distribuídas geograficamente: Nesse cenário, é melhor que a arquitetura subjacente distribua os nodes de coleta de dados próximos à fonte. Dessa forma, tornamos os próprios nodes parte da solução de recepção rápida de dados, para coletar, processar, encaminhar ou redirecionar os dados recebidos.

Como o Redis Enterprise facilita o recebimento rápido de dados, venha descobrir!

Alto desempenho com o menor número de servidores
Observe que, quando se trata de desempenho, o Redis Enterprise foi testado para lidar com mais de 200 milhões de operações de leitura/gravação por segundo, com latências inferiores a milissegundos, com apenas um cluster de 40 nodes no AWS. É por isso que o Redis Enterprise é o banco de dados NoSQL mais eficiente em termos de recursos do mercado.

Estruturas de dados flexíveis e módulos para análise em tempo real: Redis Streams, Pub/Sub, Lists, Sorted Sets, RedisTimeSeries
O Redis oferece tudo isso: uma variedade de estruturas de dados, como Streams, Lists, Sets, Sorted Sets e Hashes, que fornecem processamento de dados simples e versátil para combinar com eficiência a recepção de dados em alta velocidade e a análise em tempo real.

Os recursos Pub/Sub do Redis permitem que ele atue como um eficiente intermediário de mensagens entre nodes de aninhamento de dados geograficamente distribuídos. Os aplicativos produtores de dados publicam dados de streaming em canais no(s) formato(s) necessário(s), e os aplicativos consumidores se inscrevem nos canais nos quais estão interessados, recebendo mensagens de forma assíncrona à medida que são publicadas e assim por diante em um loop.

Listas e conjuntos ordenados podem ser usados como canais de dados que conectam produtores e consumidores. Você também pode usar essas estruturas de dados para transmitir dados de forma assíncrona. Ao contrário do Pub/Sub, mas as listas e os conjuntos ordenados oferecem persistência.

Os fluxos podem fazer ainda mais, fornecendo um canal de recepção de dados persistente entre produtores e consumidores. Com o Streams, você pode ampliar o número de consumidores usando grupos de consumidores. Esses grupos também implementam a segurança de dados transacionais se algo falhar no meio do consumo e do processamento de dados.

Ainda, o RedisTimeSeries oferece um conjunto aprimorado de recursos de recepção rápida de dados, incluindo amostragem reduzida, operações especiais e compactação delta dupla, combinados com recursos de análise em tempo real, como marcação de dados com pesquisa integrada, agregação, consultas de intervalo e um conector integrado às principais ferramentas de monitoramento e análise, como Grafana e Prometheus.

Implantação de geodistribuição de Ativo para Ativo
A tecnologia Ativo-Ativo do Redis Enterprise, baseada em CRDTs, permite operações complexas de armazenamento de dados e mensagens em locais geográficos e permite que o aplicativo seja implantado de maneira totalmente distribuída para melhorar significativamente a disponibilidade e o tempo de resposta do aplicativo.

Ampliação da DRAM do Redis com SSDs e memória persistente
A tecnologia Redison Flash do Redis Enterprise permite que você estenda a DRAM com SSD e memória persistente, armazenando assim conjuntos de dados muito grandes, de vários terabytes, usando os mesmos custos de infraestrutura que os bancos de dados baseados em disco e mantendo as latências do banco de dados em níveis inferiores a milissegundos, mesmo ao consumir mais de 1 milhão de itens/segundo em cada node do cluster do Redis Enterprise.

Descubra como implementar o consumo rápido de dados com o Redis

Abaixo você pode ver alguns trechos de código escritos em Java, não perca. Todos eles usam a biblioteca Jedis. Antes de tudo, siga as instruções na página inicial do Jedis para fazer download da versão mais recente do Jedis.

  1. Recepção rápida de dados com Redis Streams
    1. Publique uma mensagem em uma estrutura de dados de fluxo. Este programa usa o  XADD para adicionar novos elementos ao fluxo. Nome do arquivo: StreamPublish.java.import java.util.HashMap;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntryID;public class StreamPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {
      try {
      Map<String, String> kv = new HashMap<String, String>();
      kv.put(“a”, “100”); // key -> a; value -> 100
      jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
      }finally {
      jedis.close();
      }
      }
      }
    2. Consumir dados de um fluxo de forma assíncrona. Aguarde a mensagem se o fluxo estiver vazio. Esse programa usa o comando XREAD. Nome do arquivo: StreamConsumeAsync.java.import java.util.AbstractMap.SimpleEntry;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Map.Entry;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamConsumeAsync{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{// Start from 0. Início a partir de 0. Para consultas subsequentes, leia a partir do último id + 1
      String lastStreamDataId = “0-0”;
      int count = 1000;
      long waitTimeInMillis = 5000;try {
      // Read new data asynchronously in a loop
      while(true) {
      List next = getNext(“MyStream”, lastStreamDataId,
      count, waitTimeInMillis);
      if(next != null) {
      List<StreamEntry> stList = getStreamEntries(next);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); // Ler campos (par chave-valor) do fluxo de dados
      Map<String, String> fields = streamData.getFields(); // Ler dados subsequentes do último id + 1
      lastStreamDataId = streamData.getID().getTime()
      +”-”
      +(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
      System.out.println(lastStreamDataId);
      }
      }else{
      System.out.println(“No new data in the stream”);
      }
      }
      } }finally {
      jedis.close();
      }
      } // Read the next set of data from the stream
      private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception{
      HashMap<String, StreamEntryID> map = new HashMap();
      String readFrom = lastId;
      map.put(streamId, new StreamEntryID(readFrom));
      List list = jedis.xread(count, waitTimeInMillis,
      (Entry<String, StreamEntryID>)
      map.entrySet().toArray()[0]);
      return list;
      } // Read stream entries
      // Assumes streamList has only one stream
      private static List<StreamEntry> getStreamEntries(List streamList) throws Exception{
      if(streamList.size()>0) {
      SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
      return (List<StreamEntry>) stEntry.getValue();
      } return null;
      }
      }
    3. Consultar um fluxo usando o comando XRANGE. Nome do arquivo: StreamQuery.javaimport java.util.List;
      import java.util.Map;

      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.StreamEntry;
      import redis.clients.jedis.StreamEntryID;public class StreamQuery{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{String streamID = “MyStream”;
      StreamEntryID start = new StreamEntryID(0,0);
      StreamEntryID end = null; // null -> until the last item in the stream
      int count = 2;try {
      List<StreamEntry> stList = jedis.xrange(streamID, start, end, count);
      if(stList != null) {
      // Process data here
      for(int j=0; j<stList.size(); j++) {
      StreamEntry streamData = (StreamEntry)stList.get(j); System.out.println(streamData); // Ler campos (par chave-valor) do fluxo de dados

      Map<String, String> fields = streamData.getFields(); // Ler dados subsequentes do último id + 1
      StreamEntryID nextStart =
      new StreamEntryID(streamData.getID().getTime(),
      (streamData.getID().getSequence()+1));
      }
      }else {
      System.out.println(“No new data in the stream”);
      }}finally {
      jedis.close();
      }
      }
      }
  2. Recepção rápida de dados via Pub/Sub
    1. Publicação em um canal. Nome do arquivo: PubSubPublish.javaimport redis.clients.jedis.Jedis;

      public class PubSubPublish {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String channel = “MyChannel”;
      String message = “Hello there!”;
      jedis.publish(channel, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Assinatura de um canal. Nome do arquivo: PubSubPublish.javaimport redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPubSub;

      public class PubSubSubscribe extends JedisPubSub{
      static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {try {
      PubSubSubscribe mySubscriber = new PubSubSubscribe();
      String channel = “MyChannel”;
      jedis.subscribe(mySubscriber, channel);
      }finally {
      jedis.close();
      }
      }// Receive messages
      @Override
      public void onMessage(String channel, String message) {
      System.out.println(message);
      }
      }
  3. Recepção rápida de dados usando listas
    1. Migrando (empurrando) dados para uma lista. Nome do arquivo: ListPush.javaimport redis.clients.jedis.Jedis;

      public class ListPush {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = “Hello there!”;
      jedis.lpush(list, message);
      }finally {
      jedis.close();
      }
      }
      }
    2. Extraindo dados de uma lista. Nome do arquivo: ListPop.javaimport redis.clients.jedis.Jedis;

      public class ListPop {
      static Jedis jedis = new Jedis(“localhost”, 6379);
      public static void main(String[] args) throws Exception {try {
      String list = “MyList”;
      String message = jedis.rpop(list);
      System.out.println(message);
      }finally {
      jedis.close();
      }
      }
      }