Start fleshing out set operations

This commit is contained in:
Tom 2025-02-14 15:34:11 +00:00
parent 62c7a49c59
commit af69d2fe00
8 changed files with 186 additions and 95 deletions

View File

@ -97,11 +97,6 @@ but we do not allow this because it would mean we would have to take multiple br
What we have now is a tree of dense datacubes which represents a single larger sparse datacube in a more compact manner. For want of a better word we'll call it a Qube.
## HTML Output
```{code-cell} python3
q.compress().html()
````
## API

View File

@ -74,4 +74,14 @@ A.print(name="A"), B.print(name="B");
A | B
```
### Command Line Usage
```bash
fdb list class=rd,expver=0001,... | qubed --from=fdblist --to=text
```
`--from` options include: `fdblist`, `json`, `protobuf`, `marslist`, `constraints`.
`--to` options include `text`, `html`, `json`, `datacubes` `constraints`.
use `--input` and `--output` to specify input and output files respectively.

View File

@ -1,44 +1,17 @@
import dataclasses
from collections import defaultdict
from dataclasses import dataclass, field
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Callable, Hashable, Literal, Mapping
from typing import Any, Callable, Literal
from frozendict import frozendict
from . import set_operations
from .node_types import NodeData, RootNodeData
from .tree_formatters import HTML, node_tree_to_html, node_tree_to_string
from .value_types import DateRange, Enum, IntRange, TimeRange, Values
from .value_types import QEnum, Values, values_from_json
def values_from_json(obj) -> Values:
if isinstance(obj, list):
return Enum(tuple(obj))
match obj["dtype"]:
case "date": return DateRange(**obj)
case "time": return TimeRange(**obj)
case "int": return IntRange(**obj)
case _: raise ValueError(f"Unknown dtype {obj['dtype']}")
# In practice use a frozendict
Metadata = Mapping[str, str | int | float | bool]
@dataclass(frozen=True, eq=True, order=True)
class NodeData:
key: str
values: Values
metadata: dict[str, tuple[Hashable, ...]] = field(default_factory=frozendict, compare=False)
def summary(self) -> str:
return f"{self.key}={self.values.summary()}" if self.key != "root" else "root"
@dataclass(frozen=True, eq=True, order=True)
class RootNodeData(NodeData):
"Helper class to print a custom root name"
def summary(self) -> str:
return self.key
@dataclass(frozen=True, eq=True, order=True)
class Qube:
data: NodeData
@ -85,17 +58,17 @@ class Qube:
def from_dict(d: dict) -> tuple[Qube, ...]:
return tuple(Qube.make(
key=k.split("=")[0],
values=Enum(tuple(k.split("=")[1].split("/"))),
values=QEnum((k.split("=")[1].split("/"))),
children=from_dict(children)
) for k, children in d.items())
return Qube.make(key = "root",
values=Enum(("root",)),
values=QEnum(("root",)),
children = from_dict(d))
@classmethod
def empty(cls) -> 'Qube':
return cls.make("root", Enum(("root",)), [])
return cls.make("root", QEnum(("root",)), [])
def __str__(self, depth = None, name = None) -> str:
@ -119,7 +92,7 @@ class Qube:
key, value = args
for c in self.children:
if c.key == key and value in c.values:
data = dataclasses.replace(c.data, values = Enum((value,)))
data = dataclasses.replace(c.data, values = QEnum((value,)))
return dataclasses.replace(c, data = data)
raise KeyError(f"Key {key} not found in children of {self.key}")
@ -164,7 +137,7 @@ class Qube:
return dataclasses.replace(node, children = not_none(select(c) for c in node.children))
# If the key is specified, check if any of the values match
values = Enum(tuple(c for c in selection[node.key] if c in node.values))
values = QEnum((c for c in selection[node.key] if c in node.values))
if not values:
return None
@ -225,11 +198,11 @@ class Qube:
# values = values - values_set # At the end of this loop values will contain only the new values
# if group_1:
# group_1_node = Qube.make(c.key, Enum(tuple(group_1)), c.children)
# group_1_node = Qube.make(c.key, QEnum((group_1)), c.children)
# new_children.append(group_1_node) # Add the unaffected part of this child
# if group_2:
# new_node = Qube.make(key, Enum(tuple(affected)), [])
# new_node = Qube.make(key, QEnum((affected)), [])
# new_node = Qube._insert(new_node, identifier)
# new_children.append(new_node) # Add the affected part of this child
@ -242,7 +215,7 @@ class Qube:
# # If there are any values not in any of the existing children, add them as a new child
# if entirely_new_values:
# new_node = Qube.make(key, Enum(tuple(entirely_new_values)), [])
# new_node = Qube.make(key, QEnum((entirely_new_values)), [])
# new_children.append(Qube._insert(new_node, identifier))
return Qube.make(position.key, position.values, new_children)
@ -292,12 +265,12 @@ class Qube:
key = child_set[0].key
# Compress the children into a single node
assert all(isinstance(child.data.values, Enum) for child in child_set), "All children must have Enum values"
assert all(isinstance(child.data.values, QEnum) for child in child_set), "All children must have QEnum values"
node_data = NodeData(
key = key,
metadata = frozendict(), # Todo: Implement metadata compression
values = Enum(tuple(v for child in child_set for v in child.data.values.values)),
values = QEnum((v for child in child_set for v in child.data.values.values)),
)
new_child = Qube(data = node_data, children = child_set[0].children)
else:

View File

@ -0,0 +1,22 @@
from dataclasses import dataclass, field
from typing import Hashable
from frozendict import frozendict
from .value_types import Values
@dataclass(frozen=True, eq=True, order=True)
class NodeData:
key: str
values: Values
metadata: dict[str, tuple[Hashable, ...]] = field(default_factory=frozendict, compare=False)
def summary(self) -> str:
return f"{self.key}={self.values.summary()}" if self.key != "root" else "root"
@dataclass(frozen=True, eq=True, order=True)
class RootNodeData(NodeData):
"Helper class to print a custom root name"
def summary(self) -> str:
return self.key

View File

@ -1,5 +1,15 @@
from enum import Enum
import dataclasses
from collections import defaultdict
from enum import Enum
# Prevent circular imports while allowing the type checker to know what Qube is
from typing import TYPE_CHECKING, Iterable
from .node_types import NodeData
from .value_types import QEnum, Values
if TYPE_CHECKING:
from .qube import Qube
class SetOperation(Enum):
@ -8,14 +18,58 @@ class SetOperation(Enum):
DIFFERENCE = (1, 0, 0)
SYMMETRIC_DIFFERENCE = (1, 0, 1)
def fused_set_operations(A: "Values", B: "Values") -> tuple[list[Values], list[Values], list[Values]]:
if isinstance(A, QEnum) and isinstance(B, QEnum):
set_A, set_B = set(A), set(B)
intersection = set_A & set_B
just_A = set_A - intersection
just_B = set_B - intersection
return [QEnum(just_A),], [QEnum(intersection),], [QEnum(just_B),]
raise NotImplementedError("Fused set operations on values types other than QEnum are not yet implemented")
def operation(A: "Qube", B : "Qube", type: SetOperation) -> "Qube":
# Sort nodes from both qubes by their keys
nodes_by_key = defaultdict(lambda : dict(A = [], B = []))
for node in A.nodes:
nodes_by_key[node.key]["A"].append(node)
for key, ndoes
def operation(A: "Qube", B : "Qube", operation_type: SetOperation) -> "Qube":
assert A.key == B.key, "The two Qube root nodes must have the same key to perform set operations," \
f"would usually be two root nodes. They have {A.key} and {B.key} respectively"
assert A.values == B.values, f"The two Qube root nodes must have the same values to perform set operations {A.values = }, {B.values = }"
# Group the children of the two nodes by key
nodes_by_key = defaultdict(lambda : ([], []))
for node in A.children:
nodes_by_key[node.key][0].append(node)
for node in B.children:
nodes_by_key[node.key][1].append(node)
new_children = []
# For every node group, perform the set operation
for key, (A_nodes, B_nodes) in nodes_by_key.items():
new_children.extend(_operation(key, A_nodes, B_nodes, operation_type))
# The values and key are the same so we just replace the children
return dataclasses.replace(A, children=new_children)
# The root node is special so we need a helper method that we can recurse on
def _operation(A: list["Qube"], B : list["Qube"], type: SetOperation) -> "Qube":
pass
def _operation(key: str, A: list["Qube"], B : list["Qube"], operation_type: SetOperation) -> Iterable["Qube"]:
for node_a in A:
for node_b in B:
just_A, intersection, just_B = fused_set_operations(
node_a.values,
node_b.values
)
for values in just_A:
data = NodeData(key, values, {})
yield type(node_a)(data, node_a.children)
if intersection:
intersected_children = operation(node_a, node_b, operation_type)
for values in intersection:
data = NodeData(key, values, {})
yield type(node_a)(data, intersected_children)
for values in just_B:
data = NodeData(key, values, {})
yield type(node_a)(data, node_b.children)

View File

@ -1,3 +1,4 @@
import random
from dataclasses import dataclass
from typing import Iterable, Protocol, Sequence, runtime_checkable
@ -74,46 +75,56 @@ def _node_tree_to_html(node : TreeLike, prefix : str = "", depth = 1, connector
yield "</details>"
def node_tree_to_html(node : TreeLike, depth = 1, **kwargs) -> str:
css = """
css_id = f"qubed-tree-{random.randint(0, 1000000)}"
css = f"""
<style>
.qubed-tree-view {
pre#{css_id} """ \
"""{
font-family: monospace;
white-space: pre;
font-family: SFMono-Regular,Menlo,Monaco,Consolas,Liberation Mono,Courier New,Courier,monospace;
font-size: 12px;
line-height: 1.4;
}
.qubed-tree-view details {
# display: inline;
margin-left: 0;
}
.qubed-tree-view summary {
list-style: none;
cursor: pointer;
text-overflow: ellipsis;
overflow: hidden;
text-wrap: nowrap;
display: block;
}
details {
margin-left: 0;
}
.qubed-tree-view .leaf {
text-overflow: ellipsis;
overflow: hidden;
text-wrap: nowrap;
display: block;
}
summary {
list-style: none;
cursor: pointer;
text-overflow: ellipsis;
overflow: hidden;
text-wrap: nowrap;
display: block;
}
summary:hover,span.leaf:hover {
background-color: #f0f0f0;
}
details > summary::after {
content: '';
}
details:not([open]) > summary::after {
content: "";
}
.leaf {
text-overflow: ellipsis;
overflow: hidden;
text-wrap: nowrap;
display: block;
}
summary::-webkit-details-marker {
display: none;
content: "";
}
.qubed-tree-view summary:hover,span.leaf:hover {
background-color: #f0f0f0;
}
.qubed-tree-view details > summary::after {
content: '';
}
.qubed-tree-view details:not([open]) > summary::after {
content: "";
}
</style>
"""
nodes = "".join(_node_tree_to_html(node=node, depth=depth, **kwargs))
return f"{css}<pre class='qubed-tree-view'>{nodes}</pre>"
return f"{css}<pre class='qubed-tree' id='{css_id}'>{nodes}</pre>"

View File

@ -2,7 +2,7 @@ import dataclasses
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from typing import Any, Iterable, Literal
from typing import Any, FrozenSet, Iterable, Literal, TypeVar
@dataclass(frozen=True)
@ -22,13 +22,19 @@ class Values(ABC):
def from_strings(self, values: Iterable[str]) -> list['Values']:
pass
T = TypeVar("T")
EnumValuesType = FrozenSet[T]
@dataclass(frozen=True, order=True)
class Enum(Values):
class QEnum(Values):
"""
The simplest kind of key value is just a list of strings.
summary -> string1/string2/string....
"""
values: tuple[Any, ...]
values: EnumValuesType
def __init__(self, obj):
object.__setattr__(self, 'values', frozenset(obj))
def __post_init__(self):
assert isinstance(self.values, tuple)
@ -43,7 +49,7 @@ class Enum(Values):
def __contains__(self, value: Any) -> bool:
return value in self.values
def from_strings(self, values: Iterable[str]) -> list['Values']:
return [Enum(tuple(values))]
return [type(self)(tuple(values))]
@dataclass(frozen=True)
class Range(Values, ABC):
@ -115,8 +121,6 @@ class TimeRange(Range):
@classmethod
def from_strings(self, values: Iterable[str]) -> list['TimeRange']:
if len(values) == 0: return []
times = sorted([int(v) for v in values])
if len(times) < 2:
return [TimeRange(
@ -181,7 +185,6 @@ class IntRange(Range):
@classmethod
def from_strings(self, values: Iterable[str]) -> list['IntRange']:
if len(values) == 0: return []
ints = sorted([int(v) for v in values])
if len(ints) < 2:
return [IntRange(
@ -211,4 +214,14 @@ class IntRange(Range):
step=1
))
current_range = [ints.pop(0),]
return ranges
return ranges
def values_from_json(obj) -> Values:
if isinstance(obj, list):
return QEnum(tuple(obj))
match obj["dtype"]:
case "date": return DateRange(**obj)
case "time": return TimeRange(**obj)
case "int": return IntRange(**obj)
case _: raise ValueError(f"Unknown dtype {obj['dtype']}")

View File

@ -24,4 +24,17 @@ def test_n_leaves():
})
# Size is 3*3*3 + 1*1*1 = 27 + 1
assert q.n_leaves == 27 + 1
assert q.n_leaves == 27 + 1
# def test_union():
# q = Qube.from_dict({"a=1/2/3" : {"b=1" : {}},})
# r = Qube.from_dict({"a=2/3/4" : {"b=2" : {}},})
# u = Qube.from_dict({
# "a=1" : {"b=1" : {}},
# "a=1/2/3" : {"b=1/2" : {}},
# "a=4" : {"b=2" : {}},
# })
# assert q | r == u