Update tree compresser

This commit is contained in:
Tom Hodson 2024-12-02 09:49:45 +00:00
parent 25ff51c71e
commit 08b5d10a26
6 changed files with 149 additions and 77 deletions

View File

@ -7,7 +7,7 @@ edition = "2021"
rsfdb = {git = "https://github.com/ecmwf/rsfdb", branch = "develop"} rsfdb = {git = "https://github.com/ecmwf/rsfdb", branch = "develop"}
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
pyo3 = "0.23.1" pyo3 = "0.23"
[lib] [lib]

View File

@ -1 +1,2 @@
from . import rust as backend from . import rust as backend
from .CompressedTree import CompressedTree

View File

@ -4,85 +4,79 @@
use rsfdb::listiterator::KeyValueLevel; use rsfdb::listiterator::KeyValueLevel;
use rsfdb::request::Request; use rsfdb::request::Request;
use rsfdb::FDB; // Make sure the `fdb` crate is correctly specified in the dependencies use rsfdb::FDB;
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::time::Instant; use std::time::Instant;
use pyo3::prelude::*; use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList, PyString}; use pyo3::types::{PyDict, PyInt, PyList, PyString};
use crate::tree::TreeNode;
use std::collections::HashMap; use std::collections::HashMap;
/// Formats the sum of two numbers as string. pub mod tree;
#[pyfunction] use std::sync::Arc;
#[pyo3(signature = (request, fdb_config = None))] use std::sync::Mutex;
fn traverse_fdb( use tree::TreeNode;
request: HashMap<String, Vec<String>>,
fdb_config: Option<&str>,
) -> PyResult<String> {
let start_time = Instant::now();
let fdb = FDB::new(fdb_config).unwrap();
let list_request = #[pyclass(unsendable)]
Request::from_json(json!(request)).expect("Failed to create request from python dict"); pub struct PyFDB {
pub fdb: FDB,
}
let list = fdb.list(&list_request, true, true).unwrap(); #[pymethods]
impl PyFDB {
// for item in list { #[new]
// for kvl in item.request { #[pyo3(signature = (fdb_config=None))]
// println!("{:?}", kvl); pub fn new(fdb_config: Option<&str>) -> PyResult<Self> {
// } let fdb = FDB::new(fdb_config)
// } .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(PyFDB { fdb })
let mut root = TreeNode::new(KeyValueLevel {
key: "root".to_string(),
value: "root".to_string(),
level: 0,
});
for item in list {
if let Some(request) = &item.request {
root.insert(&request);
}
} }
// Traverse and print the tree /// Traverse the FDB with the given request.
root.traverse(0, &|node, level| { pub fn traverse_fdb(
let indent = " ".repeat(level); &self,
println!("{}{}={}", indent, node.key.key, node.key.value); py: Python<'_>,
}); request: HashMap<String, Vec<String>>,
) -> PyResult<PyObject> {
let start_time = Instant::now();
// Convert the tree to JSON let list_request = Request::from_json(json!(request))
// let json_output = root.to_json(); .map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
// // Print the JSON output // Use `fdb_guard` instead of `self.fdb`
// // println!("{}", serde_json::to_string_pretty(&json_output).unwrap()); let list = self
// std::fs::write( .fdb
// "output.json", .list(&list_request, true, true)
// serde_json::to_string_pretty(&json_output).unwrap(), .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
// )
// .expect("Unable to write file");
// let duration = start_time.elapsed(); let mut root = TreeNode::new(KeyValueLevel {
// println!("Total runtime: {:?}", duration); key: "root".to_string(),
value: "root".to_string(),
level: 0,
});
Ok(("test").to_string()) for item in list {
py.check_signals()?;
if let Some(request) = &item.request {
root.insert(&request);
}
}
let duration = start_time.elapsed();
println!("Total runtime: {:?}", duration);
let py_dict = root.to_py_dict(py)?;
Ok(py_dict)
}
} }
use pyo3::prelude::*; use pyo3::prelude::*;
/// Formats the sum of two numbers as string.
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
Ok((a + b + 2).to_string())
}
/// A Python module implemented in Rust. The name of this function must match
/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to
/// import the module.
#[pymodule] #[pymodule]
fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> { fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(traverse_fdb, m)?) m.add_class::<PyFDB>()?;
Ok(())
} }

View File

@ -1,7 +1,12 @@
use pyo3::prelude::*;
use pyo3::types::PyDict;
use rsfdb::listiterator::KeyValueLevel;
use serde_json::Value;
#[derive(Debug)] #[derive(Debug)]
pub struct TreeNode { pub struct TreeNode {
key: KeyValueLevel, pub key: KeyValueLevel,
children: Vec<TreeNode>, pub children: Vec<TreeNode>,
} }
impl TreeNode { impl TreeNode {
@ -63,4 +68,23 @@ impl TreeNode {
// Combine the formatted key with children // Combine the formatted key with children
serde_json::json!({ formatted_key: children_json }) serde_json::json!({ formatted_key: children_json })
} }
pub fn to_py_dict(&self, py: Python) -> PyResult<PyObject> {
let py_dict = PyDict::new(py);
let formatted_key = format!("{}={}", self.key.key, self.key.value);
if self.children.is_empty() {
py_dict.set_item(formatted_key, PyDict::new(py))?;
} else {
let children_dict = PyDict::new(py);
for child in &self.children {
let child_key = format!("{}={}", child.key.key, child.key.value);
children_dict.set_item(child_key, child.to_py_dict(py)?)?;
}
py_dict.set_item(formatted_key, children_dict)?;
}
Ok(py_dict.to_object(py))
}
} }

View File

@ -0,0 +1,12 @@
from tree_traverser import backend, CompressedTree
from pathlib import Path
data_path = Path("data/compressed_tree_climate_dt.json")
# Print size of file
print(f"climate dt compressed tree: {data_path.stat().st_size // 1e6:.1f} MB")
print("Opening json file")
compressed_tree = CompressedTree.load(data_path)
print("Printing compressed tree")
print(compressed_tree.reconstruct_compressed_ecmwf_style())

View File

@ -1,26 +1,67 @@
from tree_traverser import backend from tree_traverser import backend, CompressedTree
import datetime
import psutil
from tqdm import tqdm
from pathlib import Path
import json
from more_itertools import chunked
process = psutil.Process()
def massage_request(r):
return {k : v if isinstance(v, list) else [v]
for k, v in r.items()}
if __name__ == "__main__":
config = """ config = """
--- ---
type: remote type: remote
host: databridge-prod-catalogue1-ope.ewctest.link host: databridge-prod-catalogue1-ope.ewctest.link
port: 10000 port: 10000
engine: remote engine: remote
store: remote store: remote
""" """
def massage_request(r): request = {
return {k : v if isinstance(v, list) else [v] "class": "d1",
for k, v in r.items()} "dataset": "climate-dt",
# "date": "19920420",
}
data_path = Path("data/compressed_tree_climate_dt.json")
if not data_path.exists():
compressed_tree = CompressedTree({})
else:
compressed_tree = CompressedTree.load(data_path)
request = { fdb = backend.PyFDB(fdb_config = config)
"class": "d1",
"dataset": "extremes-dt",
"expver": "0001",
"stream": "oper",
"date": ["20241117", "20241116"],
}
backend.traverse_fdb(massage_request(request), fdb_config = config) visited_path = Path("data/visited_dates.json")
if not visited_path.exists():
visited_dates = set()
else:
with open(visited_path, "r") as f:
visited_dates = set(json.load(f))
today = datetime.datetime.today()
start = datetime.datetime.strptime("19920420", "%Y%m%d")
date_list = [start + datetime.timedelta(days=x) for x in range((today - start).days)]
date_list = [d.strftime("%Y%m%d") for d in date_list if d not in visited_dates]
for dates in chunked(tqdm(date_list), 5):
print(dates[0])
print(f"Memory usage: {(process.memory_info().rss)/1e6:.1f} MB")
r = request | dict(date = dates)
tree = fdb.traverse_fdb(massage_request(r))
compressed_tree.insert_tree(tree)
compressed_tree.save(data_path)
for date in dates:
visited_dates.add(date)
with open(visited_path, "w") as f:
json.dump(list(visited_dates), f)
# print(compressed_tree.reconstruct_compressed_ecmwf_style())