Compare commits

...

1 Commits

Author SHA1 Message Date
Tom
4a97eea317 Initial code 2025-03-28 09:28:48 +00:00
8 changed files with 749 additions and 9 deletions

View File

@ -8,7 +8,14 @@ repository = "https://github.com/ecmwf/qubed"
# rsfdb = {git = "https://github.com/ecmwf/rsfdb", branch = "develop"}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
pyo3 = "0.23"
# For fdb binding
libc = "0.2"
libloading = "0.6"
once_cell = "1.8"
[dependencies.pyo3]
version = "0.23"
[package.metadata.maturin]
version-from-git = true
@ -18,8 +25,6 @@ name = "tree_traverser"
crate-type = ["cdylib"]
path = "./src/rust/lib.rs"
# [patch.'https://github.com/ecmwf/rsfdb']
# rsfdb = { path = "../rsfdb" }
# [patch.'https://github.com/ecmwf-projects/rsfindlibs']
# rsfindlibs = { path = "../rsfindlibs" }
[features]
extension-module = ["pyo3/extension-module"]
default = ["extension-module"]

View File

@ -0,0 +1,139 @@
use std::io;
use std::io::ErrorKind;
use std::io::SeekFrom;
use super::request::Request;
use super::FDB;
use super::FDBLIB;
#[repr(C)]
pub struct FdbDataReader {
_empty: [u8; 0],
}
pub struct DataRetriever {
datareader: *mut FdbDataReader,
opened: bool,
}
impl DataRetriever {
pub fn new(fdb: &FDB, request: &Request) -> Result<Self, String> {
// Create a new data reader
let mut datareader: *mut FdbDataReader = std::ptr::null_mut();
let result = unsafe { (FDBLIB.fdb_new_datareader)(&mut datareader) };
if result != 0 {
return Err("Failed to create data reader".into());
}
// Retrieve data
let result = unsafe { (FDBLIB.fdb_retrieve)(fdb.handle, request.as_ptr(), datareader) };
if result != 0 {
unsafe { (FDBLIB.fdb_delete_datareader)(datareader) };
return Err("Failed to initiate data retrieval".into());
}
Ok(Self {
datareader,
opened: false,
})
}
pub fn open(&mut self) -> Result<(), io::Error> {
if !self.opened {
let result =
unsafe { (FDBLIB.fdb_datareader_open)(self.datareader, std::ptr::null_mut()) };
if result != 0 {
return Err(io::Error::new(
ErrorKind::Other,
"Failed to open data reader",
));
}
self.opened = true;
}
Ok(())
}
pub fn close(&mut self) {
if self.opened {
unsafe { (FDBLIB.fdb_datareader_close)(self.datareader) };
self.opened = false;
}
}
pub fn tell(&mut self) -> Result<libc::c_long, io::Error> {
self.open()?;
let mut pos = 0;
let result = unsafe { (FDBLIB.fdb_datareader_tell)(self.datareader, &mut pos) };
if result != 0 {
return Err(io::Error::new(
ErrorKind::Other,
"Failed to tell in data reader",
));
}
Ok(pos)
}
}
impl std::io::Seek for DataRetriever {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let new_pos = match pos {
SeekFrom::Start(offset) => offset as libc::c_long,
SeekFrom::End(_offset) => {
// Don't know size of stream, so can't seek from end
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"Seek from end is not supported for this stream",
));
}
SeekFrom::Current(offset) => {
let current_pos = self.tell()? as i64;
(current_pos + offset) as libc::c_long
}
};
let result = unsafe { (FDBLIB.fdb_datareader_seek)(self.datareader, new_pos) };
if result != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
"Failed to seek in data reader",
))
} else {
Ok(new_pos as u64)
}
}
}
impl std::io::Read for DataRetriever {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.open()?;
let mut read = 0;
let result = unsafe {
(FDBLIB.fdb_datareader_read)(
self.datareader,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as libc::c_long,
&mut read,
)
};
if result != 0 {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to read from data reader",
))
} else {
Ok(read as usize)
}
}
}
impl Drop for DataRetriever {
fn drop(&mut self) {
self.close();
unsafe {
(FDBLIB.fdb_delete_datareader)(self.datareader);
}
}
}

55
src/rust/fdb/key.rs Normal file
View File

@ -0,0 +1,55 @@
use std::ffi::CString;
use super::CKey;
use super::FDBLIB;
pub struct Key {
key: *mut CKey,
}
#[macro_export]
macro_rules! create_key {
($($key:expr => $values:expr),* $(,)?) => {{
let mut key = Key::new().unwrap();
$(
let _ = key.set($key, &$values);
)*
key
}};
}
impl Key {
pub fn new() -> Result<Self, String> {
let mut key_ptr: *mut CKey = std::ptr::null_mut();
let result = unsafe { (FDBLIB.fdb_new_key)(&mut key_ptr) };
if result != 0 {
return Err("Failed to create new key".into());
}
let key = Self { key: key_ptr };
Ok(key)
}
pub fn set(&mut self, key: &str, value: &str) -> Result<(), String> {
let param_c_str = CString::new(key).map_err(|e| e.to_string())?;
let value_c_str = CString::new(value).map_err(|e| e.to_string())?;
let result =
unsafe { (FDBLIB.fdb_key_add)(self.key, param_c_str.as_ptr(), value_c_str.as_ptr()) };
if result != 0 {
return Err("Failed to add key/value".into());
}
Ok(())
}
}
impl Drop for Key {
fn drop(&mut self) {
unsafe {
(FDBLIB.fdb_delete_key)(self.key);
}
}
}

View File

@ -0,0 +1,180 @@
use libc::size_t;
use std::ffi::CStr;
use std::os::raw::c_char;
use super::{FdbListIterator, FdbSplitKey};
use super::{Request, FDB, FDBLIB};
// Represents an individual key-value pair like {class : rd}, level = 0
#[derive(Debug, PartialEq, Clone)]
pub struct KeyValueLevel {
pub key: String,
pub value: String,
pub level: usize,
}
#[derive(Debug, PartialEq)]
pub struct ListItem {
pub uri: String,
pub offset: usize,
pub length: usize,
pub request: Option<Vec<KeyValueLevel>>,
}
pub struct ListIterator {
handle: *mut FdbListIterator,
key: bool, // Whether we're extracting keys or just path, len, offset for each list item.
}
impl ListIterator {
pub fn new(fdb: &FDB, request: &Request, key: bool, duplicates: bool) -> Result<Self, String> {
let mut it: *mut FdbListIterator = std::ptr::null_mut();
let result =
unsafe { (FDBLIB.fdb_list)(fdb.handle, request.as_ptr(), &mut it, duplicates) };
if result != 0 {
return Err(format!("fdb_list failed with error code {}", result));
}
if it.is_null() {
return Err("fdb_list returned a null iterator".into());
}
Ok(ListIterator {
handle: it,
key: key,
})
}
// Extracts the keys and values from the list item
pub fn get_request_for_key(&self) -> Result<Vec<KeyValueLevel>, String> {
if !self.key {
return Err("Getting keys is not enabled for this iterator.".into());
}
let mut key_ptr: *mut FdbSplitKey = std::ptr::null_mut();
let result = unsafe { (FDBLIB.fdb_new_splitkey)(&mut key_ptr) };
if result != 0 {
return Err(format!(
"fdb_new_splitkey failed with error code {}",
result
));
}
let result = unsafe { (FDBLIB.fdb_listiterator_splitkey)(self.handle, key_ptr) };
if result != 0 {
return Err(format!(
"fdb_listiterator_splitkey failed with error code {}",
result
));
}
if key_ptr.is_null() {
return Err("fdb_listiterator_splitkey returned a null key".into());
}
let mut metadata = Vec::new();
loop {
let mut k: *const c_char = std::ptr::null();
let mut v: *const c_char = std::ptr::null();
let mut level: size_t = 0;
let meta_result =
unsafe { (FDBLIB.fdb_splitkey_next_metadata)(key_ptr, &mut k, &mut v, &mut level) };
if meta_result != 0 || k.is_null() || v.is_null() {
break; // No more metadata
}
let key = unsafe {
CStr::from_ptr(k)
.to_str()
.map_err(|_| "Invalid UTF-8 in splitkey key".to_string())?
.to_owned()
};
let value = unsafe {
CStr::from_ptr(v)
.to_str()
.map_err(|_| "Invalid UTF-8 in splitkey value".to_string())?
.to_owned()
};
metadata.push(KeyValueLevel {
key,
value,
level: level as usize,
});
}
// Clean up the splitkey instance
unsafe {
(FDBLIB.fdb_delete_splitkey)(key_ptr);
}
Ok(metadata)
}
}
impl Iterator for ListIterator {
type Item = ListItem;
fn next(&mut self) -> Option<Self::Item> {
// Advance the iterator
let result = unsafe { (FDBLIB.fdb_listiterator_next)(self.handle) };
if result != 0 {
// Assuming non-zero indicates no more items or an error
return None;
}
// Retrieve attributes
let mut uri_ptr: *const c_char = std::ptr::null();
let mut off: size_t = 0;
let mut len: size_t = 0;
let attrs_result = unsafe {
(FDBLIB.fdb_listiterator_attrs)(self.handle, &mut uri_ptr, &mut off, &mut len)
};
if attrs_result != 0 || uri_ptr.is_null() {
// Handle error or end of iteration
return None;
}
// Convert C string to Rust String
let uri = unsafe {
CStr::from_ptr(uri_ptr)
.to_str()
.unwrap_or("Invalid UTF-8")
.to_owned()
};
// If we're extracting keys, do it.
let request = if self.key {
match self.get_request_for_key() {
Ok(data) => Some(data),
Err(e) => {
eprintln!("Error retrieving splitkey metadata: {}", e);
None
}
}
} else {
None
};
Some(ListItem {
uri,
offset: off as usize,
length: len as usize,
request,
})
}
}
impl Drop for ListIterator {
fn drop(&mut self) {
unsafe {
if !self.handle.is_null() {
(FDBLIB.fdb_delete_listiterator)(self.handle);
}
}
}
}

32
src/rust/fdb/macros.rs Normal file
View File

@ -0,0 +1,32 @@
#[macro_export]
macro_rules! generate_library_wrapper {
(
$lib_name:ident {
$(
fn $func_name:ident($($arg_name:ident : $arg_type:ty),* $(,)?) $(-> $ret_type:ty)?;
)*
}
) => {
pub struct $lib_name {
lib: Arc<libloading::Library>,
$(
pub $func_name: unsafe extern "C" fn($($arg_type),*) $(-> $ret_type)?,
)*
}
impl $lib_name {
pub fn load(lib: libloading::Library) -> Result<Self, Box<dyn std::error::Error>> {
let arc_lib = Arc::new(lib);
Ok(Self {
$(
$func_name: unsafe {
*arc_lib.get::<unsafe extern "C" fn($($arg_type),*) $(-> $ret_type)?>(concat!(stringify!($func_name), "\0").as_bytes())?
},
)*
lib: arc_lib,
})
}
}
};
}

209
src/rust/fdb/mod.rs Normal file
View File

@ -0,0 +1,209 @@
extern crate libc;
use libc::{c_char, c_int, c_long, size_t};
use std::ffi::CString;
pub mod dataretriever;
pub mod key;
pub mod listiterator;
pub mod request;
use dataretriever::DataRetriever;
use dataretriever::FdbDataReader;
use listiterator::ListIterator;
use request::CRequest;
use request::Request;
use libloading::Library;
use once_cell::sync::Lazy;
mod macros;
use crate::generate_library_wrapper;
use std::sync::Arc;
// FDB C API functions
generate_library_wrapper! {
FdbApiWrapper {
fn fdb_new_handle(fdb: *mut *mut FdbHandle) -> c_int;
fn fdb_initialise() -> c_int;
fn fdb_new_handle_from_yaml(fdb: *mut *mut FdbHandle, system_config: *const c_char, user_config: *const c_char) -> c_int;
fn fdb_retrieve(fdb: *mut FdbHandle, req: *mut CRequest, dr: *mut FdbDataReader) -> c_int;
fn fdb_archive_multiple(fdb: *mut FdbHandle, req: *mut CRequest, data: *const c_char, length: size_t) -> c_int;
fn fdb_flush(fdb: *mut FdbHandle) -> c_int;
fn fdb_delete_handle(fdb: *mut FdbHandle);
// Data reader functions
fn fdb_new_datareader(dr: *mut *mut FdbDataReader) -> c_int;
fn fdb_datareader_open(dr: *mut FdbDataReader, size: *mut c_long) -> c_int;
fn fdb_datareader_close(dr: *mut FdbDataReader) -> c_int;
fn fdb_datareader_tell(dr: *mut FdbDataReader, pos: *mut c_long) -> c_int;
fn fdb_datareader_seek(dr: *mut FdbDataReader, pos: c_long) -> c_int;
// fn fdb_datareader_skip(dr: *mut FdbDataReader, count: c_long) -> c_int;
fn fdb_datareader_read(dr: *mut FdbDataReader, buf: *mut libc::c_void, count: c_long, read: *mut c_long) -> c_int;
fn fdb_delete_datareader(dr: *mut FdbDataReader);
// Key functions
fn fdb_new_key(key: *mut *mut CKey) -> c_int;
fn fdb_key_add(key: *mut CKey, param: *const c_char, value: *const c_char) -> c_int;
fn fdb_delete_key(key: *mut CKey);
// Request functions
fn fdb_new_request(request: *mut *mut CRequest) -> c_int;
fn fdb_request_add(request: *mut CRequest, name: *const c_char, values: *const *const c_char, n_values: libc::size_t) -> c_int;
fn fdb_delete_request(request: *mut CRequest);
fn fdb_list(fdb: *mut FdbHandle, req: *mut CRequest, it: *mut *mut FdbListIterator, duplicates : bool) -> c_int;
// ListIterator functions
fn fdb_listiterator_next(it: *mut FdbListIterator) -> c_int;
fn fdb_listiterator_attrs(
it: *mut FdbListIterator,
uri: *mut *const c_char,
off: *mut size_t,
len: *mut size_t,
) -> c_int;
fn fdb_listiterator_splitkey(it: *mut FdbListIterator, key: *mut FdbSplitKey) -> c_int;
fn fdb_delete_listiterator(it: *mut FdbListIterator);
// SplitKey functions, extracts path, len, offset, and request = {key : value} from each key
fn fdb_new_splitkey(key : *mut *mut FdbSplitKey) -> c_int;
fn fdb_splitkey_next_metadata(it : *mut FdbSplitKey, key: *mut *const c_char, value: *mut *const c_char, level: *mut size_t) -> c_int;
fn fdb_delete_splitkey(key : *mut FdbSplitKey);
}
}
// Define the fdb library as a global, lazily-initialized library
pub static FDBLIB: Lazy<Arc<FdbApiWrapper>> = Lazy::new(|| {
let libpath = "/Users/math/micromamba/envs/qubed/lib/libfdb5.dylib";
let raw_lib = Library::new(&libpath).expect("Failed to load library");
let fdblib_wrapper = FdbApiWrapper::load(raw_lib)
.map_err(|e| e.to_string())
.expect("Failed to wrap FDB5 library");
Arc::new(fdblib_wrapper)
});
#[repr(C)]
pub struct FdbSplitKey {
_private: [u8; 0],
}
#[repr(C)]
pub struct FdbListIterator {
_private: [u8; 0],
}
#[repr(C)]
pub struct FdbSplitKeyMetadata {
_private: [u8; 0],
}
#[repr(C)]
pub struct CKey {
_empty: [u8; 0],
}
#[repr(C)]
pub struct FdbHandle {
_empty: [u8; 0],
}
pub struct FDB {
handle: *mut FdbHandle,
}
impl FDB {
pub fn new(config: Option<&str>) -> Result<Self, String> {
let mut handle: *mut FdbHandle = std::ptr::null_mut();
unsafe {
let result = (FDBLIB.fdb_initialise)();
if result != 0 {
return Err("Failed to initialise FDB".into());
}
}
let result: i32 = match config {
Some(cfg) => {
let sys_cfg = CString::new(cfg)
.map_err(|_| "System Config contains null byte".to_string())?;
let usr_cfg =
CString::new("").map_err(|_| "User Config contains null byte".to_string())?;
unsafe {
(FDBLIB.fdb_new_handle_from_yaml)(
&mut handle,
sys_cfg.as_ptr(),
usr_cfg.as_ptr(),
)
}
}
None => unsafe { (FDBLIB.fdb_new_handle)(&mut handle) },
};
if result != 0 {
return Err("Failed to create FDB handle".into());
}
Ok(Self { handle })
}
pub fn archive_multiple(&self, request: Option<&Request>, data: &[u8]) -> Result<(), String> {
let req_ptr = match request {
Some(req) => req.as_ptr(),
None => std::ptr::null_mut(),
};
let result = unsafe {
(FDBLIB.fdb_archive_multiple)(
self.handle,
req_ptr,
data.as_ptr() as *const c_char,
data.len(),
)
};
if result != 0 {
return Err("Failed to archive data".into());
}
Ok(())
}
pub fn flush(&self) -> Result<(), String> {
let result = unsafe { (FDBLIB.fdb_flush)(self.handle) };
if result != 0 {
return Err("Failed to flush FDB".into());
}
Ok(())
}
pub fn retrieve(&self, request: &Request) -> Result<DataRetriever, String> {
DataRetriever::new(self, request)
}
pub fn list(
&self,
request: &Request,
key: bool,
duplicates: bool,
) -> Result<ListIterator, String> {
ListIterator::new(self, request, key, duplicates)
}
}
impl Drop for FDB {
fn drop(&mut self) {
unsafe {
(FDBLIB.fdb_delete_handle)(self.handle);
}
}
}
// // make a small test
// #[cfg(test)]
// mod tests {
// use super::*;
// #[test]
// fn test_fdb_new() {
// let fdb = FDB::new(None);
// assert!(fdb.is_ok());
// }
// }

108
src/rust/fdb/request.rs Normal file
View File

@ -0,0 +1,108 @@
use super::FDBLIB;
use libc::c_char;
use serde_json::Value;
use std::ffi::CString;
#[repr(C)]
pub struct CRequest {
_empty: [u8; 0],
}
pub struct Request {
request: *mut CRequest,
}
impl Request {
pub fn new() -> Result<Self, String> {
let mut request_ptr: *mut CRequest = std::ptr::null_mut();
let result = unsafe { (FDBLIB.fdb_new_request)(&mut request_ptr) };
if result != 0 {
return Err("Failed to create new request".into());
}
let request = Self {
request: request_ptr,
};
Ok(request)
}
pub fn set<'a, C, T>(&mut self, key: &'a str, values: C) -> Result<(), String>
where
C: AsRef<[T]>,
T: AsRef<str> + 'a,
{
let values_slice = values.as_ref();
let key_cstr = CString::new(key).map_err(|_| "Failed to create CString for key")?;
let cvals: Vec<CString> = values_slice
.iter()
.map(|val| CString::new(val.as_ref()).map_err(|_| "Failed to create CString for value"))
.collect::<Result<Vec<_>, _>>()?;
let cvals_ptrs: Vec<*const c_char> = cvals.iter().map(|cstr| cstr.as_ptr()).collect();
let result = unsafe {
(FDBLIB.fdb_request_add)(
self.request,
key_cstr.as_ptr(),
cvals_ptrs.as_ptr(),
values_slice.len(),
)
};
if result != 0 {
return Err(format!("Failed to add values for key '{}'", key));
}
Ok(())
}
pub fn as_ptr(&self) -> *mut CRequest {
self.request
}
pub fn from_json(v: serde_json::Value) -> Result<Self, String> {
let mut request = Self::new()?;
// Iterate over the JSON object and populate the Request
if let Value::Object(map) = v {
for (key, value) in map {
match value {
Value::String(s) => {
// Treat single strings as a slice of length 1
request.set(&key, &[s])?;
}
Value::Array(arr) => {
// Collect string values from the array
let values: Vec<String> = arr
.into_iter()
.filter_map(|val| {
if let Value::String(s) = val {
Some(s)
} else {
None // You can handle non-string items here if needed
}
})
.collect();
request.set(&key, &values)?;
}
_ => {
// Handle other types if necessary
return Err(format!("Unsupported value type for key '{}'", key).into());
}
}
}
} else {
return Err("Expected a JSON object at the root".into());
}
Ok(request)
}
}
impl Drop for Request {
fn drop(&mut self) {
unsafe {
(FDBLIB.fdb_delete_request)(self.request);
}
}
}

View File

@ -12,6 +12,17 @@ fn hello(_py: Python, name: &str) -> PyResult<String> {
Ok(format!("Hello, {}!", name))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hello() {
let out = Python::with_gil(|py| hello(py, "world"));
assert_eq!(out.unwrap(), "Hello, world!");
}
}
#[pymodule]
fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(hello, m)?).unwrap();
@ -19,9 +30,10 @@ fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
}
// use rsfdb::listiterator::KeyValueLevel;
// use rsfdb::request::Request;
// use rsfdb::FDB;
mod fdb;
use fdb::listiterator::KeyValueLevel;
use fdb::request::Request;
use fdb::FDB;
// use serde_json::{json, Value};
// use std::time::Instant;