6.6.2 Sending files

  • Redis in Action – Home
  • Foreword
  • Preface
  • Part 1: Getting Started
  • Part 2: Core concepts
  • 1.3.1 Voting on articles
  • 1.3.2 Posting and fetching articles
  • 1.3.3 Grouping articles
  • 4.2.1 Configuring Redis for replication
  • 4.2.2 Redis replication startup process
  • 4.2.3 Master/slave chains
  • 4.2.4 Verifying disk writes
  • 5.1 Logging to Redis
  • 5.2 Counters and statistics
  • 5.3 IP-to-city and -country lookup
  • 5.4 Service discovery and configuration
  • 5.1.1 Recent logs
  • 5.1.2 Common logs
  • 5.2.2 Storing statistics in Redis
  • 5.3.1 Loading the location tables
  • 5.3.2 Looking up cities
  • 5.4.1 Using Redis to store configuration information
  • 5.4.2 One Redis server per application component
  • 5.4.3 Automatic Redis connection management
  • 8.1.1 User information
  • 8.1.2 Status messages
  • 9.1.1 The ziplist representation
  • 9.1.2 The intset encoding for SETs
  • Chapter 10: Scaling Redis
  • Chapter 11: Scripting Redis with Lua
  • 10.1 Scaling reads
  • 10.2 Scaling writes and memory capacity
  • 10.3 Scaling complex queries
  • 10.2.2 Creating a server-sharded connection decorator
  • 10.3.1 Scaling search query volume
  • 10.3.2 Scaling search index size
  • 10.3.3 Scaling a social network
  • 11.1.1 Loading Lua scripts into Redis
  • 11.1.2 Creating a new status message
  • 11.2 Rewriting locks and semaphores with Lua
  • 11.3 Doing away with WATCH/MULTI/EXEC
  • 11.4 Sharding LISTs with Lua
  • 11.5 Summary
  • 11.2.1 Why locks in Lua?
  • 11.2.2 Rewriting our lock
  • 11.2.3 Counting semaphores in Lua
  • 11.4.1 Structuring a sharded LIST
  • 11.4.2 Pushing items onto the sharded LIST
  • 11.4.4 Performing blocking pops from the sharded LIST
  • A.1 Installation on Debian or Ubuntu Linux
  • A.2 Installing on OS X
  • B.1 Forums for help
  • B.4 Data visualization and recording
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Part 1: Getting Started
  • Part 2: Core concepts
  • 1.3.1 Voting on articles
  • 1.3.2 Posting and fetching articles
  • 1.3.3 Grouping articles
  • 4.2.1 Configuring Redis for replication
  • 4.2.2 Redis replication startup process
  • 4.2.3 Master/slave chains
  • 4.2.4 Verifying disk writes
  • 5.1 Logging to Redis
  • 5.2 Counters and statistics
  • 5.3 IP-to-city and -country lookup
  • 5.4 Service discovery and configuration
  • 5.1.1 Recent logs
  • 5.1.2 Common logs
  • 5.2.2 Storing statistics in Redis
  • 5.3.1 Loading the location tables
  • 5.3.2 Looking up cities
  • 5.4.1 Using Redis to store configuration information
  • 5.4.2 One Redis server per application component
  • 5.4.3 Automatic Redis connection management
  • 8.1.1 User information
  • 8.1.2 Status messages
  • 9.1.1 The ziplist representation
  • 9.1.2 The intset encoding for SETs
  • Chapter 10: Scaling Redis
  • Chapter 11: Scripting Redis with Lua
  • 10.1 Scaling reads
  • 10.2 Scaling writes and memory capacity
  • 10.3 Scaling complex queries
  • 10.2.2 Creating a server-sharded connection decorator
  • 10.3.1 Scaling search query volume
  • 10.3.2 Scaling search index size
  • 10.3.3 Scaling a social network
  • 11.1.1 Loading Lua scripts into Redis
  • 11.1.2 Creating a new status message
  • 11.2 Rewriting locks and semaphores with Lua
  • 11.3 Doing away with WATCH/MULTI/EXEC
  • 11.4 Sharding LISTs with Lua
  • 11.5 Summary
  • 11.2.1 Why locks in Lua?
  • 11.2.2 Rewriting our lock
  • 11.2.3 Counting semaphores in Lua
  • 11.4.1 Structuring a sharded LIST
  • 11.4.2 Pushing items onto the sharded LIST
  • 11.4.4 Performing blocking pops from the sharded LIST
  • A.1 Installation on Debian or Ubuntu Linux
  • A.2 Installing on OS X
  • B.1 Forums for help
  • B.4 Data visualization and recording
  • Buy the paperback

    6.6.2 Sending files

    In order to get the log data to our logs processors, we’ll have two different components operating on the data. The first is a script that will be taking the log files and putting them in Redis under named keys, publishing the names of the keys to a chat channel using our group chat method from section 6.5.2, and waiting for notification when they’re complete (to not use more memory than our Redis machine has). It’ll be waiting for a notification that a key with a name similar to the file stored in Redis has a value equal to 10, which is our number of aggregation processes. The function that copies logs and cleans up after itself is shown in the following listing.

    Listing 6.30 The copy_logs_to_redis() function
    def copy_logs_to_redis(conn, path, channel, count=10,
                         limit=2**30, quit_when_done=True):
       bytes_in_redis = 0
       waiting = deque()
    
     
       create_chat(conn, 'source', map(str, range(count)), '', channel)
    

    Create the chat that will be used to send messages to clients.

       count = str(count)
    
     
       for logfile in sorted(os.listdir(path)):
    

    Iterate over all of the log files.

          full_path = os.path.join(path, logfile)
    
    
     
          fsize = os.stat(full_path).st_size
    
     
          while bytes_in_redis + fsize > limit:
             cleaned = _clean(conn, channel, waiting, count)
             if cleaned:
                bytes_in_redis -= cleaned
          else:
                time.sleep(.25)
    
    

    Clean out finished files if we need more room.

       with open(full_path, 'rb') as inp:
          block = ' '
          while block:
             block = inp.read(2**17)
             conn.append(channel+logfile, block)
    
    

    Upload the file to Redis.

       send_message(conn, channel, 'source', logfile)
    
    

    Notify the listeners that the file is ready.

       bytes_in_redis += fsize
       waiting.append((logfile, fsize))
    
    

    Update our local information about Redis’ memory use.

    if quit_when_done:
       send_message(conn, channel, 'source', ':done')
    
    

    We are out of files, so signal that it’s done.

    while waiting:
       cleaned = _clean(conn, channel, waiting, count)
       if cleaned:
          bytes_in_redis -= cleaned
       else:
          time.sleep(.25)
    
    

    Clean out finished files if we need more room.

    def _clean(conn, channel, waiting, count):
       if not waiting:
          return 0
       w0 = waiting[0][0]
       if conn.get(channel + w0 + ':done') == count:
          conn.delete(channel + w0, channel + w0 + ':done')
          return waiting.popleft()[1]
       return 0
    

    How we actually perform the cleanup from Redis.

     

    Copying logs to Redis requires a lot of detailed steps, mostly involving being careful to not put too much data into Redis at one time and properly cleaning up after ourselves when a file has been read by all clients. The actual aspect of notifying logs processors that there’s a new file ready is easy, but the setup, sending, and cleanup are pretty detailed.