Add alt-click copy of nodes paths, flesh out range types

This commit is contained in:
Tom 2025-02-24 11:06:11 +00:00
parent 1f7c5dfecd
commit ef844c9b57
6 changed files with 263 additions and 143 deletions

View File

@ -42,7 +42,34 @@ print(f"{cq.n_leaves = }, {cq.n_nodes = }")
cq cq
``` ```
### Quick Tree Construction
With the HTML representation you can click on the leaves to expand them. You can copy a path representation of a node to the clipboard by alt/option/⌥ clicking on it. You can then extract that node in code using `[]`:
```{code-cell} python3
cq["class=rd,expver=0001"]
```
Select a subtree:
```{code-cell} python3
cq["class", "od"]["expver", "0001"]
```
Intersect with a dense datacube:
```{code-cell} python3
dq = Qube.from_datacube({
"class": ["od", "rd", "cd"],
"expver": ["0001", "0002", "0003"],
"param": "2",
})
(cq & dq).print()
```
### Tree Construction
One of the quickest ways to construct non-trivial trees is to use the `Qube.from_datacube` method to construct dense trees and then use the set operations to combine or intersect them: One of the quickest ways to construct non-trivial trees is to use the `Qube.from_datacube` method to construct dense trees and then use the set operations to combine or intersect them:
@ -154,3 +181,11 @@ Symmetric Difference:
(A ^ B).print(); (A ^ B).print();
``` ```
### Transformations
`q.transform` takes a python function from one node to one or more nodes and uses this to build a new tree. This can be used for simple operations on the key or values but also to split or remove nodes. Note that you can't use it to merge nodes beause it's only allowed to see one node at a time.
```{code-cell} python3
def capitalize(node): return node.replace(key = node.key.capitalize())
climate_dt.transform(capitalize).html(depth=1)
```

View File

@ -8,7 +8,11 @@ from frozendict import frozendict
from . import set_operations from . import set_operations
from .node_types import NodeData, RootNodeData from .node_types import NodeData, RootNodeData
from .tree_formatters import HTML, node_tree_to_html, node_tree_to_string from .tree_formatters import (
HTML,
node_tree_to_html,
node_tree_to_string,
)
from .value_types import QEnum, Values, values_from_json from .value_types import QEnum, Values, values_from_json
@ -29,6 +33,16 @@ class Qube:
def metadata(self) -> frozendict[str, Any]: def metadata(self) -> frozendict[str, Any]:
return self.data.metadata return self.data.metadata
def replace(self, **kwargs) -> 'Qube':
data_keys = {k : v for k, v in kwargs.items() if k in ["key", "values", "metadata"]}
node_keys = {k : v for k, v in kwargs.items() if k == "children"}
if not data_keys and not node_keys:
return self
if not data_keys:
return dataclasses.replace(self, **node_keys)
return dataclasses.replace(self, data = dataclasses.replace(self.data, **data_keys), **node_keys)
def summary(self) -> str: def summary(self) -> str:
return self.data.summary() return self.data.summary()
@ -155,12 +169,28 @@ class Qube:
return Qube.root_node((q for c in self.children for q in to_list_of_cubes(c))) return Qube.root_node((q for c in self.children for q in to_list_of_cubes(c)))
def __getitem__(self, args) -> 'Qube': def __getitem__(self, args) -> 'Qube':
if isinstance(args, str):
specifiers = args.split(",")
current = self
for specifier in specifiers:
key, values = specifier.split("=")
values = values.split("/")
for c in current.children:
if c.key == key and set(values) == set(c.values):
current = c
break
else:
raise KeyError(f"Key '{key}' not found in children of '{current.key}'")
return Qube.root_node(current.children)
elif isinstance(args, tuple) and len(args) == 2:
key, value = args key, value = args
for c in self.children: for c in self.children:
if c.key == key and value in c.values: if c.key == key and value in c.values:
data = dataclasses.replace(c.data, values = QEnum((value,))) return Qube.root_node(c.children)
return dataclasses.replace(c, data = data)
raise KeyError(f"Key {key} not found in children of {self.key}") raise KeyError(f"Key {key} not found in children of {self.key}")
else:
raise ValueError("Unknown key type")
@cached_property @cached_property
def n_leaves(self) -> int: def n_leaves(self) -> int:
@ -173,7 +203,7 @@ class Qube:
if self.key == "root" and not self.children: return 0 if self.key == "root" and not self.children: return 0
return 1 + sum(c.n_nodes for c in self.children) return 1 + sum(c.n_nodes for c in self.children)
def transform(self, func: 'Callable[[Qube], Qube | list[Qube]]') -> 'Qube': def transform(self, func: 'Callable[[Qube], Qube | Iterable[Qube]]') -> 'Qube':
""" """
Call a function on every node of the Qube, return one or more nodes. Call a function on every node of the Qube, return one or more nodes.
If multiple nodes are returned they each get a copy of the (transformed) children of the original node. If multiple nodes are returned they each get a copy of the (transformed) children of the original node.
@ -185,7 +215,7 @@ class Qube:
if isinstance(new_nodes, Qube): if isinstance(new_nodes, Qube):
new_nodes = [new_nodes] new_nodes = [new_nodes]
return [dataclasses.replace(new_node, children = children) return [new_node.replace(children = children)
for new_node in new_nodes] for new_node in new_nodes]
children = tuple(cc for c in self.children for cc in transform(c)) children = tuple(cc for c in self.children for cc in transform(c))
@ -243,87 +273,6 @@ class Qube:
axes[self.key].update(self.values) axes[self.key].update(self.values)
return dict(axes) return dict(axes)
@staticmethod
def _insert(position: "Qube", identifier : list[tuple[str, list[str]]]):
"""
This algorithm goes as follows:
We're at a particular node in the Qube, and we have a list of key-values pairs that we want to insert.
We take the first key values pair
key, values = identifier.pop(0)
The general idea is to insert key, values into the current node and use recursion to handle the rest of the identifier.
We have two sources of values with possible overlap. The values to insert and the values attached to the children of this node.
For each value coming from either source we put it in one of three categories:
1) Values that exist only in the already existing child. (Coming exclusively from position.children)
2) Values that exist in both a child and the new values.
3) Values that exist only in the new values.
Thus we add the values to insert to a set, and loop over the children.
For each child we partition its values into the three categories.
For 1) we create a new child node with the key, reduced set of values and the same children.
For 2)
Create a new child node with the key, and the values in group 2
Recurse to compute the children
Once we have finished looping over children we know all the values left over came exclusively from the new values.
So we:
Create a new node with these values.
Recurse to compute the children
Finally we return the node with all these new children.
"""
pass
# if not identifier:
# return position
# key, values = identifier.pop(0)
# # print(f"Inserting {key}={values} into {position.summary()}")
# # Only the children with the matching key are relevant.
# source_children = {c : [] for c in position.children if c.key == key}
# new_children = []
# values = set(values)
# for c in source_children:
# values_set = set(c.values)
# group_1 = values_set - values
# group_2 = values_set & values
# values = values - values_set # At the end of this loop values will contain only the new values
# if group_1:
# group_1_node = Qube.make(c.key, QEnum((group_1)), c.children)
# new_children.append(group_1_node) # Add the unaffected part of this child
# if group_2:
# new_node = Qube.make(key, QEnum((affected)), [])
# new_node = Qube._insert(new_node, identifier)
# new_children.append(new_node) # Add the affected part of this child
# unaffected = [x for x in c.values if x not in affected]
# if affected: # This check is not technically necessary, but it makes the code more readable
# # If there are any values not in any of the existing children, add them as a new child
# if entirely_new_values:
# new_node = Qube.make(key, QEnum((entirely_new_values)), [])
# new_children.append(Qube._insert(new_node, identifier))
return Qube.make(position.key, position.values, new_children)
def insert(self, identifier : dict[str, list[str]]) -> 'Qube':
insertion = [(k, v) for k, v in identifier.items()]
return Qube._insert(self, insertion)
def info(self):
cubes = self.to_list_of_cubes()
print(f"Number of distinct paths: {len(cubes)}")
@cached_property @cached_property
def structural_hash(self) -> int: def structural_hash(self) -> int:
""" """
@ -343,7 +292,4 @@ class Qube:
new_children = set_operations.compress_children(new_children) new_children = set_operations.compress_children(new_children)
# Return the now compressed node # Return the now compressed node
return Qube( return Qube.make(self.key, self.values, new_children)
data = self.data,
children = tuple(sorted(new_children))
)

View File

@ -16,15 +16,17 @@ class HTML():
def _repr_html_(self): def _repr_html_(self):
return self.html return self.html
def summarize_node(node: TreeLike, collapse = False, **kwargs) -> tuple[str, TreeLike]: def summarize_node(node: TreeLike, collapse = False, **kwargs) -> tuple[str, str, TreeLike]:
""" """
Extracts a summarized representation of the node while collapsing single-child paths. Extracts a summarized representation of the node while collapsing single-child paths.
Returns the summary string and the last node in the chain that has multiple children. Returns the summary string and the last node in the chain that has multiple children.
""" """
summaries = [] summaries = []
paths = []
while True: while True:
summary = node.summary(**kwargs) summary = node.summary(**kwargs)
paths.append(summary)
if len(summary) > 50: if len(summary) > 50:
summary = summary[:50] + "..." summary = summary[:50] + "..."
summaries.append(summary) summaries.append(summary)
@ -36,10 +38,10 @@ def summarize_node(node: TreeLike, collapse = False, **kwargs) -> tuple[str, Tre
break break
node = node.children[0] node = node.children[0]
return ", ".join(summaries), node return ", ".join(summaries), ",".join(paths), node
def node_tree_to_string(node : TreeLike, prefix : str = "", depth = None) -> Iterable[str]: def node_tree_to_string(node : TreeLike, prefix : str = "", depth = None) -> Iterable[str]:
summary, node = summarize_node(node) summary, path, node = summarize_node(node)
if depth is not None and depth <= 0: if depth is not None and depth <= 0:
yield summary + " - ...\n" yield summary + " - ...\n"
@ -59,14 +61,14 @@ def node_tree_to_string(node : TreeLike, prefix : str = "", depth = None) -> Ite
yield from node_tree_to_string(child, prefix + extension, depth = depth - 1 if depth is not None else None) yield from node_tree_to_string(child, prefix + extension, depth = depth - 1 if depth is not None else None)
def _node_tree_to_html(node : TreeLike, prefix : str = "", depth = 1, connector = "", **kwargs) -> Iterable[str]: def _node_tree_to_html(node : TreeLike, prefix : str = "", depth = 1, connector = "", **kwargs) -> Iterable[str]:
summary, node = summarize_node(node, **kwargs) summary, path, node = summarize_node(node, **kwargs)
if len(node.children) == 0: if len(node.children) == 0:
yield f'<span class="leaf">{connector}{summary}</span>' yield f'<span class="qubed-node leaf" data-path="{path}">{connector}{summary}</span>'
return return
else: else:
open = "open" if depth > 0 else "" open = "open" if depth > 0 else ""
yield f"<details {open}><summary>{connector}{summary}</summary>" yield f'<details {open} data-path="{path}"><summary class="qubed-node">{connector}{summary}</summary>'
for index, child in enumerate(node.children): for index, child in enumerate(node.children):
connector = "└── " if index == len(node.children) - 1 else "├── " connector = "└── " if index == len(node.children) - 1 else "├── "
@ -76,10 +78,12 @@ def _node_tree_to_html(node : TreeLike, prefix : str = "", depth = 1, connector
def node_tree_to_html(node : TreeLike, depth = 1, **kwargs) -> str: def node_tree_to_html(node : TreeLike, depth = 1, **kwargs) -> str:
css_id = f"qubed-tree-{random.randint(0, 1000000)}" css_id = f"qubed-tree-{random.randint(0, 1000000)}"
css = f"""
# It's ugle to use an f string here because css uses {} so much so instead
# we use CSS_ID as a placeholder and replace it later
css = """
<style> <style>
pre#{css_id} """ \ pre#CSS_ID {
"""{
font-family: monospace; font-family: monospace;
white-space: pre; white-space: pre;
font-family: SFMono-Regular,Menlo,Monaco,Consolas,Liberation Mono,Courier New,Courier,monospace; font-family: SFMono-Regular,Menlo,Monaco,Consolas,Liberation Mono,Courier New,Courier,monospace;
@ -90,6 +94,11 @@ def node_tree_to_html(node : TreeLike, depth = 1, **kwargs) -> str:
margin-left: 0; margin-left: 0;
} }
.qubed-node a {
margin-left: 10px;
text-decoration: none;
}
summary { summary {
list-style: none; list-style: none;
cursor: pointer; cursor: pointer;
@ -125,6 +134,30 @@ def node_tree_to_html(node : TreeLike, depth = 1, **kwargs) -> str:
} }
</style> </style>
""" """.replace("CSS_ID", css_id)
# This js snippet copies the path of a node to the clipboard when clicked
js = """
<script type="module" defer>
async function nodeOnClick(event) {
if (!event.altKey) return;
event.preventDefault();
let current_element = this.parentElement;
let paths = [];
while (true) {
if (current_element.dataset.path) {
paths.push(current_element.dataset.path);
}
current_element = current_element.parentElement;
if (current_element.tagName == "PRE") break;
}
const path = paths.reverse().slice(1).join(",");
await navigator.clipboard.writeText(path);
}
const nodes = document.querySelectorAll("#CSS_ID .qubed-node");
nodes.forEach(n => n.addEventListener("click", nodeOnClick));
</script>
""".replace("CSS_ID", css_id)
nodes = "".join(_node_tree_to_html(node=node, depth=depth, **kwargs)) nodes = "".join(_node_tree_to_html(node=node, depth=depth, **kwargs))
return f"{css}<pre class='qubed-tree' id='{css_id}'>{nodes}</pre>" return f"{js}{css}<pre class='qubed-tree' id='{css_id}'>{nodes}</pre>"

View File

@ -1,9 +1,11 @@
import dataclasses import dataclasses
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass, replace
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, FrozenSet, Iterable, Literal, TypeVar from typing import TYPE_CHECKING, Any, FrozenSet, Iterable, Literal, TypeVar
if TYPE_CHECKING:
from .Qube import Qube
@dataclass(frozen=True) @dataclass(frozen=True)
class Values(ABC): class Values(ABC):
@ -68,10 +70,31 @@ class QEnum(Values):
def to_json(self): def to_json(self):
return list(self.values) return list(self.values)
class DateEnum(QEnum):
def summary(self) -> str:
def fmt(d): return d.strftime("%Y%m%d")
return '/'.join(map(fmt, sorted(self.values)))
@dataclass(frozen=True) @dataclass(frozen=True)
class Range(Values, ABC): class Range(Values, ABC):
dtype: str = dataclasses.field(kw_only=True) dtype: str = dataclasses.field(kw_only=True)
start: Any
end: Any
step: Any
def min(self):
return self.start
def __iter__(self) -> Iterable[Any]:
i = self.start
while i <= self.end:
yield i
i += self.step
def to_json(self):
return dataclasses.asdict(self)
@dataclass(frozen=True) @dataclass(frozen=True)
class DateRange(Range): class DateRange(Range):
start: date start: date
@ -89,36 +112,58 @@ class DateRange(Range):
current += self.step current += self.step
@classmethod @classmethod
def from_strings(self, values: Iterable[str]) -> list['DateRange']: def from_strings(cls, values: Iterable[str]) -> "list[DateRange | QEnum]":
dates = sorted([datetime.strptime(v, "%Y%m%d") for v in values]) dates = sorted([datetime.strptime(v, "%Y%m%d") for v in values])
if len(dates) < 2: if len(dates) < 2:
return [DateRange( return [DateEnum(dates)]
start=dates[0],
end=dates[0],
step=timedelta(days=0)
)]
ranges = [] ranges = []
current_range, dates = [dates[0],], dates[1:] current_group, dates = [dates[0],], dates[1:]
current_type : Literal["enum", "range"] = "enum"
while len(dates) > 1: while len(dates) > 1:
if dates[0] - current_range[-1] == timedelta(days=1): if current_type == "range":
current_range.append(dates.pop(0))
elif len(current_range) == 1: # If the next date fits then add it to the current range
ranges.append(DateRange( if dates[0] - current_group[-1] == timedelta(days=1):
start=current_range[0], current_group.append(dates.pop(0))
end=current_range[0],
step=timedelta(days=0)
))
current_range = [dates.pop(0),]
# Emit the current range and start a new one
else:
if len(current_group) == 1:
ranges.append(DateEnum(current_group))
else: else:
ranges.append(DateRange( ranges.append(DateRange(
start=current_range[0], start=current_group[0],
end=current_range[-1], end=current_group[-1],
step=timedelta(days=1) step=timedelta(days=1)
)) ))
current_range = [dates.pop(0),] current_group = [dates.pop(0),]
current_type = "enum"
if current_type == "enum":
# If the next date is one more than the last then switch to range mode
if dates[0] - current_group[-1] == timedelta(days=1):
last = current_group.pop()
if current_group:
ranges.append(DateEnum(current_group))
current_group = [last, dates.pop(0)]
current_type = "range"
else:
current_group.append(dates.pop(0))
# Handle remaining `current_group`
if current_group:
if current_type == "range":
ranges.append(DateRange(
start=current_group[0],
end=current_group[-1],
step=timedelta(days=1)
))
else:
ranges.append(DateEnum(current_group))
return ranges return ranges
def __contains__(self, value: Any) -> bool: def __contains__(self, value: Any) -> bool:
@ -141,6 +186,11 @@ class TimeRange(Range):
step: int step: int
dtype: Literal["time"] = dataclasses.field(kw_only=True, default="time") dtype: Literal["time"] = dataclasses.field(kw_only=True, default="time")
def min(self):
return self.start
def __iter__(self) -> Iterable[Any]:
return super().__iter__()
@classmethod @classmethod
def from_strings(self, values: Iterable[str]) -> list['TimeRange']: def from_strings(self, values: Iterable[str]) -> list['TimeRange']:
times = sorted([int(v) for v in values]) times = sorted([int(v) for v in values])
@ -198,7 +248,9 @@ class IntRange(Range):
return (self.end - self.start) // self.step return (self.end - self.start) // self.step
def summary(self) -> str: def summary(self) -> str:
def fmt(d): return d.strftime("%Y%m%d") def fmt(d): return d
if self.step == 0:
return f"{fmt(self.start)}"
return f"{fmt(self.start)}/to/{fmt(self.end)}/by/{self.step}" return f"{fmt(self.start)}/to/{fmt(self.end)}/by/{self.step}"
def __contains__(self, value: Any) -> bool: def __contains__(self, value: Any) -> bool:
@ -247,3 +299,17 @@ def values_from_json(obj) -> Values:
case "time": return TimeRange(**obj) case "time": return TimeRange(**obj)
case "int": return IntRange(**obj) case "int": return IntRange(**obj)
case _: raise ValueError(f"Unknown dtype {obj['dtype']}") case _: raise ValueError(f"Unknown dtype {obj['dtype']}")
def convert_datatypes(q: "Qube", conversions: dict[str, Values]) -> "Qube":
def _convert(q: "Qube") -> Iterable["Qube"]:
if q.key in conversions:
data_type = conversions[q.key]
assert isinstance(q.values, QEnum), "Only QEnum values can be converted to other datatypes."
for values_group in data_type.from_strings(q.values):
# print(values_group)
yield replace(q, data=replace(q.data, values=values_group))
else:
yield q
return q.transform(_convert)

View File

@ -1,8 +1,6 @@
from qubed import Qube from qubed import Qube
d = {
def test_eq():
d = {
"class=od" : { "class=od" : {
"expver=0001": {"param=1":{}, "param=2":{}}, "expver=0001": {"param=1":{}, "param=2":{}},
"expver=0002": {"param=1":{}, "param=2":{}}, "expver=0002": {"param=1":{}, "param=2":{}},
@ -11,12 +9,23 @@ def test_eq():
"expver=0001": {"param=1":{}, "param=2":{}, "param=3":{}}, "expver=0001": {"param=1":{}, "param=2":{}, "param=3":{}},
"expver=0002": {"param=1":{}, "param=2":{}}, "expver=0002": {"param=1":{}, "param=2":{}},
}, },
} }
q = Qube.from_dict(d) q = Qube.from_dict(d)
r = Qube.from_dict(d)
def test_eq():
r = Qube.from_dict(d)
assert q == r assert q == r
def test_getitem():
assert q["class", "od"] == Qube.from_dict({
"expver=0001": {"param=1":{}, "param=2":{}},
"expver=0002": {"param=1":{}, "param=2":{}},
})
assert q["class", "od"]["expver", "0001"] == Qube.from_dict({
"param=1":{}, "param=2":{},
})
def test_n_leaves(): def test_n_leaves():
q = Qube.from_dict({ q = Qube.from_dict({
"a=1/2/3" : {"b=1/2/3" : {"c=1/2/3" : {}}}, "a=1/2/3" : {"b=1/2/3" : {"c=1/2/3" : {}}},

31
tests/test_formatters.py Normal file
View File

@ -0,0 +1,31 @@
from qubed import Qube
d = {
"class=od" : {
"expver=0001": {"param=1":{}, "param=2":{}},
"expver=0002": {"param=1":{}, "param=2":{}},
},
"class=rd" : {
"expver=0001": {"param=1":{}, "param=2":{}, "param=3":{}},
"expver=0002": {"param=1":{}, "param=2":{}},
},
}
q = Qube.from_dict(d).compress()
as_string= """
root
class=od, expver=0001/0002, param=1/2
class=rd
expver=0001, param=1/2/3
expver=0002, param=1/2
""".strip()
as_html = """
<details open data-path="root"><summary class="qubed-node">root</summary><span class="qubed-node leaf" data-path="class=od,expver=0001/0002,param=1/2"> class=od, expver=0001/0002, param=1/2</span><details open data-path="class=rd"><summary class="qubed-node"> class=rd</summary><span class="qubed-node leaf" data-path="expver=0001,param=1/2/3"> expver=0001, param=1/2/3</span><span class="qubed-node leaf" data-path="expver=0002,param=1/2"> expver=0002, param=1/2</span></details></details>
""".strip()
def test_string():
assert str(q).strip() == as_string
def test_html():
assert as_html in q._repr_html_()