risingwave_meta/storage/transaction.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::storage::{ColumnFamily, Key, Value};
16
17/// A `Transaction` executes several writes(aka. operations) to meta store atomically with optional
18/// preconditions checked. It executes as follows:
19/// 1. If all `preconditions` are valid, all `operations` are executed; Otherwise no operation
20/// is executed.
21/// 2. Upon `commit` the transaction, the `TransactionAbort` error will be returned if
22/// any precondition was not met in previous step.
23#[derive(Default)]
24pub struct Transaction {
25 preconditions: Vec<Precondition>,
26 operations: Vec<Operation>,
27}
28
29impl Transaction {
30 /// Check whether the key exists.
31 ///
32 /// The check call will never fail, instead, it will only fail on commit.
33 #[inline(always)]
34 pub fn check_exists(&mut self, cf: ColumnFamily, key: Key) {
35 self.add_precondition(Precondition::KeyExists { cf, key })
36 }
37
38 /// Check whether the key exists.
39 ///
40 /// The check call will never fail, instead, it will only fail on commit.
41 #[inline(always)]
42 pub fn check_equal(&mut self, cf: ColumnFamily, key: Key, value: Value) {
43 self.add_precondition(Precondition::KeyEqual { cf, key, value })
44 }
45
46 /// Put the key/value pair if the preconditions satisfied.
47 #[inline(always)]
48 pub fn put(&mut self, cf: ColumnFamily, key: Key, value: Value) {
49 self.add_operation(Operation::Put { cf, key, value })
50 }
51
52 /// Delete the key if the preconditions satisfied.
53 #[inline(always)]
54 pub fn delete(&mut self, cf: ColumnFamily, key: Key) {
55 self.add_operation(Operation::Delete { cf, key })
56 }
57
58 #[inline(always)]
59 fn add_precondition(&mut self, precondition: Precondition) {
60 self.preconditions.push(precondition)
61 }
62
63 #[inline(always)]
64 fn add_operation(&mut self, operation: Operation) {
65 self.operations.push(operation)
66 }
67
68 /// Add a batch of preconditions.
69 #[inline(always)]
70 pub fn add_preconditions(&mut self, mut preconditions: impl AsMut<Vec<Precondition>>) {
71 self.preconditions.append(preconditions.as_mut());
72 }
73
74 /// Add a batch of operations.
75 #[inline(always)]
76 pub fn add_operations(&mut self, mut operations: impl AsMut<Vec<Operation>>) {
77 self.operations.append(operations.as_mut());
78 }
79
80 #[cfg(test)]
81 pub fn get_operations(&self) -> &Vec<Operation> {
82 &self.operations
83 }
84}
85
86pub enum Operation {
87 /// `put` key value pairs.
88 Put {
89 cf: ColumnFamily,
90 key: Key,
91 value: Value,
92 },
93 /// `delete` key value pairs.
94 Delete { cf: ColumnFamily, key: Key },
95}
96
97/// Preconditions are checked in the beginning of a transaction
98pub enum Precondition {
99 KeyExists {
100 cf: ColumnFamily,
101 key: Key,
102 },
103 KeyEqual {
104 cf: ColumnFamily,
105 key: Key,
106 value: Value,
107 },
108}