Try Redis Cloud Essentials for Only $5/Month!

Learn More

6.5.2 Multiple-recipient publish/subscribe replacement

back to home

6.5.2 Multiple-recipient publish/subscribe replacement

Single-recipient messaging is useful, but it doesn’t get us far in replacing the PUBLISH and SUBSCRIBE commands when we have multiple recipients. To do that, we need to turn our problem around. In many ways, Redis PUBLISH/SUBSCRIBE is like group chat where whether someone’s connected determines whether they’re in the group chat. We want to remove that “need to be connected all the time” requirement, and we’ll implement it in the context of chatting.

Let’s look at Fake Garage Startup’s next problem. After quickly implementing their user-to-user messaging system, Fake Garage Startup realized that replacing SMS is good, but they’ve had many requests to add group chat functionality. Like before, their clients may connect or disconnect at any time, so we can’t use the built-in PUBLISH/SUBSCRIBE method.

Figure 6.12 Some example chat and user data. The chat ZSETs show users and the maximum IDs of messages in that chat that they’ve seen. The seen ZSETs list chat IDs per user, again with the maximum message ID in the given chat that they’ve seen.

Each new group chat will have a set of original recipients of the group messages, and users can join or leave the group if they want. Information about what users are in the chat will be stored as a ZSET with members being the usernames of the recipients, and values being the highest message ID the user has received in the chat. Which chats an individual user is a part of will also be stored as a ZSET, with members being the groups that the user is a part of, and scores being the highest message ID that the user has received in that chat. Information about some users and chats can be seen in figure 6.12.

As you can see, user jason22 has seen five of six chat messages sent in chat:827, in which jason22 and jeff24 are participating.

CREATING A CHAT SESSION

The content of chat sessions themselves will be stored in ZSETs, with messages as members and message IDs as scores. To create and start a chat, we’ll increment a global counter to get a new chat ID. We’ll then create a ZSET with all of the users that we want to include with seen IDs being 0, and add the group to each user’s group list ZSET. Finally, we’ll send the initial message to the users by placing the message in the chat ZSET. The code to create a chat is shown here.

Listing 6.24 The create_chat() function
def create_chat(conn, sender, recipients, message, chat_id=None):
 
   chat_id = chat_id or str(conn.incr('ids:chat:'))

Get a new chat ID.

   recipients.append(sender)
   recipientsd = dict((r, 0) for r in recipients)

Set up a dictionary of users-toscores to add to the chat ZSET.

      pipeline = conn.pipeline(True)
 
      pipeline.zadd('chat:' + chat_id, **recipientsd)

Create the set with the list of people participating.

      for rec in recipients:
         pipeline.zadd('seen:' + rec, chat_id, 0)

Initialize the seen ZSETs.

      pipeline.execute()

 
      return send_message(conn, chat_id, sender, message)

Send the message.

About the only thing that may be surprising is our use of what’s called a generator expression from within a call to the dict() object constructor. This shortcut lets us quickly construct a dictionary that maps users to an initially 0-valued score, which ZADD can accept in a single call.

GENERATOR EXPRESSIONS AND DICTIONARY CONSTRUCTIONPython dictionaries can be easily constructed by passing a sequence of pairs of values. The first item in the pair becomes the key; the second item becomes the value. Listing 6.24 shows some code that looks odd, where we actually generate the sequence to be passed to the dictionary in-line. This type of sequence generation is known as a generator expression, which you can read more about at http://mng.bz/TTKb.

SENDING MESSAGES

To send a message, we must get a new message ID, and then add the message to the chat’s messages ZSET. Unfortunately, there’s a race condition in sending messages, but it’s easily handled with the use of a lock from section 6.2. Our function for sending a message using a lock is shown next.

Listing 6.25 The send_message() function
def send_message(conn, chat_id, sender, message):
   identifier = acquire_lock(conn, 'chat:' + chat_id)
   if not identifier:
      raise Exception("Couldn't get the lock")
   try:
 
      mid = conn.incr('ids:' + chat_id)
      ts = time.time()
      packed = json.dumps({
         'id': mid,
         'ts': ts,
         'sender': sender,
         'message': message,
      })

Prepare the message.

      conn.zadd('msgs:' + chat_id, packed, mid)

Send the message to the chat.

   finally:
      release_lock(conn, 'chat:' + chat_id, identifier)
   return chat_id
 

 

Most of the work involved in sending a chat message is preparing the information to be sent itself; actually sending the message involves adding it to a ZSET. We use locking around the packed message construction and addition to the ZSET for the same reasons that we needed a lock for our counting semaphore earlier. Generally, when we use a value from Redis in the construction of another value we need to add to Redis, we’ll either need to use a WATCH/MULTI/EXEC transaction or a lock to remove race conditions. We use a lock here for the same performance reasons that we developed it in the first place.

Now that we’ve created the chat and sent the initial message, users need to find out information about the chats they’re a part of and how many messages are pending, and they need to actually receive the messages.

FETCHING MESSAGES

To fetch all pending messages for a user, we need to fetch group IDs and message IDs seen from the user’s ZSET with ZRANGE. When we have the group IDs and the messages that the user has seen, we can perform ZRANGEBYSCORE operations on all of the message ZSETs. After we’ve fetched the messages for the chat, we update the seen ZSET with the proper ID and the user entry in the group ZSET, and we go ahead and clean out any messages from the group chat that have been received by everyone in the chat, as shown in the following listing.

Listing 6.26 The fetch_pending_messages() function
def fetch_pending_messages(conn, recipient):
 
   seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)

Get the last message IDs received.

   pipeline = conn.pipeline(True)
   
 
   for chat_id, seen_id in seen:
      pipeline.zrangebyscore(
         'msgs:' + chat_id, seen_id+1, 'inf')

Fetch all new messages.

   chat_info = zip(seen, pipeline.execute())

Prepare information about the data to be returned.

   for i, ((chat_id, seen_id), messages) in enumerate(chat_info):
      if not messages:
         continue
      messages[:] = map(json.loads, messages)
 
      seen_id = messages[-1]['id']
      conn.zadd('chat:' + chat_id, recipient, seen_id)

Update the “chat” ZSET with the most recently received message.

      min_id = conn.zrange(
         'chat:' + chat_id, 0, 0, withscores=True)

Discover messages that have been seen by all users.

      pipeline.zadd('seen:' + recipient, chat_id, seen_id)

Update the “seen” ZSET.

      if min_id:
 
         pipeline.zremrangebyscore(
            'msgs:' + chat_id, 0, min_id[0][1])

Clean out messages that have been seen by all users.

      chat_info[i] = (chat_id, messages)
   pipeline.execute()

   return chat_info
 

 

Fetching pending messages is primarily a matter of iterating through all of the chats for the user, pulling the messages, and cleaning up messages that have been seen by all users in a chat.

JOINING AND LEAVING THE CHAT

We’ve sent and fetched messages from group chats; all that remains is joining and leaving the group chat. To join a group chat, we fetch the most recent message ID for the chat, and we add the chat information to the user’s seen ZSET with the score being the most recent message ID. We also add the user to the group’s member list, again with the score being the most recent message ID. See the next listing for the code for joining a group.

Listing 6.27 The join_chat() function
def join_chat(conn, chat_id, user):
 
   message_id = int(conn.get('ids:' + chat_id))

Get the most recent message ID for the chat.

   pipeline = conn.pipeline(True)
 
   pipeline.zadd('chat:' + chat_id, user, message_id)

Add the user to the chat member list.

   pipeline.zadd('seen:' + user, chat_id, message_id)

Add the chat to the user’s seen list.

   pipeline.execute()
 

 

Joining a chat only requires adding the proper references to the user to the chat, and the chat to the user’s seen ZSET.

To remove a user from the group chat, we remove the user ID from the chat ZSET, and we remove the chat from the user’s seen ZSET. If there are no more users in the chat ZSET, we delete the messages ZSET and the message ID counter. If there are users remaining, we’ll again take a pass and clean out any old messages that have been seen by all users. The function to leave a chat is shown in the following listing.

Listing 6.28 The leave_chat() function
def leave_chat(conn, chat_id, user):
   pipeline = conn.pipeline(True)
 
   pipeline.zrem('chat:' + chat_id, user)
   pipeline.zrem('seen:' + user, chat_id)

Remove the user from the chat.

   pipeline.zcard('chat:' + chat_id)

Find the number of remaining group members.

   if not pipeline.execute()[-1]:
 
      pipeline.delete('msgs:' + chat_id)
      pipeline.delete('ids:' + chat_id)

Delete the chat.

      pipeline.execute()
   else:
 
      oldest = conn.zrange(
         'chat:' + chat_id, 0, 0, withscores=True)

Find the oldest message seen by all users.

      conn.zremrangebyscore('chat:' + chat_id, 0, oldest)

Delete old messages from the chat.

 

Cleaning up after a user when they leave a chat isn’t that difficult, but requires taking care of a lot of little details to ensure that we don’t end up leaking a ZSET or ID somewhere.

We’ve now finished creating a complete multiple-recipient pull messaging system in Redis. Though we’re looking at it in terms of chat, this same method can be used to replace the PUBLISH/SUBSCRIBE functions when you want your recipients to be able to receive messages that were sent while they were disconnected. With a bit of work, we could replace the ZSET with a LIST, and we could move our lock use from sending a message to old message cleanup. We used a ZSET instead, because it saves us from having to fetch the current message ID for every chat. Also, by making the sender do more work (locking around sending a message), the multiple recipients are saved from having to request more data and to lock during cleanup, which will improve performance overall.

We now have a multiple-recipient messaging system to replace PUBLISH and SUBSCRIBE for group chat. In the next section, we’ll use it as a way of sending information about key names available in Redis.