risingwave_frontend/handler/
alter_subscription_retention.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_sqlparser::ast::{
17 Ident, ObjectName, SqlOption, SqlOptionValue, Statement, Value, WithProperties,
18};
19use risingwave_sqlparser::parser::Parser;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::Binder;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::convert_interval_to_u64_seconds;
26
27fn retention_value_to_string(retention: &Value) -> Result<String> {
28 match retention {
29 Value::SingleQuotedString(value) => Ok(value.clone()),
30 Value::CstyleEscapedString(value) => Ok(value.value.clone()),
31 Value::Number(value) => Ok(value.clone()),
32 _ => Err(ErrorCode::InvalidParameterValue(
33 "ALTER SUBSCRIPTION SET RETENTION only supports string or numeric literals".to_owned(),
34 )
35 .into()),
36 }
37}
38
39fn update_retention_in_with_properties(with_properties: &mut WithProperties, retention: Value) {
40 if let Some(option) = with_properties
41 .0
42 .iter_mut()
43 .find(|option| option.name.real_value().eq_ignore_ascii_case("retention"))
44 {
45 option.value = SqlOptionValue::Value(retention);
46 return;
47 }
48
49 with_properties.0.push(SqlOption {
50 name: ObjectName(vec![Ident::new_unchecked("retention")]),
51 value: SqlOptionValue::Value(retention),
52 });
53}
54
55fn update_subscription_definition(definition: &str, retention: Value) -> Result<String> {
56 let mut statements = Parser::parse_sql(definition)?;
57 let statement = match statements.as_mut_slice() {
58 [statement] => statement,
59 _ => {
60 return Err(ErrorCode::InternalError(
61 "Subscription definition should contain a single statement.".to_owned(),
62 )
63 .into());
64 }
65 };
66
67 match statement {
68 Statement::CreateSubscription { stmt } => {
69 update_retention_in_with_properties(&mut stmt.with_properties, retention);
70 }
71 _ => {
72 return Err(ErrorCode::InternalError(
73 "Unexpected statement in subscription definition.".to_owned(),
74 )
75 .into());
76 }
77 }
78
79 Ok(statement.to_string())
80}
81
82pub async fn handle_alter_subscription_retention(
83 handler_args: HandlerArgs,
84 subscription_name: ObjectName,
85 retention: Value,
86) -> Result<RwPgResponse> {
87 let session = handler_args.session;
88 let db_name = &session.database();
89 let (schema_name, real_subscription_name) =
90 Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
91 let search_path = session.config().search_path();
92 let user_name = &session.user_name();
93 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
94
95 let subscription = {
96 let reader = session.env().catalog_reader().read_guard();
97 let (subscription, schema_name) =
98 reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
99 session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
100 subscription.clone()
101 };
102
103 let retention_value = retention_value_to_string(&retention)?;
104 let retention_seconds = convert_interval_to_u64_seconds(&retention_value)?;
105 let new_definition = update_subscription_definition(&subscription.definition, retention)?;
106
107 let catalog_writer = session.catalog_writer()?;
108 catalog_writer
109 .alter_subscription_retention(subscription.id, retention_seconds, new_definition)
110 .await?;
111
112 Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
113}
114
115#[cfg(test)]
116mod tests {
117 use risingwave_sqlparser::ast::Value;
118
119 use super::update_subscription_definition;
120
121 #[test]
122 fn test_update_subscription_definition() {
123 let definition = "CREATE SUBSCRIPTION sub FROM mv WITH (retention = '1D')";
124 let updated =
125 update_subscription_definition(definition, Value::SingleQuotedString("2H".to_owned()))
126 .unwrap();
127 assert_eq!(
128 updated,
129 "CREATE SUBSCRIPTION sub FROM mv WITH (retention = '2H')"
130 );
131 }
132}