risingwave_frontend/handler/
alter_subscription_retention.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_sqlparser::ast::{
17    Ident, ObjectName, SqlOption, SqlOptionValue, Statement, Value, WithProperties,
18};
19use risingwave_sqlparser::parser::Parser;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::Binder;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::convert_interval_to_u64_seconds;
26
27fn retention_value_to_string(retention: &Value) -> Result<String> {
28    match retention {
29        Value::SingleQuotedString(value) => Ok(value.clone()),
30        Value::CstyleEscapedString(value) => Ok(value.value.clone()),
31        Value::Number(value) => Ok(value.clone()),
32        _ => Err(ErrorCode::InvalidParameterValue(
33            "ALTER SUBSCRIPTION SET RETENTION only supports string or numeric literals".to_owned(),
34        )
35        .into()),
36    }
37}
38
39fn update_retention_in_with_properties(with_properties: &mut WithProperties, retention: Value) {
40    if let Some(option) = with_properties
41        .0
42        .iter_mut()
43        .find(|option| option.name.real_value().eq_ignore_ascii_case("retention"))
44    {
45        option.value = SqlOptionValue::Value(retention);
46        return;
47    }
48
49    with_properties.0.push(SqlOption {
50        name: ObjectName(vec![Ident::new_unchecked("retention")]),
51        value: SqlOptionValue::Value(retention),
52    });
53}
54
55fn update_subscription_definition(definition: &str, retention: Value) -> Result<String> {
56    let mut statements = Parser::parse_sql(definition)?;
57    let statement = match statements.as_mut_slice() {
58        [statement] => statement,
59        _ => {
60            return Err(ErrorCode::InternalError(
61                "Subscription definition should contain a single statement.".to_owned(),
62            )
63            .into());
64        }
65    };
66
67    match statement {
68        Statement::CreateSubscription { stmt } => {
69            update_retention_in_with_properties(&mut stmt.with_properties, retention);
70        }
71        _ => {
72            return Err(ErrorCode::InternalError(
73                "Unexpected statement in subscription definition.".to_owned(),
74            )
75            .into());
76        }
77    }
78
79    Ok(statement.to_string())
80}
81
82pub async fn handle_alter_subscription_retention(
83    handler_args: HandlerArgs,
84    subscription_name: ObjectName,
85    retention: Value,
86) -> Result<RwPgResponse> {
87    let session = handler_args.session;
88    let db_name = &session.database();
89    let (schema_name, real_subscription_name) =
90        Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
91    let search_path = session.config().search_path();
92    let user_name = &session.user_name();
93    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
94
95    let subscription = {
96        let reader = session.env().catalog_reader().read_guard();
97        let (subscription, schema_name) =
98            reader.get_subscription_by_name(db_name, schema_path, &real_subscription_name)?;
99        session.check_privilege_for_drop_alter(schema_name, &**subscription)?;
100        subscription.clone()
101    };
102
103    let retention_value = retention_value_to_string(&retention)?;
104    let retention_seconds = convert_interval_to_u64_seconds(&retention_value)?;
105    let new_definition = update_subscription_definition(&subscription.definition, retention)?;
106
107    let catalog_writer = session.catalog_writer()?;
108    catalog_writer
109        .alter_subscription_retention(subscription.id, retention_seconds, new_definition)
110        .await?;
111
112    Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
113}
114
115#[cfg(test)]
116mod tests {
117    use risingwave_sqlparser::ast::Value;
118
119    use super::update_subscription_definition;
120
121    #[test]
122    fn test_update_subscription_definition() {
123        let definition = "CREATE SUBSCRIPTION sub FROM mv WITH (retention = '1D')";
124        let updated =
125            update_subscription_definition(definition, Value::SingleQuotedString("2H".to_owned()))
126                .unwrap();
127        assert_eq!(
128            updated,
129            "CREATE SUBSCRIPTION sub FROM mv WITH (retention = '2H')"
130        );
131    }
132}