6.4.1 First-in, first-out queues

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    6.4.1 First-in, first-out queues

    In the world of queues beyond task queues, normally a few different kinds of queues are discussed—first-in, first-out (FIFO), last-in first-out (LIFO), and priority queues. We’ll look first at a first-in, first-out queue, because it offers the most reasonable semantics for our first pass at a queue, can be implemented easily, and is fast. Later, we’ll talk about adding a method for coarse-grained priorities, and even later, timebased queues.

    Let’s again look back to an example from Fake Game Company. To encourage users to play the game when they don’t normally do so, Fake Game Company has decided to add the option for users to opt-in to emails about marketplace sales that have completed or that have timed out. Because outgoing email is one of those internet services that can have very high latencies and can fail, we need to keep the act of sending emails for completed or timed-out sales out of the typical code flow for those operations. To do this, we’ll use a task queue to keep a record of people who need to be emailed and why, and will implement a worker process that can be run in parallel to send multiple emails at a time if outgoing mail servers become slow.

    The queue that we’ll write only needs to send emails out in a first-come, first served manner, and will log both successes and failures. As we talked about in chapters 3 and 5, Redis LISTs let us push and pop items from both ends with RPUSH/LPUSH and RPOP/LPOP. For our email queue, we’ll push emails to send onto the right end of the queue with RPUSH, and pop them off the left end of the queue with LPOP. (We do this because it makes sense visually for readers of left-to-right languages.) Because our worker processes are only going to be performing this emailing operation, we’ll use the blocking version of our list pop, BLPOP, with a timeout of 30 seconds. We’ll only handle item-sold messages in this version for the sake of simplicity, but adding support for sending timeout emails is also easy.

    Figure 6.9 A first-in, first-out queue using a LIST

    Our queue will simply be a list of JSON-encoded blobs of data, which will look like figure 6.9.

    To add an item to the queue, we’ll get all of the necessary information together, serialize it with JSON, and RPUSH the result onto our email queue. As in previous chapters, we use JSON because it’s human readable and because there are fast libraries for translation to/from JSON in most languages. The function that pushes an email onto the item-sold email task queue appears in the next listing.

    Listing 6.18 The send_sold_email_via_queue() function
    def send_sold_email_via_queue(conn, seller, item, price, buyer):
       data = {
    
     
          'seller_id': seller,
          'item_id': item,
          'price': price,
          'buyer_id': buyer,
          'time': time.time()
    

    Prepare the item.

       }
    
     
       conn.rpush('queue:email', json.dumps(data))
    

    Push the item onto the queue.

     

    Adding a message to a LIST queue shouldn’t be surprising.

    Sending emails from the queue is easy. We use BLPOP to pull items from the email queue, prepare the email, and finally send it. The next listing shows our function for doing so.

    Listing 6.19 The process_sold_email_queue() function
    def process_sold_email_queue(conn):
       while not QUIT:
    
     
          packed = conn.blpop(['queue:email'], 30)
    

    Try to get a message to send.

          if not packed:
             continue
    
    

    No message to send; try again.

          to_send = json.loads(packed[1])
    

    Load the packed email information.

          try:
    
     
             fetch_data_and_send_sold_email(to_send)
    

    Send the email using our prewritten emailing function.

          except EmailSendError as err:
             log_error("Failed to send sold email", err, to_send)
          else:
             log_success("Sent sold email", to_send)
    
     

     

    Similarly, actually sending the email after pulling the message from the queue is also not surprising. But what about executing more than one type of task?

    Multiple Executable Tasks

    Because Redis only gives a single caller a popped item, we can be sure that none of the emails are duplicated and sent twice. Because we only put email messages to send in the queue, our worker process was simple. Having a single queue for each type of message is not uncommon for some situations, but for others, having a single queue able to handle many different types of tasks can be much more convenient. Take the worker process in listing 6.20: it watches the provided queue and dispatches the JSONencoded function call to one of a set of known registered callbacks. The item to be executed will be of the form [‘FUNCTION_NAME’, [ARG1, ARG2, …]].

    Listing 6.20 The worker_watch_queue() function
    def worker_watch_queue(conn, queue, callbacks):
       while not QUIT:
    
     
          packed = conn.blpop([queue], 30)
    

    Try to get an item from the queue.

          if not packed:
             continue
    
    

    There’s nothing to work on; try again.

          name, args = json.loads(packed[1])
    

    Unpack the work item.

          if name not in callbacks:
             log_error("Unknown callback %s"%name)
             continue
    

    The function is unknown; log the error and try again.

          callbacks[name](*args)
    

    Execute the task.

     

    With this generic worker process, our email sender could be written as a callback and passed with other callbacks.

    Task Priorities

    Sometimes when working with queues, it’s necessary to prioritize certain operations before others. In our case, maybe we want to send emails about sales that completed before we send emails about sales that expired. Or maybe we want to send password reset emails before we send out emails for an upcoming special event. Remember the BLPOP/BRPOP commands—we can provide multiple LISTs in which to pop an item from; the first LIST to have any items in it will have its first item popped (or last if we’re using BRPOP).

    Let’s say that we want to have three priority levels: high, medium, and low. High priority items should be executed if they’re available. If there are no high-priority items, then items in the medium-priority level should be executed. If there are neither high- nor medium-priority items, then items in the low-priority level should be executed. Looking at our earlier code, we can change two lines to make that possible in the updated listing.

    Listing 6.21 The worker_watch_queues() function
    def worker_watch_queues(conn, queues, callbacks):
    

    This is the first changed line to add priority support.

       while not QUIT:
    
     
          packed = conn.blpop(queues, 30)
    

    This is the second changed line to add priority support.

          if not packed:
             continue
    
    
     
          name, args = json.loads(packed[1])
    
     
          if name not in callbacks:
             log_error("Unknown callback %s"%name)
             continue
    
     
          callbacks[name](*args)
    
     

     

    By using multiple queues, priorities can be implemented easily. There are situations where multiple queues are used as a way of separating different queue items (announcement emails, notification emails, and so forth) without any desire to be “fair.” In such situations, it can make sense to reorder the queue list occasionally to be more fair to all of the queues, especially in the case where one queue can grow quickly relative to the other queues.

    If you’re using Ruby, you can use an open source package called Resque that was put out by the programmers at GitHub. It uses Redis for Ruby-based queues using lists, which is similar to what we’ve talked about here. Resque offers many additional features over the 11-line function that we provided here, so if you’re using Ruby, you should check it out. Regardless, there are many more options for queues in Redis, and you should keep reading.