risingwave_frontend/handler/
transaction.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::bail_not_implemented;
17use risingwave_common::types::Fields;
18use risingwave_sqlparser::ast::{TransactionAccessMode, TransactionMode, Value};
19
20use super::{HandlerArgs, RwPgResponse, RwPgResponseBuilderExt};
21use crate::error::Result;
22use crate::session::transaction::AccessMode;
23
24macro_rules! not_impl {
25 ($body:expr) => {
26 bail_not_implemented!(issue = 10376, "{}", $body)
27 };
28}
29
30#[expect(clippy::unused_async)]
31pub async fn handle_begin(
32 handler_args: HandlerArgs,
33 stmt_type: StatementType,
34 modes: Vec<TransactionMode>,
35) -> Result<RwPgResponse> {
36 let HandlerArgs { session, .. } = handler_args;
37
38 let mut builder = RwPgResponse::builder(stmt_type);
39
40 let access_mode = {
41 let mut access_mode = None;
42 for mode in modes {
43 match mode {
44 TransactionMode::AccessMode(mode) => {
45 let _ = access_mode.replace(mode);
46 }
47 TransactionMode::IsolationLevel(_) => {
48 const MESSAGE: &str = "\
51 Transaction with given Isolation Level is not supported yet.\n\
52 For compatibility, this statement will proceed with RepeatableRead.";
53 builder = builder.notice(MESSAGE);
54 }
55 }
56 }
57
58 match access_mode {
59 Some(TransactionAccessMode::ReadOnly) => AccessMode::ReadOnly,
60 Some(TransactionAccessMode::ReadWrite) | None => {
61 const MESSAGE: &str = "\
66 Read-write transaction is not supported yet. Please specify `READ ONLY` to start a read-only transaction.\n\
67 For compatibility, this statement will still succeed but no transaction is actually started.";
68 builder = builder.notice(MESSAGE);
69 return Ok(builder.into());
70 }
71 }
72 };
73
74 session.txn_begin_explicit(access_mode);
75 Ok(builder.into())
76}
77
78pub async fn handle_commit(
79 handler_args: HandlerArgs,
80 stmt_type: StatementType,
81 chain: bool,
82) -> Result<RwPgResponse> {
83 let HandlerArgs { session, .. } = handler_args;
84
85 if chain {
86 not_impl!("COMMIT AND CHAIN");
87 }
88
89 session.txn_commit_explicit();
90 session.get_cursor_manager().remove_all_query_cursor().await;
91
92 Ok(RwPgResponse::empty_result(stmt_type))
93}
94
95pub async fn handle_rollback(
96 handler_args: HandlerArgs,
97 stmt_type: StatementType,
98 chain: bool,
99) -> Result<RwPgResponse> {
100 let HandlerArgs { session, .. } = handler_args;
101
102 if chain {
103 not_impl!("ROLLBACK AND CHAIN");
104 }
105
106 session.txn_rollback_explicit();
107 session.get_cursor_manager().remove_all_query_cursor().await;
108
109 Ok(RwPgResponse::empty_result(stmt_type))
110}
111
112#[expect(clippy::unused_async)]
113pub async fn handle_set(
114 _handler_args: HandlerArgs,
115 _modes: Vec<TransactionMode>,
116 _snapshot: Option<Value>,
117 _session: bool,
118) -> Result<RwPgResponse> {
119 const MESSAGE: &str = "\
120 `SET TRANSACTION` is not supported yet.\n\
121 For compatibility, this statement will still succeed but no changes are actually made.";
122
123 Ok(RwPgResponse::builder(StatementType::SET_TRANSACTION)
124 .notice(MESSAGE)
125 .into())
126}
127
128#[derive(Fields)]
129#[fields(style = "Title Case")]
130struct ShowVariableRow {
131 name: String,
132}
133
134pub fn handle_show_isolation_level(handler_args: HandlerArgs) -> Result<RwPgResponse> {
135 let config_reader = handler_args.session.config();
136
137 let rows = [ShowVariableRow {
138 name: config_reader.get("transaction_isolation")?,
139 }];
140
141 Ok(RwPgResponse::builder(StatementType::SHOW_VARIABLE)
142 .rows(rows)
143 .into())
144}