risingwave_frontend/handler/
alter_streaming_config.rs1use std::collections::HashMap;
16
17use anyhow::Context;
18use pgwire::pg_response::StatementType;
19use risingwave_sqlparser::ast::{ObjectName, SqlOption, SqlOptionValue, Value as AstValue};
20use toml::Value as TomlValue;
21use toml::map::Map as TomlMap;
22
23use crate::error::{Result, bail_invalid_input_syntax};
24use crate::handler::alter_utils::resolve_streaming_job_id_for_alter;
25use crate::handler::{HandlerArgs, RwPgResponse};
26
27type TomlMapDiff = TomlMap<String, Option<TomlValue>>;
29
30fn collect_options(entries: Vec<SqlOption>) -> Result<TomlMapDiff> {
31 let mut map = TomlMap::new();
32
33 for SqlOption { name, value } in entries {
34 let name = name.real_value();
35 if !name.starts_with("streaming.") {
36 bail_invalid_input_syntax!(
37 "ALTER CONFIG only accepts options starting with `streaming.`"
38 );
39 }
40 let SqlOptionValue::Value(value) = value else {
41 bail_invalid_input_syntax!("ALTER CONFIG only accepts value options");
42 };
43
44 let value = match value {
45 AstValue::Number(n) => {
46 let n: TomlValue = n.parse().context("Invalid number for ALTER CONFIG")?;
47 Some(n)
48 }
49 AstValue::SingleQuotedString(s) | AstValue::DoubleQuotedString(s) => {
50 Some(TomlValue::String(s))
51 }
52 AstValue::Boolean(b) => Some(TomlValue::Boolean(b)),
53 AstValue::Null => None,
54 _ => bail_invalid_input_syntax!("Unsupported value for ALTER CONFIG: {}", value),
55 };
56
57 let old = map.insert(name.clone(), value);
58 if old.is_some() {
59 bail_invalid_input_syntax!("Duplicate option for ALTER CONFIG: {}", name);
60 }
61 }
62
63 Ok(map)
64}
65
66pub async fn handle_alter_streaming_set_config(
67 handler_args: HandlerArgs,
68 obj_name: ObjectName,
69 entries: Vec<SqlOption>,
70 stmt_type: StatementType,
71) -> Result<RwPgResponse> {
72 let session = handler_args.session;
73
74 let job_id = resolve_streaming_job_id_for_alter(&session, obj_name, stmt_type, "config")?;
75 let map_diff = collect_options(entries)?;
76
77 let mut entries_to_add = HashMap::new();
78 let mut keys_to_remove = Vec::new();
79
80 for (k, v) in map_diff {
81 if let Some(v) = v {
82 entries_to_add.insert(k, v.to_string());
83 } else {
84 keys_to_remove.push(k);
85 }
86 }
87
88 let catalog_writer = session.catalog_writer()?;
89 catalog_writer
90 .alter_config(job_id, entries_to_add, keys_to_remove)
91 .await?;
92
93 Ok(RwPgResponse::builder(stmt_type)
94 .notice("ALTER CONFIG requires a RECOVER on the specified streaming job to take effect.")
95 .into())
96}
97
98pub async fn handle_alter_streaming_reset_config(
99 handler_args: HandlerArgs,
100 obj_name: ObjectName,
101 keys: Vec<ObjectName>,
102 stmt_type: StatementType,
103) -> Result<RwPgResponse> {
104 let entries = keys
105 .into_iter()
106 .map(|k| SqlOption {
107 name: k,
108 value: SqlOptionValue::null(),
109 })
110 .collect();
111
112 handle_alter_streaming_set_config(handler_args, obj_name, entries, stmt_type).await
114}