
Learn More
Data ingestion is the collecting, storing, and processing large volumes of high-variety, high-velocity data presents several complex design challenges—especially in fields like Internet of Things (IoT), e-commerce, security, communications, entertainment, finance, and retail. Given that responsive, timely, and accurate data-driven decision making is core to these businesses, real-time data collection and analysis are critical.
An important first step in delivering real-time data analysis is ensuring adequate resources are available to effectively capture fast data streams. While the physical infrastructure (including a high-speed network, computation, storage, and memory) plays an important role here, the software stack must match the performance of its physical layer or organizations may end up with a massive backlog of data, missing data, or incomplete, misleading data.
High-speed data ingestion often involves different types of complexities:
High performance with the fewest number of servers
When it comes to performance, Redis Enterprise has been benchmarked to handle more than 200 million read/write operations per second, at sub-millisecond latencies with only a 40-node cluster on AWS. This makes Redis Enterprise the most resource-efficient NoSQL database in the market.
Flexible data structures and modules for real-time analytics: Redis Streams, Pub/Sub, Lists, Sorted Sets, RedisTimeSeries
Redis offers a variety of data structures such as Streams, Lists, Sets, Sorted Sets, and Hashes that provide simple and versatile data processing in order to efficiently combine high-speed data ingest and real-time analytics.
Redis’ Pub/Sub capabilities allow it to act as an efficient message broker between geographically distributed data-ingest nodes. Data-producing applications publish streaming data to channels in the format(s) required, and consuming applications subscribe to those channels that are relevant to them, receiving messages asynchronously as they are published.
Lists and Sorted Sets can be used as data channels connecting producers and consumers. You can also use these data structures to transmit data asynchronously. Unlike Pub/Sub, Lists and Sorted Sets offer persistence.
Streams can do even more, offering a persistent data ingest channel between producers and consumers. With Streams, you can scale out the number of consumers using consumer groups. Consumer groups also implement transaction-like data safety when consumers fail in the midst of consuming and processing data.
And finally RedisTimeSeries provides an enhanced fast data ingest feature set including downsampling, special counter operations on the last ingested value, and double delta compression combined with real-time analytics capabilities like for data labeling with built-in search, aggregation, range queries, and a built-in connector to leading monitoring and analytics tools such as Grafana and Prometheus.
Active-Active Geo-Distribution deployment
Redis Enterprise’s CRDTs-based Active-Active technology enables complex data-ingest and messaging operations across geo locations and enables application to be deployed in a completely distributed manner to significantly improve availability and application response time.
Extend Redis DRAM with SSD and persistent memory
Redis Enterprise’s Redis on Flash technology enables extending DRAM with SSD and persistent memory, allows storing very large multi-terabyte datasets using the same infrastructure costs of a disk-based databases and while keeping database latencies at sub-millisecond levels even when ingesting more than 1M items/sec on each node of the Redis Enterprise cluster.
Here are a few code snippets written in Java. They all use the Jedis library. First, follow the instructions on Jedis’ getting started page to download the latest version of Jedis.
1. Fast data ingest using Redis Streams
A. Publish a message to a stream data structure. This program uses XADD to add new items to the stream. File name: StreamPublish.java.
import java.util.HashMap;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
public class StreamPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
Map & lt;
String, String > kv = new HashMap & lt;
String, String > ();
kv.put(“a”, “100”); // key -> a; value -> 100
jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
} finally {
jedis.close();
}
}
}
B. Consume data from a stream asynchronously. Wait for the message if the stream is empty. This program uses the XREAD command. File name: StreamConsumeAsync.java.
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
public class StreamConsumeAsync {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
// Start from 0. For subsequent queries, read from last id + 1
String lastStreamDataId = “0-0”;
int count = 1000;
long waitTimeInMillis = 5000;
try {
// Read new data asynchronously in a loop
while(true) {
List next = getNext(“MyStream”, lastStreamDataId,
count, waitTimeInMillis);
if (next != null) {
List<StreamEntry> stList = getStreamEntries(next);
if (stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); // Read the fields (key-value pairs) of data stream
Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
lastStreamDataId = streamData.getID().getTime()
+”-”
+(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
System.out.println(lastStreamDataId);
}
} else {
System.out.println(“No new data in the stream”);
}
}
}
} finally {
jedis.close();
}
}
// Read the next set of data from the stream
private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception {
HashMap<String, StreamEntryID> map = new HashMap();
String readFrom = lastId;
map.put(streamId, new StreamEntryID(readFrom));
List list = jedis.xread(count, waitTimeInMillis,
(Entry<String, StreamEntryID>)
map.entrySet().toArray()[0]);
return list;
}
// Read stream entries
// Assumes streamList has only one stream
private static List<StreamEntry> getStreamEntries(List streamList) throws Exception {
if (streamList.size()>0) {
SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
return (List<StreamEntry>) stEntry.getValue();
}
return null;
}
}
C. Query a stream using the XRANGE command. File name: StreamQuery.java
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
public class StreamQuery {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
String streamID = “MyStream”;
StreamEntryID start = new StreamEntryID(0, 0);
StreamEntryID end = null; // null -> until the last item in the stream
int count = 2;
try {
List & lt;
StreamEntry > stList = jedis.xrange(streamID, start, end, count);
if (stList != null) {
// Process data here
for (int j = 0; j & lt; stList.size(); j++) {
StreamEntry streamData = (StreamEntry) stList.get(j);
System.out.println(streamData); // Read the fields (key-value pairs) of data stream
Map & lt;
String, String > fields = streamData.getFields(); // Read subsequent data from last id + 1
StreamEntryID nextStart =
new StreamEntryID(streamData.getID().getTime(),
(streamData.getID().getSequence() + 1));
}
} else {
System.out.println(“No new data in the stream”);
}
} finally {
jedis.close();
}
}
}
2. Fast data ingest using Pub/Sub
A. Publish to a channel. File name: PubSubPublish.java
import redis.clients.jedis.Jedis;
public class PubSubPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
String channel = “MyChannel”;
String message = “Hello there!”;
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
}
B. Subscribe to a channel. File name: PubSubPublish.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PubSubSubscribe extends JedisPubSub {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
PubSubSubscribe mySubscriber = new PubSubSubscribe();
String channel = “MyChannel”;
jedis.subscribe(mySubscriber, channel);
} finally {
jedis.close();
}
} // Receive messages
@Override
public void onMessage(String channel, String message) {
System.out.println(message);
}
}
3. Fast data ingest using Lists
A. Push data to a list. File name: ListPush.java
import redis.clients.jedis.Jedis;
public class ListPush {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
String list = “MyList”;
String message = “Hello there!”;
jedis.lpush(list, message);
} finally {
jedis.close();
}
}
}
B. Pop data from a list. File name: ListPop.java
import redis.clients.jedis.Jedis;
public class ListPop {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {
try {
String list = “MyList”;
String message = jedis.rpop(list);
System.out.println(message);
} finally {
jedis.close();
}
}
}
Watch our recent Tech Talk on Rapid Data Ingestion with Redis Enterprise!