risingwave_frontend/session/transaction.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Weak};
17
18use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
19use risingwave_common::session_config::VisibilityMode;
20use risingwave_hummock_sdk::EpochWithGap;
21
22use super::SessionImpl;
23use crate::catalog::catalog_service::CatalogWriter;
24use crate::error::{ErrorCode, Result};
25use crate::scheduler::ReadSnapshot;
26use crate::user::user_service::UserInfoWriter;
27
28/// Globally unique transaction id in this frontend instance.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
30pub struct Id(u64);
31
32impl Id {
33 /// Creates a new transaction id.
34 #[allow(clippy::new_without_default)]
35 pub fn new() -> Self {
36 static NEXT_ID: AtomicU64 = AtomicU64::new(0);
37 Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
38 }
39}
40
41/// Transaction access mode.
42// TODO: WriteOnly, CreateDdlOnly
43pub enum AccessMode {
44 /// Read-write transaction. All operations are permitted.
45 ///
46 /// Since we cannot handle "read your own writes" in the current implementation, this mode is
47 /// only used for single-statement implicit transactions.
48 ReadWrite,
49
50 /// Read-only transaction. Only read operations are permitted.
51 ///
52 /// All reads (except for the system table) are performed on a consistent snapshot acquired at
53 /// the first read operation in the transaction.
54 ReadOnly,
55}
56
57/// Transaction context.
58pub struct Context {
59 /// The transaction id.
60 id: Id,
61
62 /// The access mode of the transaction, defined by the `START TRANSACTION` and the `SET
63 /// TRANSACTION` statements
64 access_mode: AccessMode,
65
66 /// The snapshot of the transaction, acquired lazily at the first read operation in the
67 /// transaction.
68 snapshot: Option<ReadSnapshot>,
69}
70
71/// Transaction state.
72// TODO: failed state
73#[derive(Default)]
74pub enum State {
75 /// Initial state, used as a placeholder.
76 #[default]
77 Initial,
78
79 /// Implicit single-statement transaction.
80 ///
81 /// Before handling each statement, the session always implicitly starts a transaction with
82 /// this state. The state will be reset to `Initial` after the statement is handled unless
83 /// the user explicitly starts a transaction with `START TRANSACTION`.
84 // TODO: support implicit multi-statement transaction, see [55.2.2.1] Multiple Statements In A
85 // Simple Query @ https://www.postgresql.org/docs/15/protocol-flow.html#id-1.10.6.7.4
86 Implicit(Context),
87
88 /// Explicit transaction started with `START TRANSACTION`.
89 Explicit(Context),
90}
91
92/// A guard that auto commits an implicit transaction when dropped. Do nothing if an explicit
93/// transaction is in progress.
94#[must_use]
95pub struct ImplicitAutoCommitGuard(Weak<Mutex<State>>);
96
97impl Drop for ImplicitAutoCommitGuard {
98 fn drop(&mut self) {
99 if let Some(txn) = self.0.upgrade() {
100 let mut txn = txn.lock();
101 if let State::Implicit(_) = &*txn {
102 *txn = State::Initial;
103 }
104 }
105 }
106}
107
108impl SessionImpl {
109 /// Starts an implicit transaction if there's no explicit transaction in progress. Called at the
110 /// beginning of handling each statement.
111 ///
112 /// Returns a guard that auto commits the implicit transaction when dropped.
113 pub fn txn_begin_implicit(&self) -> ImplicitAutoCommitGuard {
114 let mut txn = self.txn.lock();
115
116 match &*txn {
117 State::Initial => {
118 *txn = State::Implicit(Context {
119 id: Id::new(),
120 access_mode: AccessMode::ReadWrite,
121 snapshot: Default::default(),
122 })
123 }
124 State::Implicit(_) => unreachable!("implicit transaction is already in progress"),
125 State::Explicit(_) => {} /* do nothing since an explicit transaction is already in
126 * progress */
127 }
128
129 ImplicitAutoCommitGuard(Arc::downgrade(&self.txn))
130 }
131
132 /// Starts an explicit transaction with the specified access mode from `START TRANSACTION`.
133 pub fn txn_begin_explicit(&self, access_mode: AccessMode) {
134 let mut txn = self.txn.lock();
135
136 match &*txn {
137 // Since an implicit transaction is always started, we only need to upgrade it to an
138 // explicit transaction.
139 State::Initial => unreachable!("no implicit transaction in progress"),
140 State::Implicit(ctx) => {
141 if self.config().visibility_mode() == VisibilityMode::All {
142 self.notice_to_user(
143 "`visibility_mode` is set to `All`, and there is no consistency ensured in the transaction",
144 );
145 }
146 *txn = State::Explicit(Context {
147 id: ctx.id,
148 access_mode,
149 snapshot: ctx.snapshot.clone(),
150 })
151 }
152 State::Explicit(_) => {
153 // TODO: should be warning
154 self.notice_to_user("there is already a transaction in progress")
155 }
156 }
157 }
158
159 /// Commits an explicit transaction.
160 // TODO: handle failed transaction
161 pub fn txn_commit_explicit(&self) {
162 let mut txn = self.txn.lock();
163
164 match &*txn {
165 State::Initial => unreachable!("no transaction in progress"),
166 State::Implicit(_) => {
167 // TODO: should be warning
168 self.notice_to_user("there is no transaction in progress")
169 }
170 State::Explicit(ctx) => match ctx.access_mode {
171 AccessMode::ReadWrite => unimplemented!(),
172 AccessMode::ReadOnly => *txn = State::Initial,
173 },
174 }
175 }
176
177 /// Rollbacks an explicit transaction.
178 // TODO: handle failed transaction
179 pub fn txn_rollback_explicit(&self) {
180 let mut txn = self.txn.lock();
181
182 match &*txn {
183 State::Initial => unreachable!("no transaction in progress"),
184 State::Implicit(_) => {
185 // TODO: should be warning
186 self.notice_to_user("there is no transaction in progress")
187 }
188 State::Explicit(ctx) => match ctx.access_mode {
189 AccessMode::ReadWrite => unimplemented!(),
190 AccessMode::ReadOnly => *txn = State::Initial,
191 },
192 }
193 }
194
195 /// Returns the transaction context.
196 fn txn_ctx(&self) -> MappedMutexGuard<'_, Context> {
197 MutexGuard::map(self.txn.lock(), |txn| match txn {
198 State::Initial => unreachable!("no transaction in progress"),
199 State::Implicit(ctx) => ctx,
200 State::Explicit(ctx) => ctx,
201 })
202 }
203
204 pub fn get_pinned_snapshot(&self) -> Option<ReadSnapshot> {
205 self.txn_ctx().snapshot.clone()
206 }
207
208 /// Unpin snapshot by replacing the snapshot with None.
209 pub fn unpin_snapshot(&self) {
210 self.txn_ctx().snapshot = None;
211 }
212
213 /// Acquires and pins a snapshot for the current transaction.
214 ///
215 /// If a snapshot is already acquired, returns it directly.
216 pub fn pinned_snapshot(&self) -> ReadSnapshot {
217 self.txn_ctx()
218 .snapshot
219 .get_or_insert_with(|| {
220 // query_epoch must be pure epoch
221 let query_epoch = self
222 .config()
223 .query_epoch()
224 .map(|epoch| EpochWithGap::from_u64(epoch.get()).pure_epoch().into());
225
226 if let Some(query_epoch) = query_epoch {
227 ReadSnapshot::Other(query_epoch)
228 } else if self.is_barrier_read() {
229 ReadSnapshot::ReadUncommitted
230 } else {
231 // Acquire hummock snapshot for execution.
232 let hummock_snapshot_manager = self.env().hummock_snapshot_manager();
233 let pinned_snapshot = hummock_snapshot_manager.acquire();
234
235 ReadSnapshot::FrontendPinned {
236 snapshot: pinned_snapshot,
237 }
238 }
239 })
240 .clone()
241 }
242}
243
244/// A guard that permits write operations in the current transaction.
245///
246/// Currently, this is required for [`CatalogWriter`] (including all DDLs), [`UserInfoWriter`]
247/// (including `USER` and `GRANT`), and DML operations.
248pub struct WriteGuard {
249 _private: (),
250}
251
252impl SessionImpl {
253 /// Returns a [`WriteGuard`], or an error if write operations are not permitted in the current
254 /// transaction.
255 pub fn txn_write_guard(&self) -> Result<WriteGuard> {
256 match self.txn_ctx().access_mode {
257 AccessMode::ReadWrite => Ok(WriteGuard { _private: () }),
258 AccessMode::ReadOnly => Err(ErrorCode::PermissionDenied(
259 "cannot execute in a read-only transaction".into(),
260 ))?,
261 }
262 }
263
264 /// Returns the catalog writer, if write operations are permitted in the current transaction.
265 pub fn catalog_writer(&self) -> Result<&dyn CatalogWriter> {
266 self.txn_write_guard()
267 .map(|guard| self.env().catalog_writer(guard))
268 }
269
270 /// Returns the user info writer, if write operations are permitted in the current transaction.
271 pub fn user_info_writer(&self) -> Result<&dyn UserInfoWriter> {
272 self.txn_write_guard()
273 .map(|guard| self.env().user_info_writer(guard))
274 }
275}