risingwave_frontend/handler/
transaction.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::bail_not_implemented;
17use risingwave_common::types::Fields;
18use risingwave_sqlparser::ast::{TransactionAccessMode, TransactionMode, Value};
19
20use super::{HandlerArgs, RwPgResponse, RwPgResponseBuilderExt};
21use crate::error::Result;
22use crate::session::transaction::AccessMode;
23
24macro_rules! not_impl {
25 ($body:expr) => {
26 bail_not_implemented!(issue = 10376, "{}", $body)
27 };
28}
29
30pub fn handle_begin(
31 handler_args: HandlerArgs,
32 stmt_type: StatementType,
33 modes: Vec<TransactionMode>,
34) -> Result<RwPgResponse> {
35 let HandlerArgs { session, .. } = handler_args;
36
37 let mut builder = RwPgResponse::builder(stmt_type);
38
39 let access_mode = {
40 let mut access_mode = None;
41 for mode in modes {
42 match mode {
43 TransactionMode::AccessMode(mode) => {
44 let _ = access_mode.replace(mode);
45 }
46 TransactionMode::IsolationLevel(_) => {
47 const MESSAGE: &str = "\
50 Transaction with given Isolation Level is not supported yet.\n\
51 For compatibility, this statement will proceed with RepeatableRead.";
52 builder = builder.notice(MESSAGE);
53 }
54 }
55 }
56
57 match access_mode {
58 Some(TransactionAccessMode::ReadOnly) => AccessMode::ReadOnly,
59 Some(TransactionAccessMode::ReadWrite) | None => {
60 const MESSAGE: &str = "\
65 Read-write transaction is not supported yet. Please specify `READ ONLY` to start a read-only transaction.\n\
66 For compatibility, this statement will still succeed but no transaction is actually started.";
67 builder = builder.notice(MESSAGE);
68 return Ok(builder.into());
69 }
70 }
71 };
72
73 session.txn_begin_explicit(access_mode);
74 Ok(builder.into())
75}
76
77pub async fn handle_commit(
78 handler_args: HandlerArgs,
79 stmt_type: StatementType,
80 chain: bool,
81) -> Result<RwPgResponse> {
82 let HandlerArgs { session, .. } = handler_args;
83
84 if chain {
85 not_impl!("COMMIT AND CHAIN");
86 }
87
88 session.txn_commit_explicit();
89 session.get_cursor_manager().remove_all_query_cursor().await;
90
91 Ok(RwPgResponse::empty_result(stmt_type))
92}
93
94pub async fn handle_rollback(
95 handler_args: HandlerArgs,
96 stmt_type: StatementType,
97 chain: bool,
98) -> Result<RwPgResponse> {
99 let HandlerArgs { session, .. } = handler_args;
100
101 if chain {
102 not_impl!("ROLLBACK AND CHAIN");
103 }
104
105 session.txn_rollback_explicit();
106 session.get_cursor_manager().remove_all_query_cursor().await;
107
108 Ok(RwPgResponse::empty_result(stmt_type))
109}
110
111pub fn handle_set(
112 _handler_args: HandlerArgs,
113 _modes: Vec<TransactionMode>,
114 _snapshot: Option<Value>,
115 _session: bool,
116) -> Result<RwPgResponse> {
117 const MESSAGE: &str = "\
118 `SET TRANSACTION` is not supported yet.\n\
119 For compatibility, this statement will still succeed but no changes are actually made.";
120
121 Ok(RwPgResponse::builder(StatementType::SET_TRANSACTION)
122 .notice(MESSAGE)
123 .into())
124}
125
126#[derive(Fields)]
127#[fields(style = "Title Case")]
128struct ShowVariableRow {
129 name: String,
130}
131
132pub fn handle_show_isolation_level(handler_args: HandlerArgs) -> Result<RwPgResponse> {
133 let config_reader = handler_args.session.config();
134
135 let rows = [ShowVariableRow {
136 name: config_reader.get("transaction_isolation")?,
137 }];
138
139 Ok(RwPgResponse::builder(StatementType::SHOW_VARIABLE)
140 .rows(rows)
141 .into())
142}