risingwave_frontend/handler/
transaction.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::bail_not_implemented;
17use risingwave_common::types::Fields;
18use risingwave_sqlparser::ast::{TransactionAccessMode, TransactionMode, Value};
19
20use super::{HandlerArgs, RwPgResponse, RwPgResponseBuilderExt};
21use crate::error::Result;
22use crate::session::transaction::AccessMode;
23
24macro_rules! not_impl {
25    ($body:expr) => {
26        bail_not_implemented!(issue = 10376, "{}", $body)
27    };
28}
29
30pub fn handle_begin(
31    handler_args: HandlerArgs,
32    stmt_type: StatementType,
33    modes: Vec<TransactionMode>,
34) -> Result<RwPgResponse> {
35    let HandlerArgs { session, .. } = handler_args;
36
37    let mut builder = RwPgResponse::builder(stmt_type);
38
39    let access_mode = {
40        let mut access_mode = None;
41        for mode in modes {
42            match mode {
43                TransactionMode::AccessMode(mode) => {
44                    let _ = access_mode.replace(mode);
45                }
46                TransactionMode::IsolationLevel(_) => {
47                    // Note: This is for compatibility with some external drivers (like postgres_fdw) that
48                    // always start a transaction with an Isolation Level.
49                    const MESSAGE: &str = "\
50                        Transaction with given Isolation Level is not supported yet.\n\
51                        For compatibility, this statement will proceed with RepeatableRead.";
52                    builder = builder.notice(MESSAGE);
53                }
54            }
55        }
56
57        match access_mode {
58            Some(TransactionAccessMode::ReadOnly) => AccessMode::ReadOnly,
59            Some(TransactionAccessMode::ReadWrite) | None => {
60                // Note: This is for compatibility with some external drivers (like psycopg2) that
61                // issue `BEGIN` implicitly for users. Not actually starting a transaction is okay
62                // since `COMMIT` and `ROLLBACK` are no-ops (except for warnings) when there is no
63                // active transaction.
64                const MESSAGE: &str = "\
65                    Read-write transaction is not supported yet. Please specify `READ ONLY` to start a read-only transaction.\n\
66                    For compatibility, this statement will still succeed but no transaction is actually started.";
67                builder = builder.notice(MESSAGE);
68                return Ok(builder.into());
69            }
70        }
71    };
72
73    session.txn_begin_explicit(access_mode);
74    Ok(builder.into())
75}
76
77pub async fn handle_commit(
78    handler_args: HandlerArgs,
79    stmt_type: StatementType,
80    chain: bool,
81) -> Result<RwPgResponse> {
82    let HandlerArgs { session, .. } = handler_args;
83
84    if chain {
85        not_impl!("COMMIT AND CHAIN");
86    }
87
88    session.txn_commit_explicit();
89    session.get_cursor_manager().remove_all_query_cursor().await;
90
91    Ok(RwPgResponse::empty_result(stmt_type))
92}
93
94pub async fn handle_rollback(
95    handler_args: HandlerArgs,
96    stmt_type: StatementType,
97    chain: bool,
98) -> Result<RwPgResponse> {
99    let HandlerArgs { session, .. } = handler_args;
100
101    if chain {
102        not_impl!("ROLLBACK AND CHAIN");
103    }
104
105    session.txn_rollback_explicit();
106    session.get_cursor_manager().remove_all_query_cursor().await;
107
108    Ok(RwPgResponse::empty_result(stmt_type))
109}
110
111pub fn handle_set(
112    _handler_args: HandlerArgs,
113    _modes: Vec<TransactionMode>,
114    _snapshot: Option<Value>,
115    _session: bool,
116) -> Result<RwPgResponse> {
117    const MESSAGE: &str = "\
118        `SET TRANSACTION` is not supported yet.\n\
119        For compatibility, this statement will still succeed but no changes are actually made.";
120
121    Ok(RwPgResponse::builder(StatementType::SET_TRANSACTION)
122        .notice(MESSAGE)
123        .into())
124}
125
126#[derive(Fields)]
127#[fields(style = "Title Case")]
128struct ShowVariableRow {
129    name: String,
130}
131
132pub fn handle_show_isolation_level(handler_args: HandlerArgs) -> Result<RwPgResponse> {
133    let config_reader = handler_args.session.config();
134
135    let rows = [ShowVariableRow {
136        name: config_reader.get("transaction_isolation")?,
137    }];
138
139    Ok(RwPgResponse::builder(StatementType::SHOW_VARIABLE)
140        .rows(rows)
141        .into())
142}