Compare commits

...

11 Commits

Author SHA1 Message Date
Tom
165bf5aca2 Tests passing checkpoint 2025-06-03 14:57:27 +02:00
Tom
aaafa28dfb A bit more on the rust backend 2025-05-29 17:09:17 +02:00
Tom Hodson
3328a0375b Fix update script a bit 2025-05-23 16:45:37 +00:00
Tom
ba2c67d812 Create example ingestion script 2025-05-23 10:55:32 +01:00
Tom
04b4ee24eb Silence protobuf warning 2025-05-22 17:26:58 +01:00
Tom
7069b70dd4 remove prints 2025-05-22 14:42:49 +01:00
Tom
90ea736c43 flesh out rust implementation 2025-05-22 14:40:44 +01:00
Tom
959dac332d Start writing rust backend 2025-05-19 10:20:12 +01:00
Tom
97c5abc38b
Update image link 2025-05-14 10:33:38 +01:00
Tom
1188733034 Update re 2025-05-14 10:21:48 +01:00
Tom
35bb8f0edd Massive rewrite 2025-05-14 10:14:02 +01:00
44 changed files with 21120 additions and 3499 deletions

View File

@ -8,7 +8,9 @@ repository = "https://github.com/ecmwf/qubed"
# rsfdb = {git = "https://github.com/ecmwf/rsfdb", branch = "develop"}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
pyo3 = "0.23"
pyo3 = "0.25"
lasso = "0.7.3"
itertools = "0.14.0"
[package.metadata.maturin]
version-from-git = true

View File

@ -1,4 +1,4 @@
# <p align="center"><img src="https://github.com/ecmwf/qubed/blob/main/docs/_static/banner.svg" width="1000"></p>
# <p align="center"><img src="https://raw.githubusercontent.com/ecmwf/qubed/refs/heads/main/docs/_static/banner.svg" width="1000"></p>
[![Static Badge](https://github.com/ecmwf/codex/raw/refs/heads/main/Project%20Maturity/emerging_badge.svg)](https://github.com/ecmwf/codex/raw/refs/heads/main/Project%20Maturity#emerging)
[![Docs](https://readthedocs.org/projects/qubed/badge/?version=latest)](https://qubed.readthedocs.io/en/latest/)
[![PyPi](https://img.shields.io/pypi/v/qubed.svg)](https://pypi.org/project/qubed/)

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,6 @@
---
type: remote
host: databridge-prod-catalogue3-ope.ewctest.link
port: 10000
engine: remote
store: remote

View File

@ -0,0 +1,6 @@
---
type: remote
host: databridge-prod-catalogue1-ope.ewctest.link
port: 10000
engine: remote
store: remote

File diff suppressed because it is too large Load Diff

18996
config/language/paramids.yaml Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@ dynamic = ["version"]
dependencies = [
"frozendict",
"numpy",
"protobuf",
# CLI
"rich",

View File

@ -8,19 +8,20 @@ import functools
import json
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import cached_property
from pathlib import Path
from typing import Any, Iterable, Iterator, Literal, Sequence
from typing import Any, Iterable, Iterator, Literal, Mapping, Self, Sequence
import numpy as np
from frozendict import frozendict
from . import set_operations
from .metadata import from_nodes
from .node_types import NodeData, RootNodeData
from .protobuf.adapters import proto_to_qube, qube_to_proto
from .tree_formatters import (
HTML,
_display,
node_tree_to_html,
node_tree_to_string,
)
@ -32,58 +33,107 @@ from .value_types import (
)
@dataclass(frozen=False, eq=True, order=True, unsafe_hash=True)
class Qube:
data: NodeData
children: tuple[Qube, ...]
@dataclass
class AxisInfo:
key: str
type: Any
depths: set[int]
values: set
@property
def key(self) -> str:
return self.data.key
def combine(self, other: Self):
self.key = other.key
self.type = other.type
self.depths.update(other.depths)
self.values.update(other.values)
# print(f"combining {self} and {other} getting {result}")
@property
def values(self) -> ValueGroup:
return self.data.values
@property
def metadata(self):
return self.data.metadata
@property
def dtype(self):
return self.data.dtype
def replace(self, **kwargs) -> Qube:
data_keys = {
k: v
for k, v in kwargs.items()
if k in ["key", "values", "metadata", "dtype"]
def to_json(self):
return {
"key": self.key,
"type": self.type.__name__,
"values": list(self.values),
"depths": list(self.depths),
}
node_keys = {k: v for k, v in kwargs.items() if k == "children"}
if not data_keys and not node_keys:
return self
if not data_keys:
return dataclasses.replace(self, **node_keys)
return dataclasses.replace(
self, data=dataclasses.replace(self.data, **data_keys), **node_keys
)
@dataclass(frozen=True, eq=True, order=True, unsafe_hash=True)
class QubeNamedRoot:
"Helper class to print a custom root name"
key: str
children: tuple[Qube, ...] = ()
def summary(self) -> str:
return self.data.summary()
return self.key
@dataclass(frozen=False, eq=True, order=True, unsafe_hash=True)
class Qube:
key: str
values: ValueGroup
metadata: frozendict[str, np.ndarray] = field(
default_factory=lambda: frozendict({}), compare=False
)
children: tuple[Qube, ...] = ()
is_root: bool = False
is_leaf: bool = False
depth: int = field(default=0, compare=False)
shape: tuple[int, ...] = field(default=(), compare=False)
@classmethod
def make(cls, key: str, values: ValueGroup, children, **kwargs) -> Qube:
def make_node(
cls,
key: str,
values: Iterable | QEnum | WildcardGroup,
children: Iterable[Qube],
metadata: Mapping[str, np.ndarray] = {},
is_root: bool = False,
is_leaf: bool | None = None,
) -> Qube:
if isinstance(values, ValueGroup):
values = values
else:
values = QEnum(values)
if not isinstance(values, WildcardGroup) and not is_root:
assert len(values) > 0, "Nodes must have at least one value"
children = tuple(sorted(children, key=lambda n: ((n.key, n.values.min()))))
return cls(
data=NodeData(
key, values, metadata=frozendict(kwargs.get("metadata", frozendict()))
),
children=tuple(sorted(children, key=lambda n: ((n.key, n.values.min())))),
key,
values=values,
children=children,
metadata=frozendict(metadata),
is_root=is_root,
is_leaf=(not len(children)) if is_leaf is None else is_leaf,
)
@classmethod
def root_node(cls, children: Iterable[Qube]) -> Qube:
return cls.make("root", QEnum(("root",)), children)
def make_root(cls, children: Iterable[Qube], metadata={}) -> Qube:
def update_depth_shape(children, depth, shape):
for child in children:
child.depth = depth + 1
child.shape = shape + (len(child.values),)
update_depth_shape(child.children, child.depth, child.shape)
update_depth_shape(children, depth=0, shape=(1,))
return cls.make_node(
"root",
values=QEnum(("root",)),
children=children,
metadata=metadata,
is_root=True,
)
def replace(self, **kwargs) -> Qube:
return dataclasses.replace(self, **kwargs)
def summary(self) -> str:
if self.is_root:
return self.key
return f"{self.key}={self.values.summary()}" if self.key != "root" else "root"
@classmethod
def load(cls, path: str | Path) -> Qube:
@ -91,7 +141,7 @@ class Qube:
return Qube.from_json(json.load(f))
@classmethod
def from_datacube(cls, datacube: dict[str, str | Sequence[str]]) -> Qube:
def from_datacube(cls, datacube: Mapping[str, str | Sequence[str]]) -> Qube:
key_vals = list(datacube.items())[::-1]
children: list[Qube] = []
@ -104,18 +154,19 @@ class Qube:
else:
values_group = QEnum([values])
children = [cls.make(key, values_group, children)]
children = [cls.make_node(key, values_group, children)]
return cls.root_node(children)
return cls.make_root(children)
@classmethod
def from_json(cls, json: dict) -> Qube:
def from_json(json: dict) -> Qube:
return Qube.make(
def from_json(json: dict, depth=0) -> Qube:
return Qube.make_node(
key=json["key"],
values=values_from_json(json["values"]),
metadata=frozendict(json["metadata"]) if "metadata" in json else {},
children=(from_json(c) for c in json["children"]),
children=(from_json(c, depth + 1) for c in json["children"]),
is_root=(depth == 0),
)
return from_json(json)
@ -141,18 +192,29 @@ class Qube:
for k, children in d.items():
key, values = k.split("=")
values = values.split("/")
# children == {"..." : {}}
# is a special case to represent trees with leaves we don't know about
if frozendict(children) == frozendict({"...": {}}):
yield Qube.make_node(
key=key,
values=values,
children={},
is_leaf=False,
)
# Special case for Wildcard values
if values == ["*"]:
values = WildcardGroup()
else:
values = QEnum(values)
yield Qube.make(
yield Qube.make_node(
key=key,
values=values,
children=from_dict(children),
)
return Qube.root_node(list(from_dict(d)))
return Qube.make_root(list(from_dict(d)))
def to_dict(self) -> dict:
def to_dict(q: Qube) -> tuple[str, dict]:
@ -161,6 +223,13 @@ class Qube:
return to_dict(self)[1]
@classmethod
def from_protobuf(cls, msg: bytes) -> Qube:
return proto_to_qube(cls, msg)
def to_protobuf(self) -> bytes:
return qube_to_proto(self)
@classmethod
def from_tree(cls, tree_str):
lines = tree_str.splitlines()
@ -214,17 +283,12 @@ class Qube:
@classmethod
def empty(cls) -> Qube:
return Qube.root_node([])
return Qube.make_root([])
def __str_helper__(self, depth=None, name=None) -> str:
node = (
dataclasses.replace(
self,
data=RootNodeData(key=name, values=self.values, metadata=self.metadata),
)
if name is not None
else self
)
node = self
if name is not None:
node = node.replace(key=name)
out = "".join(node_tree_to_string(node=node, depth=depth))
if out[-1] == "\n":
out = out[:-1]
@ -239,16 +303,19 @@ class Qube:
def print(self, depth=None, name: str | None = None):
print(self.__str_helper__(depth=depth, name=name))
def html(self, depth=2, collapse=True, name: str | None = None) -> HTML:
node = (
dataclasses.replace(
def html(
self,
data=RootNodeData(key=name, values=self.values, metadata=self.metadata),
depth=2,
collapse=True,
name: str | None = None,
info: Callable[[Qube], str] | None = None,
) -> HTML:
node = self
if name is not None:
node = node.replace(key=name)
return HTML(
node_tree_to_html(node=node, depth=depth, collapse=collapse, info=info)
)
if name is not None
else self
)
return HTML(node_tree_to_html(node=node, depth=depth, collapse=collapse))
def _repr_html_(self) -> str:
return node_tree_to_html(self, depth=2, collapse=True)
@ -257,7 +324,7 @@ class Qube:
def __rtruediv__(self, other: str) -> Qube:
key, values = other.split("=")
values_enum = QEnum((values.split("/")))
return Qube.root_node([Qube.make(key, values_enum, self.children)])
return Qube.make_root([Qube.make_node(key, values_enum, self.children)])
def __or__(self, other: Qube) -> Qube:
return set_operations.operation(
@ -334,6 +401,7 @@ class Qube:
for c in node.children:
yield from to_list_of_cubes(c)
else:
if not node.children:
yield {node.key: list(node.values)}
@ -358,16 +426,16 @@ class Qube:
raise KeyError(
f"Key '{key}' not found in children of '{current.key}', available keys are {[c.key for c in current.children]}"
)
return Qube.root_node(current.children)
return Qube.make_root(current.children)
elif isinstance(args, tuple) and len(args) == 2:
key, value = args
for c in self.children:
if c.key == key and value in c.values:
return Qube.root_node(c.children)
raise KeyError(f"Key {key} not found in children of {self.key}")
return Qube.make_root(c.children)
raise KeyError(f"Key '{key}' not found in children of {self.key}")
else:
raise ValueError("Unknown key type")
raise ValueError(f"Unknown key type {args}")
@cached_property
def n_leaves(self) -> int:
@ -410,7 +478,7 @@ class Qube:
for c in node.children:
if c.key in _keys:
grandchildren = tuple(sorted(remove_key(cc) for cc in c.children))
grandchildren = remove_key(Qube.root_node(grandchildren)).children
grandchildren = remove_key(Qube.make_root(grandchildren)).children
children.extend(grandchildren)
else:
children.append(remove_key(c))
@ -424,7 +492,7 @@ class Qube:
if node.key in converters:
converter = converters[node.key]
values = [converter(v) for v in node.values]
new_node = node.replace(values=QEnum(values), dtype=type(values[0]))
new_node = node.replace(values=QEnum(values))
return new_node
return node
@ -435,7 +503,6 @@ class Qube:
selection: dict[str, str | list[str] | Callable[[Any], bool]],
mode: Literal["strict", "relaxed"] = "relaxed",
consume=False,
require_match=False,
) -> Qube:
# Find any bare str values and replace them with [str]
_selection: dict[str, list[str] | Callable[[Any], bool]] = {}
@ -468,11 +535,11 @@ class Qube:
if mode == "strict":
return None
# If this node doesn't exist in the
elif mode == "next_level":
return node.replace(
children=(),
metadata=self.metadata | {"is_leaf": not bool(self.children)},
metadata=self.metadata
| {"is_leaf": np.array([not bool(node.children)])},
)
elif mode == "relaxed":
@ -501,22 +568,22 @@ class Qube:
if consume:
selection = {k: v for k, v in selection.items() if k != node.key}
# prune branches with no matches
if require_match and not node.children and not matched:
return None
# Prune nodes that had had all their children pruned
new_children = not_none(
select(c, selection, matched) for c in node.children
)
# if node.key == "dataset": print(prune, [(c.key, c.values.values) for c in node.children], [c.key for c in new_children])
if node.children and not new_children:
return None
metadata = dict(node.metadata)
if mode == "next_level":
metadata["is_leaf"] = np.array([not bool(node.children)])
return node.replace(
children=new_children,
metadata=dict(self.metadata) | {"is_leaf": not bool(new_children)},
metadata=metadata,
)
return self.replace(
@ -544,6 +611,26 @@ class Qube:
axes[self.key].update(self.values)
return dict(axes)
def axes_info(self, depth=0) -> dict[str, AxisInfo]:
axes = defaultdict(
lambda: AxisInfo(key="", type=str, depths=set(), values=set())
)
for c in self.children:
for k, info in c.axes_info(depth=depth + 1).items():
axes[k].combine(info)
if self.key != "root":
axes[self.key].combine(
AxisInfo(
key=self.key,
type=type(next(iter(self.values))),
depths={depth},
values=set(self.values),
)
)
return dict(axes)
@cached_property
def structural_hash(self) -> int:
"""
@ -570,7 +657,7 @@ class Qube:
"""
def union(a: Qube, b: Qube) -> Qube:
b = type(self).root_node(children=(b,))
b = type(self).make_root(children=(b,))
out = set_operations.operation(
a, b, set_operations.SetOperation.UNION, type(self)
)
@ -583,3 +670,23 @@ class Qube:
)
return self.replace(children=tuple(sorted(new_children)))
def add_metadata(self, **kwargs: dict[str, Any]):
metadata = {
k: np.array(
[
v,
]
)
for k, v in kwargs.items()
}
return self.replace(metadata=metadata)
def strip_metadata(self) -> Qube:
def strip(node):
return node.replace(metadata=frozendict({}))
return self.transform(strip)
def display(self):
_display(self)

View File

@ -1,3 +1,4 @@
from . import protobuf
from .Qube import Qube
__all__ = ["Qube"]
__all__ = ["Qube", "protobuf"]

View File

@ -18,7 +18,7 @@ def make_node(
children: tuple[Qube, ...],
metadata: dict[str, np.ndarray] | None = None,
):
return cls.make(
return cls.make_node(
key=key,
values=QEnum(values),
metadata={k: np.array(v).reshape(shape) for k, v in metadata.items()}
@ -39,5 +39,5 @@ def from_nodes(cls, nodes, add_root=True):
root = make_node(cls, shape=shape, children=(root,), key=key, **info)
if add_root:
return cls.root_node(children=(root,))
return cls.make_root(children=(root,))
return root

View File

@ -1,27 +0,0 @@
from dataclasses import dataclass, field
import numpy as np
from frozendict import frozendict
from .value_types import ValueGroup
@dataclass(frozen=False, eq=True, order=True, unsafe_hash=True)
class NodeData:
key: str
values: ValueGroup
metadata: frozendict[str, np.ndarray] = field(
default_factory=lambda: frozendict({}), compare=False
)
dtype: type = str
def summary(self) -> str:
return f"{self.key}={self.values.summary()}" if self.key != "root" else "root"
@dataclass(frozen=False, eq=True, order=True)
class RootNodeData(NodeData):
"Helper class to print a custom root name"
def summary(self) -> str:
return self.key

View File

View File

@ -0,0 +1,109 @@
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING
import numpy as np
from frozendict import frozendict
from ..value_types import QEnum
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
"Protobuf gencode version",
UserWarning,
"google.protobuf.runtime_version",
)
from . import qube_pb2
if TYPE_CHECKING:
from ..Qube import Qube
def _ndarray_to_proto(arr: np.ndarray) -> qube_pb2.NdArray:
"""np.ndarray → NdArray message"""
return qube_pb2.NdArray(
shape=list(arr.shape),
dtype=str(arr.dtype),
raw=arr.tobytes(order="C"),
)
def _ndarray_from_proto(msg: qube_pb2.NdArray) -> np.ndarray:
"""NdArray message → np.ndarray (immutable view)"""
return np.frombuffer(msg.raw, dtype=msg.dtype).reshape(tuple(msg.shape))
def _py_to_valuegroup(value: list[str] | np.ndarray) -> qube_pb2.ValueGroup:
"""Accept str-sequence *or* ndarray and return ValueGroup."""
vg = qube_pb2.ValueGroup()
if isinstance(value, np.ndarray):
vg.tensor.CopyFrom(_ndarray_to_proto(value))
else:
vg.s.items.extend(value)
return vg
def _valuegroup_to_py(vg: qube_pb2.ValueGroup) -> list[str] | np.ndarray:
"""ValueGroup → list[str] *or* ndarray"""
arm = vg.WhichOneof("payload")
if arm == "tensor":
return _ndarray_from_proto(vg.tensor)
return QEnum(vg.s.items)
def _py_to_metadatagroup(value: np.ndarray) -> qube_pb2.MetadataGroup:
"""Accept str-sequence *or* ndarray and return ValueGroup."""
vg = qube_pb2.MetadataGroup()
if not isinstance(value, np.ndarray):
value = np.array([value])
vg.tensor.CopyFrom(_ndarray_to_proto(value))
return vg
def _metadatagroup_to_py(vg: qube_pb2.MetadataGroup) -> np.ndarray:
"""ValueGroup → list[str] *or* ndarray"""
arm = vg.WhichOneof("payload")
if arm == "tensor":
return _ndarray_from_proto(vg.tensor)
raise ValueError(f"Unknown arm {arm}")
def _qube_to_proto(q: Qube) -> qube_pb2.Qube:
"""Frozen Qube dataclass → protobuf Qube message (new object)."""
return qube_pb2.Qube(
key=q.key,
values=_py_to_valuegroup(q.values),
metadata={k: _py_to_metadatagroup(v) for k, v in q.metadata.items()},
children=[_qube_to_proto(c) for c in q.children],
is_root=q.is_root,
)
def qube_to_proto(q: Qube) -> bytes:
return _qube_to_proto(q).SerializeToString()
def _proto_to_qube(cls: type, msg: qube_pb2.Qube) -> Qube:
"""protobuf Qube message → frozen Qube dataclass (new object)."""
return cls.make_node(
key=msg.key,
values=_valuegroup_to_py(msg.values),
metadata=frozendict(
{k: _metadatagroup_to_py(v) for k, v in msg.metadata.items()}
),
children=tuple(_proto_to_qube(cls, c) for c in msg.children),
is_root=msg.is_root,
)
def proto_to_qube(cls: type, wire: bytes) -> Qube:
msg = qube_pb2.Qube()
msg.ParseFromString(wire)
return _proto_to_qube(cls, msg)

View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: qube.proto
# Protobuf Python Version: 5.29.0
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC, 5, 29, 0, "", "qube.proto"
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\nqube.proto"4\n\x07NdArray\x12\r\n\x05shape\x18\x01 \x03(\x03\x12\r\n\x05\x64type\x18\x02 \x01(\t\x12\x0b\n\x03raw\x18\x03 \x01(\x0c"\x1c\n\x0bStringGroup\x12\r\n\x05items\x18\x01 \x03(\t"N\n\nValueGroup\x12\x19\n\x01s\x18\x01 \x01(\x0b\x32\x0c.StringGroupH\x00\x12\x1a\n\x06tensor\x18\x02 \x01(\x0b\x32\x08.NdArrayH\x00\x42\t\n\x07payload"6\n\rMetadataGroup\x12\x1a\n\x06tensor\x18\x01 \x01(\x0b\x32\x08.NdArrayH\x00\x42\t\n\x07payload"\xd1\x01\n\x04Qube\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1b\n\x06values\x18\x02 \x01(\x0b\x32\x0b.ValueGroup\x12%\n\x08metadata\x18\x03 \x03(\x0b\x32\x13.Qube.MetadataEntry\x12\r\n\x05\x64type\x18\x04 \x01(\t\x12\x17\n\x08\x63hildren\x18\x05 \x03(\x0b\x32\x05.Qube\x12\x0f\n\x07is_root\x18\x06 \x01(\x08\x1a?\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1d\n\x05value\x18\x02 \x01(\x0b\x32\x0e.MetadataGroup:\x02\x38\x01\x62\x06proto3'
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "qube_pb2", _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals["_QUBE_METADATAENTRY"]._loaded_options = None
_globals["_QUBE_METADATAENTRY"]._serialized_options = b"8\001"
_globals["_NDARRAY"]._serialized_start = 14
_globals["_NDARRAY"]._serialized_end = 66
_globals["_STRINGGROUP"]._serialized_start = 68
_globals["_STRINGGROUP"]._serialized_end = 96
_globals["_VALUEGROUP"]._serialized_start = 98
_globals["_VALUEGROUP"]._serialized_end = 176
_globals["_METADATAGROUP"]._serialized_start = 178
_globals["_METADATAGROUP"]._serialized_end = 232
_globals["_QUBE"]._serialized_start = 235
_globals["_QUBE"]._serialized_end = 444
_globals["_QUBE_METADATAENTRY"]._serialized_start = 381
_globals["_QUBE_METADATAENTRY"]._serialized_end = 444
# @@protoc_insertion_point(module_scope)

View File

@ -1,3 +1,25 @@
"""
# Set Operations
The core of this is the observation that for two sets A and B, if we compute (A - B), (A B) amd (B - A)
then we can get the other operations by taking unions of the above three objects.
Union: All of them
Intersection: Just take A B
Difference: Take either A - B or B - A
Symmetric Difference (XOR): Take A - B and B - A
We start with a shallow implementation of this algorithm that only deals with a pair of nodes, not the whole tree:
shallow_set_operation(A: Qube, B: Qube) -> SetOpsResult
This takes two qubes and (morally) returns (A - B), (A B) amd (B - A) but only for the values and metadata at the top level.
For technical reasons that will become clear we actually return a struct with two copies of (A B). One has the metadata from A and the children of A call it A', and the other has them from B call it B'. This is relevant when we extend the shallow algorithm to work with a whole tree because we will recurse and compute the set operation for each pair of the children of A' and B'.
NB: Currently there are two kinds of values, QEnums, that store a list of values and Wildcards that 'match with everything'. shallow_set_operation checks the type of values and dispatches to different methods depending on the combination of types it finds.
"""
from __future__ import annotations
from collections import defaultdict
@ -10,7 +32,6 @@ from typing import TYPE_CHECKING, Any, Iterable
import numpy as np
from frozendict import frozendict
from .node_types import NodeData
from .value_types import QEnum, ValueGroup, WildcardGroup
if TYPE_CHECKING:
@ -18,6 +39,8 @@ if TYPE_CHECKING:
class SetOperation(Enum):
"Map from set operations to which combination of (A - B), (A ∩ B), (B - A) we need."
UNION = (1, 1, 1)
INTERSECTION = (0, 1, 0)
DIFFERENCE = (1, 0, 0)
@ -25,112 +48,226 @@ class SetOperation(Enum):
@dataclass(eq=True, frozen=True)
class ValuesMetadata:
class ValuesIndices:
"Helper class to hold the values and indices from a node."
values: ValueGroup
metadata: dict[str, np.ndarray]
indices: tuple[int, ...]
@classmethod
def from_values(cls, values: ValueGroup):
return cls(values=values, indices=tuple(range(len(values))))
@classmethod
def empty(cls):
return cls(values=QEnum([]), indices=())
def enumerate(self) -> Iterable[tuple[Any, int]]:
return zip(self.indices, self.values)
def QEnum_intersection(
A: ValuesMetadata,
B: ValuesMetadata,
) -> tuple[ValuesMetadata, ValuesMetadata, ValuesMetadata]:
intersection: dict[Any, int] = {}
just_A: dict[Any, int] = {}
just_B: dict[Any, int] = {val: i for i, val in enumerate(B.values)}
for index_a, val_A in enumerate(A.values):
if val_A in B.values:
just_B.pop(val_A)
intersection[val_A] = (
index_a # We throw away any overlapping metadata from B
)
else:
just_A[val_A] = index_a
intersection_out = ValuesMetadata(
values=QEnum(list(intersection.keys())),
metadata={
k: v[..., tuple(intersection.values())] for k, v in A.metadata.items()
},
def get_indices(
metadata: frozendict[str, np.ndarray], indices: tuple[int, ...]
) -> frozendict[str, np.ndarray]:
"Given a metadata dict and some indices, return a new metadata dict with only the values indexed by the indices"
return frozendict(
{k: v[..., indices] for k, v in metadata.items() if isinstance(v, np.ndarray)}
)
just_A_out = ValuesMetadata(
values=QEnum(list(just_A.keys())),
metadata={k: v[..., tuple(just_A.values())] for k, v in A.metadata.items()},
@dataclass(eq=True, frozen=True)
class SetOpResult:
"""
Given two sets A and B, all possible set operations can be constructed from A - B, A B, B - A
That is, what's only in A, the intersection and what's only in B
However because we need to recurse on children we actually return two intersection node:
only_A is a qube with:
The values in A but not in B
The metadata corresponding to this values
All the children A had
intersection_A is a qube with:
The values that intersected with B
The metadata from that intersection
All the children A had
And vice versa for only_B and intersection B
"""
only_A: ValuesIndices
intersection_A: ValuesIndices
intersection_B: ValuesIndices
only_B: ValuesIndices
def shallow_qenum_set_operation(A: ValuesIndices, B: ValuesIndices) -> SetOpResult:
"""
For two sets of values, partition the overlap into four groups:
only_A: values and indices of values that are in A but not B
intersection_A: values and indices of values that are in both A and B
And vice versa for only_B and intersection_B.
Note that intersection_A and intersection_B contain the same values but the indices are different.
"""
# create four groups that map value -> index
only_A: dict[Any, int] = {val: i for i, val in A.enumerate()}
only_B: dict[Any, int] = {val: i for i, val in B.enumerate()}
intersection_A: dict[Any, int] = {}
intersection_B: dict[Any, int] = {}
# Go through all the values and move any that are in the intersection
# to the corresponding group, keeping the indices
for val in A.values:
if val in B.values:
intersection_A[val] = only_A.pop(val)
intersection_B[val] = only_B.pop(val)
def package(values_indices: dict[Any, int]) -> ValuesIndices:
return ValuesIndices(
values=QEnum(list(values_indices.keys())),
indices=tuple(values_indices.values()),
)
just_B_out = ValuesMetadata(
values=QEnum(list(just_B.keys())),
metadata={k: v[..., tuple(just_B.values())] for k, v in B.metadata.items()},
return SetOpResult(
only_A=package(only_A),
only_B=package(only_B),
intersection_A=package(intersection_A),
intersection_B=package(intersection_B),
)
return just_A_out, intersection_out, just_B_out
def shallow_wildcard_set_operation(A: ValuesIndices, B: ValuesIndices) -> SetOpResult:
"""
WildcardGroups behave as if they contain all the values of whatever they match against.
For two wildcards we just return both.
For A == wildcard and B == enum we have to be more careful:
1. All of B is in the intersection so only_B is None too.
2. The wildcard may need to match against other things so only_A is A
3. We return B in the intersection_B and intersection_A slot.
def node_intersection(
A: ValuesMetadata,
B: ValuesMetadata,
) -> tuple[ValuesMetadata, ValuesMetadata, ValuesMetadata]:
if isinstance(A.values, QEnum) and isinstance(B.values, QEnum):
return QEnum_intersection(A, B)
This last bit happens because the wildcard basically adopts the values of whatever it sees.
"""
# Two wildcard groups have full overlap.
if isinstance(A.values, WildcardGroup) and isinstance(B.values, WildcardGroup):
return (
ValuesMetadata(QEnum([]), {}),
ValuesMetadata(WildcardGroup(), {}),
ValuesMetadata(QEnum([]), {}),
)
return SetOpResult(ValuesIndices.empty(), A, B, ValuesIndices.empty())
# If A is a wildcard matcher then the intersection is everything
# just_A is still *
# just_B is empty
# If A is a wildcard matcher and B is not
# then the intersection is everything from B
if isinstance(A.values, WildcardGroup):
return A, B, ValuesMetadata(QEnum([]), {})
return SetOpResult(A, B, B, ValuesIndices.empty())
# The reverse if B is a wildcard
# If B is a wildcard matcher and A is not
# then the intersection is everything from A
if isinstance(B.values, WildcardGroup):
return ValuesMetadata(QEnum([]), {}), A, B
return SetOpResult(ValuesIndices.empty(), A, A, B)
raise NotImplementedError(
f"Fused set operations on values types {type(A.values)} and {type(B.values)} not yet implemented"
f"One of {type(A.values)} and {type(B.values)} should be WildCardGroup"
)
def operation(A: Qube, B: Qube, operation_type: SetOperation, node_type) -> Qube | None:
def shallow_set_operation(
A: ValuesIndices,
B: ValuesIndices,
) -> SetOpResult:
if isinstance(A.values, QEnum) and isinstance(B.values, QEnum):
return shallow_qenum_set_operation(A, B)
# WildcardGroups behave as if they contain all possible values.
if isinstance(A.values, WildcardGroup) or isinstance(B.values, WildcardGroup):
return shallow_wildcard_set_operation(A, B)
raise NotImplementedError(
f"Set operations on values types {type(A.values)} and {type(B.values)} not yet implemented"
)
def operation(
A: Qube, B: Qube, operation_type: SetOperation, node_type, depth=0
) -> Qube | None:
# print(f"operation({A}, {B})")
assert A.key == B.key, (
"The two Qube root nodes must have the same key to perform set operations,"
f"would usually be two root nodes. They have {A.key} and {B.key} respectively"
)
node_key = A.key
assert A.is_root == B.is_root
is_root = A.is_root
assert A.values == B.values, (
f"The two Qube root nodes must have the same values to perform set operations {A.values = }, {B.values = }"
)
node_values = A.values
# Group the children of the two nodes by key
nodes_by_key: defaultdict[str, tuple[list[Qube], list[Qube]]] = defaultdict(
lambda: ([], [])
)
new_children: list[Qube] = []
# Sort out metadata into what can stay at this level and what must move down
stayput_metadata: dict[str, np.ndarray] = {}
pushdown_metadata_A: dict[str, np.ndarray] = {}
pushdown_metadata_B: dict[str, np.ndarray] = {}
for key in set(A.metadata.keys()) | set(B.metadata.keys()):
if key not in A.metadata:
pushdown_metadata_B[key] = B.metadata[key]
continue
if key not in B.metadata:
pushdown_metadata_A[key] = A.metadata[key]
continue
A_val = A.metadata[key]
B_val = B.metadata[key]
if np.allclose(A_val, B_val):
# print(f"{' ' * depth}Keeping metadata key '{key}' at this level")
stayput_metadata[key] = A.metadata[key]
else:
# print(f"{' ' * depth}Pushing down metadata key '{key}' {A_val} {B_val}")
pushdown_metadata_A[key] = A_val
pushdown_metadata_B[key] = B_val
# Add all the metadata that needs to be pushed down to the child nodes
# When pushing down the metadata we need to account for the fact it now affects more values
# So expand the metadata entries from shape (a, b, ..., c) to (a, b, ..., c, d)
# where d is the length of the node values
for node in A.children:
N = len(node.values)
meta = {
k: np.broadcast_to(v[..., np.newaxis], v.shape + (N,))
for k, v in pushdown_metadata_A.items()
}
node = node.replace(metadata=node.metadata | meta)
nodes_by_key[node.key][0].append(node)
for node in B.children:
N = len(node.values)
meta = {
k: np.broadcast_to(v[..., np.newaxis], v.shape + (N,))
for k, v in pushdown_metadata_B.items()
}
node = node.replace(metadata=node.metadata | meta)
nodes_by_key[node.key][1].append(node)
new_children: list[Qube] = []
# print(f"{nodes_by_key = }")
# For every node group, perform the set operation
for key, (A_nodes, B_nodes) in nodes_by_key.items():
output = list(_operation(key, A_nodes, B_nodes, operation_type, node_type))
output = list(
_operation(A_nodes, B_nodes, operation_type, node_type, depth + 1)
)
# print(f"{' '*depth}_operation {operation_type.name} {A_nodes} {B_nodes} out = [{output}]")
new_children.extend(output)
# print(f"operation {operation_type}: {A}, {B} {new_children = }")
# print(f"{A.children = }")
# print(f"{B.children = }")
# print(f"{new_children = }")
# print(f"{' '*depth}operation {operation_type.name} [{A}] [{B}] new_children = [{new_children}]")
# If there are now no children as a result of the operation, return nothing.
if (A.children or B.children) and not new_children:
if A.key == "root":
return A.replace(children=())
return node_type.make_root(children=())
else:
return None
@ -140,74 +277,100 @@ def operation(A: Qube, B: Qube, operation_type: SetOperation, node_type) -> Qube
new_children = list(compress_children(new_children))
# The values and key are the same so we just replace the children
return A.replace(children=tuple(sorted(new_children)))
if A.key == "root":
return node_type.make_root(
children=new_children,
metadata=stayput_metadata,
)
return node_type.make_node(
key=node_key,
values=node_values,
children=new_children,
metadata=stayput_metadata,
is_root=is_root,
)
# The root node is special so we need a helper method that we can recurse on
def _operation(
key: str, A: list[Qube], B: list[Qube], operation_type: SetOperation, node_type
A: list[Qube],
B: list[Qube],
operation_type: SetOperation,
node_type,
depth: int,
) -> Iterable[Qube]:
keep_just_A, keep_intersection, keep_just_B = operation_type.value
"""
This operation assumes that we've found two nodes that match and now want to do a set operation on their children. Hence we take in two lists of child nodes all of which have the same key but different values.
We then loop over all pairs of children from each list and compute the intersection.
"""
# print(f"_operation({A}, {B})")
keep_only_A, keep_intersection, keep_only_B = operation_type.value
# Iterate over all pairs (node_A, node_B)
values = {}
for node in A + B:
values[node] = ValuesMetadata(node.values, node.metadata)
# We're going to progressively remove values from the starting nodes as we do intersections
# So we make a node -> ValuesIndices mapping here for both a and b
only_a: dict[Qube, ValuesIndices] = {
n: ValuesIndices.from_values(n.values) for n in A
}
only_b: dict[Qube, ValuesIndices] = {
n: ValuesIndices.from_values(n.values) for n in B
}
def make_new_node(source: Qube, values_indices: ValuesIndices):
return source.replace(
values=values_indices.values,
metadata=get_indices(source.metadata, values_indices.indices),
)
# Iterate over all pairs (node_A, node_B) and perform the shallow set operation
# Update our copy of the original node to remove anything that appears in an intersection
for node_a in A:
for node_b in B:
# Compute A - B, A & B, B - A
# Update the values for the two source nodes to remove the intersection
just_a, intersection, just_b = node_intersection(
values[node_a],
values[node_b],
)
set_ops_result = shallow_set_operation(only_a[node_a], only_b[node_b])
# Remove the intersection from the source nodes
values[node_a] = just_a
values[node_b] = just_b
# Save reduced values back to nodes
only_a[node_a] = set_ops_result.only_A
only_b[node_b] = set_ops_result.only_B
if keep_intersection:
if intersection.values:
new_node_a = node_a.replace(
values=intersection.values,
metadata=intersection.metadata,
)
new_node_b = node_b.replace(
values=intersection.values,
metadata=intersection.metadata,
)
# print(f"{node_a = }")
# print(f"{node_b = }")
# print(f"{intersection.values =}")
if (
set_ops_result.intersection_A.values
and set_ops_result.intersection_B.values
):
result = operation(
new_node_a, new_node_b, operation_type, node_type
make_new_node(node_a, set_ops_result.intersection_A),
make_new_node(node_b, set_ops_result.intersection_B),
operation_type,
node_type,
depth=depth + 1,
)
if result is not None:
# If we're doing a difference or xor we might want to throw away the intersection
# However we can only do this once we get to the leaf nodes, otherwise we'll
# throw away nodes too early!
# Consider Qube(root, a=1, b=1/2) - Qube(root, a=1, b=1)
# We can easily throw away the whole a node by accident here!
if keep_intersection or result.children:
yield result
# Now we've removed all the intersections we can yield the just_A and just_B parts if needed
if keep_just_A:
for node in A:
if values[node].values:
yield node_type.make(
key,
children=node.children,
values=values[node].values,
metadata=values[node].metadata,
)
if keep_just_B:
for node in B:
if values[node].values:
yield node_type.make(
key,
children=node.children,
values=values[node].values,
metadata=values[node].metadata,
elif (
not set_ops_result.intersection_A.values
and not set_ops_result.intersection_B.values
):
continue
else:
raise ValueError(
f"Only one of set_ops_result.intersection_A and set_ops_result.intersection_B is None, I didn't think that could happen! {set_ops_result = }"
)
if keep_only_A:
for node, vi in only_a.items():
if vi.values:
yield make_new_node(node, vi)
def compress_children(children: Iterable[Qube]) -> tuple[Qube, ...]:
if keep_only_B:
for node, vi in only_b.items():
if vi.values:
yield make_new_node(node, vi)
def compress_children(children: Iterable[Qube], depth=0) -> tuple[Qube, ...]:
"""
Helper method tht only compresses a set of nodes, and doesn't do it recursively.
Used in Qubed.compress but also to maintain compression in the set operations above.
@ -224,49 +387,78 @@ def compress_children(children: Iterable[Qube]) -> tuple[Qube, ...]:
# Now go through and create new compressed nodes for any groups that need collapsing
new_children = []
for child_list in identical_children.values():
if len(child_list) > 1:
# If the group is size one just keep it
if len(child_list) == 1:
new_child = child_list.pop()
else:
example = child_list[0]
node_type = type(example)
key = child_list[0].key
value_type = type(example.values)
# Compress the children into a single node
assert all(isinstance(child.data.values, QEnum) for child in child_list), (
"All children must have QEnum values"
assert all(isinstance(child.values, value_type) for child in child_list), (
f"All nodes to be grouped must have the same value type, expected {value_type}"
)
metadata_groups = {
k: [child.metadata[k] for child in child_list]
for k in example.metadata.keys()
}
# We know the children of this group of nodes all have the same structure
# but we still need to merge the metadata across them
# children = example.children
children = merge_metadata(child_list, example.depth)
metadata: frozendict[str, np.ndarray] = frozendict(
{
k: np.concatenate(metadata_group, axis=0)
for k, metadata_group in metadata_groups.items()
}
)
# Do we need to recusively compress here?
# children = compress_children(children, depth=depth+1)
node_data = NodeData(
key=key,
metadata=metadata,
values=QEnum(set(v for child in child_list for v in child.data.values)),
)
children = [cc for c in child_list for cc in c.children]
compressed_children = compress_children(children)
new_child = node_type(data=node_data, children=compressed_children)
if value_type is QEnum:
values = QEnum(set(v for child in child_list for v in child.values))
elif value_type is WildcardGroup:
values = example.values
else:
# If the group is size one just keep it
new_child = child_list.pop()
raise ValueError(f"Unknown value type: {value_type}")
new_child = node_type.make_node(
key=example.key,
metadata=example.metadata,
values=values,
children=children,
)
new_children.append(new_child)
return tuple(sorted(new_children, key=lambda n: ((n.key, n.values.min()))))
def union(a: Qube, b: Qube) -> Qube:
return operation(
a,
b,
SetOperation.UNION,
type(a),
def merge_metadata(qubes: list[Qube], axis) -> Iterable[Qube]:
"""
Given a list of qubes with identical structure,
match up the children of each node and merge the metadata
"""
# Group the children of each qube and merge them
# Exploit the fact that they have the same shape and ordering
example = qubes[0]
node_type = type(example)
for i in range(len(example.children)):
group = [q.children[i] for q in qubes]
group_example = group[0]
assert len(set((c.structural_hash for c in group))) == 1
# Collect metadata by key
metadata_groups = {
k: [q.metadata[k] for q in group] for k in group_example.metadata.keys()
}
# Concatenate the metadata together
metadata: frozendict[str, np.ndarray] = frozendict(
{
k: np.concatenate(metadata_group, axis=axis)
for k, metadata_group in metadata_groups.items()
}
)
group_children = merge_metadata(group, axis)
yield node_type.make_node(
key=group_example.key,
metadata=metadata,
values=group_example.values,
children=group_children,
)

View File

@ -2,9 +2,12 @@ from __future__ import annotations
import random
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable
from typing import TYPE_CHECKING, Callable, Iterable
import numpy as np
try:
from IPython.display import display
except ImportError:
display = None
if TYPE_CHECKING:
from .Qube import Qube
@ -30,8 +33,7 @@ def summarize_node(
while True:
summary = node.summary(**kwargs)
if "is_leaf" in node.metadata and node.metadata["is_leaf"]:
summary += " 🌿"
paths.append(summary)
if len(summary) > max_summary_length:
summary = summary[:max_summary_length] + "..."
@ -44,6 +46,10 @@ def summarize_node(
break
node = node.children[0]
# Add a "..." to represent nodes that we don't know about
if (not node.children) and (not node.is_leaf):
summaries.append("...")
return ", ".join(summaries), ",".join(paths), node
@ -71,27 +77,38 @@ def node_tree_to_string(node: Qube, prefix: str = "", depth=None) -> Iterable[st
def summarize_node_html(
node: Qube, collapse=False, max_summary_length=50, **kwargs
node: Qube,
collapse=False,
max_summary_length=50,
info: Callable[[Qube], str] | None = None,
**kwargs,
) -> tuple[str, Qube]:
"""
Extracts a summarized representation of the node while collapsing single-child paths.
Returns the summary string and the last node in the chain that has multiple children.
"""
if info is None:
def info_func(node: Qube, /):
return (
# f"dtype: {node.dtype}\n"
f"metadata: {dict(node.metadata)}\n"
)
else:
info_func = info
summaries = []
while True:
path = node.summary(**kwargs)
summary = path
if "is_leaf" in node.metadata and node.metadata["is_leaf"]:
summary += " 🌿"
if len(summary) > max_summary_length:
summary = summary[:max_summary_length] + "..."
info = (
f"dtype: {node.dtype.__name__}\n"
f"metadata: {dict((k, np.shape(v)) for k, v in node.metadata.items())}\n"
)
summary = f'<span class="qubed-node" data-path="{path}" title="{info}">{summary}</span>'
info_string = info_func(node)
summary = f'<span class="qubed-node" data-path="{path}" title="{info_string}">{summary}</span>'
summaries.append(summary)
if not collapse:
break
@ -101,13 +118,24 @@ def summarize_node_html(
break
node = node.children[0]
if (not node.children) and (not node.is_leaf):
summary = (
'<span class="qubed-node" data-path="" title="Truncated Nodes">...</span>'
)
summaries.append(summary)
return ", ".join(summaries), node
def _node_tree_to_html(
node: Qube, prefix: str = "", depth=1, connector="", **kwargs
node: Qube,
prefix: str = "",
depth=1,
connector="",
info: Callable[[Qube], str] | None = None,
**kwargs,
) -> Iterable[str]:
summary, node = summarize_node_html(node, **kwargs)
summary, node = summarize_node_html(node, info=info, **kwargs)
if len(node.children) == 0:
yield f'<span class="qubed-level">{connector}{summary}</span>'
@ -124,13 +152,20 @@ def _node_tree_to_html(
prefix + extension,
depth=depth - 1,
connector=prefix + connector,
info=info,
**kwargs,
)
yield "</details>"
def node_tree_to_html(
node: Qube, depth=1, include_css=True, include_js=True, css_id=None, **kwargs
node: Qube,
depth=1,
include_css=True,
include_js=True,
css_id=None,
info: Callable[[Qube], str] | None = None,
**kwargs,
) -> str:
if css_id is None:
css_id = f"qubed-tree-{random.randint(0, 1000000)}"
@ -215,5 +250,22 @@ def node_tree_to_html(
nodes.forEach(n => n.addEventListener("click", nodeOnClick));
</script>
""".replace("CSS_ID", css_id)
nodes = "".join(_node_tree_to_html(node=node, depth=depth, **kwargs))
nodes = "".join(_node_tree_to_html(node=node, depth=depth, info=info, **kwargs))
return f"{js if include_js else ''}{css if include_css else ''}<pre class='qubed-tree' id='{css_id}'>{nodes}</pre>"
def _display(qube: Qube, **kwargs):
if display is None:
print(qube)
else:
def info(node: Qube):
return f"""\
structural_hash = {node.structural_hash}
metadata = {dict(node.metadata)}
is_root = {node.is_root}
is_leaf = {node.is_leaf}
"""
kwargs = {"info": info} | kwargs
display(qube.html(**kwargs))

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import dataclasses
from abc import ABC, abstractmethod
from dataclasses import dataclass, replace
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from typing import (
TYPE_CHECKING,
@ -21,6 +21,11 @@ if TYPE_CHECKING:
@dataclass(frozen=True)
class ValueGroup(ABC):
@abstractmethod
def dtype(self) -> str:
"Provide a string rep of the datatype of these values"
pass
@abstractmethod
def summary(self) -> str:
"Provide a string summary of the value group."
@ -60,6 +65,20 @@ class ValueGroup(ABC):
T = TypeVar("T")
EnumValuesType = FrozenSet[T]
_dtype_map: dict[str, type] = {
"str": str,
"int64": int,
"float64": float,
"date": datetime,
}
_dtype_map_inv: dict[type, str] = {v: k for k, v in _dtype_map.items()}
_dtype_formatters = {
"str": str,
"int64": int,
"float64": float,
"date": datetime.fromisoformat,
}
@dataclass(frozen=True, order=True)
class QEnum(ValueGroup):
@ -69,9 +88,15 @@ class QEnum(ValueGroup):
"""
values: EnumValuesType
_dtype: str = "str"
def __init__(self, obj):
def __init__(self, obj, dtype="str"):
object.__setattr__(self, "values", tuple(sorted(obj)))
object.__setattr__(
self,
"_dtype",
dtype,
)
def __post_init__(self):
assert isinstance(self.values, tuple)
@ -88,6 +113,9 @@ class QEnum(ValueGroup):
def __contains__(self, value: Any) -> bool:
return value in self.values
def dtype(self):
return self._dtype
@classmethod
def from_strings(cls, values: Iterable[str]) -> Sequence[ValueGroup]:
return [cls(tuple(values))]
@ -96,7 +124,18 @@ class QEnum(ValueGroup):
return min(self.values)
def to_json(self):
return list(self.values)
return {"type": "enum", "dtype": self.dtype(), "values": self.values}
# @classmethod
# def from_json(cls, type: Literal["enum"], dtype: str, values: list):
# dtype_formatter = _dtype_formatters[dtype]
@classmethod
def from_list(cls, obj):
example = obj[0]
dtype = type(example)
assert [type(v) is dtype for v in obj]
return cls(obj, dtype=_dtype_map_inv[dtype])
@dataclass(frozen=True, order=True)
@ -114,7 +153,7 @@ class WildcardGroup(ValueGroup):
return "*"
def __len__(self):
return None
return 1
def __iter__(self):
return ["*"]
@ -122,6 +161,9 @@ class WildcardGroup(ValueGroup):
def __bool__(self):
return True
def dtype(self):
return "*"
@classmethod
def from_strings(cls, values: Iterable[str]) -> Sequence[ValueGroup]:
return [WildcardGroup()]
@ -374,17 +416,13 @@ class IntRange(Range):
return ranges
def values_from_json(obj) -> ValueGroup:
def values_from_json(obj: dict | list) -> ValueGroup:
if isinstance(obj, list):
return QEnum(tuple(obj))
return QEnum.from_list(obj)
match obj["dtype"]:
case "date":
return DateRange(**obj)
case "time":
return TimeRange(**obj)
case "int":
return IntRange(**obj)
match obj["type"]:
case "enum":
QEnum.from_json(**obj)
case _:
raise ValueError(f"Unknown dtype {obj['dtype']}")
@ -398,7 +436,7 @@ def convert_datatypes(q: "Qube", conversions: dict[str, ValueGroup]) -> "Qube":
)
for values_group in data_type.from_strings(q.values):
# print(values_group)
yield replace(q, data=replace(q.data, values=values_group))
yield q.replace(values=values_group)
else:
yield q

32
src/qube.proto Normal file
View File

@ -0,0 +1,32 @@
syntax = "proto3";
message NdArray {
repeated int64 shape = 1;
string dtype = 2;
bytes raw = 3;
}
message StringGroup {repeated string items = 1; }
// Stores values i.e class=1/2/3 the 1/2/3 part
message ValueGroup {
oneof payload {
StringGroup s = 1;
NdArray tensor = 2;
}
}
message MetadataGroup {
oneof payload {
NdArray tensor = 1;
}
}
message Qube {
string key = 1;
ValueGroup values = 2;
map<string, MetadataGroup> metadata = 3;
string dtype = 4;
repeated Qube children = 5;
bool is_root = 6;
}

View File

@ -1,334 +0,0 @@
#![allow(dead_code)]
use std::rc::Rc;
use smallstr::SmallString;
use slotmap::{new_key_type, SlotMap};
new_key_type! {
struct NodeId;
}
type CompactString = SmallString<[u8; 16]>;
#[derive(Clone)]
enum NodeValueTypes {
String(CompactString),
Int(i32),
}
impl From<&str> for NodeValueTypes {
fn from(s: &str) -> Self {
NodeValueTypes::String(CompactString::from(s))
}
}
impl From<i32> for NodeValueTypes {
fn from(i: i32) -> Self {
NodeValueTypes::Int(i)
}
}
enum NodeValue {
Single(NodeValueTypes),
Multiple(Vec<NodeValueTypes>),
}
struct Node<Payload> {
key: Rc<String>,
value: NodeValue,
parent: Option<NodeId>,
prev_sibling: Option<NodeId>,
next_sibling: Option<NodeId>,
// vector may be faster for traversal, but linkedlist should be faster for insertion
children: Option<(NodeId, NodeId)>, // (first_child, last_child)
data: Option<Payload>,
}
struct QueryTree<Payload> {
nodes: SlotMap<NodeId, Node<Payload>>,
}
impl<Payload> QueryTree<Payload> {
fn new() -> Self {
QueryTree {
nodes: SlotMap::with_key(),
}
}
// Adds a node with a key and single value
fn add_node<S>(&mut self, key: &Rc<String>, value: S, parent: Option<NodeId>) -> NodeId
where
S: Into<NodeValueTypes>,
{
let node_id = self.nodes.insert_with_key(|_| Node {
key: Rc::clone(key),
value: NodeValue::Single(value.into()),
parent,
prev_sibling: None,
next_sibling: None,
children: None,
data: None,
});
if let Some(parent_id) = parent {
// Determine if parent has existing children
if let Some((first_child_id, last_child_id)) = self.nodes[parent_id].children {
// Update the last child's `next_sibling`
{
let last_child = &mut self.nodes[last_child_id];
last_child.next_sibling = Some(node_id);
}
// Update the new node's `prev_sibling`
{
let new_node = &mut self.nodes[node_id];
new_node.prev_sibling = Some(last_child_id);
}
// Update parent's last child
let parent_node = &mut self.nodes[parent_id];
parent_node.children = Some((first_child_id, node_id));
} else {
// No existing children
let parent_node = &mut self.nodes[parent_id];
parent_node.children = Some((node_id, node_id));
}
}
node_id
}
// Add a single value to a node
fn add_value<S>(&mut self, node_id: NodeId, value: S)
where
S: Into<NodeValueTypes>,
{
if let Some(node) = self.nodes.get_mut(node_id) {
match &mut node.value {
NodeValue::Single(v) => {
let values = vec![v.clone(), value.into()];
node.value = NodeValue::Multiple(values);
}
NodeValue::Multiple(values) => {
values.push(value.into());
}
}
}
}
// Add multiple values to a node
fn add_values<S>(&mut self, node_id: NodeId, values: Vec<S>)
where
S: Into<NodeValueTypes>,
{
if let Some(node) = self.nodes.get_mut(node_id) {
match &mut node.value {
NodeValue::Single(v) => {
let mut new_values = vec![v.clone()];
new_values.extend(values.into_iter().map(|v| v.into()));
node.value = NodeValue::Multiple(new_values);
}
NodeValue::Multiple(existing_values) => {
existing_values.extend(values.into_iter().map(|v| v.into()));
}
}
}
}
fn get_node(&self, node_id: NodeId) -> Option<&Node<Payload>> {
self.nodes.get(node_id)
}
// TODO: better if this returns an iterator?
fn get_children(&self, node_id: NodeId) -> Vec<NodeId> {
let mut children = Vec::new();
if let Some(node) = self.get_node(node_id) {
if let Some((first_child_id, _)) = node.children {
let mut current_id = Some(first_child_id);
while let Some(cid) = current_id {
children.push(cid);
current_id = self.nodes[cid].next_sibling;
}
}
}
children
}
fn remove_node(&mut self, node_id: NodeId) {
// Remove the node and update parent and siblings
if let Some(node) = self.nodes.remove(node_id) {
// Update parent's children
if let Some(parent_id) = node.parent {
let parent_node = self.nodes.get_mut(parent_id).unwrap();
if let Some((first_child_id, last_child_id)) = parent_node.children {
if first_child_id == node_id && last_child_id == node_id {
// Node was the only child
parent_node.children = None;
} else if first_child_id == node_id {
// Node was the first child
parent_node.children = Some((node.next_sibling.unwrap(), last_child_id));
} else if last_child_id == node_id {
// Node was the last child
parent_node.children = Some((first_child_id, node.prev_sibling.unwrap()));
}
}
}
// Update siblings
if let Some(prev_id) = node.prev_sibling {
self.nodes[prev_id].next_sibling = node.next_sibling;
}
if let Some(next_id) = node.next_sibling {
self.nodes[next_id].prev_sibling = node.prev_sibling;
}
// Recursively remove children
let children_ids = self.get_children(node_id);
for child_id in children_ids {
self.remove_node(child_id);
}
}
}
fn is_root(&self, node_id: NodeId) -> bool {
self.nodes[node_id].parent.is_none()
}
fn is_leaf(&self, node_id: NodeId) -> bool {
self.nodes[node_id].children.is_none()
}
fn add_payload(&mut self, node_id: NodeId, payload: Payload) {
if let Some(node) = self.nodes.get_mut(node_id) {
node.data = Some(payload);
}
}
fn print_tree(&self) {
// Find all root nodes (nodes without a parent)
let roots: Vec<NodeId> = self
.nodes
.iter()
.filter_map(|(id, node)| {
if node.parent.is_none() {
Some(id)
} else {
None
}
})
.collect();
// Iterate through each root node and print its subtree
for (i, root_id) in roots.iter().enumerate() {
let is_last = i == roots.len() - 1;
self.print_node(*root_id, String::new(), is_last);
}
}
/// Recursively prints a node and its children.
///
/// - `node_id`: The current node's ID.
/// - `prefix`: The string prefix for indentation and branch lines.
/// - `is_last`: Boolean indicating if the node is the last child of its parent.
fn print_node(&self, node_id: NodeId, prefix: String, is_last: bool) {
// Retrieve the current node
let node = match self.nodes.get(node_id) {
Some(n) => n,
None => return, // Node not found; skip
};
// Determine the branch character
let branch = if prefix.is_empty() {
"" // Root node doesn't have a branch
} else if is_last {
"└── " // Last child
} else {
"├── " // Middle child
};
// Print the current node's key and values
print!("{}{}{}", prefix, branch, node.key);
match &node.value {
NodeValue::Single(v) => match v {
NodeValueTypes::String(s) => println!(": ({})", s),
NodeValueTypes::Int(i) => println!(": ({})", i),
},
NodeValue::Multiple(vs) => {
let values: Vec<String> = vs
.iter()
.map(|v| match v {
NodeValueTypes::String(s) => s.to_string(),
NodeValueTypes::Int(i) => i.to_string(),
})
.collect();
println!(": ({})", values.join(", "));
}
}
// Prepare the prefix for child nodes
let new_prefix = if prefix.is_empty() {
if is_last {
" ".to_string()
} else {
"".to_string()
}
} else {
if is_last {
format!("{} ", prefix)
} else {
format!("{}", prefix)
}
};
// Retrieve and iterate through child nodes
if let Some((_first_child_id, _last_child_id)) = node.children {
let children = self.get_children(node_id);
let total = children.len();
for (i, child_id) in children.iter().enumerate() {
let child_is_last = i == total - 1;
self.print_node(*child_id, new_prefix.clone(), child_is_last);
}
}
}
}
fn main() {
let mut tree: QueryTree<i16> = QueryTree::new();
let value = "hello";
let axis = Rc::new("foo".to_string());
let root_id = tree.add_node(&axis, value, None);
use std::time::Instant;
let now = Instant::now();
for _ in 0..100 {
// let child_value = format!("child_val{}", i);
let child_id = tree.add_node(&axis, value, Some(root_id));
// tree.add_value(child_id, value);
for _ in 0..100 {
// let gchild_value = format!("gchild_val{}", j);
let gchild_id = tree.add_node(&axis, value, Some(child_id));
// tree.add_values(gchild_id, vec![1, 2]);
for _ in 0..1000 {
// let ggchild_value = format!("ggchild_val{}", k);
let _ggchild_id = tree.add_node(&axis, value, Some(gchild_id));
// tree.add_value(_ggchild_id, value);
// tree.add_values(_ggchild_id, vec![1, 2, 3, 4]);
}
}
}
assert_eq!(tree.nodes.len(), 10_010_101);
let elapsed = now.elapsed();
println!("Elapsed: {:.2?}", elapsed);
// tree.print_tree();
}

View File

@ -0,0 +1,76 @@
use rsfdb::listiterator::KeyValueLevel;
use rsfdb::request::Request;
use rsfdb::FDB;
use serde_json::{json, Value};
use std::time::Instant;
use std::collections::HashMap;
pub mod tree;
use std::sync::Arc;
use std::sync::Mutex;
use tree::TreeNode;
#[pyclass(unsendable)]
pub struct PyFDB {
pub fdb: FDB,
}
#[pymethods]
impl PyFDB {
#[new]
#[pyo3(signature = (fdb_config=None))]
pub fn new(fdb_config: Option<&str>) -> PyResult<Self> {
let fdb = FDB::new(fdb_config)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(PyFDB { fdb })
}
/// Traverse the FDB with the given request.
pub fn traverse_fdb(
&self,
py: Python<'_>,
request: HashMap<String, Vec<String>>,
) -> PyResult<PyObject> {
let start_time = Instant::now();
let list_request = Request::from_json(json!(request))
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
// Use `fdb_guard` instead of `self.fdb`
let list = self
.fdb
.list(&list_request, true, true)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let mut root = TreeNode::new(KeyValueLevel {
key: "root".to_string(),
value: "root".to_string(),
level: 0,
});
for item in list {
py.check_signals()?;
if let Some(request) = &item.request {
root.insert(&request);
}
}
let duration = start_time.elapsed();
println!("Total runtime: {:?}", duration);
let py_dict = root.to_py_dict(py)?;
Ok(py_dict)
}
}
use pyo3::prelude::*;
#[pymodule]
fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyFDB>()?;
Ok(())
}

147
src/rust/formatters/mod.rs Normal file
View File

@ -0,0 +1,147 @@
use crate::{Node, NodeId, Qube};
use itertools::Itertools;
use itertools::Position;
impl Node {
/// Generate a human readable summary of the node
/// Examples include: key=value1/value2/.../valueN, key=value1/to/value1, key=*, root etc
pub fn summary(&self, qube: &Qube) -> String {
if self.is_root() {
return "root".to_string();
}
let key = &qube[self.key];
let values: String =
Itertools::intersperse(self.values.iter().map(|id| &qube[*id]), "/").collect();
format!("{}={}", key, values)
}
pub fn html_summary(&self, qube: &Qube) -> String {
if self.is_root() {
return r#"<span class="qubed-node">root</span>"#.to_string();
}
let key = &qube[self.key];
let values: String =
Itertools::intersperse(self.values.iter().map(|id| &qube[*id]), "/").collect();
let summary = format!("{}={}", key, values);
let path = summary.clone();
let info = format!("is_root: {}", self.is_root());
format!(r#"<span class="qubed-node" data-path="{path}" title="{info}">{summary}</span>"#)
}
}
struct NodeSummary {
summary: String,
end: NodeId,
}
enum SummaryType {
PlainText,
HTML,
}
/// Given a Node, traverse the tree until a node has more than one child.
/// Returns a summary of the form "key1=v1/v2, key2=v1/v2/v3, key3=v1"
/// and the id of the last node in the summary
fn summarise_nodes(qube: &Qube, node_id: &NodeId, summary_type: SummaryType) -> NodeSummary {
let mut node_id = *node_id;
let mut summary_vec = vec![];
loop {
let node = &qube[node_id];
let summary = match summary_type {
SummaryType::PlainText => node.summary(&qube),
SummaryType::HTML => node.html_summary(&qube),
};
summary_vec.push(summary);
// Bail out if the node has anothing other than 1 child.
match node.has_exactly_one_child() {
Some(n) => node_id = n,
None => break,
};
}
NodeSummary {
summary: summary_vec.join(", "),
end: node_id,
}
}
fn qube_to_tree(qube: &Qube, node_id: &NodeId, prefix: &str, depth: usize) -> String {
let NodeSummary {
summary,
end: node_id,
} = summarise_nodes(qube, node_id, SummaryType::PlainText);
let mut output: Vec<String> = Vec::new();
if depth <= 0 {
return format!("{} - ...\n", summary);
} else {
output.push(format!("{}\n", summary));
}
let node = &qube[node_id];
for (position, child_id) in node.children().with_position() {
let (connector, extension) = match position {
Position::Last | Position::Only => ("└── ", " "),
_ => ("├── ", ""),
};
output.extend([
prefix.to_string(),
connector.to_string(),
qube_to_tree(qube, child_id, &format!("{prefix}{extension}"), depth - 1),
]);
}
output.join("")
}
fn qube_to_html(qube: &Qube, node_id: &NodeId, prefix: &str, depth: usize) -> String {
let NodeSummary {
summary,
end: node_id,
} = summarise_nodes(qube, node_id, SummaryType::PlainText);
let node = &qube[node_id];
let mut output: Vec<String> = Vec::new();
let open = if depth > 0 { "open" } else { "" };
output.push(format!(
r#"<details {open}><summary class="qubed-level">{summary}</summary>"#
));
for (position, child_id) in node.children().with_position() {
let (connector, extension) = match position {
Position::Last | Position::Only => ("└── ", " "),
_ => ("├── ", ""),
};
output.extend([
prefix.to_string(),
connector.to_string(),
qube_to_tree(qube, child_id, &format!("{prefix}{extension}"), depth - 1),
]);
}
output.join("")
}
impl Qube {
/// Return a string version of the Qube in the format
/// root
/// ├── class=od, expver=0001/0002, param=1/2
/// └── class=rd, param=1/2/3
pub fn string_tree(&self) -> String {
qube_to_tree(&self, &self.root, "", 5)
}
/// Return an HTML version of the Qube which renders like this
/// root
/// ├── class=od, expver=0001/0002, param=1/2
/// └── class=rd, param=1/2/3
/// But under the hood children are represented with a details/summary tag and each key=value is a span
/// CSS and JS functionality is bundled inside.
pub fn html_tree(&self) -> String {
qube_to_html(&self, &self.root, "", 5)
}
}

View File

@ -1,140 +1,235 @@
#![allow(unused_imports)]
// #![allow(dead_code)]
// #![allow(unused_variables)]
use std::collections::HashMap;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use pyo3::types::{PyDict, PyInt, PyList, PyString};
use python_interface::QubeError;
use std::collections::HashMap;
use std::iter;
use pyo3::prelude::*;
use std::hash::Hash;
use std::rc::Rc;
#[pyfunction]
fn hello(_py: Python, name: &str) -> PyResult<String> {
Ok(format!("Hello, {}!", name))
}
#[pymodule]
fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(hello, m)?).unwrap();
Ok(())
}
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
struct NodeId(usize);
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
struct StringId(usize);
struct Node {
key: StringId,
metadata: HashMap<StringId, Vec<String>>,
parent: NodeId,
values: Vec<String>,
children: HashMap<StringId, Vec<NodeId>>,
}
struct Qube {
root: NodeId,
nodes: Vec<Node>,
strings: Vec<String>,
}
use lasso::{Rodeo, Spur};
use std::num::NonZero;
use std::ops;
mod serialisation;
mod python_interface;
mod formatters;
mod set_operations;
// This data structure uses the Newtype Index Pattern
// See https://matklad.github.io/2018/06/04/newtype-index-pattern.html
// See also https://github.com/nrc/r4cppp/blob/master/graphs/README.md#rcrefcellnode for a discussion of other approaches to trees and graphs in rust.
// https://smallcultfollowing.com/babysteps/blog/2015/04/06/modeling-graphs-in-rust-using-vector-indices/
// Index types use struct Id(NonZero<usize>)
// This reserves 0 as a special value which allows Option<Id(NonZero<usize>)> to be the same size as usize.
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub(crate) struct NodeId(NonZero<usize>);
// Allow node indices to index directly into Qubes:
impl ops::Index<NodeId> for Qube {
type Output = Node;
fn index(&self, index: NodeId) -> &Node {
&self.nodes[index.0.get() - 1]
}
}
impl ops::IndexMut<NodeId> for Qube {
fn index_mut(&mut self, index: NodeId) -> &mut Node {
&mut self.nodes[index.0.get() - 1]
}
}
impl ops::Index<StringId> for Qube {
type Output = str;
fn index(&self, index: StringId) -> &str {
&self.strings[index.0]
&self.strings[index]
}
}
impl ops::Index<NodeId> for Qube {
type Output = Node;
fn index(&self, index: NodeId) -> &Node {
&self.nodes[index.0]
impl NodeId {
pub fn new(value: usize) -> Option<NodeId> {
NonZero::new(value).map(NodeId)
}
}
// use rsfdb::listiterator::KeyValueLevel;
// use rsfdb::request::Request;
// use rsfdb::FDB;
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
struct StringId(lasso::Spur);
// use serde_json::{json, Value};
// use std::time::Instant;
impl ops::Index<StringId> for lasso::Rodeo {
type Output = str;
fn index(&self, index: StringId) -> &str {
&self[index.0]
}
}
#[derive(Debug, Clone)]
pub(crate) struct Node {
pub key: StringId,
pub metadata: HashMap<StringId, Vec<String>>,
pub parent: Option<NodeId>, // If not present, it's the root node
pub values: Vec<StringId>,
pub children: HashMap<StringId, Vec<NodeId>>,
}
impl Node {
fn new_root(q: &mut Qube) -> Node {
Node {
key: q.get_or_intern("root"),
metadata: HashMap::new(),
parent: None,
values: vec![],
children: HashMap::new(),
}
}
fn children(&self) -> impl Iterator<Item = &NodeId> {
self.children.values().flatten()
}
fn is_root(&self) -> bool {
self.parent.is_none()
}
/// Because children are stored grouped by key
/// determining the number of children quickly takes a little effort.
/// This is a fast method for the special case of checking if a Node has exactly one child.
/// Returns Ok(NodeId) if there is one child else None
fn has_exactly_one_child(&self) -> Option<NodeId> {
if self.children.len() != 1 {return None}
let Some(value_group) = self.children.values().next() else {return None};
let [node_id] = &value_group.as_slice() else {return None};
Some(*node_id)
}
fn n_children(&self) -> usize {
self.children
.values()
.map(|v| v.len())
.sum()
}
fn keys<'a>(&'a self, q: &'a Qube) -> impl Iterator<Item = &'a str> {
self.children.keys()
.map(|s| {&q[*s]})
}
}
#[derive(Debug, Clone)]
#[pyclass(subclass, dict)]
pub struct Qube {
pub root: NodeId,
nodes: Vec<Node>,
strings: Rodeo,
}
impl Qube {
pub fn new() -> Self {
let mut q = Self {
root: NodeId::new(1).unwrap(),
nodes: Vec::new(),
strings: Rodeo::default(),
};
let root = Node::new_root(&mut q);
q.nodes.push(root);
q
}
fn get_or_intern(&mut self, val: &str) -> StringId {
StringId(self.strings.get_or_intern(val))
}
pub(crate) fn add_node(&mut self, parent: NodeId, key: &str, values: impl IntoIterator<Item = impl AsRef<str>>) -> NodeId {
let key_id = self.get_or_intern(key);
let values = values.into_iter().map(|val| self.get_or_intern(val.as_ref())).collect();
// Create the node object
let node = Node {
key: key_id,
metadata: HashMap::new(),
values: values,
parent: Some(parent),
children: HashMap::new(),
};
// Insert it into the Qube arena and determine its id
self.nodes.push(node);
let node_id = NodeId::new(self.nodes.len()).unwrap();
// Add a reference to this node's id to the parents list of children.
let parent_node = &mut self[parent];
let key_group = parent_node.children.entry(key_id).or_insert(Vec::new());
key_group.push(node_id);
node_id
}
fn print(&self, node_id: Option<NodeId>) -> String {
let node_id: NodeId = node_id.unwrap_or(self.root);
let node = &self[node_id];
node.summary(&self)
}
fn get_node_ref(&self, id: NodeId) -> NodeRef {
let node = &self[id];
NodeRef { id: id, node: &node, qube: &self }
}
pub fn get_string_id(&self, s: &str) -> Option<StringId> {
self.strings.get(s)
.map(|id| StringId(id))
}
}
// use std::collections::HashMap;
#[pymodule]
fn rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Qube>()?;
m.add("QubeError", py.get_type::<python_interface::QubeError>())?;
Ok(())
}
// pub mod tree;
// use std::sync::Arc;
// use std::sync::Mutex;
// use tree::TreeNode;
// #[pyclass(unsendable)]
// pub struct PyFDB {
// pub fdb: FDB,
// }
pub struct NodeRef<'a> {
pub id: NodeId,
pub node: &'a Node,
pub qube: &'a Qube,
}
// #[pymethods]
// impl PyFDB {
// #[new]
// #[pyo3(signature = (fdb_config=None))]
// pub fn new(fdb_config: Option<&str>) -> PyResult<Self> {
// let fdb = FDB::new(fdb_config)
// .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
// Ok(PyFDB { fdb })
// }
impl<'a> NodeRef<'a> {
pub fn keys(&self) -> impl Iterator<Item = &str> {
self.node.keys(self.qube)
}
// /// Traverse the FDB with the given request.
// pub fn traverse_fdb(
// &self,
// py: Python<'_>,
// request: HashMap<String, Vec<String>>,
// ) -> PyResult<PyObject> {
// let start_time = Instant::now();
fn flat_children(&'a self) -> impl Iterator<Item = Self> {
self.node.children
.values()
.flatten()
.map(|id| {
NodeRef { id: *id, node: &self.qube[*id], qube: self.qube }
})
}
// let list_request = Request::from_json(json!(request))
// .map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
fn children_by_key(&'a self, key: &str) -> impl Iterator<Item = Self> {
let id = self.qube.get_string_id(key);
let children = id
.map(|i| self.node.children.get(&i))
.flatten();
// // Use `fdb_guard` instead of `self.fdb`
// let list = self
// .fdb
// .list(&list_request, true, true)
// .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
children.map(
|ids| ids.into_iter().map(
|id| {
NodeRef { id: *id, node: &self.qube[*id], qube: self.qube }
})).into_iter().flatten()
}
// let mut root = TreeNode::new(KeyValueLevel {
// key: "root".to_string(),
// value: "root".to_string(),
// level: 0,
// });
// for item in list {
// py.check_signals()?;
// if let Some(request) = &item.request {
// root.insert(&request);
// }
// }
// let duration = start_time.elapsed();
// println!("Total runtime: {:?}", duration);
// let py_dict = root.to_py_dict(py)?;
// Ok(py_dict)
// }
// }
// use pyo3::prelude::*;
// #[pymodule]
// fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
// m.add_class::<PyFDB>()?;
// Ok(())
// }
}

View File

@ -0,0 +1,179 @@
use crate::{Node, NodeId, Qube, NodeRef};
use pyo3::prelude::*;
use pyo3::types::{PyList, PyType};
use core::borrow;
use std::ops::Deref;
use std::cell::Ref;
use crate::set_operations;
use crate::serialisation;
use itertools::Itertools;
use pyo3::create_exception;
create_exception!(qubed, QubeError, pyo3::exceptions::PyException);
/// A reference to a particular node in a Qube
#[pyclass]
pub struct PyNodeRef {
id: NodeId,
qube: Py<Qube>, // see https://pyo3.rs/v0.23.1/types for a discussion of Py<T> and Bound<'py, T>
}
fn into_py_node_ref(node_ref: NodeRef, qube: Py<Qube>) -> PyNodeRef {
PyNodeRef {
id: node_ref.id,
qube: qube,
}
}
#[pymethods]
impl PyNodeRef {
fn __repr__(&self, py: Python) -> PyResult<String> {
// Get the Py<Qube> reference, bind it to the GIL.
let qube = self.qube.bind(py);
fn repr_helper<'py>(node_id: NodeId, qube: &Bound<'py, Qube>) -> String {
let node = &qube.borrow()[node_id];
let key = &qube.borrow()[node.key];
let children = node
.children
.values()
.flatten()
.map(|child_id| repr_helper(child_id.clone(), qube))
.collect::<Vec<String>>()
.join(", ");
format!("Node({}, {})", key, children)
}
Ok(repr_helper(self.id, qube))
}
fn __str__(&self, py: Python) -> String {
let qube = self.qube.bind(py).borrow();
let node = &qube[self.id];
let key = &qube.strings[node.key];
format!("Node({})", key)
}
#[getter]
pub fn get_children(&self, py: Python) -> Vec<Self> {
let qube = self.qube.bind(py).borrow();
let node = &qube[self.id];
node.children
.values()
.flatten()
.map(|child_id| Self {
id: *child_id,
qube: self.qube.clone_ref(py),
})
.collect()
}
}
#[derive(FromPyObject)]
pub enum OneOrMany<T> {
One(T),
Many(Vec<T>),
}
// Todo: Is there a way to rewrite this so that is doesn't allocate?
// Perhaps by returning an iterator?
impl<T> Into<Vec<T>> for OneOrMany<T> {
fn into(self) -> Vec<T> {
match self {
OneOrMany::One(v) => vec![v],
OneOrMany::Many(vs) => vs,
}
}
}
#[pymethods]
impl Qube {
#[new]
pub fn py_new() -> Self {
Qube::new()
}
#[pyo3(name = "add_node")]
pub fn py_add_node(
slf: Bound<'_, Self>,
parent: PyRef<'_, PyNodeRef>,
key: &str,
values: OneOrMany<String>,
) -> PyResult<PyNodeRef> {
// Check that the given parent is actually in this qube and not another one
if !parent.qube.bind(slf.py()).is(&slf) {
return Err(QubeError::new_err("Supplied parent node is not in the target qube."))
}
// massage values from T | Vec<T> into Vec<T>
let values: Vec<String> = values.into();
let mut q = slf.borrow_mut();
let node_id = q.add_node(parent.id, key, &values);
Ok(PyNodeRef { id: node_id, qube: slf.into()})
}
pub fn set_root(
slf: Bound<'_, Self>,
node: PyRef<'_, PyNodeRef>,
) -> () {
let mut q = slf.borrow_mut();
q.root = node.id;
}
#[getter]
fn get_root(slf: Bound<'_, Self>) -> PyResult<PyNodeRef> {
Ok(PyNodeRef {
id: slf.borrow().root,
qube: slf.unbind(),
})
}
fn __repr__(&self) -> String {
// format!("{:?}", self)
let nodes_str: String = self.nodes.iter()
.enumerate()
.map(|(id, node)| {
format!("{{id: {}, key: {}, values: [{}], children: [{}]}}",
id+1,
&self[node.key],
node.values.iter().map(|s| &self[*s]).join(", "),
node.children().map(|n| n.0).join(", "),
)
}).join(", ");
format!("Qube {{root: {}, nodes: {}}}", self.root.0, nodes_str)
}
fn __str__<'py>(&self) -> String {
self.string_tree()
}
fn _repr_html_(&self) -> String {
self.html_tree()
}
#[pyo3(name = "print")]
fn py_print(&self) -> String {
self.print(Option::None)
}
#[getter]
pub fn get_children(slf: Bound<'_, Self>, py: Python) -> PyResult<Vec<PyNodeRef>> {
let root = PyNodeRef {
id: slf.borrow().root,
qube: slf.unbind(),
};
Ok(root.get_children(py))
}
#[staticmethod]
pub fn from_json(data: &str) -> Result<Self, serialisation::JSONError> {
serialisation::from_json(data)
}
pub fn __or__(slf: Bound<'_, Self>, other: Bound<'_, Qube>) -> Qube {
set_operations::set_operation(&slf.borrow(), &other.borrow(), set_operations::Op::Union)
}
}

View File

@ -0,0 +1,80 @@
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use crate::{Node, NodeId, Qube};
// Use a newtype wrapper to allow us to implement auto conversion from serde_json::Error to PyErr
// via a wrapper intermediate
// see https://pyo3.rs/main/function/error-handling.html#foreign-rust-error-types
pub struct JSONError(serde_json::Error);
impl From<JSONError> for PyErr {
fn from(error: JSONError) -> Self {
PyValueError::new_err(format!("{}", error.0))
}
}
impl From<serde_json::Error> for JSONError {
fn from(other: serde_json::Error) -> Self {
Self(other)
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "dtype")]
enum Ranges {
Int64{values: Vec<(i64, i64)>}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "dtype", rename_all = "lowercase")]
enum Enum {
Str{values: Vec<String>}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type", rename_all = "lowercase")]
enum Values {
Wildcard{},
Enum(Enum),
Range(Ranges)
}
#[derive(Serialize, Deserialize, Debug)]
struct JSONQube {
key: String,
values: Values,
metadata: HashMap<String, String>,
children: Vec<JSONQube>,
}
fn add_nodes(qube: &mut Qube, parent: NodeId, nodes: &[JSONQube]) -> Vec<NodeId> {
nodes
.iter()
.map(|json_node| {
let values = match &json_node.values {
Values::Wildcard{} => &vec!["*"],
Values::Enum(Enum::Str{values}) => &values.iter().map(|s| s.as_str()).collect(),
Values::Range(_) => todo!(),
};
let node_id = qube.add_node(parent, &json_node.key, values);
//
add_nodes(qube, node_id, &json_node.children);
node_id
})
.collect()
}
pub fn from_json(data: &str) -> Result<Qube, JSONError> {
// Parse the string of data into serde_json::Value.
let json_qube: JSONQube = serde_json::from_str(data).expect("JSON parsing failed");
let mut qube = Qube::new();
let root = qube.root;
add_nodes(&mut qube, root, &json_qube.children);
Ok(qube)
}

View File

@ -0,0 +1,2 @@
mod json;
pub use json::{from_json, JSONError};

View File

@ -0,0 +1,40 @@
use crate::NodeRef;
use crate::{Node, NodeId, Qube};
use itertools::chain;
use std::collections::HashSet;
pub enum Op {
Union,
Intersection,
Difference,
SymmetricDifference,
}
fn op_to_venn_diagram(op: Op) -> (bool, bool, bool) {
use Op::*;
match op {
Union => (true, true, true),
Intersection => (false, true, false),
Difference => (true, false, false),
SymmetricDifference => (true, false, true),
}
}
pub fn set_operation<'a>(a: &'a Qube, b: &'a Qube, op: Op) -> Qube {
todo!()
// _set_operation(a.root_ref(), a.root_ref(), op)
}
// fn _set_operation<'a>(a: NodeRef, b: NodeRef, op: Op) -> Qube {
// let keys: HashSet<&str> = HashSet::from_iter(chain(a.keys(), b.keys()));
// for key in keys {
// let a = a.children_by_key(key)
// }
// todo!()
// }
pub fn set_operation_inplace<'a>(a: &'a mut Qube, b: &'a Qube, op: Op) -> &'a Qube {
a
}

View File

@ -1,82 +0,0 @@
// use pyo3::prelude::*;
// use pyo3::types::PyDict;
// use rsfdb::listiterator::KeyValueLevel;
// use serde_json::Value;
// #[derive(Debug)]
// pub struct TreeNode {
// pub key: KeyValueLevel,
// pub children: Vec<TreeNode>,
// }
// impl TreeNode {
// pub fn new(key: KeyValueLevel) -> Self {
// TreeNode {
// key,
// children: Vec::new(),
// }
// }
// pub fn insert(&mut self, path: &[KeyValueLevel]) {
// if path.is_empty() {
// return;
// }
// let kvl = &path[0];
// // Check if a child with the same key and value exists
// if let Some(child) = self.children.iter_mut().find(|child| child.key == *kvl) {
// // Insert the remaining path into the existing child
// child.insert(&path[1..]);
// } else {
// // Create a new child node
// let mut new_child = TreeNode::new(kvl.clone());
// new_child.insert(&path[1..]);
// self.children.push(new_child);
// }
// }
// pub fn traverse<F>(&self, level: usize, callback: &F)
// where
// F: Fn(&TreeNode, usize),
// {
// callback(self, level);
// for child in &self.children {
// child.traverse(level + 1, callback);
// }
// }
// pub fn to_json(&self) -> Value {
// let formatted_key = format!("{}={}", self.key.key, self.key.value);
// let children_json: Value = if self.children.is_empty() {
// Value::Object(serde_json::Map::new())
// } else {
// Value::Object(
// self.children
// .iter()
// .map(|child| {
// (
// format!("{}={}", child.key.key, child.key.value),
// child.to_json(),
// )
// })
// .collect(),
// )
// };
// // Combine the formatted key with children
// serde_json::json!({ formatted_key: children_json })
// }
// pub fn to_py_dict(&self, py: Python) -> PyResult<PyObject> {
// let py_dict = PyDict::new(py);
// for child in &self.children {
// let child_key = format!("{}={}", child.key.key, child.key.value);
// py_dict.set_item(child_key, child.to_py_dict(py)?)?;
// }
// Ok(py_dict.to_object(py))
// }
// }

View File

@ -58,8 +58,14 @@ if "LOCAL_CACHE" in os.environ:
with open("../tests/example_qubes/extremes_dt.json") as f:
qubes["climate-dt"] = qubes["climate-dt"] | Qube.from_json(json.load(f))
with open("../config/climate-dt/language.yaml", "r") as f:
with open("../tests/example_qubes/od.json") as f:
qubes["climate-dt"] = qubes["climate-dt"] | Qube.from_json(json.load(f))
with open("../config/language/language.yaml", "r") as f:
mars_language = yaml.safe_load(f)["_field"]
with open("../config/language/paramids.yaml", "r") as f:
params = yaml.safe_load(f)
else:
print("Getting climate and extremes dt data from github")
qubes["climate-dt"] = Qube.from_json(
@ -172,11 +178,11 @@ async def union(
def follow_query(request: dict[str, str | list[str]], qube: Qube):
s = qube.select(request, mode="next_level", prune=True, consume=False)
s = qube.select(request, mode="next_level", consume=False)
by_path = defaultdict(lambda: {"paths": set(), "values": set()})
for request, node in s.leaf_nodes():
if not node.data.metadata["is_leaf"]:
if not node.metadata.get("is_leaf", True):
by_path[node.key]["values"].update(node.values.values)
by_path[node.key]["paths"].add(frozendict(request))
@ -282,7 +288,19 @@ async def get_STAC(
def make_link(key_name, paths, values):
"""Take a MARS Key and information about which paths matched up to this point and use it to make a STAC Link"""
href_template = f"/stac?{request_params}{'&' if request_params else ''}{key_name}={{{key_name}}}"
values_from_mars_language = mars_language.get(key_name, {}).get("values", [])
print(f"{key_name = }")
if key_name == "param":
print(params)
values_from_mars_language = params
value_descriptions = [
max(params.get(int(v), [""]), key=len) for v in values
]
print(value_descriptions)
else:
values_from_mars_language = mars_language.get(key_name, {}).get(
"values", []
)
if all(isinstance(v, list) for v in values_from_mars_language):
value_descriptions_dict = {
@ -291,7 +309,9 @@ async def get_STAC(
if len(v) > 1
for k in v[:-1]
}
value_descriptions = [value_descriptions_dict.get(v, "") for v in values]
value_descriptions = [
value_descriptions_dict.get(v, "") for v in values
]
if not any(value_descriptions):
value_descriptions = None

View File

@ -9,7 +9,7 @@ pre#qube {
margin-left: 0;
}
.qubed-node a {
.qubed-level a {
margin-left: 10px;
text-decoration: none;
}
@ -23,7 +23,7 @@ pre#qube {
display: block;
}
summary:hover,span.leaf:hover {
span.qubed-node:hover {
background-color: #f0f0f0;
}
@ -35,7 +35,7 @@ pre#qube {
content: " ▼";
}
.leaf {
.qubed-level {
text-overflow: ellipsis;
overflow: hidden;
text-wrap: nowrap;

81
test_scripts/rust.py Normal file
View File

@ -0,0 +1,81 @@
from __future__ import annotations
from datetime import datetime
from typing import Sequence
from qubed.rust import Qube as rsQube
# q = pyQube.from_tree("""
# root, class=d1
# ├── dataset=another-value, generation=1/2/3
# └── dataset=climate-dt/weather-dt, generation=1/2/3/4
# """)
# json_str = json.dumps(q.to_json())
# rust_qube = Qube.from_json(json_str)
# # print(repr(rust_qube))
# # print(json_str)
# expected = """root, class=d1
# ├── dataset=another-value, generation=1/2/3
# └── dataset=climate-dt/weather-dt, generation=1/2/3/4
# """
# assert repr(rust_qube) == expected
# # print(rs_qube._repr_html_())
# print(q | q)
value = str | int | float | datetime
class Qube(rsQube):
@classmethod
def empty(cls):
q = cls()
print(f"empty called {cls = } {q = }")
return q
@classmethod
def from_datacube(cls, datacube: dict[str, value | Sequence[value]]) -> Qube:
qube = cls.empty()
(key, values), *key_vals = list(datacube.items())
node = qube.add_node(qube.root, key, values)
for key, values in key_vals:
node = qube.add_node(parent=node, key=key, values=values)
return qube
@classmethod
def from_dict(cls, d: dict) -> Qube:
q = cls.empty()
def from_dict(parent, d: dict):
for k, children in d.items():
key, values = k.split("=")
values = values.split("/")
node = q.add_node(
parent=parent,
key=key,
values=values,
)
from_dict(parent=node, d=children)
from_dict(q.root, d)
return q
q = Qube.from_datacube({"a": ["4"], "b": "test", "c": ["1", "2", "3"]})
print(q)
print(repr(q))
q = Qube.from_dict(
{
"a=2/3": {"b=1": {}},
"a2=a/b": {"b2=1/2": {}},
}
)
print(q)
print(repr(q))

View File

@ -0,0 +1,99 @@
# Example script for ingesting data from an fdb into a qube
# Notes
# Uses fdb --compact
# Splits by data in order to avoid out of memory problems with fdb --compact
# Does a bit of processing like removing "year" and "month" keys
# Might want to add datatypes and reordering of keys there too
import json
import subprocess
from datetime import datetime, timedelta
from time import time
import psutil
from qubed import Qube
from tqdm import tqdm
import requests
process = psutil.Process()
CHUNK_SIZE = timedelta(days=60)
FILEPATH = "tests/example_qubes/full_dt.json"
API = "https://qubed.lumi.apps.dte.destination-earth.eu/api/v1"
with open("config/api.secret", "r") as f:
secret = f.read()
def ecmwf_date(d):
return d.strftime("%Y%m%d")
start_date = datetime.now() - timedelta(days=120)
# start_date = datetime(1990, 1, 1)
# end_date = datetime.now()
end_date = datetime(2026, 1, 1)
current_span = [end_date - CHUNK_SIZE, end_date]
try:
qube = Qube.load(FILEPATH)
except:
qube = Qube.empty()
while current_span[0] > start_date:
for config in ["config/config-climate-dt.yaml", "config/config-extremes-dt.yaml"]:
t0 = time()
start, end = map(ecmwf_date, current_span)
print(f"Doing {config} {current_span[0].date()} - {current_span[1].date()}")
print(f"Current memory usage: {process.memory_info().rss / 1e9:.2g}GB")
print(f"{qube.n_nodes = }, {qube.n_leaves = },")
subqube = Qube.empty()
command = [
f"fdb list --compact --config {config} --minimum-keys=date class=d1,date={start}/{end}"
]
try:
p = subprocess.run(
command,
text=True,
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
check=True,
)
except Exception as e:
print(f"Failed for {current_span} {e}")
continue
print("Got compact list")
for i, line in tqdm(enumerate(list(p.stdout.split("\n")))):
if not line.startswith("retrieve,class="):
continue
def split(t):
return t[0], t[1].split("/")
# Could do datatypes here
request = dict(split(v.split("=")) for v in line.strip().split(",")[1:])
request.pop("year", None)
request.pop("month", None)
# Could do things like date = year + month + day
q = Qube.from_datacube(request)
subqube = subqube | q
print("added to qube")
qube = qube | subqube
subqube.print(depth=2)
print(f"{subqube.n_nodes = }, {subqube.n_leaves = },")
requests.post(
API + "/union/climate-dt/",
headers = {"Authorization" : f"Bearer {secret}"},
json = subqube.to_json())
current_span = [current_span[0] - CHUNK_SIZE, current_span[0]]
print(
f"Did that taking {(time() - t0) / CHUNK_SIZE.days:2g} seconds per day ingested, total {(time() - t0):2g}s"
)
with open(FILEPATH, "w") as f:
json.dump(qube.to_json(), f)

View File

@ -1,36 +1,40 @@
from qubed import Qube
d = {
"class=od": {
"expver=0001": {"param=1": {}, "param=2": {}},
"expver=0002": {"param=1": {}, "param=2": {}},
},
"class=rd": {
"expver=0001": {"param=1": {}, "param=2": {}, "param=3": {}},
"expver=0002": {"param=1": {}, "param=2": {}},
},
}
q = Qube.from_dict(d)
def test_eq():
r = Qube.from_dict(d)
assert q == r
q = Qube.from_tree("""
root
class=od
expver=0001
param=1
param=2
expver=0002
param=1
param=2
class=rd
expver=0001
param=1
param=2
param=3
expver=0002
param=1
param=2
""")
def test_getitem():
assert q["class", "od"] == Qube.from_dict(
{
"expver=0001": {"param=1": {}, "param=2": {}},
"expver=0002": {"param=1": {}, "param=2": {}},
}
)
assert q["class", "od"]["expver", "0001"] == Qube.from_dict(
{
"param=1": {},
"param=2": {},
}
)
assert q["class", "od"] == Qube.from_tree("""
root
expver=0001
param=1
param=2
expver=0002
param=1
param=2
""")
assert q["class", "od"]["expver", "0001"] == Qube.from_tree("""
root
param=1
param=2""")
def test_n_leaves():

View File

@ -2,7 +2,7 @@ from qubed import Qube
def test_json_round_trip():
u = Qube.from_dict(
from_dict = Qube.from_dict(
{
"class=d1": {
"dataset=climate-dt/weather-dt": {
@ -14,5 +14,54 @@ def test_json_round_trip():
}
}
)
json = u.to_json()
assert Qube.from_json(json) == u
from_tree = Qube.from_tree("""
root, class=d1
dataset=another-value, generation=1/2/3
dataset=climate-dt/weather-dt, generation=1/2/3/4
""")
from_json = Qube.from_json(
{
"key": "root",
"values": ["root"],
"metadata": {},
"children": [
{
"key": "class",
"values": ["d1"],
"metadata": {},
"children": [
{
"key": "dataset",
"values": ["another-value"],
"metadata": {},
"children": [
{
"key": "generation",
"values": ["1", "2", "3"],
"metadata": {},
"children": [],
}
],
},
{
"key": "dataset",
"values": ["climate-dt", "weather-dt"],
"metadata": {},
"children": [
{
"key": "generation",
"values": ["1", "2", "3", "4"],
"metadata": {},
"children": [],
}
],
},
],
}
],
}
)
assert from_tree == from_json
assert from_tree == from_dict

View File

@ -20,14 +20,6 @@ root
expver=0002, param=1/2
""".strip()
as_html = """
<details open><summary class="qubed-level"><span class="qubed-node" data-path="root" title="dtype: str\nmetadata: {}\n">root</span></summary><span class="qubed-level"> <span class="qubed-node" data-path="class=od" title="dtype: str\nmetadata: {}\n">class=od</span>, <span class="qubed-node" data-path="expver=0001/0002" title="dtype: str\nmetadata: {}\n">expver=0001/0002</span>, <span class="qubed-node" data-path="param=1/2" title="dtype: str\nmetadata: {}\n">param=1/2</span></span><details open><summary class="qubed-level"> <span class="qubed-node" data-path="class=rd" title="dtype: str\nmetadata: {}\n">class=rd</span></summary><span class="qubed-level"> <span class="qubed-node" data-path="expver=0001" title="dtype: str\nmetadata: {}\n">expver=0001</span>, <span class="qubed-node" data-path="param=1/2/3" title="dtype: str\nmetadata: {}\n">param=1/2/3</span></span><span class="qubed-level"> <span class="qubed-node" data-path="expver=0002" title="dtype: str\nmetadata: {}\n">expver=0002</span>, <span class="qubed-node" data-path="param=1/2" title="dtype: str\nmetadata: {}\n">param=1/2</span></span></details></details>
""".strip()
def test_string():
assert str(q).strip() == as_string
def test_html():
assert as_html in q._repr_html_()

View File

@ -16,3 +16,29 @@ def test_iter_leaves_simple():
]
assert set(make_hashable(q.leaves())) == set(make_hashable(entries))
def test_datacubes():
q = Qube.from_tree("""
root, class=d1
date=19920101/19930101/19940101, params=1/2/3
date=19950101
level=1/2/3, params=1/2/3/4
params=1/2/3/4
""")
assert len(list(q.datacubes())) == 3
assert list(q.datacubes()) == [
{
"class": ["d1"],
"date": ["19920101", "19930101", "19940101"],
"params": ["1", "2", "3"],
},
{
"class": ["d1"],
"date": ["19950101"],
"level": ["1", "2", "3"],
"params": ["1", "2", "3", "4"],
},
{"class": ["d1"], "date": ["19950101"], "params": ["1", "2", "3", "4"]},
]

View File

@ -1,45 +1,98 @@
from frozendict import frozendict
from qubed import Qube
# from frozendict import frozendict
# from qubed import Qube
def make_set(entries):
return set((frozendict(a), frozendict(b)) for a, b in entries)
# def make_set(entries):
# return set((frozendict(a), frozendict(b)) for a, b in entries)
# def construction():
# q = Qube.from_nodes(
# {
# "class": dict(values=["od", "rd"]),
# "expver": dict(values=[1, 2]),
# "stream": dict(
# values=["a", "b", "c"], metadata=dict(number=list(range(12)))
# ),
# }
# )
# assert make_set(q.leaves_with_metadata()) == make_set([
# ({'class': 'od', 'expver': 1, 'stream': 'a'}, {'number': 0}),
# ({'class': 'od', 'expver': 1, 'stream': 'b'}, {'number': 1}),
# ({'class': 'od', 'expver': 1, 'stream': 'c'}, {'number': 2}),
# ({'class': 'od', 'expver': 2, 'stream': 'a'}, {'number': 3}),
# ({'class': 'od', 'expver': 2, 'stream': 'b'}, {'number': 4}),
# ({'class': 'od', 'expver': 2, 'stream': 'c'}, {'number': 5}),
# ({'class': 'rd', 'expver': 1, 'stream': 'a'}, {'number': 6}),
# ({'class': 'rd', 'expver': 1, 'stream': 'b'}, {'number': 7}),
# ({'class': 'rd', 'expver': 1, 'stream': 'c'}, {'number': 8}),
# ({'class': 'rd', 'expver': 2, 'stream': 'a'}, {'number': 9}),
# ({'class': 'rd', 'expver': 2, 'stream': 'b'}, {'number': 10}),
# ({'class': 'rd', 'expver': 2, 'stream': 'c'}, {'number': 11})])
def test_simple_union():
q = Qube.from_nodes(
{
"class": dict(values=["od", "rd"]),
"expver": dict(values=[1, 2]),
"stream": dict(
values=["a", "b", "c"], metadata=dict(number=list(range(12)))
),
}
)
# def test_simple_union():
# q = Qube.from_nodes(
# {
# "class": dict(values=["od", "rd"]),
# "expver": dict(values=[1, 2]),
# "stream": dict(
# values=["a", "b", "c"], metadata=dict(number=list(range(12)))
# ),
# }
# )
r = Qube.from_nodes(
{
"class": dict(values=["xd"]),
"expver": dict(values=[1, 2]),
"stream": dict(
values=["a", "b", "c"], metadata=dict(number=list(range(12, 18)))
),
}
)
# r = Qube.from_nodes(
# {
# "class": dict(values=["xd"]),
# "expver": dict(values=[1, 2]),
# "stream": dict(
# values=["a", "b", "c"], metadata=dict(number=list(range(12, 18)))
# ),
# }
# )
expected_union = Qube.from_nodes(
{
"class": dict(values=["od", "rd", "xd"]),
"expver": dict(values=[1, 2]),
"stream": dict(
values=["a", "b", "c"], metadata=dict(number=list(range(18)))
),
}
)
# expected_union = Qube.from_nodes(
# {
# "class": dict(values=["od", "rd", "xd"]),
# "expver": dict(values=[1, 2]),
# "stream": dict(
# values=["a", "b", "c"], metadata=dict(number=list(range(18)))
# ),
# }
# )
union = q | r
# union = q | r
assert union == expected_union
assert make_set(expected_union.leaves_with_metadata()) == make_set(
union.leaves_with_metadata()
)
# assert union == expected_union
# assert make_set(expected_union.leaves_with_metadata()) == make_set(
# union.leaves_with_metadata()
# )
# def test_construction_from_fdb():
# import json
# paths = {}
# current_path = None
# i = 0
# qube = Qube.empty()
# with open("tests/data/climate_dt_paths.json") as f:
# for l in f.readlines():
# i += 1
# j = json.loads(l)
# if "type" in j and j["type"] == "path":
# paths[j["i"]] = j["path"]
# else:
# request = j.pop("keys")
# metadata = j
# # print(request, metadata)
# q = Qube.from_nodes({
# key : dict(values = [value])
# for key, value in request.items()
# }).add_metadata(**metadata)
# qube = qube | q
# if i > 100: break

12
tests/test_protobuf.py Normal file
View File

@ -0,0 +1,12 @@
from qubed import Qube
def test_protobuf_simple():
q = Qube.from_tree("""
root, class=d1
dataset=another-value, generation=1/2/3
dataset=climate-dt/weather-dt, generation=1/2/3/4
""")
wire = q.to_protobuf()
round_trip = Qube.from_protobuf(wire)
assert round_trip == q

View File

@ -1,5 +1,21 @@
from qubed.rust import hello
from __future__ import annotations
def test_hello():
assert hello("World") == "Hello, World!"
from qubed.rust import Qube as Qube
# def test_from_json():
# q = pyQube.from_tree("""
# root, class=d1
# ├── dataset=another-value, generation=1/2/3
# └── dataset=climate-dt/weather-dt, generation=1/2/3/4
# """)
# json_str = json.dumps(q.to_json())
# rust_qube = Qube.from_json(json_str)
# print(repr(rust_qube))
# expected = """root, class=d1
# ├── dataset=another-value, generation=1/2/3
# └── dataset=climate-dt/weather-dt, generation=1/2/3/4
# """
# assert repr(rust_qube) == expected

View File

@ -1,41 +1,27 @@
from qubed import Qube
q = Qube.from_dict(
{
"class=od": {
"expver=0001": {"param=1": {}, "param=2": {}},
"expver=0002": {"param=1": {}, "param=2": {}},
},
"class=rd": {"param=1": {}, "param=2": {}, "param=3": {}},
}
)
q = Qube.from_tree("""
root
class=od, expver=0001/0002, param=1/2
class=rd, param=1/2/3
""")
def test_consumption():
assert q.select({"expver": "0001"}, consume=True) == Qube.from_dict(
{"class=od": {"expver=0001": {"param=1": {}, "param=2": {}}}}
assert q.select({"expver": "0001"}, consume=True) == Qube.from_tree(
"root, class=od, expver=0001, param=1/2"
)
def test_consumption_off():
expected = Qube.from_dict(
{
"class=od": {"expver=0001": {"param=1": {}, "param=2": {}}},
"class=rd": {"param=1": {}, "param=2": {}, "param=3": {}},
}
)
expected = Qube.from_tree("""
root
class=od, expver=0001, param=1/2
class=rd, param=1/2/3
""")
assert q.select({"expver": "0001"}, consume=False) == expected
def test_require_match():
expected = Qube.from_dict(
{
"class=od": {"expver=0001": {"param=1": {}, "param=2": {}}},
}
)
assert q.select({"expver": "0001"}, require_match=True) == expected
def test_function_input_to_select():
q = Qube.from_tree("""
root, frequency=6:00:00

View File

@ -1,12 +1,18 @@
from qubed import Qube
def set_operation_testcase(testcase):
def set_operation_testcase(name, testcase):
q1 = Qube.from_tree(testcase["q1"])
q2 = Qube.from_tree(testcase["q2"])
assert q1 | q2 == Qube.from_tree(testcase["union"])
assert q1 & q2 == Qube.from_tree(testcase["intersection"])
assert q1 - q2 == Qube.from_tree(testcase["q1 - q2"])
assert q1 | q2 == Qube.from_tree(testcase["union"]), (
f"Case: {name} Op: Union\n {q1 = }\n {q2 = }\n {q1 | q2 = }\n expected = {testcase['union']}\n"
)
assert q1 & q2 == Qube.from_tree(testcase["intersection"]), (
f"Case: {name} Op: Intersection\n {q1 = }\n {q2 = }\n {q1 - q2 = }\n expected = {testcase['intersection']}\n"
)
assert q1 - q2 == Qube.from_tree(testcase["difference"]), (
f"Case: {name} Op: Difference\n {q1 = }\n {q2 = }\n {q1 - q2 = }\n expected = {testcase['difference']}\n"
)
# These are a bunch of testcases where q1 and q2 are specified and then their union/intersection/difference are checked
@ -19,30 +25,27 @@ def set_operation_testcase(testcase):
# "q2": str(q2),
# "union": str(q1 | q2),
# "intersection": str(q1 & q2),
# "q1 - q2": str(q1 - q2),
# "difference": str(q1 - q2),
# }
# BUT MANUALLY CHECK THE OUTPUT BEFORE ADDING IT AS A TEST CASE!
testcases = [
# Simplest case, only leaves differ
{
testcases = {
"Simplest case, only leaves differ": {
"q1": "root, a=1, b=1, c=1",
"q2": "root, a=1, b=1, c=2",
"union": "root, a=1, b=1, c=1/2",
"intersection": "root",
"q1 - q2": "root",
"difference": "root, a=1, b=1, c=1",
},
# Some overlap but also each tree has unique items
{
"Some overlap but also each tree has unique items": {
"q1": "root, a=1, b=1, c=1/2/3",
"q2": "root, a=1, b=1, c=2/3/4",
"union": "root, a=1, b=1, c=1/2/3/4",
"intersection": "root, a=1, b=1, c=2/3",
"q1 - q2": "root",
"difference": "root, a=1, b=1, c=1",
},
# Overlap at two levels
{
"Overlap at two levels": {
"q1": "root, a=1, b=1/2, c=1/2/3",
"q2": "root, a=1, b=2/3, c=2/3/4",
"union": """
@ -52,26 +55,48 @@ testcases = [
b=3, c=2/3/4
""",
"intersection": "root, a=1, b=2, c=2/3",
"q1 - q2": "root",
"difference": """
root, a=1
b=1, c=1/2/3
b=2, c=1""",
},
# Check that we can merge even if the divergence point is higher
{
"Simple difference": {
"q1": "root, a=1, b=1, c=1/2/3",
"q2": "root, a=1, b=1, c=2",
"union": "root, a=1, b=1, c=1/2/3",
"intersection": "root, a=1, b=1, c=2",
"difference": "root, a=1, b=1, c=1/3",
},
"Check that we can merge even if the divergence point is higher": {
"q1": "root, a=1, b=1, c=1",
"q2": "root, a=2, b=1, c=1",
"union": "root, a=1/2, b=1, c=1",
"intersection": "root",
"q1 - q2": "root, a=1, b=1, c=1",
"difference": "root, a=1, b=1, c=1",
},
# Two equal qubes
{
"Two equal qubes": {
"q1": "root, a=1, b=1, c=1",
"q2": "root, a=1, b=1, c=1",
"union": "root, a=1, b=1, c=1",
"intersection": "root, a=1, b=1, c=1",
"q1 - q2": "root",
"difference": "root",
},
# With wildcards
{
"Two qubes that don't compress on their own but the union does": {
"q1": """
root
a=1/3, b=1
a=2, b=1/2
""",
"q2": "root, a=1/3, b=2",
"union": "root, a=1/2/3, b=1/2",
"intersection": "root",
"difference": """
root
a=1/3, b=1
a=2, b=1/2
""",
},
"With wildcards": {
"q1": "root, frequency=*, levtype=*, param=*, levelist=*, domain=a/b/c/d",
"q2": "root, frequency=*, levtype=*, param=*, domain=a/b/c/d",
"union": """
@ -80,14 +105,21 @@ testcases = [
levelist=*, domain=a/b/c/d
""",
"intersection": "root",
"q1 - q2": "root",
"difference": "root, frequency=*, levtype=*, param=*, levelist=*, domain=a/b/c/d",
},
]
"Merging wildcard groups": {
"q1": "root, levtype=pl, param=q, levelist=100/1000, quantile=*",
"q2": "root, levtype=pl, param=t, levelist=100/1000, quantile=*",
"union": "root, levtype=pl, param=q/t, levelist=100/1000, quantile=*",
"intersection": "root",
"difference": "root, levtype=pl, param=q, levelist=100/1000, quantile=*",
},
}
def test_cases():
for case in testcases:
set_operation_testcase(case)
for name, case in testcases.items():
set_operation_testcase(name, case)
def test_leaf_conservation():

View File

@ -1,17 +1,12 @@
from qubed import Qube
q = Qube.from_dict(
{
"class=od": {
"expver=0001": {"param=1": {}, "param=2": {}},
"expver=0002": {"param=1": {}, "param=2": {}},
},
"class=rd": {
"expver=0001": {"param=1": {}, "param=2": {}, "param=3": {}},
"expver=0002": {"param=1": {}, "param=2": {}},
},
}
)
q = Qube.from_tree("""
root
class=od, expver=0001/0002, param=1/2
class=rd
expver=0001, param=1/2/3
expver=0002, param=1/2
""")
wild_datacube = {
"class": "*",