risingwave_frontend/handler/
variable.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use itertools::Itertools;
17use pgwire::pg_field_descriptor::PgFieldDescriptor;
18use pgwire::pg_protocol::ParameterStatus;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::session_config::{ConfigReporter, SESSION_CONFIG_LIST_SEP};
21use risingwave_common::system_param::reader::SystemParamsRead;
22use risingwave_common::types::Fields;
23use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value};
24use risingwave_sqlparser::keywords::Keyword;
25
26use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors};
27use crate::error::Result;
28use crate::handler::HandlerArgs;
29
30/// convert `SetVariableValue` to string while remove the quotes on literals.
31pub(crate) fn set_var_to_param_str(value: &SetVariableValue) -> Option<String> {
32    match value {
33        SetVariableValue::Single(var) => Some(var.to_string_unquoted()),
34        SetVariableValue::List(list) => Some(
35            list.iter()
36                .map(|var| var.to_string_unquoted())
37                .join(SESSION_CONFIG_LIST_SEP),
38        ),
39        SetVariableValue::Default => None,
40    }
41}
42
43pub fn handle_set(
44    handler_args: HandlerArgs,
45    name: Ident,
46    value: SetVariableValue,
47) -> Result<RwPgResponse> {
48    // Strip double and single quotes
49    let mut string_val = set_var_to_param_str(&value);
50
51    let mut status = ParameterStatus::default();
52
53    struct Reporter<'a> {
54        status: &'a mut ParameterStatus,
55    }
56
57    impl ConfigReporter for Reporter<'_> {
58        fn report_status(&mut self, key: &str, new_val: String) {
59            if key == "APPLICATION_NAME" {
60                self.status.application_name = Some(new_val);
61            }
62        }
63    }
64
65    // special handle for streaming parallelism,
66    if name
67        .real_value()
68        .eq_ignore_ascii_case("streaming_parallelism")
69        && string_val
70            .as_ref()
71            .map(|val| val.eq_ignore_ascii_case(Keyword::ADAPTIVE.to_string().as_str()))
72            .unwrap_or(false)
73    {
74        string_val = None;
75    }
76
77    // Currently store the config variable simply as String -> ConfigEntry(String).
78    // In future we can add converter/parser to make the API more robust.
79    // We remark that the name of session parameter is always case-insensitive.
80    handler_args.session.set_config_report(
81        &name.real_value().to_lowercase(),
82        string_val,
83        Reporter {
84            status: &mut status,
85        },
86    )?;
87
88    Ok(PgResponse::builder(StatementType::SET_VARIABLE)
89        .status(status)
90        .into())
91}
92
93pub(super) fn handle_set_time_zone(
94    handler_args: HandlerArgs,
95    value: SetTimeZoneValue,
96) -> Result<RwPgResponse> {
97    let tz_info = match value {
98        SetTimeZoneValue::Local => {
99            iana_time_zone::get_timezone().context("Failed to get local time zone")
100        }
101        SetTimeZoneValue::Default => Ok("UTC".to_owned()),
102        SetTimeZoneValue::Ident(ident) => Ok(ident.real_value()),
103        SetTimeZoneValue::Literal(Value::DoubleQuotedString(s))
104        | SetTimeZoneValue::Literal(Value::SingleQuotedString(s)) => Ok(s),
105        _ => Ok(value.to_string()),
106    }?;
107
108    handler_args.session.set_config("timezone", tz_info)?;
109
110    Ok(PgResponse::empty_result(StatementType::SET_VARIABLE))
111}
112
113pub(super) async fn handle_show(
114    handler_args: HandlerArgs,
115    variable: Vec<Ident>,
116) -> Result<RwPgResponse> {
117    // TODO: Verify that the name used in `show` command is indeed always case-insensitive.
118    let name = variable.iter().map(|e| e.real_value()).join(" ");
119    if name.eq_ignore_ascii_case("PARAMETERS") {
120        handle_show_system_params(handler_args).await
121    } else if name.eq_ignore_ascii_case("ALL") {
122        handle_show_all(handler_args.clone())
123    } else {
124        let config_reader = handler_args.session.config();
125        Ok(PgResponse::builder(StatementType::SHOW_VARIABLE)
126            .rows([ShowVariableRow {
127                name: config_reader.get(&name)?,
128            }])
129            .into())
130    }
131}
132
133fn handle_show_all(handler_args: HandlerArgs) -> Result<RwPgResponse> {
134    let config_reader = handler_args.session.config();
135
136    let all_variables = config_reader.show_all();
137
138    let rows = all_variables.iter().map(|info| ShowVariableAllRow {
139        name: info.name.clone(),
140        setting: info.setting.clone(),
141        description: info.description.clone(),
142    });
143    Ok(PgResponse::builder(StatementType::SHOW_VARIABLE)
144        .rows(rows)
145        .into())
146}
147
148async fn handle_show_system_params(handler_args: HandlerArgs) -> Result<RwPgResponse> {
149    let params = handler_args
150        .session
151        .env()
152        .meta_client()
153        .get_system_params()
154        .await?;
155    let rows = params
156        .get_all()
157        .into_iter()
158        .map(|info| ShowVariableParamsRow {
159            name: info.name.into(),
160            value: info.value,
161            description: info.description.into(),
162            mutable: info.mutable,
163        });
164    Ok(PgResponse::builder(StatementType::SHOW_VARIABLE)
165        .rows(rows)
166        .into())
167}
168
169pub fn infer_show_variable(name: &str) -> Vec<PgFieldDescriptor> {
170    fields_to_descriptors(if name.eq_ignore_ascii_case("ALL") {
171        ShowVariableAllRow::fields()
172    } else if name.eq_ignore_ascii_case("PARAMETERS") {
173        ShowVariableParamsRow::fields()
174    } else {
175        ShowVariableRow::fields()
176    })
177}
178
179#[derive(Fields)]
180#[fields(style = "Title Case")]
181struct ShowVariableRow {
182    name: String,
183}
184
185#[derive(Fields)]
186#[fields(style = "Title Case")]
187struct ShowVariableAllRow {
188    name: String,
189    setting: String,
190    description: String,
191}
192
193#[derive(Fields)]
194#[fields(style = "Title Case")]
195struct ShowVariableParamsRow {
196    name: String,
197    value: String,
198    description: String,
199    mutable: bool,
200}