6.6.4 Processing files

  • Redis in Action – Home
  • Foreword
  • Preface
  • Part 1: Getting Started
  • Part 2: Core concepts
  • 1.3.1 Voting on articles
  • 1.3.2 Posting and fetching articles
  • 1.3.3 Grouping articles
  • 4.2.1 Configuring Redis for replication
  • 4.2.2 Redis replication startup process
  • 4.2.3 Master/slave chains
  • 4.2.4 Verifying disk writes
  • 5.1 Logging to Redis
  • 5.2 Counters and statistics
  • 5.3 IP-to-city and -country lookup
  • 5.4 Service discovery and configuration
  • 5.1.1 Recent logs
  • 5.1.2 Common logs
  • 5.2.2 Storing statistics in Redis
  • 5.3.1 Loading the location tables
  • 5.3.2 Looking up cities
  • 5.4.1 Using Redis to store configuration information
  • 5.4.2 One Redis server per application component
  • 5.4.3 Automatic Redis connection management
  • 8.1.1 User information
  • 8.1.2 Status messages
  • 9.1.1 The ziplist representation
  • 9.1.2 The intset encoding for SETs
  • Chapter 10: Scaling Redis
  • Chapter 11: Scripting Redis with Lua
  • 10.1 Scaling reads
  • 10.2 Scaling writes and memory capacity
  • 10.3 Scaling complex queries
  • 10.2.2 Creating a server-sharded connection decorator
  • 10.3.1 Scaling search query volume
  • 10.3.2 Scaling search index size
  • 10.3.3 Scaling a social network
  • 11.1.1 Loading Lua scripts into Redis
  • 11.1.2 Creating a new status message
  • 11.2 Rewriting locks and semaphores with Lua
  • 11.3 Doing away with WATCH/MULTI/EXEC
  • 11.4 Sharding LISTs with Lua
  • 11.5 Summary
  • 11.2.1 Why locks in Lua?
  • 11.2.2 Rewriting our lock
  • 11.2.3 Counting semaphores in Lua
  • 11.4.1 Structuring a sharded LIST
  • 11.4.2 Pushing items onto the sharded LIST
  • 11.4.4 Performing blocking pops from the sharded LIST
  • A.1 Installation on Debian or Ubuntu Linux
  • A.2 Installing on OS X
  • B.1 Forums for help
  • B.4 Data visualization and recording
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Part 1: Getting Started
  • Part 2: Core concepts
  • 1.3.1 Voting on articles
  • 1.3.2 Posting and fetching articles
  • 1.3.3 Grouping articles
  • 4.2.1 Configuring Redis for replication
  • 4.2.2 Redis replication startup process
  • 4.2.3 Master/slave chains
  • 4.2.4 Verifying disk writes
  • 5.1 Logging to Redis
  • 5.2 Counters and statistics
  • 5.3 IP-to-city and -country lookup
  • 5.4 Service discovery and configuration
  • 5.1.1 Recent logs
  • 5.1.2 Common logs
  • 5.2.2 Storing statistics in Redis
  • 5.3.1 Loading the location tables
  • 5.3.2 Looking up cities
  • 5.4.1 Using Redis to store configuration information
  • 5.4.2 One Redis server per application component
  • 5.4.3 Automatic Redis connection management
  • 8.1.1 User information
  • 8.1.2 Status messages
  • 9.1.1 The ziplist representation
  • 9.1.2 The intset encoding for SETs
  • Chapter 10: Scaling Redis
  • Chapter 11: Scripting Redis with Lua
  • 10.1 Scaling reads
  • 10.2 Scaling writes and memory capacity
  • 10.3 Scaling complex queries
  • 10.2.2 Creating a server-sharded connection decorator
  • 10.3.1 Scaling search query volume
  • 10.3.2 Scaling search index size
  • 10.3.3 Scaling a social network
  • 11.1.1 Loading Lua scripts into Redis
  • 11.1.2 Creating a new status message
  • 11.2 Rewriting locks and semaphores with Lua
  • 11.3 Doing away with WATCH/MULTI/EXEC
  • 11.4 Sharding LISTs with Lua
  • 11.5 Summary
  • 11.2.1 Why locks in Lua?
  • 11.2.2 Rewriting our lock
  • 11.2.3 Counting semaphores in Lua
  • 11.4.1 Structuring a sharded LIST
  • 11.4.2 Pushing items onto the sharded LIST
  • 11.4.4 Performing blocking pops from the sharded LIST
  • A.1 Installation on Debian or Ubuntu Linux
  • A.2 Installing on OS X
  • B.1 Forums for help
  • B.4 Data visualization and recording
  • Buy the paperback

    6.6.4 Processing files

    We’re deferring some of the work of decoding our files to functions that return generators over data. The readlines() function takes the connection, key, and a block-iterating callback. It’ll iterate over blocks of data yielded by the block-iterating callback, discover line breaks, and yield lines. When provided with blocks as in listing 6.32, it finds the last line ending in the block, and then splits the lines up to that last line ending, yielding the lines one by one. When it’s done, it keeps any partial lines to prepend onto the next block. If there’s no more data, it yields the last line by itself. There are other ways of finding line breaks and extracting lines in Python, but the rfind()/split() combination is faster than other methods.

    Listing 6.32 The readlines() function
    def readlines(conn, key, rblocks):
       out = ''
       for block in rblocks(conn, key):
          out += block
    
     
          posn = out.rfind('n')
    

    Find the rightmost line break if any; rfind() returns -1 on failure.

          if posn >= 0:
    

    We found a line break.

             for line in out[:posn].split('n'):
    

    Split on all of the line breaks.

                yield line + 'n'
    

    Yield each line.

             out = out[posn+1:]
    

    Keep track of the trailing data.

          if not block:
    

    We’re out of data.

             yield out
             break
    
     

     

    For our higher-level line-generating function, we’re iterating over blocks produced by one of two readers, which allows us to focus on finding line breaks.

    GENERATORS WITH YIELDListing 6.32 offers our first real use of Python generators with the yield statement. Generally, this allows Python to suspend and resume execution of code primarily to allow for easy iteration over sequences or pseudo-sequences of data. For more details on how generators work, you can visit the Python language tutorial with this short URL: http://mng.bz/Z2b1.

    Each of the two block-yielding callbacks, readblocks and readblocks_gz(), will read blocks of data from Redis. The first yields the blocks directly, whereas the other automatically decompresses gzip files. We’ll use this particular layer separation in order to offer the most useful and reusable data reading method possible. The following listing shows the readblocks generator.

    Listing 6.33The readblocks() generator
    def readblocks(conn, key, blocksize=2**17):
       lb = blocksize
       pos = 0
    
     
       while lb == blocksize:
    

    Keep fetching more data as long as we don’t have a partial read.

          block = conn.substr(key, pos, pos + blocksize - 1)
    

    Fetch the block.

          yield block
          lb = len(block)
          pos += lb
    

    Prepare for the next pass.

       yield ''
    
     

     

    The readblocks() generator is primarily meant to offer an abstraction over our block reading, which allows us to replace it later with other types of readers, like maybe a filesystem reader, a memcached reader, a ZSET reader, or in our case, a block reader that handles gzip files in Redis. The next listing shows the readblocks_gz() generator.

    Listing 6.34The readblocks_gz() generator
    def readblocks_gz(conn, key):
       inp = ''
       decoder = None
    
     
       for block in readblocks(conn, key, 2**17):
    

    Read the raw data from Redis.

          if not decoder:
             inp += block
             try:
    
     
                if inp[:3] != "x1fx8bx08":
                   raise IOError("invalid gzip data")
                i = 10
                flag = ord(inp[3])
                if flag & 4:
                   i += 2 + ord(inp[i]) + 256*ord(inp[i+1])
                if flag & 8:
                   i = inp.index(' ', i) + 1
                if flag & 16:
                   i = inp.index(' ', i) + 1
                if flag & 2:
                   i += 2
    
    

    Read the raw data from Redis.

                if i > len(inp):
                   raise IndexError("not enough data")
             except (IndexError, ValueError):
                continue
    
    

    We haven’t read the full header yet.

             else:
                block = inp[i:]
                inp = None
                decoder = zlib.decompressobj(-zlib.MAX_WBITS)
    

    We found the header; prepare the decompresser.

                if not block:
                   continue
    
    
     
          if not block:
             yield decoder.flush()
    

    We’re out of data; yield the last chunk.

             break
    
    
     
          yield decoder.decompress(block)
    

    Yield a decompressed block.

     

    Much of the body of readblocks_gz() is gzip header parsing code, which is unfortunately necessary. For log files (like we’re parsing), gzip can offer a reduction of 2–5 times in storage space requirements, while offering fairly high-speed decompression. Though more modern compression methods are able to compress better (bzip2, lzma/xz, and many others) or faster (lz4, lzop, snappy, QuickLZ, and many others), no other method is as widely available (bzip2 comes close) or has such a useful range of compression ratio and CPU utilization trade-off options.