1mod error;
16mod stream;
17
18use std::collections::BTreeMap;
19use std::collections::btree_map::{Entry, VacantEntry};
20use std::fmt::Debug;
21use std::ops::{Deref, DerefMut};
22
23use async_trait::async_trait;
24pub use error::*;
25pub use risingwave_common::id::{ActorId, FragmentId, SubscriptionId};
26use sea_orm::ConnectionTrait;
27pub use stream::*;
28use uuid::Uuid;
29
30pub type DispatcherId = FragmentId;
32
33#[derive(Clone, Debug)]
34pub struct ClusterId(String);
35
36impl Default for ClusterId {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl ClusterId {
43 pub fn new() -> Self {
44 Self(Uuid::new_v4().to_string())
45 }
46}
47
48impl From<ClusterId> for String {
49 fn from(value: ClusterId) -> Self {
50 value.0
51 }
52}
53
54impl From<String> for ClusterId {
55 fn from(value: String) -> Self {
56 Self(value)
57 }
58}
59
60impl Deref for ClusterId {
61 type Target = str;
62
63 fn deref(&self) -> &Self::Target {
64 self.0.as_str()
65 }
66}
67
68#[async_trait]
69pub trait Transactional<TXN: ConnectionTrait> {
70 async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
71 async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
72}
73
74pub trait InMemValTransaction: Sized {
75 fn commit(self);
77}
78
79pub trait ValTransaction<TXN>: InMemValTransaction {
82 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
84}
85
86pub struct VarTransaction<'a, T> {
93 orig_value_ref: &'a mut T,
94 new_value: Option<T>,
95}
96
97impl<'a, T> VarTransaction<'a, T> {
98 pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
100 VarTransaction {
101 new_value: None,
103 orig_value_ref: val_ref,
104 }
105 }
106
107 pub fn has_new_value(&self) -> bool {
108 self.new_value.is_some()
109 }
110}
111
112impl<T> Deref for VarTransaction<'_, T> {
113 type Target = T;
114
115 fn deref(&self) -> &Self::Target {
116 match &self.new_value {
117 Some(new_value) => new_value,
118 None => self.orig_value_ref,
119 }
120 }
121}
122
123impl<T: Clone> DerefMut for VarTransaction<'_, T> {
124 fn deref_mut(&mut self) -> &mut Self::Target {
125 if self.new_value.is_none() {
126 self.new_value.replace(self.orig_value_ref.clone());
127 }
128 self.new_value.as_mut().unwrap()
129 }
130}
131
132impl<T> InMemValTransaction for VarTransaction<'_, T>
133where
134 T: PartialEq,
135{
136 fn commit(self) {
137 if let Some(new_value) = self.new_value {
138 *self.orig_value_ref = new_value;
139 }
140 }
141}
142
143impl<TXN, T> ValTransaction<TXN> for VarTransaction<'_, T>
144where
145 T: Transactional<TXN> + PartialEq,
146 TXN: ConnectionTrait,
147{
148 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
149 if let Some(new_value) = &self.new_value {
150 if *self.orig_value_ref != *new_value {
152 new_value.upsert_in_transaction(txn).await
153 } else {
154 Ok(())
155 }
156 } else {
157 Ok(())
158 }
159 }
160}
161
162enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
164 Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
166 Occupied(&'a mut V),
169}
170
171pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
175 staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
180 orig_value: Option<&'a V>,
182}
183
184impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
185 fn new(
186 staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
187 orig_value: Option<&'a V>,
188 ) -> Self {
189 let is_entry_occupied =
190 matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
191 assert!(
192 is_entry_occupied || orig_value.is_some(),
193 "one of staging_entry and orig_value must be non-empty"
194 );
195 Self {
196 staging_entry: Some(staging_entry),
197 orig_value,
198 }
199 }
200}
201
202impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
203 type Target = V;
204
205 fn deref(&self) -> &Self::Target {
206 match &self.staging_entry.as_ref().unwrap() {
208 BTreeMapTransactionStagingEntry::Vacant(_) => self
209 .orig_value
210 .expect("staging is vacant, so orig_value must be some"),
211 BTreeMapTransactionStagingEntry::Occupied(v) => v,
212 }
213 }
214}
215
216impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
217 fn deref_mut(&mut self) -> &mut Self::Target {
218 let is_occupied = matches!(
219 self.staging_entry.as_ref().unwrap(),
220 BTreeMapTransactionStagingEntry::Occupied(_)
221 );
222
223 if !is_occupied {
226 let vacant_entry = match self.staging_entry.take().unwrap() {
227 BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
228 BTreeMapTransactionStagingEntry::Occupied(_) => {
229 unreachable!("we have previously check that the entry is not occupied")
230 }
231 };
232
233 let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
235 self.orig_value
236 .expect("self.staging_entry was vacant, so orig_value must be some")
237 .clone(),
238 )) {
239 BTreeMapOp::Insert(v) => v,
240 BTreeMapOp::Delete => {
241 unreachable!(
242 "the previous inserted op is `Inserted`, so it's not possible to reach Delete"
243 )
244 }
245 };
246 let _ = self
248 .staging_entry
249 .insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
250 }
251
252 match self.staging_entry.as_mut().unwrap() {
253 BTreeMapTransactionStagingEntry::Vacant(_) => {
254 unreachable!("we have inserted a cloned original value in case of vacant")
255 }
256 BTreeMapTransactionStagingEntry::Occupied(v) => v,
257 }
258 }
259}
260
261enum BTreeMapOp<V> {
262 Insert(V),
263 Delete,
264}
265
266pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
272 tree_ref: P,
275 staging: BTreeMap<K, BTreeMapOp<V>>,
277}
278
279pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
280
281impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
282 BTreeMapTransactionInner<K, V, P>
283{
284 pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
285 Self {
286 tree_ref,
287 staging: BTreeMap::default(),
288 }
289 }
290
291 #[allow(dead_code)]
293 pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
294 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
295 }
296
297 pub fn new_entry_txn_or_default(
301 &mut self,
302 key: K,
303 default_val: V,
304 ) -> BTreeMapEntryTransaction<'_, K, V> {
305 BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
306 .expect("default value is provided and should return `Some`")
307 }
308
309 pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
311 BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
312 }
313
314 pub fn tree_ref(&self) -> &BTreeMap<K, V> {
315 &self.tree_ref
316 }
317
318 pub fn get(&self, key: &K) -> Option<&V> {
320 self.staging
321 .get(key)
322 .and_then(|op| match op {
323 BTreeMapOp::Insert(v) => Some(v),
324 BTreeMapOp::Delete => None,
325 })
326 .or_else(|| self.tree_ref.get(key))
327 }
328
329 pub fn contains_key(&self, key: &K) -> bool {
330 self.get(key).is_some()
331 }
332
333 pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
341 let orig_contains_key = self.tree_ref.contains_key(&key);
342 let orig_value = self.tree_ref.get(&key);
343
344 let staging_entry = match self.staging.entry(key) {
345 Entry::Occupied(entry) => match entry.into_mut() {
346 BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
347 BTreeMapOp::Delete => return None,
348 },
349 Entry::Vacant(vacant_entry) => {
350 if !orig_contains_key {
351 return None;
352 } else {
353 BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
354 }
355 }
356 };
357 Some(BTreeMapTransactionValueGuard::new(
358 staging_entry,
359 orig_value,
360 ))
361 }
362
363 pub fn insert(&mut self, key: K, value: V) {
364 self.staging.insert(key, BTreeMapOp::Insert(value));
365 }
366
367 pub fn remove(&mut self, key: K) -> Option<V> {
368 if let Some(op) = self.staging.get(&key) {
369 return match op {
370 BTreeMapOp::Delete => None,
371 BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
372 BTreeMapOp::Insert(v) => {
373 self.staging.insert(key, BTreeMapOp::Delete);
374 Some(v)
375 }
376 BTreeMapOp::Delete => {
377 unreachable!(
378 "we have checked that the op of the key is `Insert`, so it's impossible to be Delete"
379 )
380 }
381 },
382 };
383 }
384 match self.tree_ref.get(&key) {
385 Some(orig_value) => {
386 self.staging.insert(key, BTreeMapOp::Delete);
387 Some(orig_value.clone())
388 }
389 None => None,
390 }
391 }
392
393 pub fn commit_memory(mut self) {
394 for (k, op) in self.staging {
396 match op {
397 BTreeMapOp::Insert(v) => {
398 self.tree_ref.insert(k, v);
399 }
400 BTreeMapOp::Delete => {
401 self.tree_ref.remove(&k);
402 }
403 }
404 }
405 }
406}
407
408impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
409 for BTreeMapTransactionInner<K, V, P>
410{
411 fn commit(self) {
412 self.commit_memory();
413 }
414}
415
416impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
417 ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
418where
419 TXN: ConnectionTrait,
420{
421 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
422 for (k, op) in &self.staging {
424 match op {
425 BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
426 BTreeMapOp::Delete => {
427 if let Some(v) = self.tree_ref.get(k) {
428 v.delete_in_transaction(txn).await?;
429 }
430 }
431 }
432 }
433 Ok(())
434 }
435}
436
437pub struct BTreeMapEntryTransaction<'a, K, V> {
439 tree_ref: &'a mut BTreeMap<K, V>,
440 pub key: K,
441 pub new_value: V,
442}
443
444impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
445 pub fn new_insert(
448 tree_ref: &'a mut BTreeMap<K, V>,
449 key: K,
450 value: V,
451 ) -> BTreeMapEntryTransaction<'a, K, V> {
452 BTreeMapEntryTransaction {
453 new_value: value,
454 tree_ref,
455 key,
456 }
457 }
458
459 pub fn new(
466 tree_ref: &'a mut BTreeMap<K, V>,
467 key: K,
468 default_val: Option<V>,
469 ) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
470 tree_ref
471 .get(&key)
472 .cloned()
473 .or(default_val)
474 .map(|orig_value| BTreeMapEntryTransaction {
475 new_value: orig_value,
476 tree_ref,
477 key,
478 })
479 }
480}
481
482impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
483 type Target = V;
484
485 fn deref(&self) -> &Self::Target {
486 &self.new_value
487 }
488}
489
490impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
491 fn deref_mut(&mut self) -> &mut Self::Target {
492 &mut self.new_value
493 }
494}
495
496impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
497 fn commit(self) {
498 self.tree_ref.insert(self.key, self.new_value);
499 }
500}
501
502impl<K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
503 for BTreeMapEntryTransaction<'_, K, V>
504where
505 TXN: ConnectionTrait,
506{
507 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
508 if !self.tree_ref.contains_key(&self.key)
509 || *self.tree_ref.get(&self.key).unwrap() != self.new_value
510 {
511 self.new_value.upsert_in_transaction(txn).await?
512 }
513 Ok(())
514 }
515}
516
517impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
518 fn commit(self) {
519 if let Some(inner) = self {
520 inner.commit();
521 }
522 }
523}
524
525impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
526 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
527 if let Some(inner) = &self {
528 inner.apply_to_txn(txn).await?;
529 }
530 Ok(())
531 }
532}
533
534pub struct DerefMutForward<
535 Inner,
536 Target,
537 P: DerefMut<Target = Inner>,
538 F: Fn(&Inner) -> &Target,
539 FMut: Fn(&mut Inner) -> &mut Target,
540> {
541 ptr: P,
542 f: F,
543 f_mut: FMut,
544}
545
546impl<
547 Inner,
548 Target,
549 P: DerefMut<Target = Inner>,
550 F: Fn(&Inner) -> &Target,
551 FMut: Fn(&mut Inner) -> &mut Target,
552> DerefMutForward<Inner, Target, P, F, FMut>
553{
554 pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
555 Self { ptr, f, f_mut }
556 }
557}
558
559impl<
560 Inner,
561 Target,
562 P: DerefMut<Target = Inner>,
563 F: Fn(&Inner) -> &Target,
564 FMut: Fn(&mut Inner) -> &mut Target,
565> Deref for DerefMutForward<Inner, Target, P, F, FMut>
566{
567 type Target = Target;
568
569 fn deref(&self) -> &Self::Target {
570 (self.f)(&self.ptr)
571 }
572}
573
574impl<
575 Inner,
576 Target,
577 P: DerefMut<Target = Inner>,
578 F: Fn(&Inner) -> &Target,
579 FMut: Fn(&mut Inner) -> &mut Target,
580> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
581{
582 fn deref_mut(&mut self) -> &mut Self::Target {
583 (self.f_mut)(&mut self.ptr)
584 }
585}