Q-Digest

Continuing on the same note as the Greenwald-Khanna post, we discuss another novel data structure for order statistics called Q-Digest. Compared to GK, Q-Digest is much simpler, easier to implement and extends to a distributed solution very easily. The data structure was designed with sensor networks in mind, where minimizing radio transmission overhead is of paramount importance.  It originally appeared in this paper by Shrivastava et al

Q-Digest is simply a complete binary tree over the range of values, where the leaf nodes are the values themselves. In the example below, we have a digest over the values 1 to 4 inclusive, and we have seen the value 1 once and the value 4 seven times.

A Q-Digest over the values [1..4]

The novelty is the compression algorithm which allows values with low frequencies to propagate up the tree and be combined. In the example below, we have some information loss. We know that there are 10 values between 1 and 2 inclusive, but we don’t know the exact counts of each

Compressed Q-Digest over the values 1 through 4 inclusive

When building the q-digest, there is one invariant to keep in mind. The total count of a node, its sibling and its parent must be more than n/k, where n is the total sum of all counts and k is a compression factor of our choosing. The root node is an exception. In the paper, this is referred to as property 2. Property 1 states that the count of a node must be less than n/k unless it is a leaf node. Property 1 is used in the original paper to prove some guarantees about the q-digest.

The data structure provides some nice guarantees that make it very practical:

  • Given a compression factor k, the size of the digest is never more than 3k
  • When merging two q-digests, as would be common in a distributed setting, all one has to do is take the union of the two and run the compress algorithm. This is much simpler than the distributed extensions to the GK algorithm.
  • When answering quantile queries, the error is bounded by log(σ)/ k, where σ is the max range of values stored in the digest. Thus our answer to a median query will always be between 0.5n and (0.5 + log(σ)/k)*n.
  • We can maintain the same relative errors when merging digests
  • As we’ll see in future posts, we can augment the data structure to create a lot of fun possibilities

I’m providing the source code in python. Since I’m new to python, any non-performance related comments on the code itself are much appreciated.

from collections import namedtuple
from math import floor, pow, log, ceil
import random

### Two properties
### 1) node.count <= floor(n/k) where n = sum(f_i \all i), meaning the sum of all frequencies and k is a compression factor
### 2) node.count + node.parent.count + node.sibling.count > floor(n/k)
### root and leaf nodes are exceptions
### Property #1 means that no node should have a high count
### Property #2 means that two children (siblings) that have low count should be merged with their parent

class QDigest():
    """
    Implements a tree without wasting space by assuming that children can be found at index 2i, 2i+1 and parent
    is at floor(i / 2)
    """

    class Node():
        def __init__(self, id, is_leaf=False, initial_count=1):
            self.id = id
            self.count = initial_count
            self.is_leaf = is_leaf

        def inc(self, amount=1):
            self.count+=amount

        def __cmp__(self, other):
            return cmp(self.id, other.id)

        def __repr__(self):
            return "{id=%d, cnt=%d}" % (self.id, self.count)

        @property
        def is_root(self):
            return self.id is 1

        def parent_id(self):
            """ Returns the id of the parent or None if this is the root node
            """
            if self.is_root:
                return None
            return int(floor(self.id / 2.0))

        def children(self):
            """ Returns the ids of the left child and the right child respectively
            """
            return (2*self.id, 2*self.id + 1)

        def sibling_id(self):
            """ Returns the id of the sibling or None if this is the root node
            """
            if self.is_root:
                return None

            if self.id % 2 == 0:
                return self.id + 1
            else:
                return self.id - 1

    empty_node = Node(id=-1, initial_count=0)

    def __init__(self, universe_size, compression_factor):
        """ Defines a q-digest of specified size. Size here defines the universe, not the sum of frequencies.
         It is assumed that the first value is 1
        """
        self.size = universe_size
        self.digest = []
        self.id = 1
        self.k = compression_factor

    def __add__(self, other):
        """
        Operator overloading for merging two q-digests together. The two digests must have the same compression
        factor
        """
        if self.k is not other.k:
            raise ValueError("Compression factors of two digests not the same")

        digest = QDigest(max(self.size, other.size), self.k)
        digest.digest = self.digest[:]
        for node in other.digest:
            digest._insert_or_modify_node(node.id, node.count)

        digest.compress()

        return digest

    @property
    def n(self):
        """ Sum of all frequencies stored in this q-digest
        """
        return sum(x.count for x in self.digest)

    @property
    def height(self):
        """
        The height of the tree, used when assuming that all leaf nodes have a range [value, value]
        """
        return int(ceil(log(self.size, 2)))

    def _get_node(self, node_id):
        """
        Returns the node object having the specified id. If that id does not exist
        return a special node with id=-1 and count=0
        """
        try:
            node = (x for x in self.digest if x.id == node_id).next()
            return node
        except StopIteration:
            return QDigest.empty_node

    def _remove_node(self, node_id):
        """
        Removed the node with the specified id from the digest
        """
        node = self._get_node(node_id)
        if node is not QDigest.empty_node:
            self.digest.remove(node)

    def violates_prop_1(self, node):
        """
        Checks if a node violates property 1, namely node.count <= floor(n/k)
        This property is not used in the code, but it is used in the paper to prove error guarantees
        """
        if node.is_root or node.is_leaf:
            return False
        else:
            return node.count <= int(floor(self.n/self.k))

    def violates_prop_2(self, node):
        """ Checks if a node violates node.count + node.sibling.count + node.parent.count > floor(n/k)
        We assume that the node passed in is one of the two children nodes
        """
        sibling_count = self._get_node(node.sibling_id()).count
        parent_count = self._get_node(node.parent_id()).count

        return node.count + sibling_count + parent_count <= int(floor(self.n/self.k))

    def _insert_or_modify_node(self, node_id, inc_by=1):
        """
        Helper function to do the actual insertion
        """
        current = self._get_node(node_id)
        if current is not QDigest.empty_node:
            current.inc(inc_by)
        else:
            current = QDigest.Node(id=node_id, initial_count=inc_by)
            self.digest.append(current)

    def insert(self, value):
        """
        Insert a new element into the q-digest or increase the count if we have already seen that
        value before
        """
        if value > self.size:
            raise ValueError()

        id_for_leaf_node = int(pow(2, self.height) + value - 1)
        self._insert_or_modify_node(node_id=id_for_leaf_node, inc_by=1)

    def compress(self):
        """
        Starting from the bottom, go over each node and at each step check if prop2 is violated
        """
        for l in xrange(self.height, 0,-1): # start from bottom
            level_l_nodes = sorted((x for x in self.digest if x.id >= pow(2,l) and x.id < pow(2,l+1)), key=id)
            for node in level_l_nodes:
                if self.violates_prop_2(node):
                    merged_count = node.count + self._get_node(node.sibling_id()).count
                    self._insert_or_modify_node(node_id=node.parent_id(), inc_by=merged_count)
                    self._remove_node(node.id)
                    self._remove_node(node.sibling_id())

    def quantile_query(self, fractions):
        """
        Given a list of fractions between (0, 1) find the value in the digest closer to that quantile. For example
        set fraction to 0.5 and 0.75 to get the median and 75th percentile. The algorithm as describes in the paper
        goes as follows: Traverse the tree in postorder fashion, adding the counts. Once the running sum exceeds
        q*n, where q our fraction, return the previous value.
        """

        # setup arrays for results
        running_sums = {}
        results = {}
        for fraction in fractions:
            if fraction <= 0.0 or fraction >= 1.0:
                raise ValueError("Fraction should be between 0 and 1 exclusive")
            else:
                running_sums[str(fraction)] = 0
                results[str(fraction)] = None

        def _get_node_for_traversal(node_id):
            """
            helper fcn that will create a pseudo-node if one does not exist, just so we can
            easily fake post-order traversal the traditional way
            """
            current = self._get_node(node_id)
            if current.id is -1:
                current = QDigest.Node(id=node_id, initial_count=0)
            return current

        def basic_dfs(node):
            """
            generator fcn that does a postorder traversal of the tree, summing up the counts for each
            node and stores the result when it exceeds q*n for each desired fraction passed in the outer fcn
            """
            if node and node.id < self.size * 2 :
                left_node, right_node = (_get_node_for_traversal(x) for x in node.children())

                for node in basic_dfs(left_node):
                    yield node
                for node in basic_dfs(right_node):
                    yield node

                for fraction in fractions:
                    if not results[str(fraction)] and node.count + running_sums[str(fraction)] < fraction*self.n:
                        running_sums[str(fraction)] = running_sums[str(fraction)] + node.count
                    else:
                        results[str(fraction)] = running_sums[str(fraction)]

        #TODO: stop programming with side-effects, shitty habit
        [x for x in basic_dfs(_get_node_for_traversal(1))]
        return results
if __name__ == '__main__':
    # the example from the paper
    digest = QDigest(8,5)
    digest.insert(1)
    digest.insert(3)
    digest.insert(3)
    digest.insert(3)
    digest.insert(3)
    digest.insert(4)
    digest.insert(4)
    digest.insert(4)
    digest.insert(4)
    digest.insert(4)
    digest.insert(4)
    digest.insert(5)
    digest.insert(6)
    digest.insert(7)
    digest.insert(8)
    digest.compress()
    print digest.digest
    print digest.quantile_query((0.25, 0.5, 0.75))
Advertisements

9 thoughts on “Q-Digest

  1. papercruncher Post author

    I’ve been looking at the picture you posted on your G+ and I’m having a hard time figuring out how you would end up with that initial digest to begin with. While you are right that the specific snapshot does not violate any of the two properties, the authors only examine building the q-digest in the case where all the data is available. They handle the case of more incoming data by merging Q-Digest as they say in the first paragraph in section 3.3. There is no INSERT() operation. Even if that element in the first top diagram came via MERGE(), it wouldn’t be there as a COMPRESS() operation would have moved it to the root.

    I apologize if you already covered this in your post, I failed at visualizing the other tree q-digest you described.

  2. ekirpichov

    I’m not sure if that particular digest can indeed be the result of a sequence of inserts; this is a crafted example, simplified from a more complex example that actually resulted from a sequence of inserts (I found that in the debugger and then hand-simplified it while preserving the bug).

  3. Anonymous

    just a note. the __add__ method wont work for digests that have different universe_size. merging is done on a value range basis (from the paper, v.min and v.max), not by id, which will not be an issue if the universe_size is the same between digests, but will become problematic otherwise.

  4. MuffinSandwich

    Line 170 is incorrect, since it will increment the parent node even when the child node under consideration has already been removed in a previous iteration of the loop. To fix this, line 168 should be “if self.violates_prop_2(node) and self._get_node(node.id) is not QDigest.empty_node:”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s