1mod error;
16mod stream;
17
18use std::collections::BTreeMap;
19use std::collections::btree_map::{Entry, VacantEntry};
20use std::fmt::Debug;
21use std::ops::{Deref, DerefMut};
22
23use async_trait::async_trait;
24pub use error::*;
25pub use stream::*;
26use uuid::Uuid;
27
28pub type ActorId = u32;
30
31pub type DispatcherId = u64;
33
34pub type FragmentId = u32;
36
37pub type SubscriptionId = u32;
38
39#[derive(Clone, Debug)]
40pub struct ClusterId(String);
41
42impl Default for ClusterId {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl ClusterId {
49 pub fn new() -> Self {
50 Self(Uuid::new_v4().to_string())
51 }
52}
53
54impl From<ClusterId> for String {
55 fn from(value: ClusterId) -> Self {
56 value.0
57 }
58}
59
60impl From<String> for ClusterId {
61 fn from(value: String) -> Self {
62 Self(value)
63 }
64}
65
66impl Deref for ClusterId {
67 type Target = str;
68
69 fn deref(&self) -> &Self::Target {
70 self.0.as_str()
71 }
72}
73
74#[async_trait]
75pub trait Transactional<TXN> {
76 async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
77 async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
78}
79
80pub trait InMemValTransaction: Sized {
81 fn commit(self);
83}
84
85pub trait ValTransaction<TXN>: InMemValTransaction {
88 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
90}
91
92pub struct VarTransaction<'a, T> {
99 orig_value_ref: &'a mut T,
100 new_value: Option<T>,
101}
102
103impl<'a, T> VarTransaction<'a, T> {
104 pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
106 VarTransaction {
107 new_value: None,
109 orig_value_ref: val_ref,
110 }
111 }
112
113 pub fn has_new_value(&self) -> bool {
114 self.new_value.is_some()
115 }
116}
117
118impl<T> Deref for VarTransaction<'_, T> {
119 type Target = T;
120
121 fn deref(&self) -> &Self::Target {
122 match &self.new_value {
123 Some(new_value) => new_value,
124 None => self.orig_value_ref,
125 }
126 }
127}
128
129impl<T: Clone> DerefMut for VarTransaction<'_, T> {
130 fn deref_mut(&mut self) -> &mut Self::Target {
131 if self.new_value.is_none() {
132 self.new_value.replace(self.orig_value_ref.clone());
133 }
134 self.new_value.as_mut().unwrap()
135 }
136}
137
138impl<T> InMemValTransaction for VarTransaction<'_, T>
139where
140 T: PartialEq,
141{
142 fn commit(self) {
143 if let Some(new_value) = self.new_value {
144 *self.orig_value_ref = new_value;
145 }
146 }
147}
148
149impl<TXN, T> ValTransaction<TXN> for VarTransaction<'_, T>
150where
151 T: Transactional<TXN> + PartialEq,
152{
153 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
154 if let Some(new_value) = &self.new_value {
155 if *self.orig_value_ref != *new_value {
157 new_value.upsert_in_transaction(txn).await
158 } else {
159 Ok(())
160 }
161 } else {
162 Ok(())
163 }
164 }
165}
166
167enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
169 Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
171 Occupied(&'a mut V),
174}
175
176pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
180 staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
185 orig_value: Option<&'a V>,
187}
188
189impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
190 fn new(
191 staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
192 orig_value: Option<&'a V>,
193 ) -> Self {
194 let is_entry_occupied =
195 matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
196 assert!(
197 is_entry_occupied || orig_value.is_some(),
198 "one of staging_entry and orig_value must be non-empty"
199 );
200 Self {
201 staging_entry: Some(staging_entry),
202 orig_value,
203 }
204 }
205}
206
207impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
208 type Target = V;
209
210 fn deref(&self) -> &Self::Target {
211 match &self.staging_entry.as_ref().unwrap() {
213 BTreeMapTransactionStagingEntry::Vacant(_) => self
214 .orig_value
215 .expect("staging is vacant, so orig_value must be some"),
216 BTreeMapTransactionStagingEntry::Occupied(v) => v,
217 }
218 }
219}
220
221impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
222 fn deref_mut(&mut self) -> &mut Self::Target {
223 let is_occupied = matches!(
224 self.staging_entry.as_ref().unwrap(),
225 BTreeMapTransactionStagingEntry::Occupied(_)
226 );
227
228 if !is_occupied {
231 let vacant_entry = match self.staging_entry.take().unwrap() {
232 BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
233 BTreeMapTransactionStagingEntry::Occupied(_) => {
234 unreachable!("we have previously check that the entry is not occupied")
235 }
236 };
237
238 let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
240 self.orig_value
241 .expect("self.staging_entry was vacant, so orig_value must be some")
242 .clone(),
243 )) {
244 BTreeMapOp::Insert(v) => v,
245 BTreeMapOp::Delete => {
246 unreachable!(
247 "the previous inserted op is `Inserted`, so it's not possible to reach Delete"
248 )
249 }
250 };
251 let _ = self
253 .staging_entry
254 .insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
255 }
256
257 match self.staging_entry.as_mut().unwrap() {
258 BTreeMapTransactionStagingEntry::Vacant(_) => {
259 unreachable!("we have inserted a cloned original value in case of vacant")
260 }
261 BTreeMapTransactionStagingEntry::Occupied(v) => v,
262 }
263 }
264}
265
266enum BTreeMapOp<V> {
267 Insert(V),
268 Delete,
269}
270
271pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
277 tree_ref: P,
280 staging: BTreeMap<K, BTreeMapOp<V>>,
282}
283
284pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
285
286impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
287 BTreeMapTransactionInner<K, V, P>
288{
289 pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
290 Self {
291 tree_ref,
292 staging: BTreeMap::default(),
293 }
294 }
295
296 #[allow(dead_code)]
298 pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
299 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
300 }
301
302 pub fn new_entry_txn_or_default(
306 &mut self,
307 key: K,
308 default_val: V,
309 ) -> BTreeMapEntryTransaction<'_, K, V> {
310 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
311 .expect("default value is provided and should return `Some`")
312 }
313
314 pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
316 BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
317 }
318
319 pub fn tree_ref(&self) -> &BTreeMap<K, V> {
320 &self.tree_ref
321 }
322
323 pub fn get(&self, key: &K) -> Option<&V> {
325 self.staging
326 .get(key)
327 .and_then(|op| match op {
328 BTreeMapOp::Insert(v) => Some(v),
329 BTreeMapOp::Delete => None,
330 })
331 .or_else(|| self.tree_ref.get(key))
332 }
333
334 pub fn contains_key(&self, key: &K) -> bool {
335 self.get(key).is_some()
336 }
337
338 pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
346 let orig_contains_key = self.tree_ref.contains_key(&key);
347 let orig_value = self.tree_ref.get(&key);
348
349 let staging_entry = match self.staging.entry(key) {
350 Entry::Occupied(entry) => match entry.into_mut() {
351 BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
352 BTreeMapOp::Delete => return None,
353 },
354 Entry::Vacant(vacant_entry) => {
355 if !orig_contains_key {
356 return None;
357 } else {
358 BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
359 }
360 }
361 };
362 Some(BTreeMapTransactionValueGuard::new(
363 staging_entry,
364 orig_value,
365 ))
366 }
367
368 pub fn insert(&mut self, key: K, value: V) {
369 self.staging.insert(key, BTreeMapOp::Insert(value));
370 }
371
372 pub fn remove(&mut self, key: K) -> Option<V> {
373 if let Some(op) = self.staging.get(&key) {
374 return match op {
375 BTreeMapOp::Delete => None,
376 BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
377 BTreeMapOp::Insert(v) => {
378 self.staging.insert(key, BTreeMapOp::Delete);
379 Some(v)
380 }
381 BTreeMapOp::Delete => {
382 unreachable!(
383 "we have checked that the op of the key is `Insert`, so it's impossible to be Delete"
384 )
385 }
386 },
387 };
388 }
389 match self.tree_ref.get(&key) {
390 Some(orig_value) => {
391 self.staging.insert(key, BTreeMapOp::Delete);
392 Some(orig_value.clone())
393 }
394 None => None,
395 }
396 }
397
398 pub fn commit_memory(mut self) {
399 for (k, op) in self.staging {
401 match op {
402 BTreeMapOp::Insert(v) => {
403 self.tree_ref.insert(k, v);
404 }
405 BTreeMapOp::Delete => {
406 self.tree_ref.remove(&k);
407 }
408 }
409 }
410 }
411}
412
413impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
414 for BTreeMapTransactionInner<K, V, P>
415{
416 fn commit(self) {
417 self.commit_memory();
418 }
419}
420
421impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
422 ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
423{
424 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
425 for (k, op) in &self.staging {
427 match op {
428 BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
429 BTreeMapOp::Delete => {
430 if let Some(v) = self.tree_ref.get(k) {
431 v.delete_in_transaction(txn).await?;
432 }
433 }
434 }
435 }
436 Ok(())
437 }
438}
439
440pub struct BTreeMapEntryTransaction<'a, K, V> {
442 tree_ref: &'a mut BTreeMap<K, V>,
443 pub key: K,
444 pub new_value: V,
445}
446
447impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
448 pub fn new_insert(
451 tree_ref: &'a mut BTreeMap<K, V>,
452 key: K,
453 value: V,
454 ) -> BTreeMapEntryTransaction<'a, K, V> {
455 BTreeMapEntryTransaction {
456 new_value: value,
457 tree_ref,
458 key,
459 }
460 }
461
462 pub fn new(
469 tree_ref: &'a mut BTreeMap<K, V>,
470 key: K,
471 default_val: Option<V>,
472 ) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
473 tree_ref
474 .get(&key)
475 .cloned()
476 .or(default_val)
477 .map(|orig_value| BTreeMapEntryTransaction {
478 new_value: orig_value,
479 tree_ref,
480 key,
481 })
482 }
483}
484
485impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
486 type Target = V;
487
488 fn deref(&self) -> &Self::Target {
489 &self.new_value
490 }
491}
492
493impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
494 fn deref_mut(&mut self) -> &mut Self::Target {
495 &mut self.new_value
496 }
497}
498
499impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
500 fn commit(self) {
501 self.tree_ref.insert(self.key, self.new_value);
502 }
503}
504
505impl<K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
506 for BTreeMapEntryTransaction<'_, K, V>
507{
508 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
509 if !self.tree_ref.contains_key(&self.key)
510 || *self.tree_ref.get(&self.key).unwrap() != self.new_value
511 {
512 self.new_value.upsert_in_transaction(txn).await?
513 }
514 Ok(())
515 }
516}
517
518impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
519 fn commit(self) {
520 if let Some(inner) = self {
521 inner.commit();
522 }
523 }
524}
525
526impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
527 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
528 if let Some(inner) = &self {
529 inner.apply_to_txn(txn).await?;
530 }
531 Ok(())
532 }
533}
534
535pub struct DerefMutForward<
536 Inner,
537 Target,
538 P: DerefMut<Target = Inner>,
539 F: Fn(&Inner) -> &Target,
540 FMut: Fn(&mut Inner) -> &mut Target,
541> {
542 ptr: P,
543 f: F,
544 f_mut: FMut,
545}
546
547impl<
548 Inner,
549 Target,
550 P: DerefMut<Target = Inner>,
551 F: Fn(&Inner) -> &Target,
552 FMut: Fn(&mut Inner) -> &mut Target,
553> DerefMutForward<Inner, Target, P, F, FMut>
554{
555 pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
556 Self { ptr, f, f_mut }
557 }
558}
559
560impl<
561 Inner,
562 Target,
563 P: DerefMut<Target = Inner>,
564 F: Fn(&Inner) -> &Target,
565 FMut: Fn(&mut Inner) -> &mut Target,
566> Deref for DerefMutForward<Inner, Target, P, F, FMut>
567{
568 type Target = Target;
569
570 fn deref(&self) -> &Self::Target {
571 (self.f)(&self.ptr)
572 }
573}
574
575impl<
576 Inner,
577 Target,
578 P: DerefMut<Target = Inner>,
579 F: Fn(&Inner) -> &Target,
580 FMut: Fn(&mut Inner) -> &mut Target,
581> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
582{
583 fn deref_mut(&mut self) -> &mut Self::Target {
584 (self.f_mut)(&mut self.ptr)
585 }
586}