We are now, simply, Redis

Learn More

e-Book - Redis in Action

This book covers the use of Redis, an in-memory database/data structure server

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    6.4.2 Delayed tasks

    With list-based queues, we can handle single-call per queue, multiple callbacks per
    queue, and we can handle simple priorities. But sometimes, we need a bit more. Fake
    Game Company has decided that they’re going to add a new feature in their game:
    delayed selling. Rather than putting an item up for sale now, players can tell the game
    to put an item up for sale in the future. It’s our job to change or replace our task
    queue with something that can offer this feature.

    There are a few different ways that we could potentially add delays to our queue
    items. Here are the three most straightforward ones:

    • We could include an execution time as part of queue items, and if a worker process
      sees an item with an execution time later than now, it can wait for a brief
      period and then re-enqueue the item.
    • The worker process could have a local waiting list for any items it has seen
      that need to be executed in the future, and every time it makes a pass through its while loop, it could check that list for any outstanding items that need to
      be executed.
    • Normally when we talk about times, we usually start talking about ZSETs. What if,
      for any item we wanted to execute in the future, we added it to a ZSET instead of
      a LIST, with its score being the time when we want it to execute? We then have a
      process that checks for items that should be executed now, and if there are any,
      the process removes it from the ZSET, adding it to the proper LIST queue.

    We can’t wait/re-enqueue items as described in the first, because that’ll waste the
    worker process’s time. We also can’t create a local waiting list as described in the second
    option, because if the worker process crashes for an unrelated reason, we lose any
    pending work items it knew about. We’ll instead use a secondary ZSET as described in
    the third option, because it’s simple, straightforward, and we can use a lock from section
    6.2 to ensure that the move is safe.

    Each delayed item in the ZSET queue will be a JSON-encoded list of four items: a
    unique identifier, the queue where the item should be inserted, the name of the callback
    to call, and the arguments to pass to the callback. We include the unique identifier
    in order to differentiate all calls easily, and to allow us to add possible reporting features
    later if we so choose. The score of the item will be the time when the item should be executed.
    If the item can be executed immediately, we’ll insert the item into the list queue
    instead. For our unique identifier, we’ll again use a 128-bit randomly generated UUID.
    The code to create an (optionally) delayed task can be seen next.

    Listing 6.22The execute_later() function
    def execute_later(conn, queue, name, args, delay=0):
       identifier = str(uuid.uuid4())

    Generate a unique identifier.

       item = json.dumps([identifier, queue, name, args])

    Prepare the item for the queue.

       if delay > 0:
          conn.zadd('delayed:', item, time.time() + delay)

    Delay the item.

          conn.rpush('queue:' + queue, item)

    Execute the item immediately.

       return identifier

    Return the identifier.

    When the queue item is to be executed without delay, we continue to use the old listbased
    queue. But if we need to delay the item, we add the item to the delayed ZSET. An
    example of the delayed queue emails to be sent can be seen in figure 6.10.

    Figure 6.10A delayed task queue using a ZSET

    Unfortunately, there isn’t a convenient
    method in Redis to block on
    ZSETs until a score is lower than the
    current Unix timestamp, so we need
    to manually poll. Because delayed
    items are only going into a single
    queue, we can just fetch the first item
    with the score. If there’s no item, or if the item still needs to wait, we’ll wait a brief period and try again. If there is an item,
    we’ll acquire a lock based on the identifier in the item (a fine-grained lock), remove the
    item from the ZSET, and add the item to the proper queue. By moving items into queues
    instead of executing them directly, we only need to have one or two of these running at
    any time (instead of as many as we have workers), so our polling overhead is kept low.
    The code for polling our delayed queue is in the following listing.

    Listing 6.23The poll_queue() function
    def poll_queue(conn):
       while not QUIT:
          item = conn.zrange('delayed:', 0, 0, withscores=True)

    Get the first item in the queue.

          if not item or item[0][1] > time.time():

    No item or the item is still to be executed in the future.

          item = item[0][0]
          identifier, queue, function, args = json.loads(item)

    Unpack the item so that we know where it should go.

          locked = acquire_lock(conn, identifier)
          if not locked:

    Get the lock for the item.

    We couldn’t get the lock, so skip it and try again.

          if conn.zrem('delayed:', item):
             conn.rpush('queue:' + queue, item)

    Move the item to the proper list queue.

          release_lock(conn, identifier, locked)

    Release the lock.

    As is clear from listing 6.23, because ZSETs don’t have a blocking pop mechanism like
    LISTs do, we need to loop and retry fetching items from the queue. This can increase
    load on the network and on the processors performing the work, but because we’re
    only using one or two of these pollers to move items from the ZSET to the LIST
    queues, we won’t waste too many resources. If we further wanted to reduce overhead,
    we could add an adaptive method that increases the sleep time when it hasn’t seen any
    items in a while, or we could use the time when the next item was scheduled to help
    determine how long to sleep, capping it at 100 milliseconds to ensure that tasks scheduled
    only slightly in the future are executed in a timely fashion.

    Respecting Priorities

    In the basic sense, delayed tasks have the same sort of priorities that our first-in, firstout
    queue had. Because they’ll go back on their original destination queues, they’ll be
    executed with the same sort of priority. But what if we wanted delayed tasks to execute
    as soon as possible after their time to execute has come up?

    The simplest way to do this is to add some extra queues to make scheduled tasks
    jump to the front of the queue. If we have our high-, medium-, and low-priority
    queues, we can also create high-delayed, medium-delayed, and low-delayed queues,
    which are passed to the worker_watch_queues() function as ["high-delayed",
    "high", "medium-delayed", "medium", "low-delayed", "low"]. Each of the delayed
    queues comes just before its nondelayed equivalent.

    Some of you may be wondering, “If we’re having them jump to the front of the
    queue, why not just use LPUSH instead of RPUSH?” Suppose that all of our workers are
    working on tasks for the medium queue, and will take a few seconds to finish. Suppose
    also that we have three delayed tasks that are found and LPUSHed onto the front of the
    medium queue. The first is pushed, then the second, and then the third. But on the
    medium queue, the third task to be pushed will be executed first, which violates our
    expectations that things that we want to execute earlier should be executed earlier.

    If you use Python and you’re interested in a queue like this, I’ve written a package
    called RPQueue that offers delayed task execution semantics similar to the preceding
    code snippets. It does include more functionality, so if you want a queue and are already
    using Redis, give RPQueue a look at http://github.com/josiahcarlson/rpqueue/.

    When we use task queues, sometimes we need our tasks to report back to other
    parts of our application with some sort of messaging system. In the next section, we’ll
    talk about creating message queues that can be used to send to a single recipient, or
    to communicate between many senders and receivers.