Set operations done
This commit is contained in:
parent
fe00bb1c7f
commit
9d4fcbe624
@ -38,7 +38,9 @@ class Qube:
|
||||
return cls(
|
||||
data = NodeData(key, values, metadata = kwargs.get("metadata", frozendict())
|
||||
),
|
||||
children = tuple(sorted(children)),
|
||||
children = tuple(sorted(children,
|
||||
key = lambda n : ((n.key, n.values.min()))
|
||||
)),
|
||||
)
|
||||
|
||||
|
||||
@ -49,18 +51,19 @@ class Qube:
|
||||
key=json["key"],
|
||||
values=values_from_json(json["values"]),
|
||||
metadata=json["metadata"] if "metadata" in json else {},
|
||||
children=tuple(from_json(c) for c in json["children"])
|
||||
children=(from_json(c) for c in json["children"]),
|
||||
)
|
||||
return from_json(json)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict) -> 'Qube':
|
||||
def from_dict(d: dict) -> tuple[Qube, ...]:
|
||||
return tuple(Qube.make(
|
||||
key=k.split("=")[0],
|
||||
values=QEnum((k.split("=")[1].split("/"))),
|
||||
children=from_dict(children)
|
||||
) for k, children in d.items())
|
||||
def from_dict(d: dict) -> list[Qube]:
|
||||
return [
|
||||
Qube.make(
|
||||
key=k.split("=")[0],
|
||||
values=QEnum((k.split("=")[1].split("/"))),
|
||||
children=from_dict(children)
|
||||
) for k, children in d.items()]
|
||||
|
||||
return Qube.make(key = "root",
|
||||
values=QEnum(("root",)),
|
||||
@ -86,6 +89,15 @@ class Qube:
|
||||
|
||||
def __or__(self, other: "Qube") -> "Qube":
|
||||
return set_operations.operation(self, other, set_operations.SetOperation.UNION)
|
||||
|
||||
def __and__(self, other: "Qube") -> "Qube":
|
||||
return set_operations.operation(self, other, set_operations.SetOperation.INTERSECTION)
|
||||
|
||||
def __sub__(self, other: "Qube") -> "Qube":
|
||||
return set_operations.operation(self, other, set_operations.SetOperation.DIFFERENCE)
|
||||
|
||||
def __xor__(self, other: "Qube") -> "Qube":
|
||||
return set_operations.operation(self, other, set_operations.SetOperation.SYMMETRIC_DIFFERENCE)
|
||||
|
||||
|
||||
def __getitem__(self, args) -> 'Qube':
|
||||
@ -264,39 +276,13 @@ class Qube:
|
||||
return hash_node(self)
|
||||
|
||||
def compress(self) -> "Qube":
|
||||
# First compress the children
|
||||
# First compress the children (this recursively compresses all the way to the leaves)
|
||||
new_children = [child.compress() for child in self.children]
|
||||
|
||||
# Now take the set of new children and see if any have identical key, metadata and children
|
||||
# the values may different and will be collapsed into a single node
|
||||
identical_children = defaultdict(set)
|
||||
for child in new_children:
|
||||
# only care about the key and children of each node, ignore values
|
||||
key = hash((child.key, tuple((cc.structural_hash for cc in child.children))))
|
||||
identical_children[key].add(child)
|
||||
|
||||
# Now go through and create new compressed nodes for any groups that need collapsing
|
||||
new_children = []
|
||||
for child_set in identical_children.values():
|
||||
if len(child_set) > 1:
|
||||
child_set = list(child_set)
|
||||
key = child_set[0].key
|
||||
|
||||
# Compress the children into a single node
|
||||
assert all(isinstance(child.data.values, QEnum) for child in child_set), "All children must have QEnum values"
|
||||
|
||||
node_data = NodeData(
|
||||
key = key,
|
||||
metadata = frozendict(), # Todo: Implement metadata compression
|
||||
values = QEnum((v for child in child_set for v in child.data.values.values)),
|
||||
)
|
||||
new_child = Qube(data = node_data, children = child_set[0].children)
|
||||
else:
|
||||
# If the group is size one just keep it
|
||||
new_child = child_set.pop()
|
||||
|
||||
new_children.append(new_child)
|
||||
# Now compress the set of children at this level
|
||||
new_children = set_operations.compress_children(new_children)
|
||||
|
||||
# Return the now compressed node
|
||||
return Qube(
|
||||
data = self.data,
|
||||
children = tuple(sorted(new_children))
|
||||
|
@ -1,10 +1,12 @@
|
||||
import dataclasses
|
||||
from collections import defaultdict
|
||||
from dataclasses import replace
|
||||
from enum import Enum
|
||||
|
||||
# Prevent circular imports while allowing the type checker to know what Qube is
|
||||
from typing import TYPE_CHECKING, Iterable
|
||||
|
||||
from frozendict import frozendict
|
||||
|
||||
from .node_types import NodeData
|
||||
from .value_types import QEnum, Values
|
||||
|
||||
@ -48,28 +50,82 @@ def operation(A: "Qube", B : "Qube", operation_type: SetOperation) -> "Qube":
|
||||
for key, (A_nodes, B_nodes) in nodes_by_key.items():
|
||||
new_children.extend(_operation(key, A_nodes, B_nodes, operation_type))
|
||||
|
||||
# Whenever we modify children we should recompress them
|
||||
# But since `operation` is already recursive, we only need to compress this level not all levels
|
||||
# Hence we use the non-recursive _compress method
|
||||
new_children = compress_children(new_children)
|
||||
|
||||
# The values and key are the same so we just replace the children
|
||||
return dataclasses.replace(A, children=new_children)
|
||||
return replace(A, children=new_children)
|
||||
|
||||
|
||||
# The root node is special so we need a helper method that we can recurse on
|
||||
def _operation(key: str, A: list["Qube"], B : list["Qube"], operation_type: SetOperation) -> Iterable["Qube"]:
|
||||
# Iterate over all pairs (node_A, node_B)
|
||||
for node_a in A:
|
||||
for node_b in B:
|
||||
|
||||
# Compute A - B, A & B, B - A
|
||||
just_A, intersection, just_B = fused_set_operations(
|
||||
node_a.values,
|
||||
node_b.values
|
||||
)
|
||||
for values in just_A:
|
||||
data = NodeData(key, values, {})
|
||||
yield type(node_a)(data, node_a.children)
|
||||
keep_just_A, keep_intersection, keep_just_B = operation_type.value
|
||||
|
||||
if intersection:
|
||||
intersected_children = operation(node_a, node_b, operation_type)
|
||||
for values in intersection:
|
||||
data = NodeData(key, values, {})
|
||||
yield type(node_a)(data, intersected_children)
|
||||
# Values in just_A and just_B are simple because
|
||||
# we can just make new nodes that copy the children of node_A or node_B
|
||||
if keep_just_A:
|
||||
for group in just_A:
|
||||
data = NodeData(key, group, {})
|
||||
yield type(node_a)(data, node_a.children)
|
||||
|
||||
for values in just_B:
|
||||
data = NodeData(key, values, {})
|
||||
yield type(node_a)(data, node_b.children)
|
||||
if keep_just_B:
|
||||
for group in just_B:
|
||||
data = NodeData(key, group, {})
|
||||
yield type(node_a)(data, node_b.children)
|
||||
|
||||
if keep_intersection:
|
||||
for group in intersection:
|
||||
if group:
|
||||
new_node_a = replace(node_a, data = replace(node_a.data, values = group))
|
||||
new_node_b = replace(node_b, data= replace(node_b.data, values = group))
|
||||
yield operation(new_node_a, new_node_b, operation_type)
|
||||
|
||||
def compress_children(children: Iterable["Qube"]) -> tuple["Qube"]:
|
||||
"""
|
||||
Helper method tht only compresses a set of nodes, and doesn't do it recursively.
|
||||
Used in Qubed.compress but also to maintain compression in the set operations above.
|
||||
"""
|
||||
# Now take the set of new children and see if any have identical key, metadata and children
|
||||
# the values may different and will be collapsed into a single node
|
||||
identical_children = defaultdict(set)
|
||||
for child in children:
|
||||
# only care about the key and children of each node, ignore values
|
||||
key = hash((child.key, tuple((cc.structural_hash for cc in child.children))))
|
||||
identical_children[key].add(child)
|
||||
|
||||
# Now go through and create new compressed nodes for any groups that need collapsing
|
||||
new_children = []
|
||||
for child_set in identical_children.values():
|
||||
if len(child_set) > 1:
|
||||
child_set = list(child_set)
|
||||
node_type = type(child_set[0])
|
||||
key = child_set[0].key
|
||||
|
||||
# Compress the children into a single node
|
||||
assert all(isinstance(child.data.values, QEnum) for child in child_set), "All children must have QEnum values"
|
||||
|
||||
node_data = NodeData(
|
||||
key = key,
|
||||
metadata = frozendict(), # Todo: Implement metadata compression
|
||||
values = QEnum((v for child in child_set for v in child.data.values.values)),
|
||||
)
|
||||
new_child = node_type(data = node_data, children = child_set[0].children)
|
||||
else:
|
||||
# If the group is size one just keep it
|
||||
new_child = child_set.pop()
|
||||
|
||||
new_children.append(new_child)
|
||||
return tuple(sorted(new_children,
|
||||
key = lambda n : ((n.key, tuple(sorted(n.values.values))))
|
||||
))
|
@ -21,6 +21,10 @@ class Values(ABC):
|
||||
@abstractmethod
|
||||
def from_strings(self, values: Iterable[str]) -> list['Values']:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def min(self):
|
||||
pass
|
||||
|
||||
T = TypeVar("T")
|
||||
EnumValuesType = FrozenSet[T]
|
||||
@ -50,6 +54,8 @@ class QEnum(Values):
|
||||
return value in self.values
|
||||
def from_strings(self, values: Iterable[str]) -> list['Values']:
|
||||
return [type(self)(tuple(values))]
|
||||
def min(self):
|
||||
return min(self.values)
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Range(Values, ABC):
|
||||
|
@ -27,14 +27,42 @@ def test_n_leaves():
|
||||
assert q.n_leaves == 27 + 1
|
||||
|
||||
|
||||
# def test_union():
|
||||
# q = Qube.from_dict({"a=1/2/3" : {"b=1" : {}},})
|
||||
# r = Qube.from_dict({"a=2/3/4" : {"b=2" : {}},})
|
||||
def test_union():
|
||||
q = Qube.from_dict({"a=1/2/3" : {"b=1" : {}},})
|
||||
r = Qube.from_dict({"a=2/3/4" : {"b=2" : {}},})
|
||||
|
||||
# u = Qube.from_dict({
|
||||
# "a=1" : {"b=1" : {}},
|
||||
# "a=1/2/3" : {"b=1/2" : {}},
|
||||
# "a=4" : {"b=2" : {}},
|
||||
# })
|
||||
u = Qube.from_dict({
|
||||
"a=4" : {"b=2" : {}},
|
||||
"a=1" : {"b=1" : {}},
|
||||
"a=2/3" : {"b=1/2" : {}},
|
||||
|
||||
# assert q | r == u
|
||||
})
|
||||
|
||||
assert q | r == u
|
||||
|
||||
def test_difference():
|
||||
q = Qube.from_dict({"a=1/2/3/5" : {"b=1" : {}},})
|
||||
r = Qube.from_dict({"a=2/3/4" : {"b=1" : {}},})
|
||||
|
||||
i = Qube.from_dict({
|
||||
"a=1/5" : {"b=1" : {}},
|
||||
|
||||
})
|
||||
|
||||
assert q - r == i
|
||||
|
||||
def test_order_independence():
|
||||
u = Qube.from_dict({
|
||||
"a=4" : {"b=2" : {}},
|
||||
"a=1" : {"b=2" : {}, "b=1" : {}},
|
||||
"a=2/3" : {"b=1/2" : {}},
|
||||
|
||||
})
|
||||
|
||||
v = Qube.from_dict({
|
||||
"a=2/3" : {"b=1/2" : {}},
|
||||
"a=4" : {"b=2" : {}},
|
||||
"a=1" : {"b=1" : {}, "b=2" : {}},
|
||||
})
|
||||
|
||||
assert u == v
|
29
tests/test_compression.py
Normal file
29
tests/test_compression.py
Normal file
@ -0,0 +1,29 @@
|
||||
from qubed import Qube
|
||||
|
||||
|
||||
def test_smoke():
|
||||
q = Qube.from_dict({
|
||||
"class=od" : {
|
||||
"expver=0001": {"param=1":{}, "param=2":{}},
|
||||
"expver=0002": {"param=1":{}, "param=2":{}},
|
||||
},
|
||||
"class=rd" : {
|
||||
"expver=0001": {"param=1":{}, "param=2":{}, "param=3":{}},
|
||||
"expver=0002": {"param=1":{}, "param=2":{}},
|
||||
},
|
||||
})
|
||||
|
||||
# root
|
||||
# ├── class=od, expver=0001/0002, param=1/2
|
||||
# └── class=rd
|
||||
# ├── expver=0001, param=1/2/3
|
||||
# └── expver=0002, param=1/2
|
||||
ct = Qube.from_dict({
|
||||
"class=od" : {"expver=0001/0002": {"param=1/2":{}}},
|
||||
"class=rd" : {
|
||||
"expver=0001": {"param=1/2/3":{}},
|
||||
"expver=0002": {"param=1/2":{}},
|
||||
},
|
||||
})
|
||||
|
||||
assert q.compress() == ct
|
@ -11,4 +11,19 @@ def test_smoke():
|
||||
"expver=0001": {"param=1":{}, "param=2":{}, "param=3":{}},
|
||||
"expver=0002": {"param=1":{}, "param=2":{}},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
# root
|
||||
# ├── class=od, expver=0001/0002, param=1/2
|
||||
# └── class=rd
|
||||
# ├── expver=0001, param=1/2/3
|
||||
# └── expver=0002, param=1/2
|
||||
ct = Qube.from_dict({
|
||||
"class=od" : {"expver=0001/0002": {"param=1/2":{}}},
|
||||
"class=rd" : {
|
||||
"expver=0001": {"param=1/2/3":{}},
|
||||
"expver=0002": {"param=1/2":{}},
|
||||
},
|
||||
})
|
||||
|
||||
assert q.compress() == ct
|
Loading…
x
Reference in New Issue
Block a user