1mod error;
16mod stream;
17
18use std::collections::BTreeMap;
19use std::collections::btree_map::{Entry, VacantEntry};
20use std::fmt::Debug;
21use std::ops::{Deref, DerefMut};
22
23use async_trait::async_trait;
24pub use error::*;
25pub use stream::*;
26use uuid::Uuid;
27
28pub type ActorId = u32;
30
31pub type DispatcherId = u64;
33
34pub type FragmentId = u32;
36
37pub type SubscriptionId = u32;
38
39#[derive(Clone, Debug)]
40pub struct ClusterId(String);
41
42impl Default for ClusterId {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl ClusterId {
49 pub fn new() -> Self {
50 Self(Uuid::new_v4().to_string())
51 }
52}
53
54impl From<ClusterId> for String {
55 fn from(value: ClusterId) -> Self {
56 value.0
57 }
58}
59
60impl From<String> for ClusterId {
61 fn from(value: String) -> Self {
62 Self(value)
63 }
64}
65
66impl Deref for ClusterId {
67 type Target = str;
68
69 fn deref(&self) -> &Self::Target {
70 self.0.as_str()
71 }
72}
73
74#[async_trait]
75pub trait Transactional<TXN> {
76 async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
77 async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
78}
79
80pub trait InMemValTransaction: Sized {
81 fn commit(self);
83}
84
85pub trait ValTransaction<TXN>: InMemValTransaction {
88 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
90}
91
92pub struct VarTransaction<'a, T> {
99 orig_value_ref: &'a mut T,
100 new_value: Option<T>,
101}
102
103impl<'a, T> VarTransaction<'a, T> {
104 pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
106 VarTransaction {
107 new_value: None,
109 orig_value_ref: val_ref,
110 }
111 }
112
113 pub fn has_new_value(&self) -> bool {
114 self.new_value.is_some()
115 }
116}
117
118impl<T> Deref for VarTransaction<'_, T> {
119 type Target = T;
120
121 fn deref(&self) -> &Self::Target {
122 match &self.new_value {
123 Some(new_value) => new_value,
124 None => self.orig_value_ref,
125 }
126 }
127}
128
129impl<T: Clone> DerefMut for VarTransaction<'_, T> {
130 fn deref_mut(&mut self) -> &mut Self::Target {
131 if self.new_value.is_none() {
132 self.new_value.replace(self.orig_value_ref.clone());
133 }
134 self.new_value.as_mut().unwrap()
135 }
136}
137
138impl<T> InMemValTransaction for VarTransaction<'_, T>
139where
140 T: PartialEq,
141{
142 fn commit(self) {
143 if let Some(new_value) = self.new_value {
144 *self.orig_value_ref = new_value;
145 }
146 }
147}
148
149impl<TXN, T> ValTransaction<TXN> for VarTransaction<'_, T>
150where
151 T: Transactional<TXN> + PartialEq,
152{
153 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
154 if let Some(new_value) = &self.new_value {
155 if *self.orig_value_ref != *new_value {
157 new_value.upsert_in_transaction(txn).await
158 } else {
159 Ok(())
160 }
161 } else {
162 Ok(())
163 }
164 }
165}
166
167enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
169 Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
171 Occupied(&'a mut V),
174}
175
176pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
180 staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
185 orig_value: Option<&'a V>,
187}
188
189impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
190 fn new(
191 staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
192 orig_value: Option<&'a V>,
193 ) -> Self {
194 let is_entry_occupied =
195 matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
196 assert!(
197 is_entry_occupied || orig_value.is_some(),
198 "one of staging_entry and orig_value must be non-empty"
199 );
200 Self {
201 staging_entry: Some(staging_entry),
202 orig_value,
203 }
204 }
205}
206
207impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
208 type Target = V;
209
210 fn deref(&self) -> &Self::Target {
211 match &self.staging_entry.as_ref().unwrap() {
213 BTreeMapTransactionStagingEntry::Vacant(_) => self
214 .orig_value
215 .expect("staging is vacant, so orig_value must be some"),
216 BTreeMapTransactionStagingEntry::Occupied(v) => v,
217 }
218 }
219}
220
221impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
222 fn deref_mut(&mut self) -> &mut Self::Target {
223 let is_occupied = matches!(
224 self.staging_entry.as_ref().unwrap(),
225 BTreeMapTransactionStagingEntry::Occupied(_)
226 );
227
228 if !is_occupied {
231 let vacant_entry = match self.staging_entry.take().unwrap() {
232 BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
233 BTreeMapTransactionStagingEntry::Occupied(_) => {
234 unreachable!("we have previously check that the entry is not occupied")
235 }
236 };
237
238 let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
240 self.orig_value
241 .expect("self.staging_entry was vacant, so orig_value must be some")
242 .clone(),
243 )) {
244 BTreeMapOp::Insert(v) => v,
245 BTreeMapOp::Delete => {
246 unreachable!(
247 "the previous inserted op is `Inserted`, so it's not possible to reach Delete"
248 )
249 }
250 };
251 let _ = self
253 .staging_entry
254 .insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
255 }
256
257 match self.staging_entry.as_mut().unwrap() {
258 BTreeMapTransactionStagingEntry::Vacant(_) => {
259 unreachable!("we have inserted a cloned original value in case of vacant")
260 }
261 BTreeMapTransactionStagingEntry::Occupied(v) => v,
262 }
263 }
264}
265
266enum BTreeMapOp<V> {
267 Insert(V),
268 Delete,
269}
270
271pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
277 tree_ref: P,
280 staging: BTreeMap<K, BTreeMapOp<V>>,
282}
283
284pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
285
286impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
287 BTreeMapTransactionInner<K, V, P>
288{
289 pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
290 Self {
291 tree_ref,
292 staging: BTreeMap::default(),
293 }
294 }
295
296 #[allow(dead_code)]
298 pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
299 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
300 }
301
302 pub fn new_entry_txn_or_default(
306 &mut self,
307 key: K,
308 default_val: V,
309 ) -> BTreeMapEntryTransaction<'_, K, V> {
310 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
311 .expect("default value is provided and should return `Some`")
312 }
313
314 pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
316 BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
317 }
318
319 pub fn tree_ref(&self) -> &BTreeMap<K, V> {
320 &self.tree_ref
321 }
322
323 pub fn get(&self, key: &K) -> Option<&V> {
325 self.staging
326 .get(key)
327 .and_then(|op| match op {
328 BTreeMapOp::Insert(v) => Some(v),
329 BTreeMapOp::Delete => None,
330 })
331 .or_else(|| self.tree_ref.get(key))
332 }
333
334 pub fn contains_key(&self, key: &K) -> bool {
335 self.get(key).is_some()
336 }
337
338 pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
346 let orig_contains_key = self.tree_ref.contains_key(&key);
347 let orig_value = self.tree_ref.get(&key);
348
349 let staging_entry = match self.staging.entry(key) {
350 Entry::Occupied(entry) => match entry.into_mut() {
351 BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
352 BTreeMapOp::Delete => return None,
353 },
354 Entry::Vacant(vacant_entry) => {
355 if !orig_contains_key {
356 return None;
357 } else {
358 BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
359 }
360 }
361 };
362 Some(BTreeMapTransactionValueGuard::new(
363 staging_entry,
364 orig_value,
365 ))
366 }
367
368 pub fn insert(&mut self, key: K, value: V) {
369 self.staging.insert(key, BTreeMapOp::Insert(value));
370 }
371
372 pub fn remove(&mut self, key: K) -> Option<V> {
373 if let Some(op) = self.staging.get(&key) {
374 return match op {
375 BTreeMapOp::Delete => None,
376 BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
377 BTreeMapOp::Insert(v) => {
378 self.staging.insert(key, BTreeMapOp::Delete);
379 Some(v)
380 }
381 BTreeMapOp::Delete => {
382 unreachable!(
383 "we have checked that the op of the key is `Insert`, so it's impossible to be Delete"
384 )
385 }
386 },
387 };
388 }
389 match self.tree_ref.get(&key) {
390 Some(orig_value) => {
391 self.staging.insert(key, BTreeMapOp::Delete);
392 Some(orig_value.clone())
393 }
394 None => None,
395 }
396 }
397
398 pub fn commit_memory(mut self) {
399 for (k, op) in self.staging {
401 match op {
402 BTreeMapOp::Insert(v) => {
403 self.tree_ref.insert(k, v);
404 }
405 BTreeMapOp::Delete => {
406 self.tree_ref.remove(&k);
407 }
408 }
409 }
410 }
411}
412
413impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
414 for BTreeMapTransactionInner<K, V, P>
415{
416 fn commit(self) {
417 self.commit_memory();
418 }
419}
420
421impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
422 ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
423{
424 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
425 for (k, op) in &self.staging {
427 match op {
428 BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
429 BTreeMapOp::Delete => {
430 if let Some(v) = self.tree_ref.get(k) {
431 v.delete_in_transaction(txn).await?;
432 }
433 }
434 }
435 }
436 Ok(())
437 }
438}
439
440pub struct BTreeMapEntryTransaction<'a, K, V> {
442 tree_ref: &'a mut BTreeMap<K, V>,
443 pub key: K,
444 pub new_value: V,
445}
446
447impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
448 pub fn new_insert(
451 tree_ref: &'a mut BTreeMap<K, V>,
452 key: K,
453 value: V,
454 ) -> BTreeMapEntryTransaction<'a, K, V> {
455 BTreeMapEntryTransaction {
456 new_value: value,
457 tree_ref,
458 key,
459 }
460 }
461
462 pub fn new(
469 tree_ref: &'a mut BTreeMap<K, V>,
470 key: K,
471 default_val: Option<V>,
472 ) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
473 tree_ref
474 .get(&key)
475 .cloned()
476 .or(default_val)
477 .map(|orig_value| BTreeMapEntryTransaction {
478 new_value: orig_value,
479 tree_ref,
480 key,
481 })
482 }
483}
484
485impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
486 type Target = V;
487
488 fn deref(&self) -> &Self::Target {
489 &self.new_value
490 }
491}
492
493impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
494 fn deref_mut(&mut self) -> &mut Self::Target {
495 &mut self.new_value
496 }
497}
498
499impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
500 fn commit(self) {
501 self.tree_ref.insert(self.key, self.new_value);
502 }
503}
504
505impl<K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
506 for BTreeMapEntryTransaction<'_, K, V>
507{
508 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
509 if !self.tree_ref.contains_key(&self.key)
510 || *self.tree_ref.get(&self.key).unwrap() != self.new_value
511 {
512 self.new_value.upsert_in_transaction(txn).await?
513 }
514 Ok(())
515 }
516}
517
518impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
519 fn commit(self) {
520 if let Some(inner) = self {
521 inner.commit();
522 }
523 }
524}
525
526impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
527 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
528 if let Some(inner) = &self {
529 inner.apply_to_txn(txn).await?;
530 }
531 Ok(())
532 }
533}
534
535pub struct DerefMutForward<
536 Inner,
537 Target,
538 P: DerefMut<Target = Inner>,
539 F: Fn(&Inner) -> &Target,
540 FMut: Fn(&mut Inner) -> &mut Target,
541> {
542 ptr: P,
543 f: F,
544 f_mut: FMut,
545}
546
547impl<
548 Inner,
549 Target,
550 P: DerefMut<Target = Inner>,
551 F: Fn(&Inner) -> &Target,
552 FMut: Fn(&mut Inner) -> &mut Target,
553> DerefMutForward<Inner, Target, P, F, FMut>
554{
555 pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
556 Self { ptr, f, f_mut }
557 }
558}
559
560impl<
561 Inner,
562 Target,
563 P: DerefMut<Target = Inner>,
564 F: Fn(&Inner) -> &Target,
565 FMut: Fn(&mut Inner) -> &mut Target,
566> Deref for DerefMutForward<Inner, Target, P, F, FMut>
567{
568 type Target = Target;
569
570 fn deref(&self) -> &Self::Target {
571 (self.f)(&self.ptr)
572 }
573}
574
575impl<
576 Inner,
577 Target,
578 P: DerefMut<Target = Inner>,
579 F: Fn(&Inner) -> &Target,
580 FMut: Fn(&mut Inner) -> &mut Target,
581> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
582{
583 fn deref_mut(&mut self) -> &mut Self::Target {
584 (self.f_mut)(&mut self.ptr)
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591 use crate::storage::{Operation, Transaction};
592
593 #[derive(PartialEq, Clone, Debug)]
594 struct TestTransactional {
595 key: &'static str,
596 value: &'static str,
597 }
598
599 const TEST_CF: &str = "test-cf";
600
601 #[async_trait]
602 impl Transactional<Transaction> for TestTransactional {
603 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
604 trx.put(
605 TEST_CF.to_owned(),
606 self.key.as_bytes().into(),
607 self.value.as_bytes().into(),
608 );
609 Ok(())
610 }
611
612 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
613 trx.delete(TEST_CF.to_owned(), self.key.as_bytes().into());
614 Ok(())
615 }
616 }
617
618 #[tokio::test]
619 async fn test_simple_var_transaction_commit() {
620 let mut kv = TestTransactional {
621 key: "key",
622 value: "original",
623 };
624 let mut num_txn = VarTransaction::new(&mut kv);
625 num_txn.value = "modified";
626 assert_eq!(num_txn.value, "modified");
627 let mut txn = Transaction::default();
628 num_txn.apply_to_txn(&mut txn).await.unwrap();
629 let txn_op = txn.get_operations();
630 assert_eq!(1, txn_op.len());
631 assert!(matches!(
632 &txn_op[0],
633 Operation::Put {
634 cf: _,
635 key: _,
636 value: _
637 }
638 ));
639 assert!(
640 matches!(&txn_op[0], Operation::Put { cf, key, value } if *cf == TEST_CF && key == "key".as_bytes() && value == "modified".as_bytes())
641 );
642 num_txn.commit();
643 assert_eq!("modified", kv.value);
644 }
645
646 #[test]
647 fn test_simple_var_transaction_abort() {
648 let mut kv = TestTransactional {
649 key: "key",
650 value: "original",
651 };
652 let mut num_txn = VarTransaction::new(&mut kv);
653 num_txn.value = "modified";
654 assert_eq!("original", kv.value);
655 }
656
657 #[tokio::test]
658 async fn test_tree_map_transaction_commit() {
659 let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
660 map.insert(
661 "to-remove".to_owned(),
662 TestTransactional {
663 key: "to-remove",
664 value: "to-remove-value",
665 },
666 );
667 map.insert(
668 "to-remove-after-modify".to_owned(),
669 TestTransactional {
670 key: "to-remove-after-modify",
671 value: "to-remove-after-modify-value",
672 },
673 );
674 map.insert(
675 "first".to_owned(),
676 TestTransactional {
677 key: "first",
678 value: "first-orig-value",
679 },
680 );
681
682 let mut map_copy = map.clone();
683 let mut map_txn = BTreeMapTransaction::new(&mut map);
684 map_txn.remove("to-remove".to_owned());
685 map_txn.insert(
686 "to-remove-after-modify".to_owned(),
687 TestTransactional {
688 key: "to-remove-after-modify",
689 value: "to-remove-after-modify-value-modifying",
690 },
691 );
692 map_txn.remove("to-remove-after-modify".to_owned());
693 map_txn.insert(
694 "first".to_owned(),
695 TestTransactional {
696 key: "first",
697 value: "first-value",
698 },
699 );
700 map_txn.insert(
701 "second".to_owned(),
702 TestTransactional {
703 key: "second",
704 value: "second-value",
705 },
706 );
707 assert_eq!(
708 &TestTransactional {
709 key: "second",
710 value: "second-value",
711 },
712 map_txn.get(&"second".to_owned()).unwrap()
713 );
714 map_txn.insert(
715 "third".to_owned(),
716 TestTransactional {
717 key: "third",
718 value: "third-value",
719 },
720 );
721 assert_eq!(
722 &TestTransactional {
723 key: "third",
724 value: "third-value",
725 },
726 map_txn.get(&"third".to_owned()).unwrap()
727 );
728
729 let mut third_entry = map_txn.get_mut("third".to_owned()).unwrap();
730 third_entry.value = "third-value-updated";
731 assert_eq!(
732 &TestTransactional {
733 key: "third",
734 value: "third-value-updated",
735 },
736 map_txn.get(&"third".to_owned()).unwrap()
737 );
738
739 let mut txn = Transaction::default();
740 map_txn.apply_to_txn(&mut txn).await.unwrap();
741 let txn_ops = txn.get_operations();
742 assert_eq!(5, txn_ops.len());
743 for op in txn_ops {
744 match op {
745 Operation::Put { cf, key, value }
746 if cf == TEST_CF
747 && key == "first".as_bytes()
748 && value == "first-value".as_bytes() => {}
749 Operation::Put { cf, key, value }
750 if cf == TEST_CF
751 && key == "second".as_bytes()
752 && value == "second-value".as_bytes() => {}
753 Operation::Put { cf, key, value }
754 if cf == TEST_CF
755 && key == "third".as_bytes()
756 && value == "third-value-updated".as_bytes() => {}
757 Operation::Delete { cf, key } if cf == TEST_CF && key == "to-remove".as_bytes() => {
758 }
759 Operation::Delete { cf, key }
760 if cf == TEST_CF && key == "to-remove-after-modify".as_bytes() => {}
761 _ => unreachable!("invalid operation"),
762 }
763 }
764 map_txn.commit();
765
766 map_copy.remove("to-remove").unwrap();
768 map_copy.insert(
769 "to-remove-after-modify".to_owned(),
770 TestTransactional {
771 key: "to-remove-after-modify",
772 value: "to-remove-after-modify-value-modifying",
773 },
774 );
775 map_copy.remove("to-remove-after-modify").unwrap();
776 map_copy.insert(
777 "first".to_owned(),
778 TestTransactional {
779 key: "first",
780 value: "first-value",
781 },
782 );
783 map_copy.insert(
784 "second".to_owned(),
785 TestTransactional {
786 key: "second",
787 value: "second-value",
788 },
789 );
790 map_copy.insert(
791 "third".to_owned(),
792 TestTransactional {
793 key: "third",
794 value: "third-value-updated",
795 },
796 );
797 assert_eq!(map_copy, map);
798 }
799
800 #[tokio::test]
801 async fn test_tree_map_entry_update_transaction_commit() {
802 let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
803 map.insert(
804 "first".to_owned(),
805 TestTransactional {
806 key: "first",
807 value: "first-orig-value",
808 },
809 );
810
811 let mut map_txn = BTreeMapTransaction::new(&mut map);
812 let mut first_entry_txn = map_txn.new_entry_txn("first".to_owned()).unwrap();
813 first_entry_txn.value = "first-value";
814 let mut txn = Transaction::default();
815 first_entry_txn.apply_to_txn(&mut txn).await.unwrap();
816 let txn_ops = txn.get_operations();
817 assert_eq!(1, txn_ops.len());
818 assert!(
819 matches!(&txn_ops[0], Operation::Put {cf, key, value} if *cf == TEST_CF && key == "first".as_bytes() && value == "first-value".as_bytes())
820 );
821 first_entry_txn.commit();
822 assert_eq!("first-value", map.get("first").unwrap().value);
823 }
824
825 #[tokio::test]
826 async fn test_tree_map_entry_insert_transaction_commit() {
827 let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
828
829 let mut map_txn = BTreeMapTransaction::new(&mut map);
830 let first_entry_txn = map_txn.new_entry_insert_txn(
831 "first".to_owned(),
832 TestTransactional {
833 key: "first",
834 value: "first-value",
835 },
836 );
837 let mut txn = Transaction::default();
838 first_entry_txn.apply_to_txn(&mut txn).await.unwrap();
839 let txn_ops = txn.get_operations();
840 assert_eq!(1, txn_ops.len());
841 assert!(
842 matches!(&txn_ops[0], Operation::Put {cf, key, value} if *cf == TEST_CF && key == "first".as_bytes() && value == "first-value".as_bytes())
843 );
844 first_entry_txn.commit();
845 assert_eq!("first-value", map.get("first").unwrap().value);
846 }
847}