risingwave_frontend/session/
transaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Weak};
17
18use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
19use risingwave_common::session_config::VisibilityMode;
20use risingwave_hummock_sdk::EpochWithGap;
21
22use super::SessionImpl;
23use crate::catalog::catalog_service::CatalogWriter;
24use crate::error::{ErrorCode, Result};
25use crate::scheduler::ReadSnapshot;
26use crate::user::user_service::UserInfoWriter;
27
28/// Globally unique transaction id in this frontend instance.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
30pub struct Id(u64);
31
32impl Id {
33    /// Creates a new transaction id.
34    #[allow(clippy::new_without_default)]
35    pub fn new() -> Self {
36        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
37        Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
38    }
39}
40
41/// Transaction access mode.
42// TODO: WriteOnly, CreateDdlOnly
43pub enum AccessMode {
44    /// Read-write transaction. All operations are permitted.
45    ///
46    /// Since we cannot handle "read your own writes" in the current implementation, this mode is
47    /// only used for single-statement implicit transactions.
48    ReadWrite,
49
50    /// Read-only transaction. Only read operations are permitted.
51    ///
52    /// All reads (except for the system table) are performed on a consistent snapshot acquired at
53    /// the first read operation in the transaction.
54    ReadOnly,
55}
56
57/// Transaction context.
58pub struct Context {
59    /// The transaction id.
60    id: Id,
61
62    /// The access mode of the transaction, defined by the `START TRANSACTION` and the `SET
63    /// TRANSACTION` statements
64    access_mode: AccessMode,
65
66    /// The snapshot of the transaction, acquired lazily at the first read operation in the
67    /// transaction.
68    snapshot: Option<ReadSnapshot>,
69}
70
71/// Transaction state.
72// TODO: failed state
73#[derive(Default)]
74pub enum State {
75    /// Initial state, used as a placeholder.
76    #[default]
77    Initial,
78
79    /// Implicit single-statement transaction.
80    ///
81    /// Before handling each statement, the session always implicitly starts a transaction with
82    /// this state. The state will be reset to `Initial` after the statement is handled unless
83    /// the user explicitly starts a transaction with `START TRANSACTION`.
84    // TODO: support implicit multi-statement transaction, see [55.2.2.1] Multiple Statements In A
85    // Simple Query @ https://www.postgresql.org/docs/15/protocol-flow.html#id-1.10.6.7.4
86    Implicit(Context),
87
88    /// Explicit transaction started with `START TRANSACTION`.
89    Explicit(Context),
90}
91
92/// A guard that auto commits an implicit transaction when dropped. Do nothing if an explicit
93/// transaction is in progress.
94#[must_use]
95pub struct ImplicitAutoCommitGuard(Weak<Mutex<State>>);
96
97impl Drop for ImplicitAutoCommitGuard {
98    fn drop(&mut self) {
99        if let Some(txn) = self.0.upgrade() {
100            let mut txn = txn.lock();
101            if let State::Implicit(_) = &*txn {
102                *txn = State::Initial;
103            }
104        }
105    }
106}
107
108impl SessionImpl {
109    /// Starts an implicit transaction if there's no explicit transaction in progress. Called at the
110    /// beginning of handling each statement.
111    ///
112    /// Returns a guard that auto commits the implicit transaction when dropped.
113    pub fn txn_begin_implicit(&self) -> ImplicitAutoCommitGuard {
114        let mut txn = self.txn.lock();
115
116        match &*txn {
117            State::Initial => {
118                *txn = State::Implicit(Context {
119                    id: Id::new(),
120                    access_mode: AccessMode::ReadWrite,
121                    snapshot: Default::default(),
122                })
123            }
124            State::Implicit(_) => unreachable!("implicit transaction is already in progress"),
125            State::Explicit(_) => {} /* do nothing since an explicit transaction is already in
126                                      * progress */
127        }
128
129        ImplicitAutoCommitGuard(Arc::downgrade(&self.txn))
130    }
131
132    /// Starts an explicit transaction with the specified access mode from `START TRANSACTION`.
133    pub fn txn_begin_explicit(&self, access_mode: AccessMode) {
134        let mut txn = self.txn.lock();
135
136        match &*txn {
137            // Since an implicit transaction is always started, we only need to upgrade it to an
138            // explicit transaction.
139            State::Initial => unreachable!("no implicit transaction in progress"),
140            State::Implicit(ctx) => {
141                if self.config().visibility_mode() == VisibilityMode::All {
142                    self.notice_to_user(
143                        "`visibility_mode` is set to `All`, and there is no consistency ensured in the transaction",
144                    );
145                }
146                *txn = State::Explicit(Context {
147                    id: ctx.id,
148                    access_mode,
149                    snapshot: ctx.snapshot.clone(),
150                })
151            }
152            State::Explicit(_) => {
153                // TODO: should be warning
154                self.notice_to_user("there is already a transaction in progress")
155            }
156        }
157    }
158
159    /// Commits an explicit transaction.
160    // TODO: handle failed transaction
161    pub fn txn_commit_explicit(&self) {
162        let mut txn = self.txn.lock();
163
164        match &*txn {
165            State::Initial => unreachable!("no transaction in progress"),
166            State::Implicit(_) => {
167                // TODO: should be warning
168                self.notice_to_user("there is no transaction in progress")
169            }
170            State::Explicit(ctx) => match ctx.access_mode {
171                AccessMode::ReadWrite => unimplemented!(),
172                AccessMode::ReadOnly => *txn = State::Initial,
173            },
174        }
175    }
176
177    /// Rollbacks an explicit transaction.
178    // TODO: handle failed transaction
179    pub fn txn_rollback_explicit(&self) {
180        let mut txn = self.txn.lock();
181
182        match &*txn {
183            State::Initial => unreachable!("no transaction in progress"),
184            State::Implicit(_) => {
185                // TODO: should be warning
186                self.notice_to_user("there is no transaction in progress")
187            }
188            State::Explicit(ctx) => match ctx.access_mode {
189                AccessMode::ReadWrite => unimplemented!(),
190                AccessMode::ReadOnly => *txn = State::Initial,
191            },
192        }
193    }
194
195    /// Returns the transaction context.
196    fn txn_ctx(&self) -> MappedMutexGuard<'_, Context> {
197        MutexGuard::map(self.txn.lock(), |txn| match txn {
198            State::Initial => unreachable!("no transaction in progress"),
199            State::Implicit(ctx) => ctx,
200            State::Explicit(ctx) => ctx,
201        })
202    }
203
204    pub fn get_pinned_snapshot(&self) -> Option<ReadSnapshot> {
205        self.txn_ctx().snapshot.clone()
206    }
207
208    /// Unpin snapshot by replacing the snapshot with None.
209    pub fn unpin_snapshot(&self) {
210        self.txn_ctx().snapshot = None;
211    }
212
213    /// Acquires and pins a snapshot for the current transaction.
214    ///
215    /// If a snapshot is already acquired, returns it directly.
216    pub fn pinned_snapshot(&self) -> ReadSnapshot {
217        self.txn_ctx()
218            .snapshot
219            .get_or_insert_with(|| {
220                // query_epoch must be pure epoch
221                let query_epoch = self
222                    .config()
223                    .query_epoch()
224                    .map(|epoch| EpochWithGap::from_u64(epoch.get()).pure_epoch().into());
225
226                if let Some(query_epoch) = query_epoch {
227                    ReadSnapshot::Other(query_epoch)
228                } else if self.is_barrier_read() {
229                    ReadSnapshot::ReadUncommitted
230                } else {
231                    // Acquire hummock snapshot for execution.
232                    let hummock_snapshot_manager = self.env().hummock_snapshot_manager();
233                    let pinned_snapshot = hummock_snapshot_manager.acquire();
234
235                    ReadSnapshot::FrontendPinned {
236                        snapshot: pinned_snapshot,
237                    }
238                }
239            })
240            .clone()
241    }
242}
243
244/// A guard that permits write operations in the current transaction.
245///
246/// Currently, this is required for [`CatalogWriter`] (including all DDLs), [`UserInfoWriter`]
247/// (including `USER` and `GRANT`), and DML operations.
248pub struct WriteGuard {
249    _private: (),
250}
251
252impl SessionImpl {
253    /// Returns a [`WriteGuard`], or an error if write operations are not permitted in the current
254    /// transaction.
255    pub fn txn_write_guard(&self) -> Result<WriteGuard> {
256        match self.txn_ctx().access_mode {
257            AccessMode::ReadWrite => Ok(WriteGuard { _private: () }),
258            AccessMode::ReadOnly => Err(ErrorCode::PermissionDenied(
259                "cannot execute in a read-only transaction".into(),
260            ))?,
261        }
262    }
263
264    /// Returns the catalog writer, if write operations are permitted in the current transaction.
265    pub fn catalog_writer(&self) -> Result<&dyn CatalogWriter> {
266        self.txn_write_guard()
267            .map(|guard| self.env().catalog_writer(guard))
268    }
269
270    /// Returns the user info writer, if write operations are permitted in the current transaction.
271    pub fn user_info_writer(&self) -> Result<&dyn UserInfoWriter> {
272        self.txn_write_guard()
273            .map(|guard| self.env().user_info_writer(guard))
274    }
275}