risingwave_frontend/handler/
variable.rs1use anyhow::Context;
16use itertools::Itertools;
17use pgwire::pg_field_descriptor::PgFieldDescriptor;
18use pgwire::pg_protocol::ParameterStatus;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::session_config::{ConfigReporter, SESSION_CONFIG_LIST_SEP};
21use risingwave_common::system_param::reader::SystemParamsRead;
22use risingwave_common::types::Fields;
23use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value};
24use risingwave_sqlparser::keywords::Keyword;
25
26use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
27use crate::error::Result;
28use crate::handler::HandlerArgs;
29
30pub(crate) fn set_var_to_param_str(value: &SetVariableValue) -> Option<String> {
32 match value {
33 SetVariableValue::Single(var) => Some(var.to_string_unquoted()),
34 SetVariableValue::List(list) => Some(
35 list.iter()
36 .map(|var| var.to_string_unquoted())
37 .join(SESSION_CONFIG_LIST_SEP),
38 ),
39 SetVariableValue::Default => None,
40 }
41}
42
43pub fn handle_set(
44 handler_args: HandlerArgs,
45 name: Ident,
46 value: SetVariableValue,
47) -> Result<RwPgResponse> {
48 let mut string_val = set_var_to_param_str(&value);
50
51 let mut status = ParameterStatus::default();
52
53 struct Reporter<'a> {
54 status: &'a mut ParameterStatus,
55 }
56
57 impl ConfigReporter for Reporter<'_> {
58 fn report_status(&mut self, key: &str, new_val: String) {
59 if key == "APPLICATION_NAME" {
60 self.status.application_name = Some(new_val);
61 }
62 }
63 }
64
65 if name
67 .real_value()
68 .eq_ignore_ascii_case("streaming_parallelism")
69 && string_val
70 .as_ref()
71 .map(|val| val.eq_ignore_ascii_case(Keyword::ADAPTIVE.to_string().as_str()))
72 .unwrap_or(false)
73 {
74 string_val = None;
75 }
76
77 handler_args.session.set_config_report(
81 &name.real_value().to_lowercase(),
82 string_val,
83 Reporter {
84 status: &mut status,
85 },
86 )?;
87
88 Ok(PgResponse::builder(StatementType::SET_VARIABLE)
89 .status(status)
90 .into())
91}
92
93pub(super) fn handle_set_time_zone(
94 handler_args: HandlerArgs,
95 value: SetTimeZoneValue,
96) -> Result<RwPgResponse> {
97 let tz_info = match value {
98 SetTimeZoneValue::Local => {
99 iana_time_zone::get_timezone().context("Failed to get local time zone")
100 }
101 SetTimeZoneValue::Default => Ok("UTC".to_owned()),
102 SetTimeZoneValue::Ident(ident) => Ok(ident.real_value()),
103 SetTimeZoneValue::Literal(Value::DoubleQuotedString(s))
104 | SetTimeZoneValue::Literal(Value::SingleQuotedString(s)) => Ok(s),
105 _ => Ok(value.to_string()),
106 }?;
107
108 handler_args.session.set_config("timezone", tz_info)?;
109
110 Ok(PgResponse::empty_result(StatementType::SET_VARIABLE))
111}
112
113pub(super) async fn handle_show(
114 handler_args: HandlerArgs,
115 variable: Vec<Ident>,
116) -> Result<RwPgResponse> {
117 let name = variable.iter().map(|e| e.real_value()).join(" ");
119 if name.eq_ignore_ascii_case("PARAMETERS") {
120 handle_show_system_params(handler_args).await
121 } else if name.eq_ignore_ascii_case("ALL") {
122 handle_show_all(handler_args.clone())
123 } else {
124 let config_reader = handler_args.session.config();
125 Ok(PgResponse::builder(StatementType::SHOW_VARIABLE)
126 .rows([ShowVariableRow {
127 name: config_reader.get(&name)?,
128 }])
129 .into())
130 }
131}
132
133fn handle_show_all(handler_args: HandlerArgs) -> Result<RwPgResponse> {
134 let config_reader = handler_args.session.config();
135
136 let all_variables = config_reader.show_all();
137
138 let rows = all_variables.iter().map(|info| ShowVariableAllRow {
139 name: info.name.clone(),
140 setting: info.setting.clone(),
141 description: info.description.clone(),
142 });
143 Ok(PgResponse::builder(StatementType::SHOW_VARIABLE)
144 .rows(rows)
145 .into())
146}
147
148async fn handle_show_system_params(handler_args: HandlerArgs) -> Result<RwPgResponse> {
149 let params = handler_args
150 .session
151 .env()
152 .meta_client()
153 .get_system_params()
154 .await?;
155 let rows = params
156 .get_all()
157 .into_iter()
158 .map(|info| ShowVariableParamsRow {
159 name: info.name.into(),
160 value: info.value,
161 description: info.description.into(),
162 mutable: info.mutable,
163 });
164 Ok(PgResponse::builder(StatementType::SHOW_VARIABLE)
165 .rows(rows)
166 .into())
167}
168
169pub fn infer_show_variable(name: &str) -> Vec<PgFieldDescriptor> {
170 fields_to_descriptors(if name.eq_ignore_ascii_case("ALL") {
171 ShowVariableAllRow::fields()
172 } else if name.eq_ignore_ascii_case("PARAMETERS") {
173 ShowVariableParamsRow::fields()
174 } else {
175 ShowVariableRow::fields()
176 })
177}
178
179#[derive(Fields)]
180#[fields(style = "Title Case")]
181struct ShowVariableRow {
182 name: String,
183}
184
185#[derive(Fields)]
186#[fields(style = "Title Case")]
187struct ShowVariableAllRow {
188 name: String,
189 setting: String,
190 description: String,
191}
192
193#[derive(Fields)]
194#[fields(style = "Title Case")]
195struct ShowVariableParamsRow {
196 name: String,
197 value: String,
198 description: String,
199 mutable: bool,
200}