diff --git a/src/python/qubed/CompressedDataCubeTree.py b/src/python/qubed/CompressedDataCubeTree.py deleted file mode 100644 index d8d224b..0000000 --- a/src/python/qubed/CompressedDataCubeTree.py +++ /dev/null @@ -1,218 +0,0 @@ -import dataclasses -from collections import defaultdict -from dataclasses import dataclass, field - -from frozendict import frozendict - -from .Qube import Enum, NodeData, Tree -from .tree_formatters import HTML, node_tree_to_html, node_tree_to_string - -NodeId = int -CacheType = dict[NodeId, "CompressedNode"] - -@dataclass(frozen=True) -class CompressedNode: - id: NodeId = field(hash=False, compare=False) - data: NodeData - - _children: tuple[NodeId, ...] - _cache: CacheType = field(repr=False, hash=False, compare=False) - - @property - def children(self) -> tuple["CompressedNode", ...]: - return tuple(self._cache[i] for i in self._children) - - def summary(self, debug = False) -> str: - if debug: return f"{self.data.key}={self.data.values.summary()} ({self.id})" - return f"{self.data.key}={self.data.values.summary()}" if self.data.key != "root" else "root" - - -@dataclass(frozen=True) -class CompressedTree: - """ - This tree is compressed in two distinct different ways: - 1. Product Compression: Nodes have a key and **multiple values**, so each node represents many logical nodes key=value1, key=value2, ... - Each of these logical nodes is has identical children so we can compress them like this. - In this way any distinct path through the tree represents a cartesian product of the values, otherwise known as a datacube. - - 2. In order to facilitate the product compression described above we need to know when two nodes have identical children. - To do this every node is assigned an Id which is initially computed as a hash from the nodes data and its childrens' ids. - In order to avoid hash collisions we increment the initial hash if it's already in the cache for a different node - we do this until we find a unique id. - - Crucially this allows us to later determine if a new node is already cached: - id = hash(node) - while True: - if id not in cache: The node is definitely not in the cache - elif cache[id] != node: Hash collision, increment id and try again - else: The node is already in the cache - id += 1 - - This tree can be walked from the root by repeatedly looking up the children of a node in the cache. - - This structure facilitates compression because we can look at the children of a node: - If two chidren have the same key, metadata and children then we can compress them into a single node. - -""" - root: CompressedNode - cache: CacheType - - @staticmethod - def add_to_cache(cache : dict[NodeId, CompressedNode], data : NodeData, _children: tuple[NodeId, ...]) -> NodeId: - """ - This function is responsible for adding a new node to the cache and returning its id. - Crucially we need a way to check if new nodes are already in the cache, so we hash them. - But in case of a hash collision we need to increment the id and try again. - This way we will always eventually find a unique id for the node. - And we will never store the same node twice with a different id. - """ - _children = tuple(sorted(_children)) - id = hash((data, _children)) - - # To avoid hash collisions, we increment the id until we find a unique one - tries = 0 - while True: - tries += 1 - if id not in cache: - # The node isn't in the cache and this id is free - cache[id] = CompressedNode(id = id, - data = data, - _children = _children, - _cache = cache) - break - - if cache[id].data == data and cache[id]._children == _children: - break # The node is already in the cache - - # This id is already in use by a different node so increment it (mod) and try again - id = (id + 1) % (2**64) - - if tries > 100: - raise RuntimeError("Too many hash collisions, something is wrong.") - - return id - - - @classmethod - def from_tree(cls, tree : Tree) -> 'CompressedTree': - cache = {} - - def cache_tree(level : Tree) -> NodeId: - node_data = NodeData( - key = level.key, - values = level.values, - ) - - # Recursively cache the children - children = tuple(cache_tree(c) for c in level.children) - - # Add the node to the cache and return its id - return cls.add_to_cache(cache, node_data, children) - - root = cache_tree(tree) - return cls(cache = cache, root = cache[root]) - - def __str__(self, depth=None) -> str: - return "".join(node_tree_to_string(self.root, depth = depth)) - - def print(self, depth = None): print(self.__str__(depth = depth)) - - def html(self, depth = 2, debug = False) -> HTML: - return HTML(node_tree_to_html(self.root, depth = depth, debug = debug)) - - def _repr_html_(self) -> str: - return node_tree_to_html(self.root, depth = 2) - - def __getitem__(self, args) -> 'CompressedTree': - key, value = args - for c in self.root.children: - if c.data.key == key and value in c.data.values: - data = dataclasses.replace(c.data, values = Enum((value,))) - return CompressedTree( - cache = self.cache, - root = dataclasses.replace(c, data = data) - ) - raise KeyError(f"Key {key} not found in children.") - - def collapse_children(self, node: "CompressedNode") -> "CompressedNode": - # First perform the collapse on the children - new_children = [self.collapse_children(child) for child in node.children] - - # Now take the set of new children and see if any have identical key, metadata and children - # the values may different and will be collapsed into a single node - identical_children = defaultdict(set) - for child in new_children: - identical_children[(child.data.key, child.data.metadata, child._children)].add(child) - - # Now go through and create new compressed nodes for any groups that need collapsing - new_children = [] - for (key, metadata, _children), child_set in identical_children.items(): - if len(child_set) > 1: - # Compress the children into a single node - assert all(isinstance(child.data.values, Enum) for child in child_set), "All children must have Enum values" - node_data = NodeData( - key = key, - metadata = frozendict(), # Todo: Implement metadata compression - values = Enum(tuple(v for child in child_set for v in child.data.values.values)), - ) - - # Add the node to the cache - id = type(self).add_to_cache(self.cache, node_data, _children) - else: - # If the group is size one just keep it - id = child_set.pop().id - - new_children.append(id) - - id = self.add_to_cache(self.cache, node.data, tuple(sorted(new_children))) - return self.cache[id] - - - def compress(self) -> 'CompressedTree': - return CompressedTree(cache = self.cache, root = self.collapse_children(self.root)) - - def lookup(self, selection : dict[str, str]): - nodes = [self.root] - for _ in range(1000): - found = False - current_node = nodes[-1] - for c in current_node.children: - if selection.get(c.data.key, None) in c.data.values: - if found: - raise RuntimeError("This tree is invalid, because it contains overlapping branches.") - nodes.append(c) - selection.pop(c.data.key) - found = True - - if not found: - return nodes - - raise RuntimeError("Maximum node searches exceeded, the tree contains a loop or something is buggy.") - - - - - # def reconstruct(self) -> Tree: - # def reconstruct_node(h : int) -> Tree: - # node = self.cache[h] - # dedup : dict[tuple[int, str], set[NodeId]] = defaultdict(set) - # for index in self.cache[h].children: - # child_node = self.cache[index] - # child_hash = hash(child_node.children) - # assert isinstance(child_node.values, Enum) - # dedup[(child_hash, child_node.key)].add(index) - - - # children = tuple( - # Tree(key = key, values = Enum(tuple(values)), - # children = tuple(reconstruct_node(i) for i in self.cache[next(indices)].children) - # ) - # for (_, key), indices in dedup.items() - # ) - - # return Tree( - # key = node.key, - # values = node.values, - # children = children, - # ) - # return reconstruct_node(self.root) \ No newline at end of file diff --git a/src/python/qubed/CompressedTree.py b/src/python/qubed/CompressedTree.py deleted file mode 100644 index 84d5c87..0000000 --- a/src/python/qubed/CompressedTree.py +++ /dev/null @@ -1,329 +0,0 @@ -import json -from collections import defaultdict -from dataclasses import asdict, dataclass -from pathlib import Path - -Tree = dict[str, "Tree"] - -class RefcountedDict(dict[str, int]): - refcount: int = 1 - - def __repr__(self): - return f"RefcountedDict(refcount={self.refcount}, {super().__repr__()})" - - def __hash__(self): - return hash(tuple(sorted(self.items()))) - -@dataclass -class JSONNode: - key: str - values: list[str] - children: list["JSONNode"] - -class CompressedTree(): - """ - A implementation of a compressed tree that supports lookup, insertion, deletion and caching. - The caching means that identical subtrees are stored only once, saving memory - This is implemented internal by storing all subtrees in a global hash table - - """ - cache: dict[int, RefcountedDict] - tree: RefcountedDict - - def _add_to_cache(self, level : RefcountedDict) -> int: - "Add a level {key -> hash} to the cache" - h = hash(level) - if h not in self.cache: - # Increase refcounts of the child nodes - for child_h in level.values(): - self.cache[child_h].refcount += 1 - self.cache[h] = RefcountedDict(level) - else: - self.cache[h].refcount += 1 - return h - - def _replace_in_cache(self, old_h, level : RefcountedDict) -> int: - """ - Replace the object at old_h with a different object level - If the objects this is a no-op - """ - # Start by adding the new object to the cache - new_h = self._add_to_cache(level) - - # Now check if the old object needs to be garbage collected - self._decrease_refcount(old_h) - - return new_h - - def _decrease_refcount(self, h : int): - self.cache[h].refcount -= 1 - if self.cache[h].refcount == 0: - # Recursively decrease refcounts of child nodes - for child_h in self.cache[h].values(): - self._decrease_refcount(child_h) - del self.cache[h] - - def cache_tree(self, tree : Tree) -> int: - "Insert the given tree (dictonary of dictionaries) (all it's children, recursively) into the hash table and return the hash key" - level = RefcountedDict({k : self.cache_tree(v) for k, v in tree.items()}) - return self._add_to_cache(level) - - - def _cache_path(self, path : list[str]) -> int: - "Treat path = [x, y, z...] like {x : {y : {z : ...}}} and cache that" - if not path: - return self.empty_hash - k, *rest = path - return self._add_to_cache(RefcountedDict({k : self._cache_path(rest)})) - - def reconstruct(self, max_depth = None) -> dict[str, dict]: - "Reconstruct the tree as a normal nested dictionary" - def reconstruct_node(h : int, depth : int) -> dict[str, dict]: - if max_depth is not None and depth > max_depth: - return {} - return {k : reconstruct_node(v, depth+1) for k, v in self.cache[h].items()} - return reconstruct_node(self.root_hash, 0) - - def reconstruct_compressed(self) -> dict[str, dict]: - "Reconstruct the tree as a normal nested dictionary" - def reconstruct_node(h : int) -> dict[str, dict]: - dedup : dict[int, set[str]] = defaultdict(set) - for k, h2 in self.cache[h].items(): - dedup[h2].add(k) - - return {"/".join(keys) : reconstruct_node(h) for h, keys in dedup.items()} - return reconstruct_node(self.root_hash) - - def reconstruct_compressed_ecmwf_style(self, max_depth=None, from_node=None) -> dict[str, dict]: - "Reconstruct the tree as a normal nested dictionary" - def reconstruct_node(h : int, depth : int) -> dict[str, dict]: - if max_depth is not None and depth > max_depth: - return {} - dedup : dict[tuple[int, str], set[str]] = defaultdict(set) - for k, h2 in self.cache[h].items(): - key, value = k.split("=") - dedup[(h2, key)].add(value) - - return {f"{key}={','.join(values)}" : reconstruct_node(h, depth=depth+1) for (h, key), values in dedup.items()} - return reconstruct_node(from_node or self.root_hash, depth=0) - - def to_json(self, max_depth=None, from_node=None) -> dict: - def reconstruct_node(h : int, depth : int) -> list[JSONNode]: - if max_depth is not None and depth > max_depth: - return {} - dedup : dict[tuple[int, str], set[str]] = defaultdict(set) - for k, h2 in self.cache[h].items(): - key, value = k.split("=") - dedup[(h2, key)].add(value) - - return [JSONNode( - key = key, - values = list(values), - children = reconstruct_node(h, depth=depth+1), - ) for (h, key), values in dedup.items()] - - return asdict(reconstruct_node(from_node or self.root_hash, depth=0)[0]) - - def __init__(self, tree : Tree): - self.cache = {} - self.empty_hash = hash(RefcountedDict({})) - - # Recursively cache the tree - self.root_hash = self.cache_tree(tree) - - # Keep a reference to the root of the tree - self.tree = self.cache[self.root_hash] - - - def lookup(self, keys : tuple[str, ...]) -> tuple[bool, tuple[str, ...]]: - """ - Lookup a subtree in the tree - Returns success, path - if success == True it means the path got to the bottom of the tree and path will be equal to keys - if success == False, path will holds the keys that were found - """ - loc = self.tree - for i, key in enumerate(keys): - if key in loc: - h = loc[key] # get the hash of the subtree - loc = self.cache[h] # get the subtree - else: - return False, keys[:i], h - return True, keys, h - - def keys(self, keys : tuple[str, ...] = ()) -> list[str] | None: - loc = self.tree - for i, key in enumerate(keys): - if key in loc: - h = loc[key] # get the hash of the subtree - loc = self.cache[h] # get the subtree - else: - return None - return list(loc.keys()) - - def multi_match(self, request : dict[str, list[str]], loc = None): - if loc is None: loc = self.tree - if loc == {}: return {"_END_" : {}} - matches = {} - for request_key, request_values in request.items(): - for request_value in request_values: - meta_key = f"{request_key}={request_value}" - if meta_key in loc: - new_loc = self.cache[loc[meta_key]] - matches[meta_key] = self.multi_match(request, new_loc) - - if not matches: return {k : {} for k in loc.items()} - return matches - - - def _insert(self, old_h : int, tree: RefcountedDict, keys : tuple[str, ...]) -> int: - "Insert keys in the subtree and return the new hash of the subtree" - key, *rest = keys - assert old_h in self.cache - - # Adding a new branch to the tree - if key not in tree: - new_tree = RefcountedDict(tree | {key : self._cache_path(rest)}) - - else: - # Make a copy of the tree and update the subtree - new_tree = RefcountedDict(tree.copy()) - subtree_h = tree[key] - subtree = self.cache[subtree_h] - new_tree[key] = self._insert(subtree_h, subtree, tuple(rest)) - - # no-op if the hash hasn't changed - new_h = self._replace_in_cache(old_h, new_tree) - return new_h - - - def insert(self, keys : tuple[str, ...]): - """ - Insert a new branch into the compressed tree - """ - already_there, path = self.lookup(keys) - if already_there: - return - # Update the tree - self.root_hash = self._insert(self.root_hash, self.tree, keys) - self.tree = self.cache[self.root_hash] - - def insert_tree(self, subtree: Tree): - """ - Insert a whole tree into the compressed tree. - """ - self.root_hash = self._insert_tree(self.root_hash, self.tree, subtree) - self.tree = self.cache[self.root_hash] - - def _insert_tree(self, old_h: int, tree: RefcountedDict, subtree: Tree) -> int: - """ - Recursively insert a subtree into the compressed tree and return the new hash. - """ - assert old_h in self.cache - - # Make a copy of the tree to avoid modifying shared structures - new_tree = RefcountedDict(tree.copy()) - for key, sub_subtree in subtree.items(): - if key not in tree: - # Key is not in current tree, add the subtree - # Cache the subtree rooted at sub_subtree - subtree_h = self.cache_tree(sub_subtree) - new_tree[key] = subtree_h - else: - # Key is in tree, need to recursively merge - # Get the hash and subtree from the current tree - child_h = tree[key] - child_tree = self.cache[child_h] - # Recursively merge - new_child_h = self._insert_tree(child_h, child_tree, sub_subtree) - new_tree[key] = new_child_h - - # Replace the old hash with the new one in the cache - new_h = self._replace_in_cache(old_h, new_tree) - return new_h - - def save(self, path : Path): - "Save the compressed tree to a file" - with open(path, "w") as f: - json.dump({ - "cache" : {k : {"refcount" : v.refcount, "dict" : v} for k, v in self.cache.items()}, - "root_hash": self.root_hash - }, f) - - @classmethod - def load(cls, path : Path) -> "CompressedTree": - "Load the compressed tree from a file" - with open(path) as f: - data = json.load(f) - return cls.from_json(data) - - - @classmethod - def from_json(cls, data : dict) -> "CompressedTree": - c = CompressedTree({}) - c.cache = {} - for k, v in data["cache"].items(): - c.cache[int(k)] = RefcountedDict(v["dict"]) - c.cache[int(k)].refcount = v["refcount"] - - c.root_hash = data["root_hash"] - c.tree = c.cache[c.root_hash] - return c - - -if __name__ == "__main__": - original_tree = { - "a": { - "b1": { - "c": {} - }, - "b2" : { - "c": {} - }, - "b3*": { - "c*": {} - } - } - } - - c_tree = CompressedTree(original_tree) - - assert c_tree.lookup(("a", "b1", "c")) == (True, ("a", "b1", "c")) - assert c_tree.lookup(("a", "b1", "d")) == (False, ("a", "b1")) - - print(json.dumps(c_tree.reconstruct_compressed(), indent = 4)) - - assert c_tree.reconstruct() == original_tree - - c_tree.insert(("a", "b1", "d")) - c_tree.insert(("a", "b2", "d")) - print(json.dumps(c_tree.reconstruct(), indent = 4)) - - print(json.dumps(c_tree.reconstruct_compressed(), indent = 4)) - print(c_tree.cache) - - # test round trip - assert CompressedTree(original_tree).reconstruct() == original_tree - - # test adding a key - added_keys_tree = { - "a": { - "b1": { - "c": {} - }, - "b2" : { - "c": {}, - "d" : {} - }, - "b3*": { - "c*": {}, - "d*": {} - } - } - } - c_tree = CompressedTree(original_tree) - c_tree.insert(("a", "b2", "d")) - c_tree.insert(("a", "b3*", "d*")) - assert c_tree.reconstruct() == added_keys_tree - - print(c_tree.reconstruct_compressed()) \ No newline at end of file