risingwave_meta/storage/
transaction.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::storage::{ColumnFamily, Key, Value};

/// A `Transaction` executes several writes(aka. operations) to meta store atomically with optional
/// preconditions checked. It executes as follows:
/// 1. If all `preconditions` are valid, all `operations` are executed; Otherwise no operation
///    is executed.
/// 2. Upon `commit` the transaction, the `TransactionAbort` error will be returned if
///    any precondition was not met in previous step.
#[derive(Default)]
pub struct Transaction {
    preconditions: Vec<Precondition>,
    operations: Vec<Operation>,
}

impl Transaction {
    /// Check whether the key exists.
    ///
    /// The check call will never fail, instead, it will only fail on commit.
    #[inline(always)]
    pub fn check_exists(&mut self, cf: ColumnFamily, key: Key) {
        self.add_precondition(Precondition::KeyExists { cf, key })
    }

    /// Check whether the key exists.
    ///
    /// The check call will never fail, instead, it will only fail on commit.
    #[inline(always)]
    pub fn check_equal(&mut self, cf: ColumnFamily, key: Key, value: Value) {
        self.add_precondition(Precondition::KeyEqual { cf, key, value })
    }

    /// Put the key/value pair if the preconditions satisfied.
    #[inline(always)]
    pub fn put(&mut self, cf: ColumnFamily, key: Key, value: Value) {
        self.add_operation(Operation::Put { cf, key, value })
    }

    /// Delete the key if the preconditions satisfied.
    #[inline(always)]
    pub fn delete(&mut self, cf: ColumnFamily, key: Key) {
        self.add_operation(Operation::Delete { cf, key })
    }

    #[inline(always)]
    fn add_precondition(&mut self, precondition: Precondition) {
        self.preconditions.push(precondition)
    }

    #[inline(always)]
    fn add_operation(&mut self, operation: Operation) {
        self.operations.push(operation)
    }

    /// Add a batch of preconditions.
    #[inline(always)]
    pub fn add_preconditions(&mut self, mut preconditions: impl AsMut<Vec<Precondition>>) {
        self.preconditions.append(preconditions.as_mut());
    }

    /// Add a batch of operations.
    #[inline(always)]
    pub fn add_operations(&mut self, mut operations: impl AsMut<Vec<Operation>>) {
        self.operations.append(operations.as_mut());
    }

    #[cfg(test)]
    pub fn get_operations(&self) -> &Vec<Operation> {
        &self.operations
    }
}

pub enum Operation {
    /// `put` key value pairs.
    Put {
        cf: ColumnFamily,
        key: Key,
        value: Value,
    },
    /// `delete` key value pairs.
    Delete { cf: ColumnFamily, key: Key },
}

/// Preconditions are checked in the beginning of a transaction
pub enum Precondition {
    KeyExists {
        cf: ColumnFamily,
        key: Key,
    },
    KeyEqual {
        cf: ColumnFamily,
        key: Key,
        value: Value,
    },
}