8.5.3 Filtering streamed messages

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    8.5.3 Filtering streamed messages

    So far we’ve built a server to serve the streamed messages; now it’s time to filter through the messages for streaming. We filter the messages so that a client making a request only sees the messages they’re interested in. Though our social network may not have a lot of traffic, sites like Twitter, Facebook, or even Google+ will see tens to hundreds of thousands of events every second. And for both third parties and ourselves, the cost of bandwidth to send all of that information can be quite high, so only sending messages that match up is important.

    In this section, we’ll write functions and classes that will filter posted messages to be streamed to clients. These filters will plug into the streaming web server we wrote in section 8.5.2. As I mentioned at the beginning of section 8.5, we’ll support random sampling of all messages and access to the full firehose, as well as filtering for specific users, words, and the location of messages.

    As mentioned way back in chapter 3, we’ll use Redis PUBLISH and SUBSCRIBE to implement at least part of the streaming functionality. More specifically, when users post messages, we’ll PUBLISH the posted message information to a channel in Redis. Our filters will SUBSCRIBE to that same channel, receive the message, and yield messages that match the filters back to the web server for sending to the client.


    Before we get ahead of ourselves, let’s first update our message posting function from section 8.1.2 and message deletion function from section 8.4 to start producing messages to filter. We’ll start with posting in the next listing, which shows that we’ve added a line to our function that sends messages out to be filtered.

    Listing 8.13 Updated create_status() from listing 8.2 to support streaming filters
    def create_status(conn, uid, message, **data):
       pipeline = conn.pipeline(True)
       pipeline.hget('user:%s'%uid, 'login')
       login, id = pipeline.execute()
       if not login:
          return None
          'message': message,
          'posted': time.time(),
          'id': id,
          'uid': uid,
          'login': login,
       pipeline.hmset('status:%s'%id, data)
       pipeline.hincrby('user:%s'%uid, 'posts')
       pipeline.publish('streaming:status:', json.dumps(data))

    The added line to send a message to streaming filters

       return id

    All it took was one more line to add streaming support on the posting side. But what about deletion? The update to status message deletion is shown in the following listing.

    Listing 8.14 Updated delete_status() from listing 8.8 to support streaming filters
    def delete_status(conn, uid, status_id):
       key = 'status:%s'%status_id
       lock = acquire_lock_with_timeout(conn, key, 1)
       if not lock:
          return None
       if conn.hget(key, 'uid') != str(uid):  
          return None
       pipeline = conn.pipeline(True)
       status = conn.hgetall(key)

    Fetch the status message so that streaming filters can perform the same filters to determine whether the deletion should be passed to the client.

       status['deleted'] = True

    Mark the status message as deleted.

       pipeline.publish('streaming:status:', json.dumps(status))

    Publish the deleted status message to the stream.

       pipeline.zrem('profile:%s'%uid, status_id)
       pipeline.zrem('home:%s'%uid, status_id)
       pipeline.hincrby('user:%s'%uid, 'posts', -1)
       release_lock(conn, key, lock)
       return True

    At first glance, you’re probably wondering why we’d want to send the entire status message that’s to be deleted to the channel for filtering. Conceptually, we should only need to send message-deleted information to clients that received the status message when it was posted. If we perform the same filtering on deleted messages as we do on newly posted messages, then we can always send message-deleted notifications to those clients that would’ve received the original message. This ensures that we don’t need to keep a record of the status IDs for messages sent to all clients, which simplifies our server and reduces memory use.


    Now that we’re sending information about status messages being posted and deleted to a channel in Redis, we only need to subscribe to that channel to start receiving messages to filter. As was the case in chapter 3, we’ll need to construct a special pubsub object in order to subscribe to a channel. When we’ve subscribed to the channel, we’ll perform our filtering, and produce one of two different messages depending on whether the message was posted or deleted. The code for handling these operations is next.

    Listing 8.15 A function to receive and process streamed messages

    Use our automatic connection decorator from chapter 5.

    def filter_content(conn, id, method, name, args, quit):
       match = create_filters(id, method, name, args)

    Create the filter that will determine whether a message should be sent to the client.

       pubsub = conn.pubsub()

    Prepare the subscription.

       for item in pubsub.listen():

    Receive messages from the subscription.

          message = item['data']
          decoded = json.loads(message)

    Get the status message information from the subscription structure.

          if match(decoded):

    Check if the status message matched the filter.

             if decoded.get('deleted'):
                yield json.dumps({
                   'id': decoded['id'], 'deleted': True})

    For deleted messages, send a special “deleted” placeholder for the message.

                yield message

    For matched status messages that are not deleted, send the message itself.

          if quit[0]:

    If the web server no longer has a connection to the client, stop filtering messages.


    Reset the Redis connection to ensure that the Redis server clears its outgoing buffers if this wasn’t fast enough.

    As I said before, this function needs to subscribe to a channel in Redis in order to receive posted/deleted notifications for status messages. But it also needs to handle cases where the streaming client has disconnected, and it needs to properly clean up the connection if Redis has been trying to send it too much data.

    As we covered in chapter 3, there’s a Redis server setting to determine the maximum outgoing buffer for subscriptions to support. To ensure that our Redis server stays up even under heavy load, we’ll probably want to set client-output-bufferlimit pubsub to lower than the default 32 megabytes per connection. Where to set the limit will depend on how many clients we expect to support and how much other data is in Redis.


    At this point we’ve built every other layer; it now remains to actually write filtering. I know, there was a lot of build-up, but you may be surprised to find out that actually filtering messages isn’t difficult for any of our cases. To create filters, we’ll first define our create_filters() function in listing 8.16, which will delegate off to one of a variety of filtering classes, depending on the filter that we want to build. We’ll assume that< clients are sending reasonable arguments, but if you’re considering using any of this in a production scenario, you’ll want to add validation and verification.

    Listing 8.16 A factory function to dispatch to the actual filter creation
    def create_filters(id, method, name, args):
       if method == 'sample':
          return SampleFilter(id, args)

    For the “sample” method, we don’t need to worry about names, just the arguments.

       elif name == 'track':
          return TrackFilter(args)
       elif name == 'follow':
          return FollowFilter(args)
       elif name == 'location':
          return LocationFilter(args)

    For the “filter” method, we actually worry about which of the filters we want to apply, so return the specific filters for them.

       raise Exception("Unknown filter")

    If no filter matches, then raise an exception.

    Nothing surprising there: we’re distinguishing the different kinds of filters. The first filter we’ll create will be the sample filter, which will actually implement the functionality of the Twitter-style firehose, gardenhose, and spritzer access levels, and anything in between. The implementation of the sampling filter is shown next.

    Listing 8.17 The function to handle firehose, gardenhose, and spritzer
    def SampleFilter(id, args):

    We’re defining a filter class called “SampleFilter”, which is created by passing “id” and “args” parameters.

       percent = int(args.get('percent', ['10'])[0], 10)

    The “args” parameter is actually a dictionary based on the parameters passed as part of the GET request.

       ids = range(100)
       shuffler = random.Random(id)

    We use the “id” parameter to randomly choose a subset of IDs, the count of which is determined by the “percent” argument passed.

       keep = set(ids[:max(percent, 1)])

    We’ll use a Python set to allow us to quickly determine whether a status message matches our criteria.

       def check(status):

    If we create a specially named method called “__call__” on an instance, it will be called if the instance is used like a function.

          return (status['id'] % 100) in keep

    To filter status messages, we fetch the status ID, find its value modulo 100, and return whether it’s in the status IDs that we want to accept.

       return check

    As you can see, we started using classes again, primarily because we need to encapsulate data and behavior together. This first class that defines sampling does one interesting thing—it uses a random number generator seeded with the user-provided identifier to choose the IDs of status messages that it should accept. This allows the sampling filters to receive a deleted notification for a message, even if the client had disconnected (as long as the client reconnected before the delete notification came through). We use Python sets here to quickly determine whether the ID modulo 100 is in the group that we want to accept, as Python sets offer O(1) lookup time, compared to O(n) for a Python list.

    Continuing on, we’ll now build the track filter, which will allow users to track words or phrases in status messages. Similar to our sample filter in listing 8.17, we’ll use a class to encapsulate the data and filtering functionality together. The filter class definition is shown in the following listing.

    Listing 8.18 A filter that matches groups of words that are posted in status messages
    def TrackFilter(list_of_strings):
       groups = []
       for group in list_of_strings:
          group = set(group.lower().split())

    The filter has been provided with a list of word groups, and the filter matches if a message has all of the words in any of the groups.

          if group:

    We’ll only keep groups that have at least 1 word.

       def check(status):
          message_words = set(status['message'].lower().split())

    We’ll split words in the message on whitespace.

          for group in groups:

    Then we’ll iterate over all of the groups.

             if len(group & message_words) == len(group):
                return True

    If all of the words in any of the groups match, we’ll accept the message with this filter.

          return False
       return check

    About the only interesting thing about the tracking filter is to make sure that if someone wants to match a group of words, the filter matches all of the words in the message and not just some of them. We again use Python sets, which, like Redis SETs, offer the ability to calculate intersections.

    Moving on to the follow filter, we’re trying to match status messages that were posted by one of a group of users, or where one of the users is mentioned in the message. The class that implements user matching is shown here.

    Listing 8.19 Messages posted by or mentioning any one of a list of users
    def FollowFilter(names):
       names = set()

    We’ll match login names against posters and messages.

       for name in names:
          names.add('@' + name.lower().lstrip('@'))

    Store all names consistently as ‘@username’.

       def check(status):
          message_words = set(status['message'].lower().split())
          message_words.add('@' + status['login'].lower())

    Construct a set of words from the message and the poster’s name.

          return message_words & names

    Consider the message a match if any of the usernames provided match any of the whitespace-separated words in the message.

       return check

    As before, we continue to use Python sets as a fast way to check whether a name is in the set of names that we’re looking for, or whether any of the names to match are also contained in a status message.

    We finally get to the location filter. This filter is different from the others in that we didn’t explicitly talk about adding location information to our status messages. But because of the way we wrote our create_status() and post_status() functions to take additional optional keyword arguments, we can add additional information without altering our status creation and posting functions. The location filter for this optional data is shown next.

    Listing 8.20 Messages within boxes defined by ranges of latitudes and longitudes
    def LocationFilter(list_of_boxes):
       boxes = []
       for start in xrange(0, len(list_of_boxes)-3, 4):
          boxes.append(map(float, list_of_boxes[start:start+4]))

    We’ll create a set of boxes that define the regions that should return messages.

       def check(self, status):
          location = status.get('location')

    Try to fetch “location” data from a status message.

          if not location:
             return False

    If the message has no location information, then it can’t be inside the boxes.

          lat, lon = map(float, location.split(','))

    Otherwise, extract the latitude and longitude of the location.

          for box in self.boxes:

    To match one of the boxes, we need to iterate over all boxes.

             if (box[1] <= lat <= box[3] and
                box[0] <= lon <= box[2]):
                return True

    If the message status location is within the required latitude and longitude range, then the status message matches the filter.

          return False
       return check

    About the only thing that may surprise you about this particular filter is how we’re preparing the boxes for filtering. We expect that requests will provide location boxes as comma-separated sequences of numbers, where each chunk of four numbers defines latitude and longitude ranges (minimum longitude, minimum latitude, maximum longitude, maximum latitude—the same order as Twitter’s API).

    With all of our filters built, a working web server, and the back-end API for everything else, it’s now up to you to get traffic!