risingwave_meta/model/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod error;
16mod stream;
17
18use std::collections::BTreeMap;
19use std::collections::btree_map::{Entry, VacantEntry};
20use std::fmt::Debug;
21use std::ops::{Deref, DerefMut};
22
23use async_trait::async_trait;
24pub use error::*;
25pub use risingwave_common::id::{ActorId, FragmentId, SubscriptionId};
26pub use stream::*;
27use uuid::Uuid;
28
29/// Should be used together with `ActorId` to uniquely identify a dispatcher
30pub type DispatcherId = u64;
31
32#[derive(Clone, Debug)]
33pub struct ClusterId(String);
34
35impl Default for ClusterId {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl ClusterId {
42    pub fn new() -> Self {
43        Self(Uuid::new_v4().to_string())
44    }
45}
46
47impl From<ClusterId> for String {
48    fn from(value: ClusterId) -> Self {
49        value.0
50    }
51}
52
53impl From<String> for ClusterId {
54    fn from(value: String) -> Self {
55        Self(value)
56    }
57}
58
59impl Deref for ClusterId {
60    type Target = str;
61
62    fn deref(&self) -> &Self::Target {
63        self.0.as_str()
64    }
65}
66
67#[async_trait]
68pub trait Transactional<TXN> {
69    async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
70    async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
71}
72
73pub trait InMemValTransaction: Sized {
74    /// Commit the change to local memory value
75    fn commit(self);
76}
77
78/// Trait that wraps a local memory value and applies the change to the local memory value on
79/// `commit` or leaves the local memory value untouched on `abort`.
80pub trait ValTransaction<TXN>: InMemValTransaction {
81    /// Apply the change (upsert or delete) to `txn`
82    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
83}
84
85/// Transaction wrapper for a variable.
86/// In first `deref_mut` call, a copy of the original value will be assigned to `new_value`
87/// and all subsequent modifications will be applied to the `new_value`.
88/// When `commit` is called, the change to `new_value` will be applied to the `orig_value_ref`
89/// When `abort` is called, the `VarTransaction` is dropped and the local memory value is
90/// untouched.
91pub struct VarTransaction<'a, T> {
92    orig_value_ref: &'a mut T,
93    new_value: Option<T>,
94}
95
96impl<'a, T> VarTransaction<'a, T> {
97    /// Create a `VarTransaction` that wraps a raw variable
98    pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
99        VarTransaction {
100            // lazy initialization
101            new_value: None,
102            orig_value_ref: val_ref,
103        }
104    }
105
106    pub fn has_new_value(&self) -> bool {
107        self.new_value.is_some()
108    }
109}
110
111impl<T> Deref for VarTransaction<'_, T> {
112    type Target = T;
113
114    fn deref(&self) -> &Self::Target {
115        match &self.new_value {
116            Some(new_value) => new_value,
117            None => self.orig_value_ref,
118        }
119    }
120}
121
122impl<T: Clone> DerefMut for VarTransaction<'_, T> {
123    fn deref_mut(&mut self) -> &mut Self::Target {
124        if self.new_value.is_none() {
125            self.new_value.replace(self.orig_value_ref.clone());
126        }
127        self.new_value.as_mut().unwrap()
128    }
129}
130
131impl<T> InMemValTransaction for VarTransaction<'_, T>
132where
133    T: PartialEq,
134{
135    fn commit(self) {
136        if let Some(new_value) = self.new_value {
137            *self.orig_value_ref = new_value;
138        }
139    }
140}
141
142impl<TXN, T> ValTransaction<TXN> for VarTransaction<'_, T>
143where
144    T: Transactional<TXN> + PartialEq,
145{
146    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
147        if let Some(new_value) = &self.new_value {
148            // Apply the change to `txn` only when the value is modified
149            if *self.orig_value_ref != *new_value {
150                new_value.upsert_in_transaction(txn).await
151            } else {
152                Ok(())
153            }
154        } else {
155            Ok(())
156        }
157    }
158}
159
160/// Represent the entry of the `staging` field of a `BTreeMapTransaction`
161enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
162    /// The entry of a key does not exist in the `staging` field yet.
163    Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
164    /// The entry of a key exists in the `staging` field. A mutable reference to the value of the
165    /// staging entry is provided for mutable access.
166    Occupied(&'a mut V),
167}
168
169/// A mutable guard to the value of the corresponding key of a `BTreeMapTransaction`.
170/// The staging value is initialized in a lazy manner, that is, the staging value is only cloned
171/// from the original value only when it's being mutably deref.
172pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
173    // `staging_entry` is always `Some` so it's always safe to unwrap it. We make it `Option` so
174    // that we can take a `Vacant` out, take its ownership, insert value into `VacantEntry` and
175    // insert an `Occupied` back to the `Option`.
176    // If `staging_entry` is `Vacant`, `orig_value` must be Some
177    staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
178    // If the `orig_value` is None, the `staging_entry` must be `Occupied`
179    orig_value: Option<&'a V>,
180}
181
182impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
183    fn new(
184        staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
185        orig_value: Option<&'a V>,
186    ) -> Self {
187        let is_entry_occupied =
188            matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
189        assert!(
190            is_entry_occupied || orig_value.is_some(),
191            "one of staging_entry and orig_value must be non-empty"
192        );
193        Self {
194            staging_entry: Some(staging_entry),
195            orig_value,
196        }
197    }
198}
199
200impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
201    type Target = V;
202
203    fn deref(&self) -> &Self::Target {
204        // Read the staging entry first. If the staging entry is vacant, read the original value
205        match &self.staging_entry.as_ref().unwrap() {
206            BTreeMapTransactionStagingEntry::Vacant(_) => self
207                .orig_value
208                .expect("staging is vacant, so orig_value must be some"),
209            BTreeMapTransactionStagingEntry::Occupied(v) => v,
210        }
211    }
212}
213
214impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
215    fn deref_mut(&mut self) -> &mut Self::Target {
216        let is_occupied = matches!(
217            self.staging_entry.as_ref().unwrap(),
218            BTreeMapTransactionStagingEntry::Occupied(_)
219        );
220
221        // When the staging entry is vacant, take a copy of the original value and insert an entry
222        // into the staging.
223        if !is_occupied {
224            let vacant_entry = match self.staging_entry.take().unwrap() {
225                BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
226                BTreeMapTransactionStagingEntry::Occupied(_) => {
227                    unreachable!("we have previously check that the entry is not occupied")
228                }
229            };
230
231            // Insert a cloned original value to staging through `vacant_entry`
232            let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
233                self.orig_value
234                    .expect("self.staging_entry was vacant, so orig_value must be some")
235                    .clone(),
236            )) {
237                BTreeMapOp::Insert(v) => v,
238                BTreeMapOp::Delete => {
239                    unreachable!(
240                        "the previous inserted op is `Inserted`, so it's not possible to reach Delete"
241                    )
242                }
243            };
244            // Set the staging entry to `Occupied`.
245            let _ = self
246                .staging_entry
247                .insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
248        }
249
250        match self.staging_entry.as_mut().unwrap() {
251            BTreeMapTransactionStagingEntry::Vacant(_) => {
252                unreachable!("we have inserted a cloned original value in case of vacant")
253            }
254            BTreeMapTransactionStagingEntry::Occupied(v) => v,
255        }
256    }
257}
258
259enum BTreeMapOp<V> {
260    Insert(V),
261    Delete,
262}
263
264/// A `ValTransaction` that wraps a `BTreeMap`. It supports basic `BTreeMap` operations like `get`,
265/// `get_mut`, `insert` and `remove`. Incremental modification of `insert`, `remove` and `get_mut`
266/// are stored in `staging`. On `commit`, it will apply the changes stored in `staging` to the in
267/// memory btree map. When serve `get` and `get_mut`, it merges the value stored in `staging` and
268/// `tree_ref`.
269pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
270    /// A reference to the original `BTreeMap`. All access to this field should be immutable,
271    /// except when we commit the staging changes to the original map.
272    tree_ref: P,
273    /// Store all the staging changes that will be applied to the original map on commit
274    staging: BTreeMap<K, BTreeMapOp<V>>,
275}
276
277pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
278
279impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
280    BTreeMapTransactionInner<K, V, P>
281{
282    pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
283        Self {
284            tree_ref,
285            staging: BTreeMap::default(),
286        }
287    }
288
289    /// Start a `BTreeMapEntryTransaction` when the `key` exists
290    #[allow(dead_code)]
291    pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
292        BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
293    }
294
295    /// Start a `BTreeMapEntryTransaction`. If the `key` does not exist, the the `default_val` will
296    /// be taken as the initial value of the transaction and will be applied to the original
297    /// `BTreeMap` on commit.
298    pub fn new_entry_txn_or_default(
299        &mut self,
300        key: K,
301        default_val: V,
302    ) -> BTreeMapEntryTransaction<'_, K, V> {
303        BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
304            .expect("default value is provided and should return `Some`")
305    }
306
307    /// Start a `BTreeMapEntryTransaction` that inserts the `val` into `key`.
308    pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
309        BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
310    }
311
312    pub fn tree_ref(&self) -> &BTreeMap<K, V> {
313        &self.tree_ref
314    }
315
316    /// Get the value of the provided key by merging the staging value and the original value
317    pub fn get(&self, key: &K) -> Option<&V> {
318        self.staging
319            .get(key)
320            .and_then(|op| match op {
321                BTreeMapOp::Insert(v) => Some(v),
322                BTreeMapOp::Delete => None,
323            })
324            .or_else(|| self.tree_ref.get(key))
325    }
326
327    pub fn contains_key(&self, key: &K) -> bool {
328        self.get(key).is_some()
329    }
330
331    /// This method serves the same semantic to the `get_mut` of `BTreeMap`.
332    ///
333    /// It return a `BTreeMapTransactionValueGuard` of the corresponding key for mutable access to
334    /// guarded staging value.
335    ///
336    /// When the value does not exist in the staging (either key not exist or with a Delete record)
337    /// and the value does not exist in the original `BTreeMap`, return None.
338    pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
339        let orig_contains_key = self.tree_ref.contains_key(&key);
340        let orig_value = self.tree_ref.get(&key);
341
342        let staging_entry = match self.staging.entry(key) {
343            Entry::Occupied(entry) => match entry.into_mut() {
344                BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
345                BTreeMapOp::Delete => return None,
346            },
347            Entry::Vacant(vacant_entry) => {
348                if !orig_contains_key {
349                    return None;
350                } else {
351                    BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
352                }
353            }
354        };
355        Some(BTreeMapTransactionValueGuard::new(
356            staging_entry,
357            orig_value,
358        ))
359    }
360
361    pub fn insert(&mut self, key: K, value: V) {
362        self.staging.insert(key, BTreeMapOp::Insert(value));
363    }
364
365    pub fn remove(&mut self, key: K) -> Option<V> {
366        if let Some(op) = self.staging.get(&key) {
367            return match op {
368                BTreeMapOp::Delete => None,
369                BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
370                    BTreeMapOp::Insert(v) => {
371                        self.staging.insert(key, BTreeMapOp::Delete);
372                        Some(v)
373                    }
374                    BTreeMapOp::Delete => {
375                        unreachable!(
376                            "we have checked that the op of the key is `Insert`, so it's impossible to be Delete"
377                        )
378                    }
379                },
380            };
381        }
382        match self.tree_ref.get(&key) {
383            Some(orig_value) => {
384                self.staging.insert(key, BTreeMapOp::Delete);
385                Some(orig_value.clone())
386            }
387            None => None,
388        }
389    }
390
391    pub fn commit_memory(mut self) {
392        // Apply each op stored in the staging to original tree.
393        for (k, op) in self.staging {
394            match op {
395                BTreeMapOp::Insert(v) => {
396                    self.tree_ref.insert(k, v);
397                }
398                BTreeMapOp::Delete => {
399                    self.tree_ref.remove(&k);
400                }
401            }
402        }
403    }
404}
405
406impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
407    for BTreeMapTransactionInner<K, V, P>
408{
409    fn commit(self) {
410        self.commit_memory();
411    }
412}
413
414impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
415    ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
416{
417    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
418        // Add the staging operation to txn
419        for (k, op) in &self.staging {
420            match op {
421                BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
422                BTreeMapOp::Delete => {
423                    if let Some(v) = self.tree_ref.get(k) {
424                        v.delete_in_transaction(txn).await?;
425                    }
426                }
427            }
428        }
429        Ok(())
430    }
431}
432
433/// Transaction wrapper for a `BTreeMap` entry value of given `key`
434pub struct BTreeMapEntryTransaction<'a, K, V> {
435    tree_ref: &'a mut BTreeMap<K, V>,
436    pub key: K,
437    pub new_value: V,
438}
439
440impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
441    /// Create a `ValTransaction` that wraps a `BTreeMap` entry of the given `key`.
442    /// If the tree does not contain `key`, the `default_val` will be used as the initial value
443    pub fn new_insert(
444        tree_ref: &'a mut BTreeMap<K, V>,
445        key: K,
446        value: V,
447    ) -> BTreeMapEntryTransaction<'a, K, V> {
448        BTreeMapEntryTransaction {
449            new_value: value,
450            tree_ref,
451            key,
452        }
453    }
454
455    /// Create a `BTreeMapEntryTransaction` that wraps a `BTreeMap` entry of the given `key`.
456    /// If the `key` exists in the tree, return `Some` of a `BTreeMapEntryTransaction` wrapped for
457    /// the of the given `key`.
458    /// If the `key` does not exist in the tree but `default_val` is provided as `Some`, a
459    /// `BTreeMapEntryTransaction` that wraps the given `key` and default value is returned
460    /// Otherwise return `None`.
461    pub fn new(
462        tree_ref: &'a mut BTreeMap<K, V>,
463        key: K,
464        default_val: Option<V>,
465    ) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
466        tree_ref
467            .get(&key)
468            .cloned()
469            .or(default_val)
470            .map(|orig_value| BTreeMapEntryTransaction {
471                new_value: orig_value,
472                tree_ref,
473                key,
474            })
475    }
476}
477
478impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
479    type Target = V;
480
481    fn deref(&self) -> &Self::Target {
482        &self.new_value
483    }
484}
485
486impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
487    fn deref_mut(&mut self) -> &mut Self::Target {
488        &mut self.new_value
489    }
490}
491
492impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
493    fn commit(self) {
494        self.tree_ref.insert(self.key, self.new_value);
495    }
496}
497
498impl<K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
499    for BTreeMapEntryTransaction<'_, K, V>
500{
501    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
502        if !self.tree_ref.contains_key(&self.key)
503            || *self.tree_ref.get(&self.key).unwrap() != self.new_value
504        {
505            self.new_value.upsert_in_transaction(txn).await?
506        }
507        Ok(())
508    }
509}
510
511impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
512    fn commit(self) {
513        if let Some(inner) = self {
514            inner.commit();
515        }
516    }
517}
518
519impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
520    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
521        if let Some(inner) = &self {
522            inner.apply_to_txn(txn).await?;
523        }
524        Ok(())
525    }
526}
527
528pub struct DerefMutForward<
529    Inner,
530    Target,
531    P: DerefMut<Target = Inner>,
532    F: Fn(&Inner) -> &Target,
533    FMut: Fn(&mut Inner) -> &mut Target,
534> {
535    ptr: P,
536    f: F,
537    f_mut: FMut,
538}
539
540impl<
541    Inner,
542    Target,
543    P: DerefMut<Target = Inner>,
544    F: Fn(&Inner) -> &Target,
545    FMut: Fn(&mut Inner) -> &mut Target,
546> DerefMutForward<Inner, Target, P, F, FMut>
547{
548    pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
549        Self { ptr, f, f_mut }
550    }
551}
552
553impl<
554    Inner,
555    Target,
556    P: DerefMut<Target = Inner>,
557    F: Fn(&Inner) -> &Target,
558    FMut: Fn(&mut Inner) -> &mut Target,
559> Deref for DerefMutForward<Inner, Target, P, F, FMut>
560{
561    type Target = Target;
562
563    fn deref(&self) -> &Self::Target {
564        (self.f)(&self.ptr)
565    }
566}
567
568impl<
569    Inner,
570    Target,
571    P: DerefMut<Target = Inner>,
572    F: Fn(&Inner) -> &Target,
573    FMut: Fn(&mut Inner) -> &mut Target,
574> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
575{
576    fn deref_mut(&mut self) -> &mut Self::Target {
577        (self.f_mut)(&mut self.ptr)
578    }
579}