6.6.1 Aggregating users by location

  • Redis in Action – Home
  • Foreword
  • Preface
  • Part 1: Getting Started
  • Part 2: Core concepts
  • 1.3.1 Voting on articles
  • 1.3.2 Posting and fetching articles
  • 1.3.3 Grouping articles
  • 4.2.1 Configuring Redis for replication
  • 4.2.2 Redis replication startup process
  • 4.2.3 Master/slave chains
  • 4.2.4 Verifying disk writes
  • 5.1 Logging to Redis
  • 5.2 Counters and statistics
  • 5.3 IP-to-city and -country lookup
  • 5.4 Service discovery and configuration
  • 5.1.1 Recent logs
  • 5.1.2 Common logs
  • 5.2.2 Storing statistics in Redis
  • 5.3.1 Loading the location tables
  • 5.3.2 Looking up cities
  • 5.4.1 Using Redis to store configuration information
  • 5.4.2 One Redis server per application component
  • 5.4.3 Automatic Redis connection management
  • 8.1.1 User information
  • 8.1.2 Status messages
  • 9.1.1 The ziplist representation
  • 9.1.2 The intset encoding for SETs
  • Chapter 10: Scaling Redis
  • Chapter 11: Scripting Redis with Lua
  • 10.1 Scaling reads
  • 10.2 Scaling writes and memory capacity
  • 10.3 Scaling complex queries
  • 10.2.2 Creating a server-sharded connection decorator
  • 10.3.1 Scaling search query volume
  • 10.3.2 Scaling search index size
  • 10.3.3 Scaling a social network
  • 11.1.1 Loading Lua scripts into Redis
  • 11.1.2 Creating a new status message
  • 11.2 Rewriting locks and semaphores with Lua
  • 11.3 Doing away with WATCH/MULTI/EXEC
  • 11.4 Sharding LISTs with Lua
  • 11.5 Summary
  • 11.2.1 Why locks in Lua?
  • 11.2.2 Rewriting our lock
  • 11.2.3 Counting semaphores in Lua
  • 11.4.1 Structuring a sharded LIST
  • 11.4.2 Pushing items onto the sharded LIST
  • 11.4.4 Performing blocking pops from the sharded LIST
  • A.1 Installation on Debian or Ubuntu Linux
  • A.2 Installing on OS X
  • B.1 Forums for help
  • B.4 Data visualization and recording
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Part 1: Getting Started
  • Part 2: Core concepts
  • 1.3.1 Voting on articles
  • 1.3.2 Posting and fetching articles
  • 1.3.3 Grouping articles
  • 4.2.1 Configuring Redis for replication
  • 4.2.2 Redis replication startup process
  • 4.2.3 Master/slave chains
  • 4.2.4 Verifying disk writes
  • 5.1 Logging to Redis
  • 5.2 Counters and statistics
  • 5.3 IP-to-city and -country lookup
  • 5.4 Service discovery and configuration
  • 5.1.1 Recent logs
  • 5.1.2 Common logs
  • 5.2.2 Storing statistics in Redis
  • 5.3.1 Loading the location tables
  • 5.3.2 Looking up cities
  • 5.4.1 Using Redis to store configuration information
  • 5.4.2 One Redis server per application component
  • 5.4.3 Automatic Redis connection management
  • 8.1.1 User information
  • 8.1.2 Status messages
  • 9.1.1 The ziplist representation
  • 9.1.2 The intset encoding for SETs
  • Chapter 10: Scaling Redis
  • Chapter 11: Scripting Redis with Lua
  • 10.1 Scaling reads
  • 10.2 Scaling writes and memory capacity
  • 10.3 Scaling complex queries
  • 10.2.2 Creating a server-sharded connection decorator
  • 10.3.1 Scaling search query volume
  • 10.3.2 Scaling search index size
  • 10.3.3 Scaling a social network
  • 11.1.1 Loading Lua scripts into Redis
  • 11.1.2 Creating a new status message
  • 11.2 Rewriting locks and semaphores with Lua
  • 11.3 Doing away with WATCH/MULTI/EXEC
  • 11.4 Sharding LISTs with Lua
  • 11.5 Summary
  • 11.2.1 Why locks in Lua?
  • 11.2.2 Rewriting our lock
  • 11.2.3 Counting semaphores in Lua
  • 11.4.1 Structuring a sharded LIST
  • 11.4.2 Pushing items onto the sharded LIST
  • 11.4.4 Performing blocking pops from the sharded LIST
  • A.1 Installation on Debian or Ubuntu Linux
  • A.2 Installing on OS X
  • B.1 Forums for help
  • B.4 Data visualization and recording
  • Buy the paperback

    6.6.1 Aggregating users by location

    Let’s take a moment and look back at an earlier problem that we solved for Fake Game Company. With the ability to discover where users are accessing the game from thanks to our IP-to-city lookup in chapter 5, Fake Game Company has found itself needing to reparse many gigabytes of log files. They’re looking to aggregate user visitation patterns over time in a few different dimensions: per country, per region, per city, and more. Because we need this to be run in real time over new data, we’ve already implemented callbacks to perform the aggregate operations.

    As you may remember from chapter 5, Fake Game Company has been around for about 2 years. They have roughly 1,000,000 users per day, but they have roughly 10 events per user per day. That gives us right around 7.3 billion log lines to process. If we were to use one of the earlier methods, we’d copy the log files to various machines that need to process the data, and then go about processing the log files. This works, but then we need to copy the data, potentially delaying processing, and using storage space on every machine that processes the data, which later needs to be cleaned up.

    In this particular case, instead of copying files around, we could write a one-time map-reduce3 process to handle all of this data. But because map-reduces are designed to not share memory between items to be processed (each item is usually one log line), we can end up taking more time with map-reduce than if we spent some time writing it by hand to share memory. More specifically, if we load our IP-to-city lookup table into memory in Python (which we’d only want to do if we had a lot of processing to do, and we do), we can perform about 200k IP-to-city ID lookups per second, which is faster than we could expect a single Redis instance to respond to the same queries. Also, to scale with map-reduce, we’d have to run at least a few instances of Redis to keep up with the map-reduces.

    With the three standard methods of handling this already discounted (NFS/Samba, copying files, map-reduce), let’s look at some other practical pieces that we’ll need to solve to actually perform all of our lookups.

    AGGREGATING DATA LOCALLY

    In order to process that many log entries efficiently, we’ll need to locally cache aggregates before updating Redis in order to minimize round trips. Why? If we have roughly 10 million log lines to process for each day, then that’s roughly 10 million writes to Redis. If we perform the aggregates locally on a per-country basis for the entire day (being that there are around 300 countries), we can instead write 300 values to Redis. This will significantly reduce the number of round trips between Redis, reducing the number of commands processed, which in turn will reduce the total processing time.

    If we don’t do anything intelligent about local caching, and we have 10 aggregates that we want to calculate, we’re looking at around 10 days to process all of the data. But anything on the country or region level can be aggregated completely (for the day) before being sent to Redis. And generally because the top 10% of cities (there are roughly 350,000 cities in our sample dataset) amount for more than 90% of our game’s users, we can also locally cache any city-level aggregates. So by performing local caching of aggregates, we’re not limited by Redis on aggregate throughput.

    Assuming that we’ve already cached a copy of our ZSET and HASH table for IP lookups from section 5.3, we only need to worry about aggregating the data. Let’s start with the log lines that contain an IP address, date, time, and the operation that was performed, similar to the following snippet:

    173.194.38.137 2011-10-10 13:55:36 achievement-762

    Given log lines of that form, let’s aggregate those lines on a daily basis per country. To do this, we’ll receive the line as part of a call and increment the appropriate counter. If the log line is empty, then we’ll say that the day’s worth of data is done, and we should write to Redis. The source code for performing this aggregate is shown next.

    Listing 6.29 A locally aggregating callback for a daily country-level aggregate
    aggregates = defaultdict(lambda: defaultdict(int))
    
    

    Prepare the local aggregate dictionary.

    def daily_country_aggregate(conn, line):
       if line:
          line = line.split()
    
     
          ip = line[0]
          day = line[1]
    

    Extract the information from our log lines.

          country = find_city_by_ip_local(ip)[2]
    

    Find the country from the IP address.

          aggregates[day][country] += 1
    

    Increment our local aggregate.

          return
    
    
     
       for day, aggregate in aggregates.items():
          conn.zadd('daily:country:' + day, **aggregate)
          del aggregates[day]
    

    The day file is done; write our aggregate to Redis.

     

    Now that we’ve written and seen one of these aggregate functions, the rest are fairly similar and just as easy to write. Let’s move on to more interesting topics, like how we’re going to send files through Redis.

    3 MapReduce (or Map/Reduce) is a type of distributed computation popularized by Google, which can offer high performance and simplicity for some problems.