risingwave_frontend/handler/
alter_streaming_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Context;
18use pgwire::pg_response::StatementType;
19use risingwave_sqlparser::ast::{ObjectName, SqlOption, SqlOptionValue, Value as AstValue};
20use toml::Value as TomlValue;
21use toml::map::Map as TomlMap;
22
23use crate::error::{Result, bail_invalid_input_syntax};
24use crate::handler::alter_utils::resolve_streaming_job_id_for_alter;
25use crate::handler::{HandlerArgs, RwPgResponse};
26
27/// A diff of a TOML map. `None` means the key should be removed.
28type TomlMapDiff = TomlMap<String, Option<TomlValue>>;
29
30fn collect_options(entries: Vec<SqlOption>) -> Result<TomlMapDiff> {
31    let mut map = TomlMap::new();
32
33    for SqlOption { name, value } in entries {
34        let name = name.real_value();
35        if !name.starts_with("streaming.") {
36            bail_invalid_input_syntax!(
37                "ALTER CONFIG only accepts options starting with `streaming.`"
38            );
39        }
40        let SqlOptionValue::Value(value) = value else {
41            bail_invalid_input_syntax!("ALTER CONFIG only accepts value options");
42        };
43
44        let value = match value {
45            AstValue::Number(n) => {
46                let n: TomlValue = n.parse().context("Invalid number for ALTER CONFIG")?;
47                Some(n)
48            }
49            AstValue::SingleQuotedString(s) | AstValue::DoubleQuotedString(s) => {
50                Some(TomlValue::String(s))
51            }
52            AstValue::Boolean(b) => Some(TomlValue::Boolean(b)),
53            AstValue::Null => None,
54            _ => bail_invalid_input_syntax!("Unsupported value for ALTER CONFIG: {}", value),
55        };
56
57        let old = map.insert(name.clone(), value);
58        if old.is_some() {
59            bail_invalid_input_syntax!("Duplicate option for ALTER CONFIG: {}", name);
60        }
61    }
62
63    Ok(map)
64}
65
66pub async fn handle_alter_streaming_set_config(
67    handler_args: HandlerArgs,
68    obj_name: ObjectName,
69    entries: Vec<SqlOption>,
70    stmt_type: StatementType,
71) -> Result<RwPgResponse> {
72    let session = handler_args.session;
73
74    let job_id = resolve_streaming_job_id_for_alter(&session, obj_name, stmt_type, "config")?;
75    let map_diff = collect_options(entries)?;
76
77    let mut entries_to_add = HashMap::new();
78    let mut keys_to_remove = Vec::new();
79
80    for (k, v) in map_diff {
81        if let Some(v) = v {
82            entries_to_add.insert(k, v.to_string());
83        } else {
84            keys_to_remove.push(k);
85        }
86    }
87
88    let catalog_writer = session.catalog_writer()?;
89    catalog_writer
90        .alter_config(job_id, entries_to_add, keys_to_remove)
91        .await?;
92
93    Ok(RwPgResponse::builder(stmt_type)
94        .notice("ALTER CONFIG requires a RECOVER on the specified streaming job to take effect.")
95        .into())
96}
97
98pub async fn handle_alter_streaming_reset_config(
99    handler_args: HandlerArgs,
100    obj_name: ObjectName,
101    keys: Vec<ObjectName>,
102    stmt_type: StatementType,
103) -> Result<RwPgResponse> {
104    let entries = keys
105        .into_iter()
106        .map(|k| SqlOption {
107            name: k,
108            value: SqlOptionValue::null(),
109        })
110        .collect();
111
112    // Simply delegate to `handle_alter_streaming_set_config` with all values set to `NULL`.
113    handle_alter_streaming_set_config(handler_args, obj_name, entries, stmt_type).await
114}