risingwave_meta/model/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod error;
16mod stream;
17
18use std::collections::BTreeMap;
19use std::collections::btree_map::{Entry, VacantEntry};
20use std::fmt::Debug;
21use std::ops::{Deref, DerefMut};
22
23use async_trait::async_trait;
24pub use error::*;
25pub use risingwave_common::id::{ActorId, FragmentId, SubscriptionId};
26use sea_orm::ConnectionTrait;
27pub use stream::*;
28use uuid::Uuid;
29
30/// Should be used together with `ActorId` to uniquely identify a dispatcher
31pub type DispatcherId = FragmentId;
32
33#[derive(Clone, Debug)]
34pub struct ClusterId(String);
35
36impl Default for ClusterId {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl ClusterId {
43    pub fn new() -> Self {
44        Self(Uuid::new_v4().to_string())
45    }
46}
47
48impl From<ClusterId> for String {
49    fn from(value: ClusterId) -> Self {
50        value.0
51    }
52}
53
54impl From<String> for ClusterId {
55    fn from(value: String) -> Self {
56        Self(value)
57    }
58}
59
60impl Deref for ClusterId {
61    type Target = str;
62
63    fn deref(&self) -> &Self::Target {
64        self.0.as_str()
65    }
66}
67
68#[async_trait]
69pub trait Transactional<TXN: ConnectionTrait> {
70    async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
71    async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
72}
73
74pub trait InMemValTransaction: Sized {
75    /// Commit the change to local memory value
76    fn commit(self);
77}
78
79/// Trait that wraps a local memory value and applies the change to the local memory value on
80/// `commit` or leaves the local memory value untouched on `abort`.
81pub trait ValTransaction<TXN>: InMemValTransaction {
82    /// Apply the change (upsert or delete) to `txn`
83    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
84}
85
86/// Transaction wrapper for a variable.
87/// In first `deref_mut` call, a copy of the original value will be assigned to `new_value`
88/// and all subsequent modifications will be applied to the `new_value`.
89/// When `commit` is called, the change to `new_value` will be applied to the `orig_value_ref`
90/// When `abort` is called, the `VarTransaction` is dropped and the local memory value is
91/// untouched.
92pub struct VarTransaction<'a, T> {
93    orig_value_ref: &'a mut T,
94    new_value: Option<T>,
95}
96
97impl<'a, T> VarTransaction<'a, T> {
98    /// Create a `VarTransaction` that wraps a raw variable
99    pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
100        VarTransaction {
101            // lazy initialization
102            new_value: None,
103            orig_value_ref: val_ref,
104        }
105    }
106
107    pub fn has_new_value(&self) -> bool {
108        self.new_value.is_some()
109    }
110}
111
112impl<T> Deref for VarTransaction<'_, T> {
113    type Target = T;
114
115    fn deref(&self) -> &Self::Target {
116        match &self.new_value {
117            Some(new_value) => new_value,
118            None => self.orig_value_ref,
119        }
120    }
121}
122
123impl<T: Clone> DerefMut for VarTransaction<'_, T> {
124    fn deref_mut(&mut self) -> &mut Self::Target {
125        if self.new_value.is_none() {
126            self.new_value.replace(self.orig_value_ref.clone());
127        }
128        self.new_value.as_mut().unwrap()
129    }
130}
131
132impl<T> InMemValTransaction for VarTransaction<'_, T>
133where
134    T: PartialEq,
135{
136    fn commit(self) {
137        if let Some(new_value) = self.new_value {
138            *self.orig_value_ref = new_value;
139        }
140    }
141}
142
143impl<TXN, T> ValTransaction<TXN> for VarTransaction<'_, T>
144where
145    T: Transactional<TXN> + PartialEq,
146    TXN: ConnectionTrait,
147{
148    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
149        if let Some(new_value) = &self.new_value {
150            // Apply the change to `txn` only when the value is modified
151            if *self.orig_value_ref != *new_value {
152                new_value.upsert_in_transaction(txn).await
153            } else {
154                Ok(())
155            }
156        } else {
157            Ok(())
158        }
159    }
160}
161
162/// Represent the entry of the `staging` field of a `BTreeMapTransaction`
163enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
164    /// The entry of a key does not exist in the `staging` field yet.
165    Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
166    /// The entry of a key exists in the `staging` field. A mutable reference to the value of the
167    /// staging entry is provided for mutable access.
168    Occupied(&'a mut V),
169}
170
171/// A mutable guard to the value of the corresponding key of a `BTreeMapTransaction`.
172/// The staging value is initialized in a lazy manner, that is, the staging value is only cloned
173/// from the original value only when it's being mutably deref.
174pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
175    // `staging_entry` is always `Some` so it's always safe to unwrap it. We make it `Option` so
176    // that we can take a `Vacant` out, take its ownership, insert value into `VacantEntry` and
177    // insert an `Occupied` back to the `Option`.
178    // If `staging_entry` is `Vacant`, `orig_value` must be Some
179    staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
180    // If the `orig_value` is None, the `staging_entry` must be `Occupied`
181    orig_value: Option<&'a V>,
182}
183
184impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
185    fn new(
186        staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
187        orig_value: Option<&'a V>,
188    ) -> Self {
189        let is_entry_occupied =
190            matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
191        assert!(
192            is_entry_occupied || orig_value.is_some(),
193            "one of staging_entry and orig_value must be non-empty"
194        );
195        Self {
196            staging_entry: Some(staging_entry),
197            orig_value,
198        }
199    }
200}
201
202impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
203    type Target = V;
204
205    fn deref(&self) -> &Self::Target {
206        // Read the staging entry first. If the staging entry is vacant, read the original value
207        match &self.staging_entry.as_ref().unwrap() {
208            BTreeMapTransactionStagingEntry::Vacant(_) => self
209                .orig_value
210                .expect("staging is vacant, so orig_value must be some"),
211            BTreeMapTransactionStagingEntry::Occupied(v) => v,
212        }
213    }
214}
215
216impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
217    fn deref_mut(&mut self) -> &mut Self::Target {
218        let is_occupied = matches!(
219            self.staging_entry.as_ref().unwrap(),
220            BTreeMapTransactionStagingEntry::Occupied(_)
221        );
222
223        // When the staging entry is vacant, take a copy of the original value and insert an entry
224        // into the staging.
225        if !is_occupied {
226            let vacant_entry = match self.staging_entry.take().unwrap() {
227                BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
228                BTreeMapTransactionStagingEntry::Occupied(_) => {
229                    unreachable!("we have previously check that the entry is not occupied")
230                }
231            };
232
233            // Insert a cloned original value to staging through `vacant_entry`
234            let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
235                self.orig_value
236                    .expect("self.staging_entry was vacant, so orig_value must be some")
237                    .clone(),
238            )) {
239                BTreeMapOp::Insert(v) => v,
240                BTreeMapOp::Delete => {
241                    unreachable!(
242                        "the previous inserted op is `Inserted`, so it's not possible to reach Delete"
243                    )
244                }
245            };
246            // Set the staging entry to `Occupied`.
247            let _ = self
248                .staging_entry
249                .insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
250        }
251
252        match self.staging_entry.as_mut().unwrap() {
253            BTreeMapTransactionStagingEntry::Vacant(_) => {
254                unreachable!("we have inserted a cloned original value in case of vacant")
255            }
256            BTreeMapTransactionStagingEntry::Occupied(v) => v,
257        }
258    }
259}
260
261enum BTreeMapOp<V> {
262    Insert(V),
263    Delete,
264}
265
266/// A `ValTransaction` that wraps a `BTreeMap`. It supports basic `BTreeMap` operations like `get`,
267/// `get_mut`, `insert` and `remove`. Incremental modification of `insert`, `remove` and `get_mut`
268/// are stored in `staging`. On `commit`, it will apply the changes stored in `staging` to the in
269/// memory btree map. When serve `get` and `get_mut`, it merges the value stored in `staging` and
270/// `tree_ref`.
271pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
272    /// A reference to the original `BTreeMap`. All access to this field should be immutable,
273    /// except when we commit the staging changes to the original map.
274    tree_ref: P,
275    /// Store all the staging changes that will be applied to the original map on commit
276    staging: BTreeMap<K, BTreeMapOp<V>>,
277}
278
279pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
280
281impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
282    BTreeMapTransactionInner<K, V, P>
283{
284    pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
285        Self {
286            tree_ref,
287            staging: BTreeMap::default(),
288        }
289    }
290
291    /// Start a `BTreeMapEntryTransaction` when the `key` exists
292    #[allow(dead_code)]
293    pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
294        BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
295    }
296
297    /// Start a `BTreeMapEntryTransaction`. If the `key` does not exist, the the `default_val` will
298    /// be taken as the initial value of the transaction and will be applied to the original
299    /// `BTreeMap` on commit.
300    pub fn new_entry_txn_or_default(
301        &mut self,
302        key: K,
303        default_val: V,
304    ) -> BTreeMapEntryTransaction<'_, K, V> {
305        BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
306            .expect("default value is provided and should return `Some`")
307    }
308
309    /// Start a `BTreeMapEntryTransaction` that inserts the `val` into `key`.
310    pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
311        BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
312    }
313
314    pub fn tree_ref(&self) -> &BTreeMap<K, V> {
315        &self.tree_ref
316    }
317
318    /// Get the value of the provided key by merging the staging value and the original value
319    pub fn get(&self, key: &K) -> Option<&V> {
320        self.staging
321            .get(key)
322            .and_then(|op| match op {
323                BTreeMapOp::Insert(v) => Some(v),
324                BTreeMapOp::Delete => None,
325            })
326            .or_else(|| self.tree_ref.get(key))
327    }
328
329    pub fn contains_key(&self, key: &K) -> bool {
330        self.get(key).is_some()
331    }
332
333    /// This method serves the same semantic to the `get_mut` of `BTreeMap`.
334    ///
335    /// It return a `BTreeMapTransactionValueGuard` of the corresponding key for mutable access to
336    /// guarded staging value.
337    ///
338    /// When the value does not exist in the staging (either key not exist or with a Delete record)
339    /// and the value does not exist in the original `BTreeMap`, return None.
340    pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
341        let orig_contains_key = self.tree_ref.contains_key(&key);
342        let orig_value = self.tree_ref.get(&key);
343
344        let staging_entry = match self.staging.entry(key) {
345            Entry::Occupied(entry) => match entry.into_mut() {
346                BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
347                BTreeMapOp::Delete => return None,
348            },
349            Entry::Vacant(vacant_entry) => {
350                if !orig_contains_key {
351                    return None;
352                } else {
353                    BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
354                }
355            }
356        };
357        Some(BTreeMapTransactionValueGuard::new(
358            staging_entry,
359            orig_value,
360        ))
361    }
362
363    pub fn insert(&mut self, key: K, value: V) {
364        self.staging.insert(key, BTreeMapOp::Insert(value));
365    }
366
367    pub fn remove(&mut self, key: K) -> Option<V> {
368        if let Some(op) = self.staging.get(&key) {
369            return match op {
370                BTreeMapOp::Delete => None,
371                BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
372                    BTreeMapOp::Insert(v) => {
373                        self.staging.insert(key, BTreeMapOp::Delete);
374                        Some(v)
375                    }
376                    BTreeMapOp::Delete => {
377                        unreachable!(
378                            "we have checked that the op of the key is `Insert`, so it's impossible to be Delete"
379                        )
380                    }
381                },
382            };
383        }
384        match self.tree_ref.get(&key) {
385            Some(orig_value) => {
386                self.staging.insert(key, BTreeMapOp::Delete);
387                Some(orig_value.clone())
388            }
389            None => None,
390        }
391    }
392
393    pub fn commit_memory(mut self) {
394        // Apply each op stored in the staging to original tree.
395        for (k, op) in self.staging {
396            match op {
397                BTreeMapOp::Insert(v) => {
398                    self.tree_ref.insert(k, v);
399                }
400                BTreeMapOp::Delete => {
401                    self.tree_ref.remove(&k);
402                }
403            }
404        }
405    }
406}
407
408impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
409    for BTreeMapTransactionInner<K, V, P>
410{
411    fn commit(self) {
412        self.commit_memory();
413    }
414}
415
416impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
417    ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
418where
419    TXN: ConnectionTrait,
420{
421    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
422        // Add the staging operation to txn
423        for (k, op) in &self.staging {
424            match op {
425                BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
426                BTreeMapOp::Delete => {
427                    if let Some(v) = self.tree_ref.get(k) {
428                        v.delete_in_transaction(txn).await?;
429                    }
430                }
431            }
432        }
433        Ok(())
434    }
435}
436
437/// Transaction wrapper for a `BTreeMap` entry value of given `key`
438pub struct BTreeMapEntryTransaction<'a, K, V> {
439    tree_ref: &'a mut BTreeMap<K, V>,
440    pub key: K,
441    pub new_value: V,
442}
443
444impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
445    /// Create a `ValTransaction` that wraps a `BTreeMap` entry of the given `key`.
446    /// If the tree does not contain `key`, the `default_val` will be used as the initial value
447    pub fn new_insert(
448        tree_ref: &'a mut BTreeMap<K, V>,
449        key: K,
450        value: V,
451    ) -> BTreeMapEntryTransaction<'a, K, V> {
452        BTreeMapEntryTransaction {
453            new_value: value,
454            tree_ref,
455            key,
456        }
457    }
458
459    /// Create a `BTreeMapEntryTransaction` that wraps a `BTreeMap` entry of the given `key`.
460    /// If the `key` exists in the tree, return `Some` of a `BTreeMapEntryTransaction` wrapped for
461    /// the of the given `key`.
462    /// If the `key` does not exist in the tree but `default_val` is provided as `Some`, a
463    /// `BTreeMapEntryTransaction` that wraps the given `key` and default value is returned
464    /// Otherwise return `None`.
465    pub fn new(
466        tree_ref: &'a mut BTreeMap<K, V>,
467        key: K,
468        default_val: Option<V>,
469    ) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
470        tree_ref
471            .get(&key)
472            .cloned()
473            .or(default_val)
474            .map(|orig_value| BTreeMapEntryTransaction {
475                new_value: orig_value,
476                tree_ref,
477                key,
478            })
479    }
480}
481
482impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
483    type Target = V;
484
485    fn deref(&self) -> &Self::Target {
486        &self.new_value
487    }
488}
489
490impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
491    fn deref_mut(&mut self) -> &mut Self::Target {
492        &mut self.new_value
493    }
494}
495
496impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
497    fn commit(self) {
498        self.tree_ref.insert(self.key, self.new_value);
499    }
500}
501
502impl<K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
503    for BTreeMapEntryTransaction<'_, K, V>
504where
505    TXN: ConnectionTrait,
506{
507    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
508        if !self.tree_ref.contains_key(&self.key)
509            || *self.tree_ref.get(&self.key).unwrap() != self.new_value
510        {
511            self.new_value.upsert_in_transaction(txn).await?
512        }
513        Ok(())
514    }
515}
516
517impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
518    fn commit(self) {
519        if let Some(inner) = self {
520            inner.commit();
521        }
522    }
523}
524
525impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
526    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
527        if let Some(inner) = &self {
528            inner.apply_to_txn(txn).await?;
529        }
530        Ok(())
531    }
532}
533
534pub struct DerefMutForward<
535    Inner,
536    Target,
537    P: DerefMut<Target = Inner>,
538    F: Fn(&Inner) -> &Target,
539    FMut: Fn(&mut Inner) -> &mut Target,
540> {
541    ptr: P,
542    f: F,
543    f_mut: FMut,
544}
545
546impl<
547    Inner,
548    Target,
549    P: DerefMut<Target = Inner>,
550    F: Fn(&Inner) -> &Target,
551    FMut: Fn(&mut Inner) -> &mut Target,
552> DerefMutForward<Inner, Target, P, F, FMut>
553{
554    pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
555        Self { ptr, f, f_mut }
556    }
557}
558
559impl<
560    Inner,
561    Target,
562    P: DerefMut<Target = Inner>,
563    F: Fn(&Inner) -> &Target,
564    FMut: Fn(&mut Inner) -> &mut Target,
565> Deref for DerefMutForward<Inner, Target, P, F, FMut>
566{
567    type Target = Target;
568
569    fn deref(&self) -> &Self::Target {
570        (self.f)(&self.ptr)
571    }
572}
573
574impl<
575    Inner,
576    Target,
577    P: DerefMut<Target = Inner>,
578    F: Fn(&Inner) -> &Target,
579    FMut: Fn(&mut Inner) -> &mut Target,
580> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
581{
582    fn deref_mut(&mut self) -> &mut Self::Target {
583        (self.f_mut)(&mut self.ptr)
584    }
585}