risingwave_frontend/handler/
transaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::bail_not_implemented;
17use risingwave_common::types::Fields;
18use risingwave_sqlparser::ast::{TransactionAccessMode, TransactionMode, Value};
19
20use super::{HandlerArgs, RwPgResponse, RwPgResponseBuilderExt};
21use crate::error::Result;
22use crate::session::transaction::AccessMode;
23
24macro_rules! not_impl {
25    ($body:expr) => {
26        bail_not_implemented!(issue = 10376, "{}", $body)
27    };
28}
29
30#[expect(clippy::unused_async)]
31pub async fn handle_begin(
32    handler_args: HandlerArgs,
33    stmt_type: StatementType,
34    modes: Vec<TransactionMode>,
35) -> Result<RwPgResponse> {
36    let HandlerArgs { session, .. } = handler_args;
37
38    let mut builder = RwPgResponse::builder(stmt_type);
39
40    let access_mode = {
41        let mut access_mode = None;
42        for mode in modes {
43            match mode {
44                TransactionMode::AccessMode(mode) => {
45                    let _ = access_mode.replace(mode);
46                }
47                TransactionMode::IsolationLevel(_) => {
48                    // Note: This is for compatibility with some external drivers (like postgres_fdw) that
49                    // always start a transaction with an Isolation Level.
50                    const MESSAGE: &str = "\
51                        Transaction with given Isolation Level is not supported yet.\n\
52                        For compatibility, this statement will proceed with RepeatableRead.";
53                    builder = builder.notice(MESSAGE);
54                }
55            }
56        }
57
58        match access_mode {
59            Some(TransactionAccessMode::ReadOnly) => AccessMode::ReadOnly,
60            Some(TransactionAccessMode::ReadWrite) | None => {
61                // Note: This is for compatibility with some external drivers (like psycopg2) that
62                // issue `BEGIN` implicitly for users. Not actually starting a transaction is okay
63                // since `COMMIT` and `ROLLBACK` are no-ops (except for warnings) when there is no
64                // active transaction.
65                const MESSAGE: &str = "\
66                    Read-write transaction is not supported yet. Please specify `READ ONLY` to start a read-only transaction.\n\
67                    For compatibility, this statement will still succeed but no transaction is actually started.";
68                builder = builder.notice(MESSAGE);
69                return Ok(builder.into());
70            }
71        }
72    };
73
74    session.txn_begin_explicit(access_mode);
75    Ok(builder.into())
76}
77
78pub async fn handle_commit(
79    handler_args: HandlerArgs,
80    stmt_type: StatementType,
81    chain: bool,
82) -> Result<RwPgResponse> {
83    let HandlerArgs { session, .. } = handler_args;
84
85    if chain {
86        not_impl!("COMMIT AND CHAIN");
87    }
88
89    session.txn_commit_explicit();
90    session.get_cursor_manager().remove_all_query_cursor().await;
91
92    Ok(RwPgResponse::empty_result(stmt_type))
93}
94
95pub async fn handle_rollback(
96    handler_args: HandlerArgs,
97    stmt_type: StatementType,
98    chain: bool,
99) -> Result<RwPgResponse> {
100    let HandlerArgs { session, .. } = handler_args;
101
102    if chain {
103        not_impl!("ROLLBACK AND CHAIN");
104    }
105
106    session.txn_rollback_explicit();
107    session.get_cursor_manager().remove_all_query_cursor().await;
108
109    Ok(RwPgResponse::empty_result(stmt_type))
110}
111
112#[expect(clippy::unused_async)]
113pub async fn handle_set(
114    _handler_args: HandlerArgs,
115    _modes: Vec<TransactionMode>,
116    _snapshot: Option<Value>,
117    _session: bool,
118) -> Result<RwPgResponse> {
119    const MESSAGE: &str = "\
120        `SET TRANSACTION` is not supported yet.\n\
121        For compatibility, this statement will still succeed but no changes are actually made.";
122
123    Ok(RwPgResponse::builder(StatementType::SET_TRANSACTION)
124        .notice(MESSAGE)
125        .into())
126}
127
128#[derive(Fields)]
129#[fields(style = "Title Case")]
130struct ShowVariableRow {
131    name: String,
132}
133
134pub fn handle_show_isolation_level(handler_args: HandlerArgs) -> Result<RwPgResponse> {
135    let config_reader = handler_args.session.config();
136
137    let rows = [ShowVariableRow {
138        name: config_reader.get("transaction_isolation")?,
139    }];
140
141    Ok(RwPgResponse::builder(StatementType::SHOW_VARIABLE)
142        .rows(rows)
143        .into())
144}