6.4.2 Delayed tasks

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    6.4.2 Delayed tasks

    With list-based queues, we can handle single-call per queue, multiple callbacks per queue, and we can handle simple priorities. But sometimes, we need a bit more. Fake Game Company has decided that they’re going to add a new feature in their game: delayed selling. Rather than putting an item up for sale now, players can tell the game to put an item up for sale in the future. It’s our job to change or replace our task queue with something that can offer this feature.

    There are a few different ways that we could potentially add delays to our queue items. Here are the three most straightforward ones:

    • We could include an execution time as part of queue items, and if a worker process sees an item with an execution time later than now, it can wait for a brief period and then re-enqueue the item.
    • The worker process could have a local waiting list for any items it has seen that need to be executed in the future, and every time it makes a pass through its while loop, it could check that list for any outstanding items that need to be executed.
    • Normally when we talk about times, we usually start talking about ZSETs. What if, for any item we wanted to execute in the future, we added it to a ZSET instead of a LIST, with its score being the time when we want it to execute? We then have a process that checks for items that should be executed now, and if there are any, the process removes it from the ZSET, adding it to the proper LIST queue.

    We can’t wait/re-enqueue items as described in the first, because that’ll waste the worker process’s time. We also can’t create a local waiting list as described in the second option, because if the worker process crashes for an unrelated reason, we lose any pending work items it knew about. We’ll instead use a secondary ZSET as described in the third option, because it’s simple, straightforward, and we can use a lock from section 6.2 to ensure that the move is safe.

    Each delayed item in the ZSET queue will be a JSON-encoded list of four items: a unique identifier, the queue where the item should be inserted, the name of the callback to call, and the arguments to pass to the callback. We include the unique identifier in order to differentiate all calls easily, and to allow us to add possible reporting features later if we so choose. The score of the item will be the time when the item should be executed. If the item can be executed immediately, we’ll insert the item into the list queue instead. For our unique identifier, we’ll again use a 128-bit randomly generated UUID. The code to create an (optionally) delayed task can be seen next.

    Listing 6.22 The execute_later() function
    def execute_later(conn, queue, name, args, delay=0):
    
     
       identifier = str(uuid.uuid4())
    

    Generate a unique identifier.

       item = json.dumps([identifier, queue, name, args])
    

    Prepare the item for the queue.

       if delay > 0:
    
     
          conn.zadd('delayed:', item, time.time() + delay)
    

    Delay the item.

       else:
    
     
          conn.rpush('queue:' + queue, item)
    

    Execute the item immediately.

       return identifier
    

    Return the identifier.

     

    When the queue item is to be executed without delay, we continue to use the old listbased queue. But if we need to delay the item, we add the item to the delayed ZSET. An example of the delayed queue emails to be sent can be seen in figure 6.10.

    Figure 6.10 A delayed task queue using a ZSET

    Unfortunately, there isn’t a convenient method in Redis to block on ZSETs until a score is lower than the current Unix timestamp, so we need to manually poll. Because delayed items are only going into a single queue, we can just fetch the first item with the score. If there’s no item, or if the item still needs to wait, we’ll wait a brief period and try again. If there is an item, we’ll acquire a lock based on the identifier in the item (a fine-grained lock), remove the item from the ZSET, and add the item to the proper queue. By moving items into queues instead of executing them directly, we only need to have one or two of these running at any time (instead of as many as we have workers), so our polling overhead is kept low. The code for polling our delayed queue is in the following listing.

    Listing 6.23 The poll_queue() function
    def poll_queue(conn):
       while not QUIT:
    
     
          item = conn.zrange('delayed:', 0, 0, withscores=True)
    

    Get the first item in the queue.

          if not item or item[0][1] > time.time():
             time.sleep(.01)
             continue
    
    

    No item or the item is still to be executed in the future.

          item = item[0][0]
          identifier, queue, function, args = json.loads(item)
    
    

    Unpack the item so that we know where it should go.

          locked = acquire_lock(conn, identifier)
          if not locked:
    

    Get the lock for the item.

    We couldn’t get the lock, so skip it and try again.

             continue
    
    
     
          if conn.zrem('delayed:', item):
             conn.rpush('queue:' + queue, item)
    
    

    Move the item to the proper list queue.

          release_lock(conn, identifier, locked)
    

    Release the lock.

     

    As is clear from listing 6.23, because ZSETs don’t have a blocking pop mechanism like LISTs do, we need to loop and retry fetching items from the queue. This can increase load on the network and on the processors performing the work, but because we’re only using one or two of these pollers to move items from the ZSET to the LIST queues, we won’t waste too many resources. If we further wanted to reduce overhead, we could add an adaptive method that increases the sleep time when it hasn’t seen any items in a while, or we could use the time when the next item was scheduled to help determine how long to sleep, capping it at 100 milliseconds to ensure that tasks scheduled only slightly in the future are executed in a timely fashion.

    Respecting Priorities

    In the basic sense, delayed tasks have the same sort of priorities that our first-in, firstout queue had. Because they’ll go back on their original destination queues, they’ll be executed with the same sort of priority. But what if we wanted delayed tasks to execute as soon as possible after their time to execute has come up?

    The simplest way to do this is to add some extra queues to make scheduled tasks jump to the front of the queue. If we have our high-, medium-, and low-priority queues, we can also create high-delayed, medium-delayed, and low-delayed queues, which are passed to the worker_watch_queues() function as [“high-delayed“, “high“, “medium-delayed“, “medium“, “low-delayed“, “low“]. Each of the delayed queues comes just before its nondelayed equivalent.

    Some of you may be wondering, “If we’re having them jump to the front of the queue, why not just use LPUSH instead of RPUSH?” Suppose that all of our workers are working on tasks for the medium queue, and will take a few seconds to finish. Suppose also that we have three delayed tasks that are found and LPUSHed onto the front of the medium queue. The first is pushed, then the second, and then the third. But on the medium queue, the third task to be pushed will be executed first, which violates our expectations that things that we want to execute earlier should be executed earlier.

    If you use Python and you’re interested in a queue like this, I’ve written a package called RPQueue that offers delayed task execution semantics similar to the preceding code snippets. It does include more functionality, so if you want a queue and are already using Redis, give RPQueue a look at http://github.com/josiahcarlson/rpqueue/.

    When we use task queues, sometimes we need our tasks to report back to other parts of our application with some sort of messaging system. In the next section, we’ll talk about creating message queues that can be used to send to a single recipient, or to communicate between many senders and receivers.