10.3.2 Scaling search index size

  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback
  • Redis in Action – Home
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    10.3.2 Scaling search index size

    If there’s one thing we can expect of a search engine, it’s that the search index will
    grow over time. As search indexes grow, the memory used by those search indexes also grows. Depending on the speed of the growth, we may or may not be able to keep buying/
    renting larger computers to run our index on. But for many of us, getting bigger
    and bigger computers is just not possible.

    In this section, we’ll talk about how to structure our data to support sharded
    search queries, and will include code to execute sharded search queries against a collection
    of sharded Redis masters (or slaves of sharded masters, if you followed the
    instructions in section 10.3.1).

    In order to shard our search queries, we must first shard our indexes so that for
    each document that we index, all of the data about that document is on the same
    shard. It turns out that our index_document() function from chapter 7 takes a connection
    object, which we can shard by hand with the docid that’s passed. Or, because
    index_document() takes a connection followed by the docid, we can use our automatic
    sharding decorator from listing 10.3 to handle sharding for us.

    When we have our documents indexed across shards, we only need to perform
    queries against the shards to get the results. The details of what we need to do will
    depend on our type of index—whether it’s SORT-based or ZSET-based. Let’s first update
    our SORT-based index for sharded searches.


    As is the case with all sharded searches, we need a way to combine the results of the
    sharded searches. In our implementation of search_and_sort() from chapter 7, we
    received a total count of results and the document IDs that were the result of the
    required query. This is a great building block to start from, but overall we’ll need to
    write functions to perform the following steps:

    1. Perform the search and fetch the values to sort on for a query against a single shard.
    2. Execute the search on all shards.
    3. Merge the results of the queries, and choose the subset desired.

    First, let’s look at what it takes to perform the search and fetch the values from a single shard.

    Because we already have search_and_sort() from chapter 7, we can start by using
    that to fetch the result of a search. After we have the results, we can then fetch the
    data associated with each search result. But we have to be careful about pagination,
    because we don’t know which shard each result from a previous search came from. So,
    in order to always return the correct search results for items 91–100, we need to fetch
    the first 100 search results from every shard. Our code for fetching all of the necessary
    results and data can be seen in the next listing.

    Listing 10.5SORT-based search that fetches the values that were sorted
    def search_get_values(conn, query, id=None, ttl=300, sort="-updated",
                         start=0, num=20):

    We need to take all of the same parameters to pass on to search_and_sort().

    count, docids, id = search_and_sort(
       conn, query, id, ttl, sort, 0, start+num)

    First get the results of a search and sort.

    key = "kb:doc:%s"
    sort = sort.lstrip('-')
    pipe = conn.pipeline(False)
    for docid in docids:
       pipe.hget(key%docid, sort)
    sort_column = pipe.execute()

    Fetch the data that the results were sorted by.

    data_pairs = zip(docids, sort_column

    Pair up the document IDs with the data that it was sorted by.

    return count, data_pairs, id

    Return the count, data, and cache ID of the results.

    This function fetches all of the information necessary from a single shard in preparation
    for the final merge. Our next step is to execute the query on all of the shards.

    To execute a query on all of our shards, we have two options. We can either run
    each query on each shard one by one, or we can execute our queries across all of our
    shards simultaneously. To keep it simple, we’ll execute our queries one by one on
    each shard, collecting the results together in the next listing.

    Listing 10.6A function to perform queries against all shards
    def get_shard_results(component, shards, query, ids=None, ttl=300,
                   sort="-updated", start=0, num=20, wait=1):

    In order to know what servers to connect to, we’ll assume that all of our shard information is kept in the standard configuration location.

       count = 0
       data = []

    Prepare structures to hold all of our fetched data.

       ids = ids or shards * [None]

    Use cached results if we have any; otherwise, start over.

       for shard in xrange(shards):
          conn = get_redis_connection('%s:%s'%(component, shard), wait)

    Get or create a connection to the desired shard.

          c, d, i = search_get_values(
             conn, query, ids[shard], ttl, sort, start, num)

    Fetch the search results and their sort values.

          count += c
          ids[shard] = i

    Combine this shard’s results with all of the other results.

       return count, data, ids

    Return the raw results from all of the shards.

    This function works as explained: we execute queries against each shard one at a time
    until we have results from all shards. Remember that in order to perform queries against
    all shards, we must pass the proper shard count to the get_shard_results() function.

    Exercise: Run queries in parallel

    Python includes a variety of methods to run calls against Redis servers in parallel.
    Because most of the work with performing a query is actually just waiting for Redis
    to respond, we can easily use Python’s built-in threading and queue libraries to send
    requests to the sharded Redis servers and wait for a response. Can you write a version
    of get_shard_results() that uses threads to fetch results from all shards in parallel?

    Now that we have all of the results from all of the queries, we only need to re-sort our
    results so that we can get an ordering on all of the results that we fetched. This isn’t
    terribly complicated, but we have to be careful about numeric and non-numeric sorts,
    handling missing values, and handling non-numeric values during numeric sorts. Our
    function for merging results and returning only the requested results is shown in the
    next listing.

    Listing 10.7A function to merge sharded search results
    def to_numeric_key(data):
          return Decimal(data[1] or '0')
          return Decimal('0')

    We’ll use the Decimal numeric type here because it transparently handles both integers and floats reasonably, defaulting to 0 if the value wasn’t numeric or was missing.

    def to_string_key(data):
       return data[1] or ''

    Always return a string, even if there was no value stored.

    def search_shards(component, shards, query, ids=None, ttl=300,
                      sort="-updated", start=0, num=20, wait=1):

    We need to take all of the sharding and searching arguments, mostly to pass on to lower-level functions, but we use the sort and search offsets.

       count, data, ids = get_shard_results(
          component, shards, query, ids, ttl, sort, start, num, wait)

    Fetch the results of the unsorted sharded search.

       reversed = sort.startswith('-')
       sort = sort.strip('-')
       key = to_numeric_key
       if sort not in ('updated', 'id', 'created'):
          key = to_string_key

    Prepare all of our sorting options.

       data.sort(key=key, reverse=reversed)

    Actually sort our results based on the sort parameter.

       results = []
       for docid, score in data[start:start+num]:

    Fetch just the page of results that we want.

       return count, results, ids

    Return the results, including the sequence of cache IDs for each shard.

    In order to handle sorting properly, we needed to write two function to convert data
    returned by Redis into values that could be consistently sorted against each other.
    You’ll notice that we chose to use Python Decimal values for sorting numerically. This
    is because we get the same sorted results with less code, and transparent support for
    handling infinity correctly. From there, all of our code does exactly what’s expected:
    we fetch the results, prepare to sort the results, sort the results, and then return only
    those document IDs from the search that are in the requested range.

    Now that we have a version of our SORT-based search that works across sharded
    Redis servers, it only remains to shard searching on ZSET-based sharded indexes.


    Like a SORT-based search, handling searching for ZSET-based search requires running
    our queries against all shards, fetching the scores to sort by, and merging the results properly. We’ll go through the same steps that we did for SORT-based search in this section:
    search on one shard, search on all shards, and then merge the results.

    To search on one shard, we’ll wrap the chapter 7 search_and_zsort() function on
    ZSETs, fetching the results and scores from the cached ZSET in the next listing.

    Listing 10.8ZSET-based search that returns scores for each result
    def search_get_zset_values(conn, query, id=None, ttl=300, update=1,
                         vote=0, start=0, num=20, desc=True):

    We need to accept all of the standard arguments for search_and_zsort().

       count, r, id = search_and_zsort(
          conn, query, id, ttl, update, vote, 0, 1, desc)

    Call the underlying search_and_zsort() function to get the cached result ID and total number of results.

       if desc:
          data = conn.zrevrange(id, 0, start + num - 1, withscores=True)
          data = conn.zrange(id, 0, start + num - 1, withscores=True)

    Fetch all of the results we need, including their scores.

       return count, data, id

    Return the count, results with scores, and the cache ID.

    Compared to the SORT-based search that does similar work, this function tries to keep
    things simple by ignoring the returned results without scores, and just fetches the
    results with scores directly from the cached ZSET. Because we have our scores already
    as floating-point numbers for easy sorting, we’ll combine the function to search on all
    shards with the function that merges and sorts the results.

    As before, we’ll perform searches for each shard one at a time, combining the
    results. When we have the results, we’ll sort them based on the scores that were
    returned. After the sort, we’ll return the results to the caller. The function that implements
    this is shown next.

    Listing 10.9Sharded search query over ZSETs that returns paginated results
    def search_shards_zset(component, shards, query, ids=None, ttl=300,
                   update=1, vote=0, start=0, num=20, desc=True, wait=1):

    We need to take all of the sharding arguments along with all of the search arguments.

       count = 0
       data = []

    Prepare structures for data to be returned.

       ids = ids or shards * [None]
       for shard in xrange(shards):

    Use cached results, if any; otherwise, start from scratch.

          conn = get_redis_connection('%s:%s'%(component, shard), wait)

    Fetch or create a connection to each shard.

          c, d, i = search_get_zset_values(conn, query, ids[shard],
             ttl, update, vote, start, num, desc)

    Perform the search on a shard and fetch the scores.

          count += c
          ids[shard] = i

    Merge the results together.

       def key(result):

    Prepare the simple sort helper to return only information about the score.

          return result[1]
       data.sort(key=key, reversed=desc)

    Sort all of the results together.

       results = []
       for docid, score in data[start:start+num]:

    Extract the document IDs from the results, removing the scores.

       return count, results, ids

    Return the search results to the caller.

    With this code, you should have a good idea of the kinds of things necessary for handling
    sharded search queries. Generally, when confronted with a situation like this, I
    find myself questioning whether it’s worth attempting to scale these queries in this
    way. Given that we now have at least working sharded search code, the question is easier
    to answer. Note that as our number of shards increase, we’ll need to fetch more
    and more data in order to satisfy our queries. At some point, we may even need to delegate
    fetching and merging to other processes, or even merging in a tree-like structure.
    At that point, we may want to consider other solutions that were purpose-built
    for search (like Lucene, Solr, Elastic Search, or even Amazon’s Cloud Search).

    Now that you know how to scale a second type of search, we really have only covered
    one other problem in other sections that might reach the point of needing to be scaled.
    Let’s take a look at what it would take to scale our social network from chapter 8.