Continuing on the same note as the Greenwald-Khanna post, we discuss another novel data structure for order statistics called Q-Digest. Compared to GK, Q-Digest is much simpler, easier to implement and extends to a distributed solution very easily. The data structure was designed with sensor networks in mind, where minimizing radio transmission overhead is of paramount importance. It originally appeared in this paper by Shrivastava et al

Q-Digest is simply a complete binary tree over the range of values, where the leaf nodes are the values themselves. In the example below, we have a digest over the values 1 to 4 inclusive, and we have seen the value 1 once and the value 4 seven times.

The novelty is the compression algorithm which allows values with low frequencies to propagate up the tree and be combined. In the example below, we have some information loss. We know that there are 10 values between 1 and 2 inclusive, but we don’t know the exact counts of each

When building the q-digest, there is one invariant to keep in mind. The total count of a node, its sibling and its parent must be more than n/k, where n is the total sum of all counts and k is a compression factor of our choosing. The root node is an exception. In the paper, this is referred to as property 2. Property 1 states that the count of a node must be less than n/k unless it is a leaf node. Property 1 is used in the original paper to prove some guarantees about the q-digest.

The data structure provides some nice guarantees that make it very practical:

- Given a compression factor k, the size of the digest is never more than 3k
- When merging two q-digests, as would be common in a distributed setting, all one has to do is take the union of the two and run the compress algorithm. This is much simpler than the distributed extensions to the GK algorithm.
- When answering quantile queries, the error is bounded by log(σ)/ k, where σ is the max range of values stored in the digest. Thus our answer to a median query will always be between 0.5n and (0.5 + log(σ)/k)*n.
- We can maintain the same relative errors when merging digests
- As we’ll see in future posts, we can augment the data structure to create a lot of fun possibilities

I’m providing the source code in python. Since I’m new to python, any non-performance related comments on the code itself are much appreciated.

from collections import namedtuple from math import floor, pow, log, ceil import random ### Two properties ### 1) node.count <= floor(n/k) where n = sum(f_i \all i), meaning the sum of all frequencies and k is a compression factor ### 2) node.count + node.parent.count + node.sibling.count > floor(n/k) ### root and leaf nodes are exceptions ### Property #1 means that no node should have a high count ### Property #2 means that two children (siblings) that have low count should be merged with their parent class QDigest(): """ Implements a tree without wasting space by assuming that children can be found at index 2i, 2i+1 and parent is at floor(i / 2) """ class Node(): def __init__(self, id, is_leaf=False, initial_count=1): self.id = id self.count = initial_count self.is_leaf = is_leaf def inc(self, amount=1): self.count+=amount def __cmp__(self, other): return cmp(self.id, other.id) def __repr__(self): return "{id=%d, cnt=%d}" % (self.id, self.count) @property def is_root(self): return self.id is 1 def parent_id(self): """ Returns the id of the parent or None if this is the root node """ if self.is_root: return None return int(floor(self.id / 2.0)) def children(self): """ Returns the ids of the left child and the right child respectively """ return (2*self.id, 2*self.id + 1) def sibling_id(self): """ Returns the id of the sibling or None if this is the root node """ if self.is_root: return None if self.id % 2 == 0: return self.id + 1 else: return self.id - 1 empty_node = Node(id=-1, initial_count=0) def __init__(self, universe_size, compression_factor): """ Defines a q-digest of specified size. Size here defines the universe, not the sum of frequencies. It is assumed that the first value is 1 """ self.size = universe_size self.digest = [] self.id = 1 self.k = compression_factor def __add__(self, other): """ Operator overloading for merging two q-digests together. The two digests must have the same compression factor """ if self.k is not other.k: raise ValueError("Compression factors of two digests not the same") digest = QDigest(max(self.size, other.size), self.k) digest.digest = self.digest[:] for node in other.digest: digest._insert_or_modify_node(node.id, node.count) digest.compress() return digest @property def n(self): """ Sum of all frequencies stored in this q-digest """ return sum(x.count for x in self.digest) @property def height(self): """ The height of the tree, used when assuming that all leaf nodes have a range [value, value] """ return int(ceil(log(self.size, 2))) def _get_node(self, node_id): """ Returns the node object having the specified id. If that id does not exist return a special node with id=-1 and count=0 """ try: node = (x for x in self.digest if x.id == node_id).next() return node except StopIteration: return QDigest.empty_node def _remove_node(self, node_id): """ Removed the node with the specified id from the digest """ node = self._get_node(node_id) if node is not QDigest.empty_node: self.digest.remove(node) def violates_prop_1(self, node): """ Checks if a node violates property 1, namely node.count <= floor(n/k) This property is not used in the code, but it is used in the paper to prove error guarantees """ if node.is_root or node.is_leaf: return False else: return node.count <= int(floor(self.n/self.k)) def violates_prop_2(self, node): """ Checks if a node violates node.count + node.sibling.count + node.parent.count > floor(n/k) We assume that the node passed in is one of the two children nodes """ sibling_count = self._get_node(node.sibling_id()).count parent_count = self._get_node(node.parent_id()).count return node.count + sibling_count + parent_count <= int(floor(self.n/self.k)) def _insert_or_modify_node(self, node_id, inc_by=1): """ Helper function to do the actual insertion """ current = self._get_node(node_id) if current is not QDigest.empty_node: current.inc(inc_by) else: current = QDigest.Node(id=node_id, initial_count=inc_by) self.digest.append(current) def insert(self, value): """ Insert a new element into the q-digest or increase the count if we have already seen that value before """ if value > self.size: raise ValueError() id_for_leaf_node = int(pow(2, self.height) + value - 1) self._insert_or_modify_node(node_id=id_for_leaf_node, inc_by=1) def compress(self): """ Starting from the bottom, go over each node and at each step check if prop2 is violated """ for l in xrange(self.height, 0,-1): # start from bottom level_l_nodes = sorted((x for x in self.digest if x.id >= pow(2,l) and x.id < pow(2,l+1)), key=id) for node in level_l_nodes: if self.violates_prop_2(node): merged_count = node.count + self._get_node(node.sibling_id()).count self._insert_or_modify_node(node_id=node.parent_id(), inc_by=merged_count) self._remove_node(node.id) self._remove_node(node.sibling_id()) def quantile_query(self, fractions): """ Given a list of fractions between (0, 1) find the value in the digest closer to that quantile. For example set fraction to 0.5 and 0.75 to get the median and 75th percentile. The algorithm as describes in the paper goes as follows: Traverse the tree in postorder fashion, adding the counts. Once the running sum exceeds q*n, where q our fraction, return the previous value. """ # setup arrays for results running_sums = {} results = {} for fraction in fractions: if fraction <= 0.0 or fraction >= 1.0: raise ValueError("Fraction should be between 0 and 1 exclusive") else: running_sums[str(fraction)] = 0 results[str(fraction)] = None def _get_node_for_traversal(node_id): """ helper fcn that will create a pseudo-node if one does not exist, just so we can easily fake post-order traversal the traditional way """ current = self._get_node(node_id) if current.id is -1: current = QDigest.Node(id=node_id, initial_count=0) return current def basic_dfs(node): """ generator fcn that does a postorder traversal of the tree, summing up the counts for each node and stores the result when it exceeds q*n for each desired fraction passed in the outer fcn """ if node and node.id < self.size * 2 : left_node, right_node = (_get_node_for_traversal(x) for x in node.children()) for node in basic_dfs(left_node): yield node for node in basic_dfs(right_node): yield node for fraction in fractions: if not results[str(fraction)] and node.count + running_sums[str(fraction)] < fraction*self.n: running_sums[str(fraction)] = running_sums[str(fraction)] + node.count else: results[str(fraction)] = running_sums[str(fraction)] #TODO: stop programming with side-effects, shitty habit [x for x in basic_dfs(_get_node_for_traversal(1))] return results if __name__ == '__main__': # the example from the paper digest = QDigest(8,5) digest.insert(1) digest.insert(3) digest.insert(3) digest.insert(3) digest.insert(3) digest.insert(4) digest.insert(4) digest.insert(4) digest.insert(4) digest.insert(4) digest.insert(4) digest.insert(5) digest.insert(6) digest.insert(7) digest.insert(8) digest.compress() print digest.digest print digest.quantile_query((0.25, 0.5, 0.75))

ekirpichovHi!

I found a problem in the algorithm (your implementation has that problem as well) and fixed it. See https://plus.google.com/u/0/109909935680879695595/posts/eTnoqB62iya

ekirpichovSorry, the correct link to the G+ post is https://plus.google.com/u/0/109909935680879695595/posts/768ZZ9Euqz6

papercruncherPost authorThanks Eugene, I’ll look into it. Did the authors get back to you?

Eugene KirpichovThey promised to, but eventually didn’t 😐 In the meantime, I had a few discussions about this with my friends and ended up with this implementation: https://github.com/clearspring/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/quantile/QDigest.java

papercruncherPost authorI’ve been looking at the picture you posted on your G+ and I’m having a hard time figuring out how you would end up with that initial digest to begin with. While you are right that the specific snapshot does not violate any of the two properties, the authors only examine building the q-digest in the case where all the data is available. They handle the case of more incoming data by merging Q-Digest as they say in the first paragraph in section 3.3. There is no INSERT() operation. Even if that element in the first top diagram came via MERGE(), it wouldn’t be there as a COMPRESS() operation would have moved it to the root.

I apologize if you already covered this in your post, I failed at visualizing the other tree q-digest you described.

ekirpichovI’m not sure if that particular digest can indeed be the result of a sequence of inserts; this is a crafted example, simplified from a more complex example that actually resulted from a sequence of inserts (I found that in the debugger and then hand-simplified it while preserving the bug).

http://cmellaroma.com/bottega-veneta/bottega-veneta-beetle-iridescent-leather-continental-wallet.htmlHi, this is a great post.

Anonymousjust a note. the __add__ method wont work for digests that have different universe_size. merging is done on a value range basis (from the paper, v.min and v.max), not by id, which will not be an issue if the universe_size is the same between digests, but will become problematic otherwise.