We are now, simply, Redis

Learn More

e-Book - Redis in Action

This book covers the use of Redis, an in-memory database/data structure server

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    6.5.2 Multiple-recipient publish/subscribe replacement

    Single-recipient messaging is useful, but it doesn’t get us far in replacing the PUBLISH
    and SUBSCRIBE commands when we have multiple recipients. To do that, we need to
    turn our problem around. In many ways, Redis PUBLISH/SUBSCRIBE is like group chat
    where whether someone’s connected determines whether they’re in the group chat.
    We want to remove that “need to be connected all the time” requirement, and we’ll
    implement it in the context of chatting.

    Let’s look at Fake Garage Startup’s next problem. After quickly implementing
    their user-to-user messaging system, Fake Garage Startup realized that replacing SMS is good, but they’ve had many requests to add group chat functionality. Like before,
    their clients may connect or disconnect at any time, so we can’t use the built-in PUBLISH/SUBSCRIBE method.

    Figure 6.12Some example chat and user data. The chat ZSETs show users and the maximum IDs
    of messages in that chat that they’ve seen. The seen ZSETs list chat IDs per user, again with the
    maximum message ID in the given chat that they’ve seen.

    Each new group chat will have a set of original recipients of the group messages,
    and users can join or leave the group if they want. Information about what users are in
    the chat will be stored as a ZSET with members being the usernames of the recipients,
    and values being the highest message ID the user has received in the chat. Which
    chats an individual user is a part of will also be stored as a ZSET, with members being
    the groups that the user is a part of, and scores being the highest message ID that the
    user has received in that chat. Information about some users and chats can be seen in
    figure 6.12.

    As you can see, user jason22 has seen five of six chat messages sent in chat:827, in
    which jason22 and jeff24 are participating.


    The content of chat sessions themselves will be stored in ZSETs, with messages as members
    and message IDs as scores. To create and start a chat, we’ll increment a global
    counter to get a new chat ID. We’ll then create a ZSET with all of the users that we want
    to include with seen IDs being 0, and add the group to each user’s group list ZSET.
    Finally, we’ll send the initial message to the users by placing the message in the chat
    ZSET. The code to create a chat is shown here.

    Listing 6.24The create_chat() function
    def create_chat(conn, sender, recipients, message, chat_id=None):
       chat_id = chat_id or str(conn.incr('ids:chat:'))

    Get a new chat ID.

       recipientsd = dict((r, 0) for r in recipients)

    Set up a dictionary of users-toscores to add to the chat ZSET.

          pipeline = conn.pipeline(True)
          pipeline.zadd('chat:' + chat_id, **recipientsd)

    Create the set with the list of people participating.

          for rec in recipients:
             pipeline.zadd('seen:' + rec, chat_id, 0)

    Initialize the seen ZSETs.

          return send_message(conn, chat_id, sender, message)

    Send the message.

    About the only thing that may be surprising is our use of what’s called a generator expression
    from within a call to the dict() object constructor. This shortcut lets us
    quickly construct a dictionary that maps users to an initially 0-valued score, which
    ZADD can accept in a single call.

    can be easily constructed by passing a sequence of pairs of values. The first item
    in the pair becomes the key; the second item becomes the value. Listing 6.24
    shows some code that looks odd, where we actually generate the sequence to be
    passed to the dictionary in-line. This type of sequence generation is known as
    a generator expression, which you can read more about at http://mng.bz/TTKb.


    To send a message, we must get a new message ID, and then add the message to the
    chat’s messages ZSET. Unfortunately, there’s a race condition in sending messages, but
    it’s easily handled with the use of a lock from section 6.2. Our function for sending a
    message using a lock is shown next.

    Listing 6.25The send_message() function
    def send_message(conn, chat_id, sender, message):
       identifier = acquire_lock(conn, 'chat:' + chat_id)
       if not identifier:
          raise Exception("Couldn't get the lock")
          mid = conn.incr('ids:' + chat_id)
          ts = time.time()
          packed = json.dumps({
             'id': mid,
             'ts': ts,
             'sender': sender,
             'message': message,

    Prepare the message.

          conn.zadd('msgs:' + chat_id, packed, mid)

    Send the message to the chat.

          release_lock(conn, 'chat:' + chat_id, identifier)
       return chat_id

    Most of the work involved in sending a chat message is preparing the information to
    be sent itself; actually sending the message involves adding it to a ZSET. We use locking
    around the packed message construction and addition to the ZSET for the same reasons
    that we needed a lock for our counting semaphore earlier. Generally, when we
    use a value from Redis in the construction of another value we need to add to Redis, we’ll either need to use a WATCH/MULTI/EXEC transaction or a lock to remove race conditions.
    We use a lock here for the same performance reasons that we developed it in the first place.

    Now that we’ve created the chat and sent the initial message, users need to find
    out information about the chats they’re a part of and how many messages are pending,
    and they need to actually receive the messages.


    To fetch all pending messages for a user, we need to fetch group IDs and message IDs
    seen from the user’s ZSET with ZRANGE. When we have the group IDs and the messages
    that the user has seen, we can perform ZRANGEBYSCORE operations on all of the message
    ZSETs. After we’ve fetched the messages for the chat, we update the seen ZSET
    with the proper ID and the user entry in the group ZSET, and we go ahead and clean
    out any messages from the group chat that have been received by everyone in the
    chat, as shown in the following listing.

    Listing 6.26The fetch_pending_messages() function
    def fetch_pending_messages(conn, recipient):
       seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)

    Get the last message IDs received.

       pipeline = conn.pipeline(True)
       for chat_id, seen_id in seen:
             'msgs:' + chat_id, seen_id+1, 'inf')

    Fetch all new messages.

       chat_info = zip(seen, pipeline.execute())

    Prepare information about the data to be returned.

       for i, ((chat_id, seen_id), messages) in enumerate(chat_info):
          if not messages:
          messages[:] = map(json.loads, messages)
          seen_id = messages[-1]['id']
          conn.zadd('chat:' + chat_id, recipient, seen_id)

    Update the “chat” ZSET with the most recently received message.

          min_id = conn.zrange(
             'chat:' + chat_id, 0, 0, withscores=True)

    Discover messages that have been seen by all users.

          pipeline.zadd('seen:' + recipient, chat_id, seen_id)

    Update the “seen” ZSET.

          if min_id:
                'msgs:' + chat_id, 0, min_id[0][1])

    Clean out messages that have been seen by all users.

          chat_info[i] = (chat_id, messages)
       return chat_info

    Fetching pending messages is primarily a matter of iterating through all of the chats
    for the user, pulling the messages, and cleaning up messages that have been seen by
    all users in a chat.


    We’ve sent and fetched messages from group chats; all that remains is joining and
    leaving the group chat. To join a group chat, we fetch the most recent message ID for the chat, and we add the chat information to the user’s seen ZSET with the score being
    the most recent message ID. We also add the user to the group’s member list, again
    with the score being the most recent message ID. See the next listing for the code for
    joining a group.

    Listing 6.27The join_chat() function
    def join_chat(conn, chat_id, user):
       message_id = int(conn.get('ids:' + chat_id))

    Get the most recent message ID for the chat.

       pipeline = conn.pipeline(True)
       pipeline.zadd('chat:' + chat_id, user, message_id)

    Add the user to the chat member list.

       pipeline.zadd('seen:' + user, chat_id, message_id)

    Add the chat to the user’s seen list.


    Joining a chat only requires adding the proper references to the user to the chat, and
    the chat to the user’s seen ZSET.

    To remove a user from the group chat, we remove the user ID from the chat ZSET,
    and we remove the chat from the user’s seen ZSET. If there are no more users in the
    chat ZSET, we delete the messages ZSET and the message ID counter. If there are users
    remaining, we’ll again take a pass and clean out any old messages that have been seen
    by all users. The function to leave a chat is shown in the following listing.

    Listing 6.28The leave_chat() function
    def leave_chat(conn, chat_id, user):
       pipeline = conn.pipeline(True)
       pipeline.zrem('chat:' + chat_id, user)
       pipeline.zrem('seen:' + user, chat_id)

    Remove the user from the chat.

       pipeline.zcard('chat:' + chat_id)

    Find the number of remaining group members.

       if not pipeline.execute()[-1]:
          pipeline.delete('msgs:' + chat_id)
          pipeline.delete('ids:' + chat_id)

    Delete the chat.

          oldest = conn.zrange(
             'chat:' + chat_id, 0, 0, withscores=True)

    Find the oldest message seen by all users.

          conn.zremrangebyscore('chat:' + chat_id, 0, oldest)

    Delete old messages from the chat.

    Cleaning up after a user when they leave a chat isn’t that difficult, but requires taking
    care of a lot of little details to ensure that we don’t end up leaking a ZSET or ID somewhere.

    We’ve now finished creating a complete multiple-recipient pull messaging system
    in Redis. Though we’re looking at it in terms of chat, this same method can be used to
    replace the PUBLISH/SUBSCRIBE functions when you want your recipients to be able to
    receive messages that were sent while they were disconnected. With a bit of work, we
    could replace the ZSET with a LIST, and we could move our lock use from sending a
    message to old message cleanup. We used a ZSET instead, because it saves us from having
    to fetch the current message ID for every chat. Also, by making the sender do more work (locking around sending a message), the multiple recipients are saved
    from having to request more data and to lock during cleanup, which will improve performance

    We now have a multiple-recipient messaging system to replace PUBLISH and SUBSCRIBE
    for group chat. In the next section, we’ll use it as a way of sending information
    about key names available in Redis.