1mod error;
16mod stream;
17
18use std::collections::BTreeMap;
19use std::collections::btree_map::{Entry, VacantEntry};
20use std::fmt::Debug;
21use std::ops::{Deref, DerefMut};
22
23use async_trait::async_trait;
24pub use error::*;
25pub use risingwave_common::id::{ActorId, FragmentId, SubscriptionId};
26pub use stream::*;
27use uuid::Uuid;
28
29pub type DispatcherId = u64;
31
32#[derive(Clone, Debug)]
33pub struct ClusterId(String);
34
35impl Default for ClusterId {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl ClusterId {
42 pub fn new() -> Self {
43 Self(Uuid::new_v4().to_string())
44 }
45}
46
47impl From<ClusterId> for String {
48 fn from(value: ClusterId) -> Self {
49 value.0
50 }
51}
52
53impl From<String> for ClusterId {
54 fn from(value: String) -> Self {
55 Self(value)
56 }
57}
58
59impl Deref for ClusterId {
60 type Target = str;
61
62 fn deref(&self) -> &Self::Target {
63 self.0.as_str()
64 }
65}
66
67#[async_trait]
68pub trait Transactional<TXN> {
69 async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
70 async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
71}
72
73pub trait InMemValTransaction: Sized {
74 fn commit(self);
76}
77
78pub trait ValTransaction<TXN>: InMemValTransaction {
81 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
83}
84
85pub struct VarTransaction<'a, T> {
92 orig_value_ref: &'a mut T,
93 new_value: Option<T>,
94}
95
96impl<'a, T> VarTransaction<'a, T> {
97 pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
99 VarTransaction {
100 new_value: None,
102 orig_value_ref: val_ref,
103 }
104 }
105
106 pub fn has_new_value(&self) -> bool {
107 self.new_value.is_some()
108 }
109}
110
111impl<T> Deref for VarTransaction<'_, T> {
112 type Target = T;
113
114 fn deref(&self) -> &Self::Target {
115 match &self.new_value {
116 Some(new_value) => new_value,
117 None => self.orig_value_ref,
118 }
119 }
120}
121
122impl<T: Clone> DerefMut for VarTransaction<'_, T> {
123 fn deref_mut(&mut self) -> &mut Self::Target {
124 if self.new_value.is_none() {
125 self.new_value.replace(self.orig_value_ref.clone());
126 }
127 self.new_value.as_mut().unwrap()
128 }
129}
130
131impl<T> InMemValTransaction for VarTransaction<'_, T>
132where
133 T: PartialEq,
134{
135 fn commit(self) {
136 if let Some(new_value) = self.new_value {
137 *self.orig_value_ref = new_value;
138 }
139 }
140}
141
142impl<TXN, T> ValTransaction<TXN> for VarTransaction<'_, T>
143where
144 T: Transactional<TXN> + PartialEq,
145{
146 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
147 if let Some(new_value) = &self.new_value {
148 if *self.orig_value_ref != *new_value {
150 new_value.upsert_in_transaction(txn).await
151 } else {
152 Ok(())
153 }
154 } else {
155 Ok(())
156 }
157 }
158}
159
160enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
162 Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
164 Occupied(&'a mut V),
167}
168
169pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
173 staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
178 orig_value: Option<&'a V>,
180}
181
182impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
183 fn new(
184 staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
185 orig_value: Option<&'a V>,
186 ) -> Self {
187 let is_entry_occupied =
188 matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
189 assert!(
190 is_entry_occupied || orig_value.is_some(),
191 "one of staging_entry and orig_value must be non-empty"
192 );
193 Self {
194 staging_entry: Some(staging_entry),
195 orig_value,
196 }
197 }
198}
199
200impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
201 type Target = V;
202
203 fn deref(&self) -> &Self::Target {
204 match &self.staging_entry.as_ref().unwrap() {
206 BTreeMapTransactionStagingEntry::Vacant(_) => self
207 .orig_value
208 .expect("staging is vacant, so orig_value must be some"),
209 BTreeMapTransactionStagingEntry::Occupied(v) => v,
210 }
211 }
212}
213
214impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
215 fn deref_mut(&mut self) -> &mut Self::Target {
216 let is_occupied = matches!(
217 self.staging_entry.as_ref().unwrap(),
218 BTreeMapTransactionStagingEntry::Occupied(_)
219 );
220
221 if !is_occupied {
224 let vacant_entry = match self.staging_entry.take().unwrap() {
225 BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
226 BTreeMapTransactionStagingEntry::Occupied(_) => {
227 unreachable!("we have previously check that the entry is not occupied")
228 }
229 };
230
231 let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
233 self.orig_value
234 .expect("self.staging_entry was vacant, so orig_value must be some")
235 .clone(),
236 )) {
237 BTreeMapOp::Insert(v) => v,
238 BTreeMapOp::Delete => {
239 unreachable!(
240 "the previous inserted op is `Inserted`, so it's not possible to reach Delete"
241 )
242 }
243 };
244 let _ = self
246 .staging_entry
247 .insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
248 }
249
250 match self.staging_entry.as_mut().unwrap() {
251 BTreeMapTransactionStagingEntry::Vacant(_) => {
252 unreachable!("we have inserted a cloned original value in case of vacant")
253 }
254 BTreeMapTransactionStagingEntry::Occupied(v) => v,
255 }
256 }
257}
258
259enum BTreeMapOp<V> {
260 Insert(V),
261 Delete,
262}
263
264pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
270 tree_ref: P,
273 staging: BTreeMap<K, BTreeMapOp<V>>,
275}
276
277pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
278
279impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
280 BTreeMapTransactionInner<K, V, P>
281{
282 pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
283 Self {
284 tree_ref,
285 staging: BTreeMap::default(),
286 }
287 }
288
289 #[allow(dead_code)]
291 pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
292 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
293 }
294
295 pub fn new_entry_txn_or_default(
299 &mut self,
300 key: K,
301 default_val: V,
302 ) -> BTreeMapEntryTransaction<'_, K, V> {
303 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
304 .expect("default value is provided and should return `Some`")
305 }
306
307 pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
309 BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
310 }
311
312 pub fn tree_ref(&self) -> &BTreeMap<K, V> {
313 &self.tree_ref
314 }
315
316 pub fn get(&self, key: &K) -> Option<&V> {
318 self.staging
319 .get(key)
320 .and_then(|op| match op {
321 BTreeMapOp::Insert(v) => Some(v),
322 BTreeMapOp::Delete => None,
323 })
324 .or_else(|| self.tree_ref.get(key))
325 }
326
327 pub fn contains_key(&self, key: &K) -> bool {
328 self.get(key).is_some()
329 }
330
331 pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
339 let orig_contains_key = self.tree_ref.contains_key(&key);
340 let orig_value = self.tree_ref.get(&key);
341
342 let staging_entry = match self.staging.entry(key) {
343 Entry::Occupied(entry) => match entry.into_mut() {
344 BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
345 BTreeMapOp::Delete => return None,
346 },
347 Entry::Vacant(vacant_entry) => {
348 if !orig_contains_key {
349 return None;
350 } else {
351 BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
352 }
353 }
354 };
355 Some(BTreeMapTransactionValueGuard::new(
356 staging_entry,
357 orig_value,
358 ))
359 }
360
361 pub fn insert(&mut self, key: K, value: V) {
362 self.staging.insert(key, BTreeMapOp::Insert(value));
363 }
364
365 pub fn remove(&mut self, key: K) -> Option<V> {
366 if let Some(op) = self.staging.get(&key) {
367 return match op {
368 BTreeMapOp::Delete => None,
369 BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
370 BTreeMapOp::Insert(v) => {
371 self.staging.insert(key, BTreeMapOp::Delete);
372 Some(v)
373 }
374 BTreeMapOp::Delete => {
375 unreachable!(
376 "we have checked that the op of the key is `Insert`, so it's impossible to be Delete"
377 )
378 }
379 },
380 };
381 }
382 match self.tree_ref.get(&key) {
383 Some(orig_value) => {
384 self.staging.insert(key, BTreeMapOp::Delete);
385 Some(orig_value.clone())
386 }
387 None => None,
388 }
389 }
390
391 pub fn commit_memory(mut self) {
392 for (k, op) in self.staging {
394 match op {
395 BTreeMapOp::Insert(v) => {
396 self.tree_ref.insert(k, v);
397 }
398 BTreeMapOp::Delete => {
399 self.tree_ref.remove(&k);
400 }
401 }
402 }
403 }
404}
405
406impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
407 for BTreeMapTransactionInner<K, V, P>
408{
409 fn commit(self) {
410 self.commit_memory();
411 }
412}
413
414impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
415 ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
416{
417 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
418 for (k, op) in &self.staging {
420 match op {
421 BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
422 BTreeMapOp::Delete => {
423 if let Some(v) = self.tree_ref.get(k) {
424 v.delete_in_transaction(txn).await?;
425 }
426 }
427 }
428 }
429 Ok(())
430 }
431}
432
433pub struct BTreeMapEntryTransaction<'a, K, V> {
435 tree_ref: &'a mut BTreeMap<K, V>,
436 pub key: K,
437 pub new_value: V,
438}
439
440impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
441 pub fn new_insert(
444 tree_ref: &'a mut BTreeMap<K, V>,
445 key: K,
446 value: V,
447 ) -> BTreeMapEntryTransaction<'a, K, V> {
448 BTreeMapEntryTransaction {
449 new_value: value,
450 tree_ref,
451 key,
452 }
453 }
454
455 pub fn new(
462 tree_ref: &'a mut BTreeMap<K, V>,
463 key: K,
464 default_val: Option<V>,
465 ) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
466 tree_ref
467 .get(&key)
468 .cloned()
469 .or(default_val)
470 .map(|orig_value| BTreeMapEntryTransaction {
471 new_value: orig_value,
472 tree_ref,
473 key,
474 })
475 }
476}
477
478impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
479 type Target = V;
480
481 fn deref(&self) -> &Self::Target {
482 &self.new_value
483 }
484}
485
486impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
487 fn deref_mut(&mut self) -> &mut Self::Target {
488 &mut self.new_value
489 }
490}
491
492impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
493 fn commit(self) {
494 self.tree_ref.insert(self.key, self.new_value);
495 }
496}
497
498impl<K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
499 for BTreeMapEntryTransaction<'_, K, V>
500{
501 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
502 if !self.tree_ref.contains_key(&self.key)
503 || *self.tree_ref.get(&self.key).unwrap() != self.new_value
504 {
505 self.new_value.upsert_in_transaction(txn).await?
506 }
507 Ok(())
508 }
509}
510
511impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
512 fn commit(self) {
513 if let Some(inner) = self {
514 inner.commit();
515 }
516 }
517}
518
519impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
520 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
521 if let Some(inner) = &self {
522 inner.apply_to_txn(txn).await?;
523 }
524 Ok(())
525 }
526}
527
528pub struct DerefMutForward<
529 Inner,
530 Target,
531 P: DerefMut<Target = Inner>,
532 F: Fn(&Inner) -> &Target,
533 FMut: Fn(&mut Inner) -> &mut Target,
534> {
535 ptr: P,
536 f: F,
537 f_mut: FMut,
538}
539
540impl<
541 Inner,
542 Target,
543 P: DerefMut<Target = Inner>,
544 F: Fn(&Inner) -> &Target,
545 FMut: Fn(&mut Inner) -> &mut Target,
546> DerefMutForward<Inner, Target, P, F, FMut>
547{
548 pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
549 Self { ptr, f, f_mut }
550 }
551}
552
553impl<
554 Inner,
555 Target,
556 P: DerefMut<Target = Inner>,
557 F: Fn(&Inner) -> &Target,
558 FMut: Fn(&mut Inner) -> &mut Target,
559> Deref for DerefMutForward<Inner, Target, P, F, FMut>
560{
561 type Target = Target;
562
563 fn deref(&self) -> &Self::Target {
564 (self.f)(&self.ptr)
565 }
566}
567
568impl<
569 Inner,
570 Target,
571 P: DerefMut<Target = Inner>,
572 F: Fn(&Inner) -> &Target,
573 FMut: Fn(&mut Inner) -> &mut Target,
574> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
575{
576 fn deref_mut(&mut self) -> &mut Self::Target {
577 (self.f_mut)(&mut self.ptr)
578 }
579}