10.3.3 Scaling a social network

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    10.3.3 Scaling a social network

    As we built our social network in chapter 8, I pointed out that it wasn’t designed to
    scale to the size of a social network like Twitter, but that it was primarily meant to help
    you understand what structures and methods it takes to build a social network. In this
    section, I’ll describe a few methods that can let us scale a social networking site with
    sharding, almost without bounds (mostly limited by our budget, which is always the
    case with large projects).

    One of the first steps necessary to helping a social network scale is figuring out
    what data is read often, what data is written often, and whether it’s possible to separate
    often-used data from rarely used data. To start, say that we’ve already pulled out our
    posted message data into a separate Redis server, which has read slaves to handle the
    moderately high volume of reads that occurs on that data. That really leaves two major
    types of data left to scale: timelines and follower/following lists.

    SCALING POSTED MESSAGE DATABASE SIZEIf you actually built this system out,
    and you had any sort of success, at some point you’d need to further scale the
    posted message database beyond just read slaves. Because each message is
    completely contained within a single HASH, these can be easily sharded onto a
    cluster of Redis servers based on the key where the HASH is stored. Because
    this data is easily sharded, and because we’ve already worked through how to
    fetch data from multiple shards as part of our search scaling in section 10.3.2,
    you shouldn’t have any difficulty here. Alternatively, you can also use Redis as
    a cache, storing recently posted messages in Redis, and older (rarely read)
    messages in a primarily on-disk storage server (like PostgreSQL, MySQL, Riak,
    MongoDB, and so on). If you’re finding yourself challenged, please feel free
    to post on the message board or on the Redis mailing list. As you may remember, we had three primary types of timelines: home timelines, profile
    timelines, and list timelines. Timelines themselves are all similar, though both list
    timelines and home timelines are limited to 1,000 items. Similarly, followers, following,
    list followers, and list following are also essentially the same, so we’ll also handle
    them the same. First, let’s look at how we can scale timelines with sharding.


    When we say that we’re sharding timelines, it’s a bit of a bait-and-switch. Because
    home and list timelines are short (1,000 entries max, which we may want to use to
    inform how large to set zset-max-ziplist-size),1 there’s really no need to shard the
    contents of the ZSETs; we merely need to place those timelines on different shards
    based on their key names.

    On the other hand, the size that profile timelines can grow to is currently unlimited.
    Though the vast majority of users will probably only be posting a few times a day
    at most, there can be situations where someone is posting significantly more often. As
    an example of this, the top 1,000 posters on Twitter have all posted more than 150,000
    status messages, with the top 15 all posting more than a million messages.

    On a practical level, it wouldn’t be unreasonable to cap the number of messages
    that are kept in the timeline for an individual user to 20,000 or so (the oldest being
    hidden or deleted), which would handle 99.999% of Twitter users generally. We’ll
    assume that this is our plan for scaling profile timelines. If not, we can use the technique
    we cover for scaling follower/following lists later in this section for scaling profile
    timelines instead.

    In order to shard our timelines based on key name, we could write a set of functions
    that handle sharded versions of ZADD, ZREM, and ZRANGE, along with others, all of which
    would be short three-line functions that would quickly get boring. Instead, let’s write a
    class that uses Python dictionary lookups to automatically create connections to shards.

    First, let’s start with what we want our API to look like by updating our
    follow_user() function from chapter 8. We’ll create a generic sharded connection
    object that will let us create a connection to a given shard, based on a key that we want
    to access in that shard. After we have that connection, we can call all of the standard
    Redis methods to do whatever we want on that shard. We can see what we want our API
    to look like, and how we need to update our function, in the next listing.

    Listing 10.10An example of how we want our API for accessing shards to work
    sharded_timelines = KeyShardedConnection('timelines', 8)

    Create a connection that knows about the sharding information for a given component with a number of shards.

    def follow_user(conn, uid, other_uid):
       fkey1 = 'following:%s'%uid
       fkey2 = 'followers:%s'%other_uid
       if conn.zscore(fkey1, other_uid):
          print "already followed", uid, other_uid
          return None
       now = time.time()
       pipeline = conn.pipeline(True)
       pipeline.zadd(fkey1, other_uid, now)
       pipeline.zadd(fkey2, uid, now)
       following, followers = pipeline.execute()[-2:]
       pipeline.hset('user:%s'%uid, 'following', following)
       pipeline.hset('user:%s'%other_uid, 'followers', followers)
       pkey = 'profile:%s'%other_uid
       status_and_score = sharded_timelines[pkey].zrevrange(
          pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)

    Fetch the recent status messages from the profile timeline of the nowfollowed user.

       if status_and_score:
          hkey = 'home:%s'%uid
          pipe = sharded_timelines[hkey].pipeline(True)

    Get a connection based on the shard key provided, and fetch a pipeline from that.

          pipe.zadd(hkey, **dict(status_and_score))
          pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)

    Add the statuses to the home timeline ZSET on the shard, and then trim it.


    Execute the transaction.

       return True

    Now that we have an idea of what we want our API to look like, let’s build it. We first need
    an object that takes the component and number of shards. When a key is referenced via
    dictionary lookup on the object, we need to return a connection to the shard that the
    provided key should be stored on. The class that implements this follows.

    Listing 10.11A class that implements sharded connection resolution based on key
    class KeyShardedConnection(object):
       def __init__(self, component, shards):
          self.component = component
          self.shards = shards

    The object is initialized with the component name and number of shards.

       def __getitem__(self, key):

    When an item is fetched from the object, this method is called with the item that was requested.

          return get_sharded_connection(
             self.component, key, self.shards)

    Use the passed key along with the previously known component and shards to fetch the sharded connection.

    For simple key-based sharding, this is all that’s necessary to support almost every call
    that we’d perform in Redis. All that remains is to update the remainder of
    unfollow_user(), refill_timeline(), and the rest of the functions that access home
    timelines and list timelines. If you intend to scale this social network, go ahead and
    update those functions yourself. For those of us who aren’t scaling the social network,
    we’ll continue on.

    Exercise: Syndicating posts to home and list timelines

    With the update to where data is stored for both home and list timelines, can you
    update your list timeline supporting syndication task from chapter 8 to support
    sharded profiles? Can you keep it almost as fast as the original version? Hint: If
    you’re stuck, we include a fully updated version that supports sharded follower lists
    in listing 10.15.

    Up next is scaling follower and following lists.


    Though our scaling of timelines is pretty straightforward, scaling followers, following,
    and the equivalent “list” ZSETs is more difficult. The vast majority of these ZSETs will
    be short (99.99% of users on Twitter have fewer than 1,000 followers), but there may
    be a few users who are following a large number of users, or who have a large number
    of followers. As a practical matter, it wouldn’t be unreasonable to limit the number of
    users that a given user or list can follow to be somewhat small (perhaps up to 1,000, to
    match the limits on home and list timelines), forcing them to create lists if they really
    want to follow more people. But we still run into issues when the number of followers
    of a given user grows substantially.

    To handle the situation where follower/following lists can grow to be very large,
    we’ll shard these ZSETs across multiple shards. To be more specific, a user’s followers
    will be broken up into as many pieces as we have shards. For reasons we’ll get
    into in a moment, we only need to implement specific sharded versions of ZADD,

    I know what you’re thinking: since we just built a method to handle sharding automatically,
    we could use that. We will (to some extent), but because we’re sharding data
    and not just keys, we can’t just use our earlier class directly. Also, in order to reduce
    the number of connections we need to create and call, it makes a lot of sense to have
    data for both sides of the follower/following link on the same shard, so we can’t just
    shard by data like we did in chapter 9 and in section 10.2.

    In order to shard our follower/following data such that both sides of the follower/
    following relationship are on the same shard, we’ll use both IDs as part of the key to look
    up a shard. Like we did for sharding timelines, let’s update follow_user() to show the
    API that we’d like to use, and then we’ll create the class that’s necessary to implement
    the functionality. The updated follow_user() with our desired API is next.

    Listing 10.12Access follower/following ZSET shards
    sharded_timelines = KeyShardedConnection('timelines', 8)
    sharded_followers = KeyDataShardedConnection('followers', 16)

    Create a connection that knows about the sharding information for a given component with a number of shards.

    def follow_user(conn, uid, other_uid):
       fkey1 = 'following:%s'%uid
       fkey2 = 'followers:%s'%other_uid
       sconn = sharded_followers[uid, other_uid]

    Fetch the connection object for the uid, other_uid pair.

       if sconn.zscore(fkey1, other_uid):

    Check to see if other_uid pair. the other_uid is already followed

          return None
       now = time.time()
       spipe = sconn.pipeline(True)
       spipe.zadd(fkey1, other_uid, now)
       spipe.zadd(fkey2, uid, now)

    Add the follower/ following information to the ZSETs.

       following, followers = spipe.execute()
       pipeline = conn.pipeline(True)
       pipeline.hincrby('user:%s'%uid, 'following', int(following))
       pipeline.hincrby('user:%s'%other_uid, 'followers', int(followers))

    Update the follower and following information for both users.

       pkey = 'profile:%s'%other_uid
       status_and_score = sharded_timelines[pkey].zrevrange(
          pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)
       if status_and_score:
          hkey = 'home:%s'%uid
          pipe = sharded_timelines[hkey].pipeline(True)
          pipe.zadd(hkey, **dict(status_and_score))
          pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)
       return True

    Aside from a bit of rearrangement and code updating, the only difference between
    this change and the change we made earlier for sharding timelines is that instead of
    passing a specific key to look up the shard, we pass a pair of IDs. From these two IDs,
    we’ll calculate the proper shard that data involving both IDs should be stored on. The
    class that implements this API appears next.

    Listing 10.13Sharded connection resolution based on ID pairs
    class KeyDataShardedConnection(object):
       def __init__(self, component, shards):
          self.component = component
          self.shards = shards

    The object is initialized with the component name and number of shards.

       def __getitem__(self, ids):

    When the pair of IDs is passed as part of the dictionary lookup, this method is called.

          id1, id2 = map(int, ids)

    Unpack the pair of IDs, and ensure that they are integers.

          if id2 < id1:
             id1, id2 = id2, id1

    If the second is less than the first, swap them so that the first ID is less than or equal to the second.

          key = "%s:%s"%(id1, id2)

    Construct a key based on the two IDs.

          return get_sharded_connection(
             self.component, key, self.shards)

    Use the computed key along with the previously known component and shards to fetch the sharded connection.

    The only thing different for this sharded connection generator, compared to listing
    10.11, is that this sharded connection generator takes a pair of IDs instead of a
    key. From those two IDs, we generate a key where the lower of the two IDs is first,
    and the higher is second. By constructing the key in this way, we ensure that whenever we reference the same two IDs, regardless of initial order, we always end
    up on the same shard.

    With this sharded connection generator, we can update almost all of the remaining
    follower/following ZSET operations. The one remaining operation that’s left is to
    properly handle ZRANGEBYSCORE, which we use in a few places to fetch a “page” of followers.
    Usually this is done to syndicate messages out to home and list timelines when
    an update is posted. When syndicating to timelines, we could scan through all of one
    shard’s ZSET, and then move to the next. But with a little extra work, we could instead
    pass through all ZSETs simultaneously, which would give us a useful sharded ZRANGEBYSCORE
    operation that can be used in other situations.

    As we saw in section 10.3.2, in order to fetch items 100–109 from sharded ZSETs,
    we needed to fetch items 0–109 from all ZSETs and merge them together. This is
    because we only knew the index that we wanted to start at. Because we have the opportunity
    to scan based on score instead, when we want to fetch the next 10 items with
    scores greater than X, we only need to fetch the next 10 items with scores greater than
    X from all shards, followed by a merge. A function that implements ZRANGEBYSCORE
    across multiple shards is shown in the following listing.

    Listing 10.14A function that implements a sharded ZRANGEBYSCORE
    def sharded_zrangebyscore(component, shards, key, min, max, num):

    We need to take arguments for the component and number of shards, and we’ll limit the arguments to be passed on to only those that’ll ensure correct behavior in sharded situations.

       data = []
       for shard in xrange(shards):
          conn = get_redis_connection("%s:%s"%(component, shard))

    Fetch the sharded connection for the current shard.

             key, min, max, start=0, num=num, withscores=True))

    Get the data from Redis for this shard.

       def key(pair):
          return pair[1], pair[0]

    Sort the data based on score, and then by member.

       return data[:num]

    Return only the number of items requested.

    This function works a lot like the query/merge that we did in section 10.3.2, only we
    can start in the middle of the ZSET because we have scores (and not indexes).

    timestamps for follower/following lists, which avoided some of the drawbacks
    to paginate over sharded ZSETs that we covered in section 10.3.2. If you’d
    planned on using this method for sharding profile timelines, you’ll need to
    go back and update your code to use timestamps instead of offsets, and you’ll
    need to implement a ZREVRANGEBYSCORE version of listing 10.14, which
    should be straightforward. With this new sharded ZRANGEBYSCORE function, let’s update our function that syndicates
    posts to home and list timelines in the next listing. While we’re at it, we may as
    well add support for sharded home timelines.

    Listing 10.15Updated syndicate status function
    def syndicate_status(uid, post, start=0, on_lists=False):
       root = 'followers'
       key = 'followers:%s'%uid
       base = 'home:%s'
       if on_lists:
          root = 'list:out'
          key = 'list:out:%s'%uid
          base = 'list:statuses:%s'


       followers = sharded_zrangebyscore(root,
          sharded_followers.shards, key, start, 'inf', POSTS_PER_PASS)

    Fetch the next group of followers using the sharded ZRANGEBYSCORE call.

       to_send = defaultdict(list)

    Prepare a structure that will group profile information on a per-shard basis.

       for follower, start in followers:
          timeline = base % follower

    Calculate the key for the timeline.

          shard = shard_key('timelines',
             timeline, sharded_timelines.shards, 2)

    Find the shard where this timeline would go.


    Add the timeline key to the rest of the timelines on the same shard.

       for timelines in to_send.itervalues():
          pipe = sharded_timelines[timelines[0]].pipeline(False)

    Get a connection to the server for the group of timelines, and create a pipeline.

          for timeline in timelines:
             pipe.zadd(timeline, **post)
                timeline, 0, -HOME_TIMELINE_SIZE-1)

    Add the post to the timeline, and remove any posts that are too old.

       conn = redis.Redis()
       if len(followers) >= POSTS_PER_PASS:
          execute_later(conn, 'default', 'syndicate_status',
             [uid, post, start, on_lists])
       elif not on_lists:
          execute_later(conn, 'default', 'syndicate_status',
             [uid, post, 0, True])

    As you can see from the code, we use the sharded ZRANGEBYSCORE function to fetch
    those users who are interested in this user’s posts. Also, in order to keep the syndication
    process fast, we group requests that are to be sent to each home or list timeline
    shard server together. Later, after we’ve grouped all of the writes together, we add the
    post to all of the timelines on a given shard server with a pipeline. Though this may be
    slower than the nonsharded version, this does allow us to scale our social network
    much larger than before.

    All that remains is to finish updating the rest of the functions to support all of the
    sharding that we’ve done in the rest of section 10.3.3. Again, if you’re going to scale
    this social network, feel free to do so. But if you have some nonsharded code that you want to shard, you can compare the earlier version of syndicate_status() from section
    8.4 with our new version to get an idea of how to update your code.

    1 Because of the way we add items to home and list timelines, they can actually grow to roughly 2,000 entries
    for a short time. And because Redis doesn’t turn structures back into ziplist-encoded versions of themselves
    when they’ve gotten too large, setting zset-max-ziplist-size to be a little over 2,000 entries can keep
    these two timelines encoded efficiently.