Get rid of old code

This commit is contained in:
Tom 2025-02-13 13:48:27 +00:00
parent fbf8a0fcaf
commit 00ea804c35
2 changed files with 0 additions and 547 deletions

View File

@ -1,218 +0,0 @@
import dataclasses
from collections import defaultdict
from dataclasses import dataclass, field
from frozendict import frozendict
from .Qube import Enum, NodeData, Tree
from .tree_formatters import HTML, node_tree_to_html, node_tree_to_string
NodeId = int
CacheType = dict[NodeId, "CompressedNode"]
@dataclass(frozen=True)
class CompressedNode:
id: NodeId = field(hash=False, compare=False)
data: NodeData
_children: tuple[NodeId, ...]
_cache: CacheType = field(repr=False, hash=False, compare=False)
@property
def children(self) -> tuple["CompressedNode", ...]:
return tuple(self._cache[i] for i in self._children)
def summary(self, debug = False) -> str:
if debug: return f"{self.data.key}={self.data.values.summary()} ({self.id})"
return f"{self.data.key}={self.data.values.summary()}" if self.data.key != "root" else "root"
@dataclass(frozen=True)
class CompressedTree:
"""
This tree is compressed in two distinct different ways:
1. Product Compression: Nodes have a key and **multiple values**, so each node represents many logical nodes key=value1, key=value2, ...
Each of these logical nodes is has identical children so we can compress them like this.
In this way any distinct path through the tree represents a cartesian product of the values, otherwise known as a datacube.
2. In order to facilitate the product compression described above we need to know when two nodes have identical children.
To do this every node is assigned an Id which is initially computed as a hash from the nodes data and its childrens' ids.
In order to avoid hash collisions we increment the initial hash if it's already in the cache for a different node
we do this until we find a unique id.
Crucially this allows us to later determine if a new node is already cached:
id = hash(node)
while True:
if id not in cache: The node is definitely not in the cache
elif cache[id] != node: Hash collision, increment id and try again
else: The node is already in the cache
id += 1
This tree can be walked from the root by repeatedly looking up the children of a node in the cache.
This structure facilitates compression because we can look at the children of a node:
If two chidren have the same key, metadata and children then we can compress them into a single node.
"""
root: CompressedNode
cache: CacheType
@staticmethod
def add_to_cache(cache : dict[NodeId, CompressedNode], data : NodeData, _children: tuple[NodeId, ...]) -> NodeId:
"""
This function is responsible for adding a new node to the cache and returning its id.
Crucially we need a way to check if new nodes are already in the cache, so we hash them.
But in case of a hash collision we need to increment the id and try again.
This way we will always eventually find a unique id for the node.
And we will never store the same node twice with a different id.
"""
_children = tuple(sorted(_children))
id = hash((data, _children))
# To avoid hash collisions, we increment the id until we find a unique one
tries = 0
while True:
tries += 1
if id not in cache:
# The node isn't in the cache and this id is free
cache[id] = CompressedNode(id = id,
data = data,
_children = _children,
_cache = cache)
break
if cache[id].data == data and cache[id]._children == _children:
break # The node is already in the cache
# This id is already in use by a different node so increment it (mod) and try again
id = (id + 1) % (2**64)
if tries > 100:
raise RuntimeError("Too many hash collisions, something is wrong.")
return id
@classmethod
def from_tree(cls, tree : Tree) -> 'CompressedTree':
cache = {}
def cache_tree(level : Tree) -> NodeId:
node_data = NodeData(
key = level.key,
values = level.values,
)
# Recursively cache the children
children = tuple(cache_tree(c) for c in level.children)
# Add the node to the cache and return its id
return cls.add_to_cache(cache, node_data, children)
root = cache_tree(tree)
return cls(cache = cache, root = cache[root])
def __str__(self, depth=None) -> str:
return "".join(node_tree_to_string(self.root, depth = depth))
def print(self, depth = None): print(self.__str__(depth = depth))
def html(self, depth = 2, debug = False) -> HTML:
return HTML(node_tree_to_html(self.root, depth = depth, debug = debug))
def _repr_html_(self) -> str:
return node_tree_to_html(self.root, depth = 2)
def __getitem__(self, args) -> 'CompressedTree':
key, value = args
for c in self.root.children:
if c.data.key == key and value in c.data.values:
data = dataclasses.replace(c.data, values = Enum((value,)))
return CompressedTree(
cache = self.cache,
root = dataclasses.replace(c, data = data)
)
raise KeyError(f"Key {key} not found in children.")
def collapse_children(self, node: "CompressedNode") -> "CompressedNode":
# First perform the collapse on the children
new_children = [self.collapse_children(child) for child in node.children]
# Now take the set of new children and see if any have identical key, metadata and children
# the values may different and will be collapsed into a single node
identical_children = defaultdict(set)
for child in new_children:
identical_children[(child.data.key, child.data.metadata, child._children)].add(child)
# Now go through and create new compressed nodes for any groups that need collapsing
new_children = []
for (key, metadata, _children), child_set in identical_children.items():
if len(child_set) > 1:
# Compress the children into a single node
assert all(isinstance(child.data.values, Enum) for child in child_set), "All children must have Enum values"
node_data = NodeData(
key = key,
metadata = frozendict(), # Todo: Implement metadata compression
values = Enum(tuple(v for child in child_set for v in child.data.values.values)),
)
# Add the node to the cache
id = type(self).add_to_cache(self.cache, node_data, _children)
else:
# If the group is size one just keep it
id = child_set.pop().id
new_children.append(id)
id = self.add_to_cache(self.cache, node.data, tuple(sorted(new_children)))
return self.cache[id]
def compress(self) -> 'CompressedTree':
return CompressedTree(cache = self.cache, root = self.collapse_children(self.root))
def lookup(self, selection : dict[str, str]):
nodes = [self.root]
for _ in range(1000):
found = False
current_node = nodes[-1]
for c in current_node.children:
if selection.get(c.data.key, None) in c.data.values:
if found:
raise RuntimeError("This tree is invalid, because it contains overlapping branches.")
nodes.append(c)
selection.pop(c.data.key)
found = True
if not found:
return nodes
raise RuntimeError("Maximum node searches exceeded, the tree contains a loop or something is buggy.")
# def reconstruct(self) -> Tree:
# def reconstruct_node(h : int) -> Tree:
# node = self.cache[h]
# dedup : dict[tuple[int, str], set[NodeId]] = defaultdict(set)
# for index in self.cache[h].children:
# child_node = self.cache[index]
# child_hash = hash(child_node.children)
# assert isinstance(child_node.values, Enum)
# dedup[(child_hash, child_node.key)].add(index)
# children = tuple(
# Tree(key = key, values = Enum(tuple(values)),
# children = tuple(reconstruct_node(i) for i in self.cache[next(indices)].children)
# )
# for (_, key), indices in dedup.items()
# )
# return Tree(
# key = node.key,
# values = node.values,
# children = children,
# )
# return reconstruct_node(self.root)

View File

@ -1,329 +0,0 @@
import json
from collections import defaultdict
from dataclasses import asdict, dataclass
from pathlib import Path
Tree = dict[str, "Tree"]
class RefcountedDict(dict[str, int]):
refcount: int = 1
def __repr__(self):
return f"RefcountedDict(refcount={self.refcount}, {super().__repr__()})"
def __hash__(self):
return hash(tuple(sorted(self.items())))
@dataclass
class JSONNode:
key: str
values: list[str]
children: list["JSONNode"]
class CompressedTree():
"""
A implementation of a compressed tree that supports lookup, insertion, deletion and caching.
The caching means that identical subtrees are stored only once, saving memory
This is implemented internal by storing all subtrees in a global hash table
"""
cache: dict[int, RefcountedDict]
tree: RefcountedDict
def _add_to_cache(self, level : RefcountedDict) -> int:
"Add a level {key -> hash} to the cache"
h = hash(level)
if h not in self.cache:
# Increase refcounts of the child nodes
for child_h in level.values():
self.cache[child_h].refcount += 1
self.cache[h] = RefcountedDict(level)
else:
self.cache[h].refcount += 1
return h
def _replace_in_cache(self, old_h, level : RefcountedDict) -> int:
"""
Replace the object at old_h with a different object level
If the objects this is a no-op
"""
# Start by adding the new object to the cache
new_h = self._add_to_cache(level)
# Now check if the old object needs to be garbage collected
self._decrease_refcount(old_h)
return new_h
def _decrease_refcount(self, h : int):
self.cache[h].refcount -= 1
if self.cache[h].refcount == 0:
# Recursively decrease refcounts of child nodes
for child_h in self.cache[h].values():
self._decrease_refcount(child_h)
del self.cache[h]
def cache_tree(self, tree : Tree) -> int:
"Insert the given tree (dictonary of dictionaries) (all it's children, recursively) into the hash table and return the hash key"
level = RefcountedDict({k : self.cache_tree(v) for k, v in tree.items()})
return self._add_to_cache(level)
def _cache_path(self, path : list[str]) -> int:
"Treat path = [x, y, z...] like {x : {y : {z : ...}}} and cache that"
if not path:
return self.empty_hash
k, *rest = path
return self._add_to_cache(RefcountedDict({k : self._cache_path(rest)}))
def reconstruct(self, max_depth = None) -> dict[str, dict]:
"Reconstruct the tree as a normal nested dictionary"
def reconstruct_node(h : int, depth : int) -> dict[str, dict]:
if max_depth is not None and depth > max_depth:
return {}
return {k : reconstruct_node(v, depth+1) for k, v in self.cache[h].items()}
return reconstruct_node(self.root_hash, 0)
def reconstruct_compressed(self) -> dict[str, dict]:
"Reconstruct the tree as a normal nested dictionary"
def reconstruct_node(h : int) -> dict[str, dict]:
dedup : dict[int, set[str]] = defaultdict(set)
for k, h2 in self.cache[h].items():
dedup[h2].add(k)
return {"/".join(keys) : reconstruct_node(h) for h, keys in dedup.items()}
return reconstruct_node(self.root_hash)
def reconstruct_compressed_ecmwf_style(self, max_depth=None, from_node=None) -> dict[str, dict]:
"Reconstruct the tree as a normal nested dictionary"
def reconstruct_node(h : int, depth : int) -> dict[str, dict]:
if max_depth is not None and depth > max_depth:
return {}
dedup : dict[tuple[int, str], set[str]] = defaultdict(set)
for k, h2 in self.cache[h].items():
key, value = k.split("=")
dedup[(h2, key)].add(value)
return {f"{key}={','.join(values)}" : reconstruct_node(h, depth=depth+1) for (h, key), values in dedup.items()}
return reconstruct_node(from_node or self.root_hash, depth=0)
def to_json(self, max_depth=None, from_node=None) -> dict:
def reconstruct_node(h : int, depth : int) -> list[JSONNode]:
if max_depth is not None and depth > max_depth:
return {}
dedup : dict[tuple[int, str], set[str]] = defaultdict(set)
for k, h2 in self.cache[h].items():
key, value = k.split("=")
dedup[(h2, key)].add(value)
return [JSONNode(
key = key,
values = list(values),
children = reconstruct_node(h, depth=depth+1),
) for (h, key), values in dedup.items()]
return asdict(reconstruct_node(from_node or self.root_hash, depth=0)[0])
def __init__(self, tree : Tree):
self.cache = {}
self.empty_hash = hash(RefcountedDict({}))
# Recursively cache the tree
self.root_hash = self.cache_tree(tree)
# Keep a reference to the root of the tree
self.tree = self.cache[self.root_hash]
def lookup(self, keys : tuple[str, ...]) -> tuple[bool, tuple[str, ...]]:
"""
Lookup a subtree in the tree
Returns success, path
if success == True it means the path got to the bottom of the tree and path will be equal to keys
if success == False, path will holds the keys that were found
"""
loc = self.tree
for i, key in enumerate(keys):
if key in loc:
h = loc[key] # get the hash of the subtree
loc = self.cache[h] # get the subtree
else:
return False, keys[:i], h
return True, keys, h
def keys(self, keys : tuple[str, ...] = ()) -> list[str] | None:
loc = self.tree
for i, key in enumerate(keys):
if key in loc:
h = loc[key] # get the hash of the subtree
loc = self.cache[h] # get the subtree
else:
return None
return list(loc.keys())
def multi_match(self, request : dict[str, list[str]], loc = None):
if loc is None: loc = self.tree
if loc == {}: return {"_END_" : {}}
matches = {}
for request_key, request_values in request.items():
for request_value in request_values:
meta_key = f"{request_key}={request_value}"
if meta_key in loc:
new_loc = self.cache[loc[meta_key]]
matches[meta_key] = self.multi_match(request, new_loc)
if not matches: return {k : {} for k in loc.items()}
return matches
def _insert(self, old_h : int, tree: RefcountedDict, keys : tuple[str, ...]) -> int:
"Insert keys in the subtree and return the new hash of the subtree"
key, *rest = keys
assert old_h in self.cache
# Adding a new branch to the tree
if key not in tree:
new_tree = RefcountedDict(tree | {key : self._cache_path(rest)})
else:
# Make a copy of the tree and update the subtree
new_tree = RefcountedDict(tree.copy())
subtree_h = tree[key]
subtree = self.cache[subtree_h]
new_tree[key] = self._insert(subtree_h, subtree, tuple(rest))
# no-op if the hash hasn't changed
new_h = self._replace_in_cache(old_h, new_tree)
return new_h
def insert(self, keys : tuple[str, ...]):
"""
Insert a new branch into the compressed tree
"""
already_there, path = self.lookup(keys)
if already_there:
return
# Update the tree
self.root_hash = self._insert(self.root_hash, self.tree, keys)
self.tree = self.cache[self.root_hash]
def insert_tree(self, subtree: Tree):
"""
Insert a whole tree into the compressed tree.
"""
self.root_hash = self._insert_tree(self.root_hash, self.tree, subtree)
self.tree = self.cache[self.root_hash]
def _insert_tree(self, old_h: int, tree: RefcountedDict, subtree: Tree) -> int:
"""
Recursively insert a subtree into the compressed tree and return the new hash.
"""
assert old_h in self.cache
# Make a copy of the tree to avoid modifying shared structures
new_tree = RefcountedDict(tree.copy())
for key, sub_subtree in subtree.items():
if key not in tree:
# Key is not in current tree, add the subtree
# Cache the subtree rooted at sub_subtree
subtree_h = self.cache_tree(sub_subtree)
new_tree[key] = subtree_h
else:
# Key is in tree, need to recursively merge
# Get the hash and subtree from the current tree
child_h = tree[key]
child_tree = self.cache[child_h]
# Recursively merge
new_child_h = self._insert_tree(child_h, child_tree, sub_subtree)
new_tree[key] = new_child_h
# Replace the old hash with the new one in the cache
new_h = self._replace_in_cache(old_h, new_tree)
return new_h
def save(self, path : Path):
"Save the compressed tree to a file"
with open(path, "w") as f:
json.dump({
"cache" : {k : {"refcount" : v.refcount, "dict" : v} for k, v in self.cache.items()},
"root_hash": self.root_hash
}, f)
@classmethod
def load(cls, path : Path) -> "CompressedTree":
"Load the compressed tree from a file"
with open(path) as f:
data = json.load(f)
return cls.from_json(data)
@classmethod
def from_json(cls, data : dict) -> "CompressedTree":
c = CompressedTree({})
c.cache = {}
for k, v in data["cache"].items():
c.cache[int(k)] = RefcountedDict(v["dict"])
c.cache[int(k)].refcount = v["refcount"]
c.root_hash = data["root_hash"]
c.tree = c.cache[c.root_hash]
return c
if __name__ == "__main__":
original_tree = {
"a": {
"b1": {
"c": {}
},
"b2" : {
"c": {}
},
"b3*": {
"c*": {}
}
}
}
c_tree = CompressedTree(original_tree)
assert c_tree.lookup(("a", "b1", "c")) == (True, ("a", "b1", "c"))
assert c_tree.lookup(("a", "b1", "d")) == (False, ("a", "b1"))
print(json.dumps(c_tree.reconstruct_compressed(), indent = 4))
assert c_tree.reconstruct() == original_tree
c_tree.insert(("a", "b1", "d"))
c_tree.insert(("a", "b2", "d"))
print(json.dumps(c_tree.reconstruct(), indent = 4))
print(json.dumps(c_tree.reconstruct_compressed(), indent = 4))
print(c_tree.cache)
# test round trip
assert CompressedTree(original_tree).reconstruct() == original_tree
# test adding a key
added_keys_tree = {
"a": {
"b1": {
"c": {}
},
"b2" : {
"c": {},
"d" : {}
},
"b3*": {
"c*": {},
"d*": {}
}
}
}
c_tree = CompressedTree(original_tree)
c_tree.insert(("a", "b2", "d"))
c_tree.insert(("a", "b3*", "d*"))
assert c_tree.reconstruct() == added_keys_tree
print(c_tree.reconstruct_compressed())