mod error;
mod stream;
use std::collections::btree_map::{Entry, VacantEntry};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use async_trait::async_trait;
pub use error::*;
pub use stream::*;
use uuid::Uuid;
pub type ActorId = u32;
pub type DispatcherId = u64;
pub type FragmentId = u32;
pub type SubscriptionId = u32;
#[derive(Clone, Debug)]
pub struct ClusterId(String);
impl Default for ClusterId {
fn default() -> Self {
Self::new()
}
}
impl ClusterId {
pub fn new() -> Self {
Self(Uuid::new_v4().to_string())
}
}
impl From<ClusterId> for String {
fn from(value: ClusterId) -> Self {
value.0
}
}
impl From<String> for ClusterId {
fn from(value: String) -> Self {
Self(value)
}
}
impl Deref for ClusterId {
type Target = str;
fn deref(&self) -> &Self::Target {
self.0.as_str()
}
}
#[async_trait]
pub trait Transactional<TXN> {
async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
}
pub trait InMemValTransaction: Sized {
fn commit(self);
}
pub trait ValTransaction<TXN>: InMemValTransaction {
async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
}
pub struct VarTransaction<'a, T> {
orig_value_ref: &'a mut T,
new_value: Option<T>,
}
impl<'a, T> VarTransaction<'a, T> {
pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
VarTransaction {
new_value: None,
orig_value_ref: val_ref,
}
}
pub fn has_new_value(&self) -> bool {
self.new_value.is_some()
}
}
impl<T> Deref for VarTransaction<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match &self.new_value {
Some(new_value) => new_value,
None => self.orig_value_ref,
}
}
}
impl<T: Clone> DerefMut for VarTransaction<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
if self.new_value.is_none() {
self.new_value.replace(self.orig_value_ref.clone());
}
self.new_value.as_mut().unwrap()
}
}
impl<T> InMemValTransaction for VarTransaction<'_, T>
where
T: PartialEq,
{
fn commit(self) {
if let Some(new_value) = self.new_value {
*self.orig_value_ref = new_value;
}
}
}
impl<'a, TXN, T> ValTransaction<TXN> for VarTransaction<'a, T>
where
T: Transactional<TXN> + PartialEq,
{
async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
if let Some(new_value) = &self.new_value {
if *self.orig_value_ref != *new_value {
new_value.upsert_in_transaction(txn).await
} else {
Ok(())
}
} else {
Ok(())
}
}
}
enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
Occupied(&'a mut V),
}
pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
orig_value: Option<&'a V>,
}
impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
fn new(
staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
orig_value: Option<&'a V>,
) -> Self {
let is_entry_occupied =
matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
assert!(
is_entry_occupied || orig_value.is_some(),
"one of staging_entry and orig_value must be non-empty"
);
Self {
staging_entry: Some(staging_entry),
orig_value,
}
}
}
impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
match &self.staging_entry.as_ref().unwrap() {
BTreeMapTransactionStagingEntry::Vacant(_) => self
.orig_value
.expect("staging is vacant, so orig_value must be some"),
BTreeMapTransactionStagingEntry::Occupied(v) => v,
}
}
}
impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
let is_occupied = matches!(
self.staging_entry.as_ref().unwrap(),
BTreeMapTransactionStagingEntry::Occupied(_)
);
if !is_occupied {
let vacant_entry = match self.staging_entry.take().unwrap() {
BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
BTreeMapTransactionStagingEntry::Occupied(_) => {
unreachable!("we have previously check that the entry is not occupied")
}
};
let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
self.orig_value
.expect("self.staging_entry was vacant, so orig_value must be some")
.clone(),
)) {
BTreeMapOp::Insert(v) => v,
BTreeMapOp::Delete => {
unreachable!(
"the previous inserted op is `Inserted`, so it's not possible to reach Delete"
)
}
};
let _ = self
.staging_entry
.insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
}
match self.staging_entry.as_mut().unwrap() {
BTreeMapTransactionStagingEntry::Vacant(_) => {
unreachable!("we have inserted a cloned original value in case of vacant")
}
BTreeMapTransactionStagingEntry::Occupied(v) => v,
}
}
}
enum BTreeMapOp<V> {
Insert(V),
Delete,
}
pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
tree_ref: P,
staging: BTreeMap<K, BTreeMapOp<V>>,
}
pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
BTreeMapTransactionInner<K, V, P>
{
pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
Self {
tree_ref,
staging: BTreeMap::default(),
}
}
#[allow(dead_code)]
pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
}
pub fn new_entry_txn_or_default(
&mut self,
key: K,
default_val: V,
) -> BTreeMapEntryTransaction<'_, K, V> {
BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
.expect("default value is provided and should return `Some`")
}
pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
}
pub fn tree_ref(&self) -> &BTreeMap<K, V> {
&self.tree_ref
}
pub fn get(&self, key: &K) -> Option<&V> {
self.staging
.get(key)
.and_then(|op| match op {
BTreeMapOp::Insert(v) => Some(v),
BTreeMapOp::Delete => None,
})
.or_else(|| self.tree_ref.get(key))
}
pub fn contains_key(&self, key: &K) -> bool {
self.get(key).is_some()
}
pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
let orig_contains_key = self.tree_ref.contains_key(&key);
let orig_value = self.tree_ref.get(&key);
let staging_entry = match self.staging.entry(key) {
Entry::Occupied(entry) => match entry.into_mut() {
BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
BTreeMapOp::Delete => return None,
},
Entry::Vacant(vacant_entry) => {
if !orig_contains_key {
return None;
} else {
BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
}
}
};
Some(BTreeMapTransactionValueGuard::new(
staging_entry,
orig_value,
))
}
pub fn insert(&mut self, key: K, value: V) {
self.staging.insert(key, BTreeMapOp::Insert(value));
}
pub fn remove(&mut self, key: K) -> Option<V> {
if let Some(op) = self.staging.get(&key) {
return match op {
BTreeMapOp::Delete => None,
BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
BTreeMapOp::Insert(v) => {
self.staging.insert(key, BTreeMapOp::Delete);
Some(v)
}
BTreeMapOp::Delete => {
unreachable!("we have checked that the op of the key is `Insert`, so it's impossible to be Delete")
}
},
};
}
match self.tree_ref.get(&key) {
Some(orig_value) => {
self.staging.insert(key, BTreeMapOp::Delete);
Some(orig_value.clone())
}
None => None,
}
}
pub fn commit_memory(mut self) {
for (k, op) in self.staging {
match op {
BTreeMapOp::Insert(v) => {
self.tree_ref.insert(k, v);
}
BTreeMapOp::Delete => {
self.tree_ref.remove(&k);
}
}
}
}
}
impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
for BTreeMapTransactionInner<K, V, P>
{
fn commit(self) {
self.commit_memory();
}
}
impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
{
async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
for (k, op) in &self.staging {
match op {
BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
BTreeMapOp::Delete => {
if let Some(v) = self.tree_ref.get(k) {
v.delete_in_transaction(txn).await?;
}
}
}
}
Ok(())
}
}
pub struct BTreeMapEntryTransaction<'a, K, V> {
tree_ref: &'a mut BTreeMap<K, V>,
pub key: K,
pub new_value: V,
}
impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
pub fn new_insert(
tree_ref: &'a mut BTreeMap<K, V>,
key: K,
value: V,
) -> BTreeMapEntryTransaction<'a, K, V> {
BTreeMapEntryTransaction {
new_value: value,
tree_ref,
key,
}
}
pub fn new(
tree_ref: &'a mut BTreeMap<K, V>,
key: K,
default_val: Option<V>,
) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
tree_ref
.get(&key)
.cloned()
.or(default_val)
.map(|orig_value| BTreeMapEntryTransaction {
new_value: orig_value,
tree_ref,
key,
})
}
}
impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
&self.new_value
}
}
impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.new_value
}
}
impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
fn commit(self) {
self.tree_ref.insert(self.key, self.new_value);
}
}
impl<'a, K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
for BTreeMapEntryTransaction<'a, K, V>
{
async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
if !self.tree_ref.contains_key(&self.key)
|| *self.tree_ref.get(&self.key).unwrap() != self.new_value
{
self.new_value.upsert_in_transaction(txn).await?
}
Ok(())
}
}
impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
fn commit(self) {
if let Some(inner) = self {
inner.commit();
}
}
}
impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
if let Some(inner) = &self {
inner.apply_to_txn(txn).await?;
}
Ok(())
}
}
pub struct DerefMutForward<
Inner,
Target,
P: DerefMut<Target = Inner>,
F: Fn(&Inner) -> &Target,
FMut: Fn(&mut Inner) -> &mut Target,
> {
ptr: P,
f: F,
f_mut: FMut,
}
impl<
Inner,
Target,
P: DerefMut<Target = Inner>,
F: Fn(&Inner) -> &Target,
FMut: Fn(&mut Inner) -> &mut Target,
> DerefMutForward<Inner, Target, P, F, FMut>
{
pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
Self { ptr, f, f_mut }
}
}
impl<
Inner,
Target,
P: DerefMut<Target = Inner>,
F: Fn(&Inner) -> &Target,
FMut: Fn(&mut Inner) -> &mut Target,
> Deref for DerefMutForward<Inner, Target, P, F, FMut>
{
type Target = Target;
fn deref(&self) -> &Self::Target {
(self.f)(&self.ptr)
}
}
impl<
Inner,
Target,
P: DerefMut<Target = Inner>,
F: Fn(&Inner) -> &Target,
FMut: Fn(&mut Inner) -> &mut Target,
> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
{
fn deref_mut(&mut self) -> &mut Self::Target {
(self.f_mut)(&mut self.ptr)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::{Operation, Transaction};
#[derive(PartialEq, Clone, Debug)]
struct TestTransactional {
key: &'static str,
value: &'static str,
}
const TEST_CF: &str = "test-cf";
#[async_trait]
impl Transactional<Transaction> for TestTransactional {
async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
trx.put(
TEST_CF.to_string(),
self.key.as_bytes().into(),
self.value.as_bytes().into(),
);
Ok(())
}
async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
trx.delete(TEST_CF.to_string(), self.key.as_bytes().into());
Ok(())
}
}
#[tokio::test]
async fn test_simple_var_transaction_commit() {
let mut kv = TestTransactional {
key: "key",
value: "original",
};
let mut num_txn = VarTransaction::new(&mut kv);
num_txn.value = "modified";
assert_eq!(num_txn.value, "modified");
let mut txn = Transaction::default();
num_txn.apply_to_txn(&mut txn).await.unwrap();
let txn_op = txn.get_operations();
assert_eq!(1, txn_op.len());
assert!(matches!(
&txn_op[0],
Operation::Put {
cf: _,
key: _,
value: _
}
));
assert!(
matches!(&txn_op[0], Operation::Put { cf, key, value } if *cf == TEST_CF && key == "key".as_bytes() && value == "modified".as_bytes())
);
num_txn.commit();
assert_eq!("modified", kv.value);
}
#[test]
fn test_simple_var_transaction_abort() {
let mut kv = TestTransactional {
key: "key",
value: "original",
};
let mut num_txn = VarTransaction::new(&mut kv);
num_txn.value = "modified";
assert_eq!("original", kv.value);
}
#[tokio::test]
async fn test_tree_map_transaction_commit() {
let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
map.insert(
"to-remove".to_string(),
TestTransactional {
key: "to-remove",
value: "to-remove-value",
},
);
map.insert(
"to-remove-after-modify".to_string(),
TestTransactional {
key: "to-remove-after-modify",
value: "to-remove-after-modify-value",
},
);
map.insert(
"first".to_string(),
TestTransactional {
key: "first",
value: "first-orig-value",
},
);
let mut map_copy = map.clone();
let mut map_txn = BTreeMapTransaction::new(&mut map);
map_txn.remove("to-remove".to_string());
map_txn.insert(
"to-remove-after-modify".to_string(),
TestTransactional {
key: "to-remove-after-modify",
value: "to-remove-after-modify-value-modifying",
},
);
map_txn.remove("to-remove-after-modify".to_string());
map_txn.insert(
"first".to_string(),
TestTransactional {
key: "first",
value: "first-value",
},
);
map_txn.insert(
"second".to_string(),
TestTransactional {
key: "second",
value: "second-value",
},
);
assert_eq!(
&TestTransactional {
key: "second",
value: "second-value",
},
map_txn.get(&"second".to_string()).unwrap()
);
map_txn.insert(
"third".to_string(),
TestTransactional {
key: "third",
value: "third-value",
},
);
assert_eq!(
&TestTransactional {
key: "third",
value: "third-value",
},
map_txn.get(&"third".to_string()).unwrap()
);
let mut third_entry = map_txn.get_mut("third".to_string()).unwrap();
third_entry.value = "third-value-updated";
assert_eq!(
&TestTransactional {
key: "third",
value: "third-value-updated",
},
map_txn.get(&"third".to_string()).unwrap()
);
let mut txn = Transaction::default();
map_txn.apply_to_txn(&mut txn).await.unwrap();
let txn_ops = txn.get_operations();
assert_eq!(5, txn_ops.len());
for op in txn_ops {
match op {
Operation::Put { cf, key, value }
if cf == TEST_CF
&& key == "first".as_bytes()
&& value == "first-value".as_bytes() => {}
Operation::Put { cf, key, value }
if cf == TEST_CF
&& key == "second".as_bytes()
&& value == "second-value".as_bytes() => {}
Operation::Put { cf, key, value }
if cf == TEST_CF
&& key == "third".as_bytes()
&& value == "third-value-updated".as_bytes() => {}
Operation::Delete { cf, key } if cf == TEST_CF && key == "to-remove".as_bytes() => {
}
Operation::Delete { cf, key }
if cf == TEST_CF && key == "to-remove-after-modify".as_bytes() => {}
_ => unreachable!("invalid operation"),
}
}
map_txn.commit();
map_copy.remove("to-remove").unwrap();
map_copy.insert(
"to-remove-after-modify".to_string(),
TestTransactional {
key: "to-remove-after-modify",
value: "to-remove-after-modify-value-modifying",
},
);
map_copy.remove("to-remove-after-modify").unwrap();
map_copy.insert(
"first".to_string(),
TestTransactional {
key: "first",
value: "first-value",
},
);
map_copy.insert(
"second".to_string(),
TestTransactional {
key: "second",
value: "second-value",
},
);
map_copy.insert(
"third".to_string(),
TestTransactional {
key: "third",
value: "third-value-updated",
},
);
assert_eq!(map_copy, map);
}
#[tokio::test]
async fn test_tree_map_entry_update_transaction_commit() {
let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
map.insert(
"first".to_string(),
TestTransactional {
key: "first",
value: "first-orig-value",
},
);
let mut map_txn = BTreeMapTransaction::new(&mut map);
let mut first_entry_txn = map_txn.new_entry_txn("first".to_string()).unwrap();
first_entry_txn.value = "first-value";
let mut txn = Transaction::default();
first_entry_txn.apply_to_txn(&mut txn).await.unwrap();
let txn_ops = txn.get_operations();
assert_eq!(1, txn_ops.len());
assert!(
matches!(&txn_ops[0], Operation::Put {cf, key, value} if *cf == TEST_CF && key == "first".as_bytes() && value == "first-value".as_bytes())
);
first_entry_txn.commit();
assert_eq!("first-value", map.get("first").unwrap().value);
}
#[tokio::test]
async fn test_tree_map_entry_insert_transaction_commit() {
let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
let mut map_txn = BTreeMapTransaction::new(&mut map);
let first_entry_txn = map_txn.new_entry_insert_txn(
"first".to_string(),
TestTransactional {
key: "first",
value: "first-value",
},
);
let mut txn = Transaction::default();
first_entry_txn.apply_to_txn(&mut txn).await.unwrap();
let txn_ops = txn.get_operations();
assert_eq!(1, txn_ops.len());
assert!(
matches!(&txn_ops[0], Operation::Put {cf, key, value} if *cf == TEST_CF && key == "first".as_bytes() && value == "first-value".as_bytes())
);
first_entry_txn.commit();
assert_eq!("first-value", map.get("first").unwrap().value);
}
}