risingwave_meta/storage/
transaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::storage::{ColumnFamily, Key, Value};
16
17/// A `Transaction` executes several writes(aka. operations) to meta store atomically with optional
18/// preconditions checked. It executes as follows:
19/// 1. If all `preconditions` are valid, all `operations` are executed; Otherwise no operation
20///    is executed.
21/// 2. Upon `commit` the transaction, the `TransactionAbort` error will be returned if
22///    any precondition was not met in previous step.
23#[derive(Default)]
24pub struct Transaction {
25    preconditions: Vec<Precondition>,
26    operations: Vec<Operation>,
27}
28
29impl Transaction {
30    /// Check whether the key exists.
31    ///
32    /// The check call will never fail, instead, it will only fail on commit.
33    #[inline(always)]
34    pub fn check_exists(&mut self, cf: ColumnFamily, key: Key) {
35        self.add_precondition(Precondition::KeyExists { cf, key })
36    }
37
38    /// Check whether the key exists.
39    ///
40    /// The check call will never fail, instead, it will only fail on commit.
41    #[inline(always)]
42    pub fn check_equal(&mut self, cf: ColumnFamily, key: Key, value: Value) {
43        self.add_precondition(Precondition::KeyEqual { cf, key, value })
44    }
45
46    /// Put the key/value pair if the preconditions satisfied.
47    #[inline(always)]
48    pub fn put(&mut self, cf: ColumnFamily, key: Key, value: Value) {
49        self.add_operation(Operation::Put { cf, key, value })
50    }
51
52    /// Delete the key if the preconditions satisfied.
53    #[inline(always)]
54    pub fn delete(&mut self, cf: ColumnFamily, key: Key) {
55        self.add_operation(Operation::Delete { cf, key })
56    }
57
58    #[inline(always)]
59    fn add_precondition(&mut self, precondition: Precondition) {
60        self.preconditions.push(precondition)
61    }
62
63    #[inline(always)]
64    fn add_operation(&mut self, operation: Operation) {
65        self.operations.push(operation)
66    }
67
68    /// Add a batch of preconditions.
69    #[inline(always)]
70    pub fn add_preconditions(&mut self, mut preconditions: impl AsMut<Vec<Precondition>>) {
71        self.preconditions.append(preconditions.as_mut());
72    }
73
74    /// Add a batch of operations.
75    #[inline(always)]
76    pub fn add_operations(&mut self, mut operations: impl AsMut<Vec<Operation>>) {
77        self.operations.append(operations.as_mut());
78    }
79
80    #[cfg(test)]
81    pub fn get_operations(&self) -> &Vec<Operation> {
82        &self.operations
83    }
84}
85
86pub enum Operation {
87    /// `put` key value pairs.
88    Put {
89        cf: ColumnFamily,
90        key: Key,
91        value: Value,
92    },
93    /// `delete` key value pairs.
94    Delete { cf: ColumnFamily, key: Key },
95}
96
97/// Preconditions are checked in the beginning of a transaction
98pub enum Precondition {
99    KeyExists {
100        cf: ColumnFamily,
101        key: Key,
102    },
103    KeyEqual {
104        cf: ColumnFamily,
105        key: Key,
106        value: Value,
107    },
108}