Break the data matrix. Explore what Redis has to offer.

Learn More

e-Book - Redis in Action

This book covers the use of Redis, an in-memory database/data structure server

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    8.5.3 Filtering streamed messages

    So far we’ve built a server to serve the streamed messages; now it’s time to filter through
    the messages for streaming. We filter the messages so that a client making a request
    only sees the messages they’re interested in. Though our social network may not have
    a lot of traffic, sites like Twitter, Facebook, or even Google+ will see tens to hundreds of
    thousands of events every second. And for both third parties and ourselves, the cost of bandwidth to send all of that information can be quite high, so only sending messages
    that match up is important.

    In this section, we’ll write functions and classes that will filter posted messages to
    be streamed to clients. These filters will plug into the streaming web server we wrote
    in section 8.5.2. As I mentioned at the beginning of section 8.5, we’ll support random
    sampling of all messages and access to the full firehose, as well as filtering for specific
    users, words, and the location of messages.

    As mentioned way back in chapter 3, we’ll use Redis PUBLISH and SUBSCRIBE to implement at least part of the streaming functionality. More specifically, when users post messages, we’ll PUBLISH the posted message information to a channel in Redis.
    Our filters will SUBSCRIBE to that same channel, receive the message, and yield messages that match the filters back to the web server for sending to the client.


    Before we get ahead of ourselves, let’s first update our message posting function from
    section 8.1.2 and message deletion function from section 8.4 to start producing messages
    to filter. We’ll start with posting in the next listing, which shows that we’ve added
    a line to our function that sends messages out to be filtered.

    Listing 8.13Updated create_status() from listing 8.2 to support streaming filters
    def create_status(conn, uid, message, **data):
       pipeline = conn.pipeline(True)
       pipeline.hget('user:%s'%uid, 'login')
       login, id = pipeline.execute()
       if not login:
          return None
          'message': message,
          'posted': time.time(),
          'id': id,
          'uid': uid,
          'login': login,
       pipeline.hmset('status:%s'%id, data)
       pipeline.hincrby('user:%s'%uid, 'posts')
       pipeline.publish('streaming:status:', json.dumps(data))

    The added line to send a message to streaming filters

       return id

    All it took was one more line to add streaming support on the posting side. But what about
    deletion? The update to status message deletion is shown in the following listing.

    Listing 8.14Updated delete_status() from listing 8.8 to support streaming filters
    def delete_status(conn, uid, status_id):
       key = 'status:%s'%status_id
       lock = acquire_lock_with_timeout(conn, key, 1)
       if not lock:
          return None
       if conn.hget(key, 'uid') != str(uid):  
          return None
       pipeline = conn.pipeline(True)
       status = conn.hgetall(key)

    Fetch the status message so that streaming filters can perform the same filters to determine whether the deletion should be passed to the client.

       status['deleted'] = True

    Mark the status message as deleted.

       pipeline.publish('streaming:status:', json.dumps(status))

    Publish the deleted status message to the stream.

       pipeline.zrem('profile:%s'%uid, status_id)
       pipeline.zrem('home:%s'%uid, status_id)
       pipeline.hincrby('user:%s'%uid, 'posts', -1)
       release_lock(conn, key, lock)
       return True

    At first glance, you’re probably wondering why we’d want to send the entire status
    message that’s to be deleted to the channel for filtering. Conceptually, we should only
    need to send message-deleted information to clients that received the status message
    when it was posted. If we perform the same filtering on deleted messages as we do on
    newly posted messages, then we can always send message-deleted notifications to
    those clients that would’ve received the original message. This ensures that we don’t
    need to keep a record of the status IDs for messages sent to all clients, which simplifies
    our server and reduces memory use.


    Now that we’re sending information about status messages being posted and deleted to
    a channel in Redis, we only need to subscribe to that channel to start receiving messages
    to filter. As was the case in chapter 3, we’ll need to construct a special pubsub object in
    order to subscribe to a channel. When we’ve subscribed to the channel, we’ll perform
    our filtering, and produce one of two different messages depending on whether the
    message was posted or deleted. The code for handling these operations is next.

    Listing 8.15A function to receive and process streamed messages

    Use our automatic connection decorator from chapter 5.

    def filter_content(conn, id, method, name, args, quit):
       match = create_filters(id, method, name, args)

    Create the filter that will determine whether a message should be sent to the client.

       pubsub = conn.pubsub()

    Prepare the subscription.

       for item in pubsub.listen():

    Receive messages from the subscription.

          message = item['data']
          decoded = json.loads(message)

    Get the status message information from the subscription structure.

          if match(decoded):

    Check if the status message matched the filter.

             if decoded.get('deleted'):
                yield json.dumps({
                   'id': decoded['id'], 'deleted': True})

    For deleted messages, send a special “deleted” placeholder for the message.

                yield message

    For matched status messages that are not deleted, send the message itself.

          if quit[0]:

    If the web server no longer has a connection to the client, stop filtering messages.


    Reset the Redis connection to ensure that the Redis server clears its outgoing buffers if this wasn’t fast enough.

    As I said before, this function needs to subscribe to a channel in Redis in order to
    receive posted/deleted notifications for status messages. But it also needs to handle
    cases where the streaming client has disconnected, and it needs to properly clean up
    the connection if Redis has been trying to send it too much data.

    As we covered in chapter 3, there’s a Redis server setting to determine the maximum
    outgoing buffer for subscriptions to support. To ensure that our Redis server
    stays up even under heavy load, we’ll probably want to set client-output-bufferlimit
    pubsub to lower than the default 32 megabytes per connection. Where to set
    the limit will depend on how many clients we expect to support and how much other
    data is in Redis.


    At this point we’ve built every other layer; it now remains to actually write filtering. I
    know, there was a lot of build-up, but you may be surprised to find out that actually filtering
    messages isn’t difficult for any of our cases. To create filters, we’ll first define
    our create_filters() function in listing 8.16, which will delegate off to one of a variety
    of filtering classes, depending on the filter that we want to build. We’ll assume that
    clients are sending reasonable arguments, but if you’re considering using any of this
    in a production scenario, you’ll want to add validation and verification.

    Listing 8.16A factory function to dispatch to the actual filter creation
    def create_filters(id, method, name, args):
       if method == 'sample':
          return SampleFilter(id, args)

    For the “sample” method, we don’t need to worry about names, just the arguments.

       elif name == 'track':
          return TrackFilter(args)
       elif name == 'follow':
          return FollowFilter(args)
       elif name == 'location':
          return LocationFilter(args)

    For the “filter” method, we actually worry about which of the filters we want to apply, so return the specific filters for them.

       raise Exception("Unknown filter")

    If no filter matches, then raise an exception.

    Nothing surprising there: we’re distinguishing the different kinds of filters. The first
    filter we’ll create will be the sample filter, which will actually implement the functionality
    of the Twitter-style firehose, gardenhose, and spritzer access levels, and anything
    in between. The implementation of the sampling filter is shown next.

    Listing 8.17The function to handle firehose, gardenhose, and spritzer
    def SampleFilter(id, args):

    We’re defining a filter class called “SampleFilter”, which is created by passing “id” and “args” parameters.

       percent = int(args.get('percent', ['10'])[0], 10)

    The “args” parameter is actually a dictionary based on the parameters passed as part of the GET request.

       ids = range(100)
       shuffler = random.Random(id)

    We use the “id” parameter to randomly choose a subset of IDs, the count of which is determined by the “percent” argument passed.

       keep = set(ids[:max(percent, 1)])

    We’ll use a Python set to allow us to quickly determine whether a status message matches our criteria.

       def check(status):

    If we create a specially named method called “__call__” on an instance, it will be called if the instance is used like a function.

          return (status['id'] % 100) in keep

    To filter status messages, we fetch the status ID, find its value modulo 100, and return whether it’s in the status IDs that we want to accept.

       return check

    As you can see, we started using classes again, primarily because we need to encapsulate
    data and behavior together. This first class that defines sampling does one interesting
    thing—it uses a random number generator seeded with the user-provided
    identifier to choose the IDs of status messages that it should accept. This allows the
    sampling filters to receive a deleted notification for a message, even if the client had
    disconnected (as long as the client reconnected before the delete notification came
    through). We use Python sets here to quickly determine whether the ID modulo 100 is
    in the group that we want to accept, as Python sets offer O(1) lookup time, compared
    to O(n) for a Python list.

    Continuing on, we’ll now build the track filter, which will allow users to track
    words or phrases in status messages. Similar to our sample filter in listing 8.17, we’ll
    use a class to encapsulate the data and filtering functionality together. The filter class
    definition is shown in the following listing.

    Listing 8.18A filter that matches groups of words that are posted in status messages
    def TrackFilter(list_of_strings):
       groups = []
       for group in list_of_strings:
          group = set(group.lower().split())

    The filter has been provided with a list of word groups, and the filter matches if a message has all of the words in any of the groups.

          if group:

    We’ll only keep groups that have at least 1 word.

       def check(status):
          message_words = set(status['message'].lower().split())

    We’ll split words in the message on whitespace.

          for group in groups:

    Then we’ll iterate over all of the groups.

             if len(group & message_words) == len(group):
                return True

    If all of the words in any of the groups match, we’ll accept the message with this filter.

          return False
       return check

    About the only interesting thing about the tracking filter is to make sure that if someone
    wants to match a group of words, the filter matches all of the words in the message
    and not just some of them. We again use Python sets, which, like Redis SETs, offer
    the ability to calculate intersections.

    Moving on to the follow filter, we’re trying to match status messages that were
    posted by one of a group of users, or where one of the users is mentioned in the message.
    The class that implements user matching is shown here.

    Listing 8.19Messages posted by or mentioning any one of a list of users
    def FollowFilter(names):
       names = set()

    We’ll match login names against posters and messages.

       for name in names:
          names.add('@' + name.lower().lstrip('@'))

    Store all names consistently as ‘@username’.

       def check(status):
          message_words = set(status['message'].lower().split())
          message_words.add('@' + status['login'].lower())

    Construct a set of words from the message and the poster’s name.

          return message_words & names

    Consider the message a match if any of the usernames provided match any of the whitespace-separated words in the message.

       return check

    As before, we continue to use Python sets as a fast way to check whether a name is in
    the set of names that we’re looking for, or whether any of the names to match are also
    contained in a status message.

    We finally get to the location filter. This filter is different from the others in that we
    didn’t explicitly talk about adding location information to our status messages. But
    because of the way we wrote our create_status() and post_status() functions to
    take additional optional keyword arguments, we can add additional information without
    altering our status creation and posting functions. The location filter for this
    optional data is shown next.

    Listing 8.20Messages within boxes defined by ranges of latitudes and longitudes
    def LocationFilter(list_of_boxes):
       boxes = []
       for start in xrange(0, len(list_of_boxes)-3, 4):
          boxes.append(map(float, list_of_boxes[start:start+4]))

    We’ll create a set of boxes that define the regions that should return messages.

       def check(self, status):
          location = status.get('location')

    Try to fetch “location” data from a status message.

          if not location:
             return False

    If the message has no location information, then it can’t be inside the boxes.

          lat, lon = map(float, location.split(','))

    Otherwise, extract the latitude and longitude of the location.

          for box in self.boxes:

    To match one of the boxes, we need to iterate over all boxes.

             if (box[1] 

    If the message status location is within the required latitude and longitude range, then the status message matches the filter.

          return False
       return check

    About the only thing that may surprise you about this particular filter is how we’re preparing
    the boxes for filtering. We expect that requests will provide location boxes as
    comma-separated sequences of numbers, where each chunk of four numbers defines
    latitude and longitude ranges (minimum longitude, minimum latitude, maximum
    longitude, maximum latitude—the same order as Twitter’s API).

    With all of our filters built, a working web server, and the back-end API for everything
    else, it’s now up to you to get traffic!