risingwave_meta/model/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod error;
16mod stream;
17
18use std::collections::BTreeMap;
19use std::collections::btree_map::{Entry, VacantEntry};
20use std::fmt::Debug;
21use std::ops::{Deref, DerefMut};
22
23use async_trait::async_trait;
24pub use error::*;
25pub use stream::*;
26use uuid::Uuid;
27
28/// A global, unique identifier of an actor
29pub type ActorId = u32;
30
31/// Should be used together with `ActorId` to uniquely identify a dispatcher
32pub type DispatcherId = u64;
33
34/// A global, unique identifier of a fragment
35pub type FragmentId = u32;
36
37pub type SubscriptionId = u32;
38
39#[derive(Clone, Debug)]
40pub struct ClusterId(String);
41
42impl Default for ClusterId {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl ClusterId {
49    pub fn new() -> Self {
50        Self(Uuid::new_v4().to_string())
51    }
52}
53
54impl From<ClusterId> for String {
55    fn from(value: ClusterId) -> Self {
56        value.0
57    }
58}
59
60impl From<String> for ClusterId {
61    fn from(value: String) -> Self {
62        Self(value)
63    }
64}
65
66impl Deref for ClusterId {
67    type Target = str;
68
69    fn deref(&self) -> &Self::Target {
70        self.0.as_str()
71    }
72}
73
74#[async_trait]
75pub trait Transactional<TXN> {
76    async fn upsert_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
77    async fn delete_in_transaction(&self, trx: &mut TXN) -> MetadataModelResult<()>;
78}
79
80pub trait InMemValTransaction: Sized {
81    /// Commit the change to local memory value
82    fn commit(self);
83}
84
85/// Trait that wraps a local memory value and applies the change to the local memory value on
86/// `commit` or leaves the local memory value untouched on `abort`.
87pub trait ValTransaction<TXN>: InMemValTransaction {
88    /// Apply the change (upsert or delete) to `txn`
89    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()>;
90}
91
92/// Transaction wrapper for a variable.
93/// In first `deref_mut` call, a copy of the original value will be assigned to `new_value`
94/// and all subsequent modifications will be applied to the `new_value`.
95/// When `commit` is called, the change to `new_value` will be applied to the `orig_value_ref`
96/// When `abort` is called, the `VarTransaction` is dropped and the local memory value is
97/// untouched.
98pub struct VarTransaction<'a, T> {
99    orig_value_ref: &'a mut T,
100    new_value: Option<T>,
101}
102
103impl<'a, T> VarTransaction<'a, T> {
104    /// Create a `VarTransaction` that wraps a raw variable
105    pub fn new(val_ref: &'a mut T) -> VarTransaction<'a, T> {
106        VarTransaction {
107            // lazy initialization
108            new_value: None,
109            orig_value_ref: val_ref,
110        }
111    }
112
113    pub fn has_new_value(&self) -> bool {
114        self.new_value.is_some()
115    }
116}
117
118impl<T> Deref for VarTransaction<'_, T> {
119    type Target = T;
120
121    fn deref(&self) -> &Self::Target {
122        match &self.new_value {
123            Some(new_value) => new_value,
124            None => self.orig_value_ref,
125        }
126    }
127}
128
129impl<T: Clone> DerefMut for VarTransaction<'_, T> {
130    fn deref_mut(&mut self) -> &mut Self::Target {
131        if self.new_value.is_none() {
132            self.new_value.replace(self.orig_value_ref.clone());
133        }
134        self.new_value.as_mut().unwrap()
135    }
136}
137
138impl<T> InMemValTransaction for VarTransaction<'_, T>
139where
140    T: PartialEq,
141{
142    fn commit(self) {
143        if let Some(new_value) = self.new_value {
144            *self.orig_value_ref = new_value;
145        }
146    }
147}
148
149impl<TXN, T> ValTransaction<TXN> for VarTransaction<'_, T>
150where
151    T: Transactional<TXN> + PartialEq,
152{
153    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
154        if let Some(new_value) = &self.new_value {
155            // Apply the change to `txn` only when the value is modified
156            if *self.orig_value_ref != *new_value {
157                new_value.upsert_in_transaction(txn).await
158            } else {
159                Ok(())
160            }
161        } else {
162            Ok(())
163        }
164    }
165}
166
167/// Represent the entry of the `staging` field of a `BTreeMapTransaction`
168enum BTreeMapTransactionStagingEntry<'a, K: Ord, V> {
169    /// The entry of a key does not exist in the `staging` field yet.
170    Vacant(VacantEntry<'a, K, BTreeMapOp<V>>),
171    /// The entry of a key exists in the `staging` field. A mutable reference to the value of the
172    /// staging entry is provided for mutable access.
173    Occupied(&'a mut V),
174}
175
176/// A mutable guard to the value of the corresponding key of a `BTreeMapTransaction`.
177/// The staging value is initialized in a lazy manner, that is, the staging value is only cloned
178/// from the original value only when it's being mutably deref.
179pub struct BTreeMapTransactionValueGuard<'a, K: Ord, V: Clone> {
180    // `staging_entry` is always `Some` so it's always safe to unwrap it. We make it `Option` so
181    // that we can take a `Vacant` out, take its ownership, insert value into `VacantEntry` and
182    // insert an `Occupied` back to the `Option`.
183    // If `staging_entry` is `Vacant`, `orig_value` must be Some
184    staging_entry: Option<BTreeMapTransactionStagingEntry<'a, K, V>>,
185    // If the `orig_value` is None, the `staging_entry` must be `Occupied`
186    orig_value: Option<&'a V>,
187}
188
189impl<'a, K: Ord, V: Clone> BTreeMapTransactionValueGuard<'a, K, V> {
190    fn new(
191        staging_entry: BTreeMapTransactionStagingEntry<'a, K, V>,
192        orig_value: Option<&'a V>,
193    ) -> Self {
194        let is_entry_occupied =
195            matches!(staging_entry, BTreeMapTransactionStagingEntry::Occupied(_));
196        assert!(
197            is_entry_occupied || orig_value.is_some(),
198            "one of staging_entry and orig_value must be non-empty"
199        );
200        Self {
201            staging_entry: Some(staging_entry),
202            orig_value,
203        }
204    }
205}
206
207impl<K: Ord, V: Clone> Deref for BTreeMapTransactionValueGuard<'_, K, V> {
208    type Target = V;
209
210    fn deref(&self) -> &Self::Target {
211        // Read the staging entry first. If the staging entry is vacant, read the original value
212        match &self.staging_entry.as_ref().unwrap() {
213            BTreeMapTransactionStagingEntry::Vacant(_) => self
214                .orig_value
215                .expect("staging is vacant, so orig_value must be some"),
216            BTreeMapTransactionStagingEntry::Occupied(v) => v,
217        }
218    }
219}
220
221impl<K: Ord, V: Clone> DerefMut for BTreeMapTransactionValueGuard<'_, K, V> {
222    fn deref_mut(&mut self) -> &mut Self::Target {
223        let is_occupied = matches!(
224            self.staging_entry.as_ref().unwrap(),
225            BTreeMapTransactionStagingEntry::Occupied(_)
226        );
227
228        // When the staging entry is vacant, take a copy of the original value and insert an entry
229        // into the staging.
230        if !is_occupied {
231            let vacant_entry = match self.staging_entry.take().unwrap() {
232                BTreeMapTransactionStagingEntry::Vacant(entry) => entry,
233                BTreeMapTransactionStagingEntry::Occupied(_) => {
234                    unreachable!("we have previously check that the entry is not occupied")
235                }
236            };
237
238            // Insert a cloned original value to staging through `vacant_entry`
239            let new_value_mut_ref = match vacant_entry.insert(BTreeMapOp::Insert(
240                self.orig_value
241                    .expect("self.staging_entry was vacant, so orig_value must be some")
242                    .clone(),
243            )) {
244                BTreeMapOp::Insert(v) => v,
245                BTreeMapOp::Delete => {
246                    unreachable!(
247                        "the previous inserted op is `Inserted`, so it's not possible to reach Delete"
248                    )
249                }
250            };
251            // Set the staging entry to `Occupied`.
252            let _ = self
253                .staging_entry
254                .insert(BTreeMapTransactionStagingEntry::Occupied(new_value_mut_ref));
255        }
256
257        match self.staging_entry.as_mut().unwrap() {
258            BTreeMapTransactionStagingEntry::Vacant(_) => {
259                unreachable!("we have inserted a cloned original value in case of vacant")
260            }
261            BTreeMapTransactionStagingEntry::Occupied(v) => v,
262        }
263    }
264}
265
266enum BTreeMapOp<V> {
267    Insert(V),
268    Delete,
269}
270
271/// A `ValTransaction` that wraps a `BTreeMap`. It supports basic `BTreeMap` operations like `get`,
272/// `get_mut`, `insert` and `remove`. Incremental modification of `insert`, `remove` and `get_mut`
273/// are stored in `staging`. On `commit`, it will apply the changes stored in `staging` to the in
274/// memory btree map. When serve `get` and `get_mut`, it merges the value stored in `staging` and
275/// `tree_ref`.
276pub struct BTreeMapTransactionInner<K: Ord, V, P: DerefMut<Target = BTreeMap<K, V>>> {
277    /// A reference to the original `BTreeMap`. All access to this field should be immutable,
278    /// except when we commit the staging changes to the original map.
279    tree_ref: P,
280    /// Store all the staging changes that will be applied to the original map on commit
281    staging: BTreeMap<K, BTreeMapOp<V>>,
282}
283
284pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner<K, V, &'a mut BTreeMap<K, V>>;
285
286impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>>
287    BTreeMapTransactionInner<K, V, P>
288{
289    pub fn new(tree_ref: P) -> BTreeMapTransactionInner<K, V, P> {
290        Self {
291            tree_ref,
292            staging: BTreeMap::default(),
293        }
294    }
295
296    /// Start a `BTreeMapEntryTransaction` when the `key` exists
297    #[allow(dead_code)]
298    pub fn new_entry_txn(&mut self, key: K) -> Option<BTreeMapEntryTransaction<'_, K, V>> {
299        BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None)
300    }
301
302    /// Start a `BTreeMapEntryTransaction`. If the `key` does not exist, the the `default_val` will
303    /// be taken as the initial value of the transaction and will be applied to the original
304    /// `BTreeMap` on commit.
305    pub fn new_entry_txn_or_default(
306        &mut self,
307        key: K,
308        default_val: V,
309    ) -> BTreeMapEntryTransaction<'_, K, V> {
310        BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val))
311            .expect("default value is provided and should return `Some`")
312    }
313
314    /// Start a `BTreeMapEntryTransaction` that inserts the `val` into `key`.
315    pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> {
316        BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val)
317    }
318
319    pub fn tree_ref(&self) -> &BTreeMap<K, V> {
320        &self.tree_ref
321    }
322
323    /// Get the value of the provided key by merging the staging value and the original value
324    pub fn get(&self, key: &K) -> Option<&V> {
325        self.staging
326            .get(key)
327            .and_then(|op| match op {
328                BTreeMapOp::Insert(v) => Some(v),
329                BTreeMapOp::Delete => None,
330            })
331            .or_else(|| self.tree_ref.get(key))
332    }
333
334    pub fn contains_key(&self, key: &K) -> bool {
335        self.get(key).is_some()
336    }
337
338    /// This method serves the same semantic to the `get_mut` of `BTreeMap`.
339    ///
340    /// It return a `BTreeMapTransactionValueGuard` of the corresponding key for mutable access to
341    /// guarded staging value.
342    ///
343    /// When the value does not exist in the staging (either key not exist or with a Delete record)
344    /// and the value does not exist in the original `BTreeMap`, return None.
345    pub fn get_mut(&mut self, key: K) -> Option<BTreeMapTransactionValueGuard<'_, K, V>> {
346        let orig_contains_key = self.tree_ref.contains_key(&key);
347        let orig_value = self.tree_ref.get(&key);
348
349        let staging_entry = match self.staging.entry(key) {
350            Entry::Occupied(entry) => match entry.into_mut() {
351                BTreeMapOp::Insert(v) => BTreeMapTransactionStagingEntry::Occupied(v),
352                BTreeMapOp::Delete => return None,
353            },
354            Entry::Vacant(vacant_entry) => {
355                if !orig_contains_key {
356                    return None;
357                } else {
358                    BTreeMapTransactionStagingEntry::Vacant(vacant_entry)
359                }
360            }
361        };
362        Some(BTreeMapTransactionValueGuard::new(
363            staging_entry,
364            orig_value,
365        ))
366    }
367
368    pub fn insert(&mut self, key: K, value: V) {
369        self.staging.insert(key, BTreeMapOp::Insert(value));
370    }
371
372    pub fn remove(&mut self, key: K) -> Option<V> {
373        if let Some(op) = self.staging.get(&key) {
374            return match op {
375                BTreeMapOp::Delete => None,
376                BTreeMapOp::Insert(_) => match self.staging.remove(&key).unwrap() {
377                    BTreeMapOp::Insert(v) => {
378                        self.staging.insert(key, BTreeMapOp::Delete);
379                        Some(v)
380                    }
381                    BTreeMapOp::Delete => {
382                        unreachable!(
383                            "we have checked that the op of the key is `Insert`, so it's impossible to be Delete"
384                        )
385                    }
386                },
387            };
388        }
389        match self.tree_ref.get(&key) {
390            Some(orig_value) => {
391                self.staging.insert(key, BTreeMapOp::Delete);
392                Some(orig_value.clone())
393            }
394            None => None,
395        }
396    }
397
398    pub fn commit_memory(mut self) {
399        // Apply each op stored in the staging to original tree.
400        for (k, op) in self.staging {
401            match op {
402                BTreeMapOp::Insert(v) => {
403                    self.tree_ref.insert(k, v);
404                }
405                BTreeMapOp::Delete => {
406                    self.tree_ref.remove(&k);
407                }
408            }
409        }
410    }
411}
412
413impl<K: Ord + Debug, V: Clone, P: DerefMut<Target = BTreeMap<K, V>>> InMemValTransaction
414    for BTreeMapTransactionInner<K, V, P>
415{
416    fn commit(self) {
417        self.commit_memory();
418    }
419}
420
421impl<K: Ord + Debug, V: Transactional<TXN> + Clone, P: DerefMut<Target = BTreeMap<K, V>>, TXN>
422    ValTransaction<TXN> for BTreeMapTransactionInner<K, V, P>
423{
424    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
425        // Add the staging operation to txn
426        for (k, op) in &self.staging {
427            match op {
428                BTreeMapOp::Insert(v) => v.upsert_in_transaction(txn).await?,
429                BTreeMapOp::Delete => {
430                    if let Some(v) = self.tree_ref.get(k) {
431                        v.delete_in_transaction(txn).await?;
432                    }
433                }
434            }
435        }
436        Ok(())
437    }
438}
439
440/// Transaction wrapper for a `BTreeMap` entry value of given `key`
441pub struct BTreeMapEntryTransaction<'a, K, V> {
442    tree_ref: &'a mut BTreeMap<K, V>,
443    pub key: K,
444    pub new_value: V,
445}
446
447impl<'a, K: Ord + Debug, V: Clone> BTreeMapEntryTransaction<'a, K, V> {
448    /// Create a `ValTransaction` that wraps a `BTreeMap` entry of the given `key`.
449    /// If the tree does not contain `key`, the `default_val` will be used as the initial value
450    pub fn new_insert(
451        tree_ref: &'a mut BTreeMap<K, V>,
452        key: K,
453        value: V,
454    ) -> BTreeMapEntryTransaction<'a, K, V> {
455        BTreeMapEntryTransaction {
456            new_value: value,
457            tree_ref,
458            key,
459        }
460    }
461
462    /// Create a `BTreeMapEntryTransaction` that wraps a `BTreeMap` entry of the given `key`.
463    /// If the `key` exists in the tree, return `Some` of a `BTreeMapEntryTransaction` wrapped for
464    /// the of the given `key`.
465    /// If the `key` does not exist in the tree but `default_val` is provided as `Some`, a
466    /// `BTreeMapEntryTransaction` that wraps the given `key` and default value is returned
467    /// Otherwise return `None`.
468    pub fn new(
469        tree_ref: &'a mut BTreeMap<K, V>,
470        key: K,
471        default_val: Option<V>,
472    ) -> Option<BTreeMapEntryTransaction<'a, K, V>> {
473        tree_ref
474            .get(&key)
475            .cloned()
476            .or(default_val)
477            .map(|orig_value| BTreeMapEntryTransaction {
478                new_value: orig_value,
479                tree_ref,
480                key,
481            })
482    }
483}
484
485impl<K, V> Deref for BTreeMapEntryTransaction<'_, K, V> {
486    type Target = V;
487
488    fn deref(&self) -> &Self::Target {
489        &self.new_value
490    }
491}
492
493impl<K, V> DerefMut for BTreeMapEntryTransaction<'_, K, V> {
494    fn deref_mut(&mut self) -> &mut Self::Target {
495        &mut self.new_value
496    }
497}
498
499impl<K: Ord, V: PartialEq> InMemValTransaction for BTreeMapEntryTransaction<'_, K, V> {
500    fn commit(self) {
501        self.tree_ref.insert(self.key, self.new_value);
502    }
503}
504
505impl<K: Ord, V: PartialEq + Transactional<TXN>, TXN> ValTransaction<TXN>
506    for BTreeMapEntryTransaction<'_, K, V>
507{
508    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
509        if !self.tree_ref.contains_key(&self.key)
510            || *self.tree_ref.get(&self.key).unwrap() != self.new_value
511        {
512            self.new_value.upsert_in_transaction(txn).await?
513        }
514        Ok(())
515    }
516}
517
518impl<T: InMemValTransaction> InMemValTransaction for Option<T> {
519    fn commit(self) {
520        if let Some(inner) = self {
521            inner.commit();
522        }
523    }
524}
525
526impl<T: ValTransaction<TXN>, TXN> ValTransaction<TXN> for Option<T> {
527    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
528        if let Some(inner) = &self {
529            inner.apply_to_txn(txn).await?;
530        }
531        Ok(())
532    }
533}
534
535pub struct DerefMutForward<
536    Inner,
537    Target,
538    P: DerefMut<Target = Inner>,
539    F: Fn(&Inner) -> &Target,
540    FMut: Fn(&mut Inner) -> &mut Target,
541> {
542    ptr: P,
543    f: F,
544    f_mut: FMut,
545}
546
547impl<
548    Inner,
549    Target,
550    P: DerefMut<Target = Inner>,
551    F: Fn(&Inner) -> &Target,
552    FMut: Fn(&mut Inner) -> &mut Target,
553> DerefMutForward<Inner, Target, P, F, FMut>
554{
555    pub fn new(ptr: P, f: F, f_mut: FMut) -> Self {
556        Self { ptr, f, f_mut }
557    }
558}
559
560impl<
561    Inner,
562    Target,
563    P: DerefMut<Target = Inner>,
564    F: Fn(&Inner) -> &Target,
565    FMut: Fn(&mut Inner) -> &mut Target,
566> Deref for DerefMutForward<Inner, Target, P, F, FMut>
567{
568    type Target = Target;
569
570    fn deref(&self) -> &Self::Target {
571        (self.f)(&self.ptr)
572    }
573}
574
575impl<
576    Inner,
577    Target,
578    P: DerefMut<Target = Inner>,
579    F: Fn(&Inner) -> &Target,
580    FMut: Fn(&mut Inner) -> &mut Target,
581> DerefMut for DerefMutForward<Inner, Target, P, F, FMut>
582{
583    fn deref_mut(&mut self) -> &mut Self::Target {
584        (self.f_mut)(&mut self.ptr)
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591    use crate::storage::{Operation, Transaction};
592
593    #[derive(PartialEq, Clone, Debug)]
594    struct TestTransactional {
595        key: &'static str,
596        value: &'static str,
597    }
598
599    const TEST_CF: &str = "test-cf";
600
601    #[async_trait]
602    impl Transactional<Transaction> for TestTransactional {
603        async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
604            trx.put(
605                TEST_CF.to_owned(),
606                self.key.as_bytes().into(),
607                self.value.as_bytes().into(),
608            );
609            Ok(())
610        }
611
612        async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
613            trx.delete(TEST_CF.to_owned(), self.key.as_bytes().into());
614            Ok(())
615        }
616    }
617
618    #[tokio::test]
619    async fn test_simple_var_transaction_commit() {
620        let mut kv = TestTransactional {
621            key: "key",
622            value: "original",
623        };
624        let mut num_txn = VarTransaction::new(&mut kv);
625        num_txn.value = "modified";
626        assert_eq!(num_txn.value, "modified");
627        let mut txn = Transaction::default();
628        num_txn.apply_to_txn(&mut txn).await.unwrap();
629        let txn_op = txn.get_operations();
630        assert_eq!(1, txn_op.len());
631        assert!(matches!(
632            &txn_op[0],
633            Operation::Put {
634                cf: _,
635                key: _,
636                value: _
637            }
638        ));
639        assert!(
640            matches!(&txn_op[0], Operation::Put { cf, key, value } if *cf == TEST_CF && key == "key".as_bytes() && value == "modified".as_bytes())
641        );
642        num_txn.commit();
643        assert_eq!("modified", kv.value);
644    }
645
646    #[test]
647    fn test_simple_var_transaction_abort() {
648        let mut kv = TestTransactional {
649            key: "key",
650            value: "original",
651        };
652        let mut num_txn = VarTransaction::new(&mut kv);
653        num_txn.value = "modified";
654        assert_eq!("original", kv.value);
655    }
656
657    #[tokio::test]
658    async fn test_tree_map_transaction_commit() {
659        let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
660        map.insert(
661            "to-remove".to_owned(),
662            TestTransactional {
663                key: "to-remove",
664                value: "to-remove-value",
665            },
666        );
667        map.insert(
668            "to-remove-after-modify".to_owned(),
669            TestTransactional {
670                key: "to-remove-after-modify",
671                value: "to-remove-after-modify-value",
672            },
673        );
674        map.insert(
675            "first".to_owned(),
676            TestTransactional {
677                key: "first",
678                value: "first-orig-value",
679            },
680        );
681
682        let mut map_copy = map.clone();
683        let mut map_txn = BTreeMapTransaction::new(&mut map);
684        map_txn.remove("to-remove".to_owned());
685        map_txn.insert(
686            "to-remove-after-modify".to_owned(),
687            TestTransactional {
688                key: "to-remove-after-modify",
689                value: "to-remove-after-modify-value-modifying",
690            },
691        );
692        map_txn.remove("to-remove-after-modify".to_owned());
693        map_txn.insert(
694            "first".to_owned(),
695            TestTransactional {
696                key: "first",
697                value: "first-value",
698            },
699        );
700        map_txn.insert(
701            "second".to_owned(),
702            TestTransactional {
703                key: "second",
704                value: "second-value",
705            },
706        );
707        assert_eq!(
708            &TestTransactional {
709                key: "second",
710                value: "second-value",
711            },
712            map_txn.get(&"second".to_owned()).unwrap()
713        );
714        map_txn.insert(
715            "third".to_owned(),
716            TestTransactional {
717                key: "third",
718                value: "third-value",
719            },
720        );
721        assert_eq!(
722            &TestTransactional {
723                key: "third",
724                value: "third-value",
725            },
726            map_txn.get(&"third".to_owned()).unwrap()
727        );
728
729        let mut third_entry = map_txn.get_mut("third".to_owned()).unwrap();
730        third_entry.value = "third-value-updated";
731        assert_eq!(
732            &TestTransactional {
733                key: "third",
734                value: "third-value-updated",
735            },
736            map_txn.get(&"third".to_owned()).unwrap()
737        );
738
739        let mut txn = Transaction::default();
740        map_txn.apply_to_txn(&mut txn).await.unwrap();
741        let txn_ops = txn.get_operations();
742        assert_eq!(5, txn_ops.len());
743        for op in txn_ops {
744            match op {
745                Operation::Put { cf, key, value }
746                    if cf == TEST_CF
747                        && key == "first".as_bytes()
748                        && value == "first-value".as_bytes() => {}
749                Operation::Put { cf, key, value }
750                    if cf == TEST_CF
751                        && key == "second".as_bytes()
752                        && value == "second-value".as_bytes() => {}
753                Operation::Put { cf, key, value }
754                    if cf == TEST_CF
755                        && key == "third".as_bytes()
756                        && value == "third-value-updated".as_bytes() => {}
757                Operation::Delete { cf, key } if cf == TEST_CF && key == "to-remove".as_bytes() => {
758                }
759                Operation::Delete { cf, key }
760                    if cf == TEST_CF && key == "to-remove-after-modify".as_bytes() => {}
761                _ => unreachable!("invalid operation"),
762            }
763        }
764        map_txn.commit();
765
766        // replay the change to local copy and compare
767        map_copy.remove("to-remove").unwrap();
768        map_copy.insert(
769            "to-remove-after-modify".to_owned(),
770            TestTransactional {
771                key: "to-remove-after-modify",
772                value: "to-remove-after-modify-value-modifying",
773            },
774        );
775        map_copy.remove("to-remove-after-modify").unwrap();
776        map_copy.insert(
777            "first".to_owned(),
778            TestTransactional {
779                key: "first",
780                value: "first-value",
781            },
782        );
783        map_copy.insert(
784            "second".to_owned(),
785            TestTransactional {
786                key: "second",
787                value: "second-value",
788            },
789        );
790        map_copy.insert(
791            "third".to_owned(),
792            TestTransactional {
793                key: "third",
794                value: "third-value-updated",
795            },
796        );
797        assert_eq!(map_copy, map);
798    }
799
800    #[tokio::test]
801    async fn test_tree_map_entry_update_transaction_commit() {
802        let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
803        map.insert(
804            "first".to_owned(),
805            TestTransactional {
806                key: "first",
807                value: "first-orig-value",
808            },
809        );
810
811        let mut map_txn = BTreeMapTransaction::new(&mut map);
812        let mut first_entry_txn = map_txn.new_entry_txn("first".to_owned()).unwrap();
813        first_entry_txn.value = "first-value";
814        let mut txn = Transaction::default();
815        first_entry_txn.apply_to_txn(&mut txn).await.unwrap();
816        let txn_ops = txn.get_operations();
817        assert_eq!(1, txn_ops.len());
818        assert!(
819            matches!(&txn_ops[0], Operation::Put {cf, key, value} if *cf == TEST_CF && key == "first".as_bytes() && value == "first-value".as_bytes())
820        );
821        first_entry_txn.commit();
822        assert_eq!("first-value", map.get("first").unwrap().value);
823    }
824
825    #[tokio::test]
826    async fn test_tree_map_entry_insert_transaction_commit() {
827        let mut map: BTreeMap<String, TestTransactional> = BTreeMap::new();
828
829        let mut map_txn = BTreeMapTransaction::new(&mut map);
830        let first_entry_txn = map_txn.new_entry_insert_txn(
831            "first".to_owned(),
832            TestTransactional {
833                key: "first",
834                value: "first-value",
835            },
836        );
837        let mut txn = Transaction::default();
838        first_entry_txn.apply_to_txn(&mut txn).await.unwrap();
839        let txn_ops = txn.get_operations();
840        assert_eq!(1, txn_ops.len());
841        assert!(
842            matches!(&txn_ops[0], Operation::Put {cf, key, value} if *cf == TEST_CF && key == "first".as_bytes() && value == "first-value".as_bytes())
843        );
844        first_entry_txn.commit();
845        assert_eq!("first-value", map.get("first").unwrap().value);
846    }
847}