6.6.1 Aggregating users by location

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    6.6.1 Aggregating users by location

    Let’s take a moment and look back at an earlier problem that we solved for Fake Game Company. With the ability to discover where users are accessing the game from thanks to our IP-to-city lookup in chapter 5, Fake Game Company has found itself needing to reparse many gigabytes of log files. They’re looking to aggregate user visitation patterns over time in a few different dimensions: per country, per region, per city, and more. Because we need this to be run in real time over new data, we’ve already implemented callbacks to perform the aggregate operations.

    As you may remember from chapter 5, Fake Game Company has been around for about 2 years. They have roughly 1,000,000 users per day, but they have roughly 10 events per user per day. That gives us right around 7.3 billion log lines to process. If we were to use one of the earlier methods, we’d copy the log files to various machines that need to process the data, and then go about processing the log files. This works, but then we need to copy the data, potentially delaying processing, and using storage space on every machine that processes the data, which later needs to be cleaned up.

    In this particular case, instead of copying files around, we could write a one-time map-reduce3 process to handle all of this data. But because map-reduces are designed to not share memory between items to be processed (each item is usually one log line), we can end up taking more time with map-reduce than if we spent some time writing it by hand to share memory. More specifically, if we load our IP-to-city lookup table into memory in Python (which we’d only want to do if we had a lot of processing to do, and we do), we can perform about 200k IP-to-city ID lookups per second, which is faster than we could expect a single Redis instance to respond to the same queries. Also, to scale with map-reduce, we’d have to run at least a few instances of Redis to keep up with the map-reduces.

    With the three standard methods of handling this already discounted (NFS/Samba, copying files, map-reduce), let’s look at some other practical pieces that we’ll need to solve to actually perform all of our lookups.

    AGGREGATING DATA LOCALLY

    In order to process that many log entries efficiently, we’ll need to locally cache aggregates before updating Redis in order to minimize round trips. Why? If we have roughly 10 million log lines to process for each day, then that’s roughly 10 million writes to Redis. If we perform the aggregates locally on a per-country basis for the entire day (being that there are around 300 countries), we can instead write 300 values to Redis. This will significantly reduce the number of round trips between Redis, reducing the number of commands processed, which in turn will reduce the total processing time.

    If we don’t do anything intelligent about local caching, and we have 10 aggregates that we want to calculate, we’re looking at around 10 days to process all of the data. But anything on the country or region level can be aggregated completely (for the day) before being sent to Redis. And generally because the top 10% of cities (there are roughly 350,000 cities in our sample dataset) amount for more than 90% of our game’s users, we can also locally cache any city-level aggregates. So by performing local caching of aggregates, we’re not limited by Redis on aggregate throughput.

    Assuming that we’ve already cached a copy of our ZSET and HASH table for IP lookups from section 5.3, we only need to worry about aggregating the data. Let’s start with the log lines that contain an IP address, date, time, and the operation that was performed, similar to the following snippet:

    173.194.38.137 2011-10-10 13:55:36 achievement-762

    Given log lines of that form, let’s aggregate those lines on a daily basis per country. To do this, we’ll receive the line as part of a call and increment the appropriate counter. If the log line is empty, then we’ll say that the day’s worth of data is done, and we should write to Redis. The source code for performing this aggregate is shown next.

    Listing 6.29 A locally aggregating callback for a daily country-level aggregate
    aggregates = defaultdict(lambda: defaultdict(int))
    
    

    Prepare the local aggregate dictionary.

    def daily_country_aggregate(conn, line):
       if line:
          line = line.split()
    
     
          ip = line[0]
          day = line[1]
    

    Extract the information from our log lines.

          country = find_city_by_ip_local(ip)[2]
    

    Find the country from the IP address.

          aggregates[day][country] += 1
    

    Increment our local aggregate.

          return
    
    
     
       for day, aggregate in aggregates.items():
          conn.zadd('daily:country:' + day, **aggregate)
          del aggregates[day]
    

    The day file is done; write our aggregate to Redis.

     

    Now that we’ve written and seen one of these aggregate functions, the rest are fairly similar and just as easy to write. Let’s move on to more interesting topics, like how we’re going to send files through Redis.

    3 MapReduce (or Map/Reduce) is a type of distributed computation popularized by Google, which can offer high performance and simplicity for some problems.