diff --git a/Cargo.toml b/Cargo.toml index 826afd2..7ca0036 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,14 @@ repository = "https://github.com/ecmwf/qubed" # rsfdb = {git = "https://github.com/ecmwf/rsfdb", branch = "develop"} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -pyo3 = "0.23" + +# For fdb binding +libc = "0.2" +libloading = "0.6" +once_cell = "1.8" + +[dependencies.pyo3] +version = "0.23" [package.metadata.maturin] version-from-git = true @@ -18,8 +25,6 @@ name = "tree_traverser" crate-type = ["cdylib"] path = "./src/rust/lib.rs" -# [patch.'https://github.com/ecmwf/rsfdb'] -# rsfdb = { path = "../rsfdb" } - -# [patch.'https://github.com/ecmwf-projects/rsfindlibs'] -# rsfindlibs = { path = "../rsfindlibs" } +[features] +extension-module = ["pyo3/extension-module"] +default = ["extension-module"] diff --git a/src/rust/fdb/dataretriever.rs b/src/rust/fdb/dataretriever.rs new file mode 100644 index 0000000..94615d7 --- /dev/null +++ b/src/rust/fdb/dataretriever.rs @@ -0,0 +1,139 @@ +use std::io; +use std::io::ErrorKind; +use std::io::SeekFrom; + +use super::request::Request; +use super::FDB; +use super::FDBLIB; + +#[repr(C)] +pub struct FdbDataReader { + _empty: [u8; 0], +} + +pub struct DataRetriever { + datareader: *mut FdbDataReader, + opened: bool, +} + +impl DataRetriever { + pub fn new(fdb: &FDB, request: &Request) -> Result { + // Create a new data reader + let mut datareader: *mut FdbDataReader = std::ptr::null_mut(); + let result = unsafe { (FDBLIB.fdb_new_datareader)(&mut datareader) }; + if result != 0 { + return Err("Failed to create data reader".into()); + } + + // Retrieve data + let result = unsafe { (FDBLIB.fdb_retrieve)(fdb.handle, request.as_ptr(), datareader) }; + if result != 0 { + unsafe { (FDBLIB.fdb_delete_datareader)(datareader) }; + return Err("Failed to initiate data retrieval".into()); + } + + Ok(Self { + datareader, + opened: false, + }) + } + + pub fn open(&mut self) -> Result<(), io::Error> { + if !self.opened { + let result = + unsafe { (FDBLIB.fdb_datareader_open)(self.datareader, std::ptr::null_mut()) }; + if result != 0 { + return Err(io::Error::new( + ErrorKind::Other, + "Failed to open data reader", + )); + } + self.opened = true; + } + Ok(()) + } + + pub fn close(&mut self) { + if self.opened { + unsafe { (FDBLIB.fdb_datareader_close)(self.datareader) }; + self.opened = false; + } + } + + pub fn tell(&mut self) -> Result { + self.open()?; + let mut pos = 0; + let result = unsafe { (FDBLIB.fdb_datareader_tell)(self.datareader, &mut pos) }; + if result != 0 { + return Err(io::Error::new( + ErrorKind::Other, + "Failed to tell in data reader", + )); + } + Ok(pos) + } +} + +impl std::io::Seek for DataRetriever { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + let new_pos = match pos { + SeekFrom::Start(offset) => offset as libc::c_long, + + SeekFrom::End(_offset) => { + // Don't know size of stream, so can't seek from end + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "Seek from end is not supported for this stream", + )); + } + + SeekFrom::Current(offset) => { + let current_pos = self.tell()? as i64; + (current_pos + offset) as libc::c_long + } + }; + + let result = unsafe { (FDBLIB.fdb_datareader_seek)(self.datareader, new_pos) }; + if result != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + "Failed to seek in data reader", + )) + } else { + Ok(new_pos as u64) + } + } +} +impl std::io::Read for DataRetriever { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.open()?; + + let mut read = 0; + let result = unsafe { + (FDBLIB.fdb_datareader_read)( + self.datareader, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len() as libc::c_long, + &mut read, + ) + }; + + if result != 0 { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to read from data reader", + )) + } else { + Ok(read as usize) + } + } +} + +impl Drop for DataRetriever { + fn drop(&mut self) { + self.close(); + unsafe { + (FDBLIB.fdb_delete_datareader)(self.datareader); + } + } +} diff --git a/src/rust/fdb/key.rs b/src/rust/fdb/key.rs new file mode 100644 index 0000000..0b908a9 --- /dev/null +++ b/src/rust/fdb/key.rs @@ -0,0 +1,55 @@ +use std::ffi::CString; + +use super::CKey; +use super::FDBLIB; + +pub struct Key { + key: *mut CKey, +} + +#[macro_export] +macro_rules! create_key { + ($($key:expr => $values:expr),* $(,)?) => {{ + let mut key = Key::new().unwrap(); + $( + let _ = key.set($key, &$values); + )* + key + }}; +} + +impl Key { + pub fn new() -> Result { + let mut key_ptr: *mut CKey = std::ptr::null_mut(); + let result = unsafe { (FDBLIB.fdb_new_key)(&mut key_ptr) }; + + if result != 0 { + return Err("Failed to create new key".into()); + } + + let key = Self { key: key_ptr }; + + Ok(key) + } + + pub fn set(&mut self, key: &str, value: &str) -> Result<(), String> { + let param_c_str = CString::new(key).map_err(|e| e.to_string())?; + let value_c_str = CString::new(value).map_err(|e| e.to_string())?; + + let result = + unsafe { (FDBLIB.fdb_key_add)(self.key, param_c_str.as_ptr(), value_c_str.as_ptr()) }; + + if result != 0 { + return Err("Failed to add key/value".into()); + } + Ok(()) + } +} + +impl Drop for Key { + fn drop(&mut self) { + unsafe { + (FDBLIB.fdb_delete_key)(self.key); + } + } +} diff --git a/src/rust/fdb/listiterator.rs b/src/rust/fdb/listiterator.rs new file mode 100644 index 0000000..41f90ff --- /dev/null +++ b/src/rust/fdb/listiterator.rs @@ -0,0 +1,180 @@ +use libc::size_t; +use std::ffi::CStr; +use std::os::raw::c_char; + +use super::{FdbListIterator, FdbSplitKey}; +use super::{Request, FDB, FDBLIB}; + +// Represents an individual key-value pair like {class : rd}, level = 0 +#[derive(Debug, PartialEq, Clone)] +pub struct KeyValueLevel { + pub key: String, + pub value: String, + pub level: usize, +} +#[derive(Debug, PartialEq)] +pub struct ListItem { + pub uri: String, + pub offset: usize, + pub length: usize, + pub request: Option>, +} + +pub struct ListIterator { + handle: *mut FdbListIterator, + key: bool, // Whether we're extracting keys or just path, len, offset for each list item. +} + +impl ListIterator { + pub fn new(fdb: &FDB, request: &Request, key: bool, duplicates: bool) -> Result { + let mut it: *mut FdbListIterator = std::ptr::null_mut(); + + let result = + unsafe { (FDBLIB.fdb_list)(fdb.handle, request.as_ptr(), &mut it, duplicates) }; + if result != 0 { + return Err(format!("fdb_list failed with error code {}", result)); + } + + if it.is_null() { + return Err("fdb_list returned a null iterator".into()); + } + + Ok(ListIterator { + handle: it, + key: key, + }) + } + + // Extracts the keys and values from the list item + pub fn get_request_for_key(&self) -> Result, String> { + if !self.key { + return Err("Getting keys is not enabled for this iterator.".into()); + } + + let mut key_ptr: *mut FdbSplitKey = std::ptr::null_mut(); + let result = unsafe { (FDBLIB.fdb_new_splitkey)(&mut key_ptr) }; + if result != 0 { + return Err(format!( + "fdb_new_splitkey failed with error code {}", + result + )); + } + + let result = unsafe { (FDBLIB.fdb_listiterator_splitkey)(self.handle, key_ptr) }; + if result != 0 { + return Err(format!( + "fdb_listiterator_splitkey failed with error code {}", + result + )); + } + + if key_ptr.is_null() { + return Err("fdb_listiterator_splitkey returned a null key".into()); + } + + let mut metadata = Vec::new(); + + loop { + let mut k: *const c_char = std::ptr::null(); + let mut v: *const c_char = std::ptr::null(); + let mut level: size_t = 0; + + let meta_result = + unsafe { (FDBLIB.fdb_splitkey_next_metadata)(key_ptr, &mut k, &mut v, &mut level) }; + if meta_result != 0 || k.is_null() || v.is_null() { + break; // No more metadata + } + + let key = unsafe { + CStr::from_ptr(k) + .to_str() + .map_err(|_| "Invalid UTF-8 in splitkey key".to_string())? + .to_owned() + }; + + let value = unsafe { + CStr::from_ptr(v) + .to_str() + .map_err(|_| "Invalid UTF-8 in splitkey value".to_string())? + .to_owned() + }; + + metadata.push(KeyValueLevel { + key, + value, + level: level as usize, + }); + } + + // Clean up the splitkey instance + unsafe { + (FDBLIB.fdb_delete_splitkey)(key_ptr); + } + + Ok(metadata) + } +} + +impl Iterator for ListIterator { + type Item = ListItem; + + fn next(&mut self) -> Option { + // Advance the iterator + let result = unsafe { (FDBLIB.fdb_listiterator_next)(self.handle) }; + if result != 0 { + // Assuming non-zero indicates no more items or an error + return None; + } + + // Retrieve attributes + let mut uri_ptr: *const c_char = std::ptr::null(); + let mut off: size_t = 0; + let mut len: size_t = 0; + + let attrs_result = unsafe { + (FDBLIB.fdb_listiterator_attrs)(self.handle, &mut uri_ptr, &mut off, &mut len) + }; + if attrs_result != 0 || uri_ptr.is_null() { + // Handle error or end of iteration + return None; + } + + // Convert C string to Rust String + let uri = unsafe { + CStr::from_ptr(uri_ptr) + .to_str() + .unwrap_or("Invalid UTF-8") + .to_owned() + }; + + // If we're extracting keys, do it. + let request = if self.key { + match self.get_request_for_key() { + Ok(data) => Some(data), + Err(e) => { + eprintln!("Error retrieving splitkey metadata: {}", e); + None + } + } + } else { + None + }; + + Some(ListItem { + uri, + offset: off as usize, + length: len as usize, + request, + }) + } +} + +impl Drop for ListIterator { + fn drop(&mut self) { + unsafe { + if !self.handle.is_null() { + (FDBLIB.fdb_delete_listiterator)(self.handle); + } + } + } +} diff --git a/src/rust/fdb/macros.rs b/src/rust/fdb/macros.rs new file mode 100644 index 0000000..21e7dd4 --- /dev/null +++ b/src/rust/fdb/macros.rs @@ -0,0 +1,32 @@ +#[macro_export] +macro_rules! generate_library_wrapper { + ( + $lib_name:ident { + $( + fn $func_name:ident($($arg_name:ident : $arg_type:ty),* $(,)?) $(-> $ret_type:ty)?; + )* + } + ) => { + + pub struct $lib_name { + lib: Arc, + $( + pub $func_name: unsafe extern "C" fn($($arg_type),*) $(-> $ret_type)?, + )* + } + + impl $lib_name { + pub fn load(lib: libloading::Library) -> Result> { + let arc_lib = Arc::new(lib); + Ok(Self { + $( + $func_name: unsafe { + *arc_lib.get:: $ret_type)?>(concat!(stringify!($func_name), "\0").as_bytes())? + }, + )* + lib: arc_lib, + }) + } + } + }; +} diff --git a/src/rust/fdb/mod.rs b/src/rust/fdb/mod.rs new file mode 100644 index 0000000..33fc920 --- /dev/null +++ b/src/rust/fdb/mod.rs @@ -0,0 +1,209 @@ +extern crate libc; +use libc::{c_char, c_int, c_long, size_t}; + +use std::ffi::CString; + +pub mod dataretriever; +pub mod key; +pub mod listiterator; +pub mod request; + +use dataretriever::DataRetriever; +use dataretriever::FdbDataReader; +use listiterator::ListIterator; +use request::CRequest; +use request::Request; + +use libloading::Library; +use once_cell::sync::Lazy; + +mod macros; +use crate::generate_library_wrapper; +use std::sync::Arc; + +// FDB C API functions +generate_library_wrapper! { + FdbApiWrapper { + fn fdb_new_handle(fdb: *mut *mut FdbHandle) -> c_int; + fn fdb_initialise() -> c_int; + fn fdb_new_handle_from_yaml(fdb: *mut *mut FdbHandle, system_config: *const c_char, user_config: *const c_char) -> c_int; + fn fdb_retrieve(fdb: *mut FdbHandle, req: *mut CRequest, dr: *mut FdbDataReader) -> c_int; + fn fdb_archive_multiple(fdb: *mut FdbHandle, req: *mut CRequest, data: *const c_char, length: size_t) -> c_int; + fn fdb_flush(fdb: *mut FdbHandle) -> c_int; + fn fdb_delete_handle(fdb: *mut FdbHandle); + + // Data reader functions + fn fdb_new_datareader(dr: *mut *mut FdbDataReader) -> c_int; + fn fdb_datareader_open(dr: *mut FdbDataReader, size: *mut c_long) -> c_int; + fn fdb_datareader_close(dr: *mut FdbDataReader) -> c_int; + fn fdb_datareader_tell(dr: *mut FdbDataReader, pos: *mut c_long) -> c_int; + fn fdb_datareader_seek(dr: *mut FdbDataReader, pos: c_long) -> c_int; + // fn fdb_datareader_skip(dr: *mut FdbDataReader, count: c_long) -> c_int; + fn fdb_datareader_read(dr: *mut FdbDataReader, buf: *mut libc::c_void, count: c_long, read: *mut c_long) -> c_int; + fn fdb_delete_datareader(dr: *mut FdbDataReader); + + // Key functions + fn fdb_new_key(key: *mut *mut CKey) -> c_int; + fn fdb_key_add(key: *mut CKey, param: *const c_char, value: *const c_char) -> c_int; + fn fdb_delete_key(key: *mut CKey); + + // Request functions + fn fdb_new_request(request: *mut *mut CRequest) -> c_int; + fn fdb_request_add(request: *mut CRequest, name: *const c_char, values: *const *const c_char, n_values: libc::size_t) -> c_int; + fn fdb_delete_request(request: *mut CRequest); + + fn fdb_list(fdb: *mut FdbHandle, req: *mut CRequest, it: *mut *mut FdbListIterator, duplicates : bool) -> c_int; + + // ListIterator functions + fn fdb_listiterator_next(it: *mut FdbListIterator) -> c_int; + fn fdb_listiterator_attrs( + it: *mut FdbListIterator, + uri: *mut *const c_char, + off: *mut size_t, + len: *mut size_t, + ) -> c_int; + fn fdb_listiterator_splitkey(it: *mut FdbListIterator, key: *mut FdbSplitKey) -> c_int; + fn fdb_delete_listiterator(it: *mut FdbListIterator); + + // SplitKey functions, extracts path, len, offset, and request = {key : value} from each key + fn fdb_new_splitkey(key : *mut *mut FdbSplitKey) -> c_int; + fn fdb_splitkey_next_metadata(it : *mut FdbSplitKey, key: *mut *const c_char, value: *mut *const c_char, level: *mut size_t) -> c_int; + fn fdb_delete_splitkey(key : *mut FdbSplitKey); + } +} + +// Define the fdb library as a global, lazily-initialized library +pub static FDBLIB: Lazy> = Lazy::new(|| { + let libpath = "/Users/math/micromamba/envs/qubed/lib/libfdb5.dylib"; + let raw_lib = Library::new(&libpath).expect("Failed to load library"); + let fdblib_wrapper = FdbApiWrapper::load(raw_lib) + .map_err(|e| e.to_string()) + .expect("Failed to wrap FDB5 library"); + Arc::new(fdblib_wrapper) +}); + +#[repr(C)] +pub struct FdbSplitKey { + _private: [u8; 0], +} + +#[repr(C)] +pub struct FdbListIterator { + _private: [u8; 0], +} + +#[repr(C)] +pub struct FdbSplitKeyMetadata { + _private: [u8; 0], +} + +#[repr(C)] +pub struct CKey { + _empty: [u8; 0], +} + +#[repr(C)] +pub struct FdbHandle { + _empty: [u8; 0], +} + +pub struct FDB { + handle: *mut FdbHandle, +} + +impl FDB { + pub fn new(config: Option<&str>) -> Result { + let mut handle: *mut FdbHandle = std::ptr::null_mut(); + + unsafe { + let result = (FDBLIB.fdb_initialise)(); + if result != 0 { + return Err("Failed to initialise FDB".into()); + } + } + + let result: i32 = match config { + Some(cfg) => { + let sys_cfg = CString::new(cfg) + .map_err(|_| "System Config contains null byte".to_string())?; + let usr_cfg = + CString::new("").map_err(|_| "User Config contains null byte".to_string())?; + unsafe { + (FDBLIB.fdb_new_handle_from_yaml)( + &mut handle, + sys_cfg.as_ptr(), + usr_cfg.as_ptr(), + ) + } + } + None => unsafe { (FDBLIB.fdb_new_handle)(&mut handle) }, + }; + + if result != 0 { + return Err("Failed to create FDB handle".into()); + } + + Ok(Self { handle }) + } + + pub fn archive_multiple(&self, request: Option<&Request>, data: &[u8]) -> Result<(), String> { + let req_ptr = match request { + Some(req) => req.as_ptr(), + None => std::ptr::null_mut(), + }; + + let result = unsafe { + (FDBLIB.fdb_archive_multiple)( + self.handle, + req_ptr, + data.as_ptr() as *const c_char, + data.len(), + ) + }; + if result != 0 { + return Err("Failed to archive data".into()); + } + Ok(()) + } + + pub fn flush(&self) -> Result<(), String> { + let result = unsafe { (FDBLIB.fdb_flush)(self.handle) }; + if result != 0 { + return Err("Failed to flush FDB".into()); + } + Ok(()) + } + + pub fn retrieve(&self, request: &Request) -> Result { + DataRetriever::new(self, request) + } + + pub fn list( + &self, + request: &Request, + key: bool, + duplicates: bool, + ) -> Result { + ListIterator::new(self, request, key, duplicates) + } +} + +impl Drop for FDB { + fn drop(&mut self) { + unsafe { + (FDBLIB.fdb_delete_handle)(self.handle); + } + } +} + +// // make a small test +// #[cfg(test)] +// mod tests { +// use super::*; + +// #[test] +// fn test_fdb_new() { +// let fdb = FDB::new(None); +// assert!(fdb.is_ok()); +// } +// } diff --git a/src/rust/fdb/request.rs b/src/rust/fdb/request.rs new file mode 100644 index 0000000..737e59b --- /dev/null +++ b/src/rust/fdb/request.rs @@ -0,0 +1,108 @@ +use super::FDBLIB; +use libc::c_char; +use serde_json::Value; +use std::ffi::CString; + +#[repr(C)] +pub struct CRequest { + _empty: [u8; 0], +} + +pub struct Request { + request: *mut CRequest, +} + +impl Request { + pub fn new() -> Result { + let mut request_ptr: *mut CRequest = std::ptr::null_mut(); + let result = unsafe { (FDBLIB.fdb_new_request)(&mut request_ptr) }; + if result != 0 { + return Err("Failed to create new request".into()); + } + let request = Self { + request: request_ptr, + }; + + Ok(request) + } + + pub fn set<'a, C, T>(&mut self, key: &'a str, values: C) -> Result<(), String> + where + C: AsRef<[T]>, + T: AsRef + 'a, + { + let values_slice = values.as_ref(); + let key_cstr = CString::new(key).map_err(|_| "Failed to create CString for key")?; + + let cvals: Vec = values_slice + .iter() + .map(|val| CString::new(val.as_ref()).map_err(|_| "Failed to create CString for value")) + .collect::, _>>()?; + + let cvals_ptrs: Vec<*const c_char> = cvals.iter().map(|cstr| cstr.as_ptr()).collect(); + + let result = unsafe { + (FDBLIB.fdb_request_add)( + self.request, + key_cstr.as_ptr(), + cvals_ptrs.as_ptr(), + values_slice.len(), + ) + }; + + if result != 0 { + return Err(format!("Failed to add values for key '{}'", key)); + } + + Ok(()) + } + + pub fn as_ptr(&self) -> *mut CRequest { + self.request + } + + pub fn from_json(v: serde_json::Value) -> Result { + let mut request = Self::new()?; + + // Iterate over the JSON object and populate the Request + if let Value::Object(map) = v { + for (key, value) in map { + match value { + Value::String(s) => { + // Treat single strings as a slice of length 1 + request.set(&key, &[s])?; + } + Value::Array(arr) => { + // Collect string values from the array + let values: Vec = arr + .into_iter() + .filter_map(|val| { + if let Value::String(s) = val { + Some(s) + } else { + None // You can handle non-string items here if needed + } + }) + .collect(); + request.set(&key, &values)?; + } + _ => { + // Handle other types if necessary + return Err(format!("Unsupported value type for key '{}'", key).into()); + } + } + } + } else { + return Err("Expected a JSON object at the root".into()); + } + Ok(request) + } +} + +impl Drop for Request { + fn drop(&mut self) { + unsafe { + (FDBLIB.fdb_delete_request)(self.request); + } + } +} diff --git a/src/rust/lib.rs b/src/rust/lib.rs index 865bacd..920527f 100644 --- a/src/rust/lib.rs +++ b/src/rust/lib.rs @@ -12,6 +12,17 @@ fn hello(_py: Python, name: &str) -> PyResult { Ok(format!("Hello, {}!", name)) } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_hello() { + let out = Python::with_gil(|py| hello(py, "world")); + assert_eq!(out.unwrap(), "Hello, world!"); + } +} + #[pymodule] fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(hello, m)?).unwrap(); @@ -19,9 +30,10 @@ fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> { } -// use rsfdb::listiterator::KeyValueLevel; -// use rsfdb::request::Request; -// use rsfdb::FDB; +mod fdb; +use fdb::listiterator::KeyValueLevel; +use fdb::request::Request; +use fdb::FDB; // use serde_json::{json, Value}; // use std::time::Instant;