A coleta, o armazenamento e o processamento de grandes volumes de dados em alta variedade e velocidade apresentam vários desafios complexos de design, especialmente em áreas como a Internet das Coisas (IoT), comércio eletrônico, segurança, comunicações, entretenimento, finanças e varejo. Portanto, assim como a tomada de decisões com base em dados sensíveis, oportunos e precisos é fundamental às empresas, a coleta e a análise de dados em tempo real são essenciais da mesma maneira.
Uma primeira etapa muito importante na execução da análise de dados em tempo real é garantir que os recursos adequados estejam disponíveis para capturar com eficiência fluxos de dados rápidos. A infraestrutura física (incluindo rede de alta velocidade, computação, armazenamento e memória) desempenha um papel importante aqui, mas a gama de software deve corresponder ao desempenho de sua camada física ou as organizações podem acabar com um enorme acúmulo de dados, dados ausentes ou dados incompletos e inúteis.
O recebimento de dados em alta velocidade geralmente envolve diferentes tipos de complexidades:
Alto desempenho com o menor número de servidores
Observe que, quando se trata de desempenho, o Redis Enterprise foi testado para lidar com mais de 200 milhões de operações de leitura/gravação por segundo, com latências inferiores a milissegundos, com apenas um cluster de 40 nodes no AWS. É por isso que o Redis Enterprise é o banco de dados NoSQL mais eficiente em termos de recursos do mercado.
Estruturas de dados flexíveis e módulos para análise em tempo real: Redis Streams, Pub/Sub, Lists, Sorted Sets, RedisTimeSeries
O Redis oferece tudo isso: uma variedade de estruturas de dados, como Streams, Lists, Sets, Sorted Sets e Hashes, que fornecem processamento de dados simples e versátil para combinar com eficiência a recepção de dados em alta velocidade e a análise em tempo real.
Os recursos Pub/Sub do Redis permitem que ele atue como um eficiente intermediário de mensagens entre nodes de aninhamento de dados geograficamente distribuídos. Os aplicativos produtores de dados publicam dados de streaming em canais no(s) formato(s) necessário(s), e os aplicativos consumidores se inscrevem nos canais nos quais estão interessados, recebendo mensagens de forma assíncrona à medida que são publicadas e assim por diante em um loop.
Listas e conjuntos ordenados podem ser usados como canais de dados que conectam produtores e consumidores. Você também pode usar essas estruturas de dados para transmitir dados de forma assíncrona. Ao contrário do Pub/Sub, mas as listas e os conjuntos ordenados oferecem persistência.
Os fluxos podem fazer ainda mais, fornecendo um canal de recepção de dados persistente entre produtores e consumidores. Com o Streams, você pode ampliar o número de consumidores usando grupos de consumidores. Esses grupos também implementam a segurança de dados transacionais se algo falhar no meio do consumo e do processamento de dados.
Ainda, o RedisTimeSeries oferece um conjunto aprimorado de recursos de recepção rápida de dados, incluindo amostragem reduzida, operações especiais e compactação delta dupla, combinados com recursos de análise em tempo real, como marcação de dados com pesquisa integrada, agregação, consultas de intervalo e um conector integrado às principais ferramentas de monitoramento e análise, como Grafana e Prometheus.
Implantação de geodistribuição de Ativo para Ativo
A tecnologia Ativo-Ativo do Redis Enterprise, baseada em CRDTs, permite operações complexas de armazenamento de dados e mensagens em locais geográficos e permite que o aplicativo seja implantado de maneira totalmente distribuída para melhorar significativamente a disponibilidade e o tempo de resposta do aplicativo.
Ampliação da DRAM do Redis com SSDs e memória persistente
A tecnologia Redison Flash do Redis Enterprise permite que você estenda a DRAM com SSD e memória persistente, armazenando assim conjuntos de dados muito grandes, de vários terabytes, usando os mesmos custos de infraestrutura que os bancos de dados baseados em disco e mantendo as latências do banco de dados em níveis inferiores a milissegundos, mesmo ao consumir mais de 1 milhão de itens/segundo em cada node do cluster do Redis Enterprise.
Abaixo você pode ver alguns trechos de código escritos em Java, não perca. Todos eles usam a biblioteca Jedis. Antes de tudo, siga as instruções na página inicial do Jedis para fazer download da versão mais recente do Jedis.
import java.util.HashMap;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;public class StreamPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {
try {
Map<String, String> kv = new HashMap<String, String>();
kv.put(“a”, “100”); // key -> a; value -> 100
jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
}finally {
jedis.close();
}
}
}
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;public class StreamConsumeAsync{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{// Start from 0. Início a partir de 0. Para consultas subsequentes, leia a partir do último id + 1
String lastStreamDataId = “0-0”;
int count = 1000;
long waitTimeInMillis = 5000;try {
// Read new data asynchronously in a loop
while(true) {
List next = getNext(“MyStream”, lastStreamDataId,
count, waitTimeInMillis);
if(next != null) {
List<StreamEntry> stList = getStreamEntries(next);
if(stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); // Ler campos (par chave-valor) do fluxo de dados
Map<String, String> fields = streamData.getFields(); // Ler dados subsequentes do último id + 1
lastStreamDataId = streamData.getID().getTime()
+”-”
+(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
System.out.println(lastStreamDataId);
}
}else{
System.out.println(“No new data in the stream”);
}
}
} }finally {
jedis.close();
}
} // Read the next set of data from the stream
private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception{
HashMap<String, StreamEntryID> map = new HashMap();
String readFrom = lastId;
map.put(streamId, new StreamEntryID(readFrom));
List list = jedis.xread(count, waitTimeInMillis,
(Entry<String, StreamEntryID>)
map.entrySet().toArray()[0]);
return list;
} // Read stream entries
// Assumes streamList has only one stream
private static List<StreamEntry> getStreamEntries(List streamList) throws Exception{
if(streamList.size()>0) {
SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
return (List<StreamEntry>) stEntry.getValue();
} return null;
}
}
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;public class StreamQuery{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{String streamID = “MyStream”;
StreamEntryID start = new StreamEntryID(0,0);
StreamEntryID end = null; // null -> until the last item in the stream
int count = 2;try {
List<StreamEntry> stList = jedis.xrange(streamID, start, end, count);
if(stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); System.out.println(streamData); // Ler campos (par chave-valor) do fluxo de dados
Map<String, String> fields = streamData.getFields(); // Ler dados subsequentes do último id + 1
StreamEntryID nextStart =
new StreamEntryID(streamData.getID().getTime(),
(streamData.getID().getSequence()+1));
}
}else {
System.out.println(“No new data in the stream”);
}}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
public class PubSubPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String channel = “MyChannel”;
String message = “Hello there!”;
jedis.publish(channel, message);
}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PubSubSubscribe extends JedisPubSub{
static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {try {
PubSubSubscribe mySubscriber = new PubSubSubscribe();
String channel = “MyChannel”;
jedis.subscribe(mySubscriber, channel);
}finally {
jedis.close();
}
}// Receive messages
@Override
public void onMessage(String channel, String message) {
System.out.println(message);
}
}
import redis.clients.jedis.Jedis;
public class ListPush {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String list = “MyList”;
String message = “Hello there!”;
jedis.lpush(list, message);
}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
public class ListPop {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String list = “MyList”;
String message = jedis.rpop(list);
System.out.println(message);
}finally {
jedis.close();
}
}
}