risingwave_frontend/handler/
alter_table_drop_connector.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::{Arc, LazyLock};
17
18use risingwave_connector::parser::additional_columns::gen_default_addition_col_name;
19use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, Ident};
22
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::source_catalog::SourceCatalog;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_source::SqlColumnStrategy;
27use crate::handler::{
28    HandlerArgs, ObjectName, PgResponse, RwPgResponse, Statement, StatementType,
29    get_replace_table_plan,
30};
31use crate::session::SessionImpl;
32use crate::utils::data_type::DataTypeToAst;
33use crate::utils::options::RETENTION_SECONDS;
34use crate::{Binder, TableCatalog, bind_data_type};
35
36// allowed in with clause but irrelevant to connector
37static TABLE_PROPS: LazyLock<HashSet<&str>> =
38    LazyLock::new(|| HashSet::from([COMMIT_CHECKPOINT_INTERVAL, RETENTION_SECONDS]));
39
40fn fetch_schema_info(
41    session: &Arc<SessionImpl>,
42    table_name: ObjectName,
43) -> Result<(Arc<TableCatalog>, Arc<SourceCatalog>)> {
44    let db_name = session.database();
45    let (schema_name, real_table_name) =
46        Binder::resolve_schema_qualified_name(db_name.as_str(), table_name.clone())?;
47    let search_path = session.config().search_path();
48    let user_name = &session.auth_context().user_name;
49
50    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
51    let reader = session.env().catalog_reader().read_guard();
52
53    let (table_def, schema_name) =
54        reader.get_any_table_by_name(db_name.as_str(), schema_path, &real_table_name)?;
55    session.check_privilege_for_drop_alter(schema_name, &**table_def)?;
56
57    let Some(source_id) = table_def.associated_source_id else {
58        return Err(ErrorCode::ProtocolError(format!(
59            "Table {} is not associated with a connector",
60            real_table_name
61        ))
62        .into());
63    };
64    let (source_def, _) =
65        reader.get_source_by_id(db_name.as_str(), schema_path, &source_id.table_id())?;
66    Ok((table_def.clone(), source_def.clone()))
67}
68
69fn rewrite_table_definition(
70    original_table_def: &Arc<TableCatalog>,
71    original_source_def: &Arc<SourceCatalog>,
72    original_statement: Statement,
73) -> Result<Statement> {
74    let Statement::CreateTable {
75        mut columns,
76        include_column_options,
77        or_replace,
78        temporary,
79        if_not_exists,
80        name,
81        wildcard_idx,
82        constraints,
83        mut with_options,
84        append_only,
85        on_conflict,
86        with_version_column,
87        query,
88        engine,
89        ..
90    } = original_statement
91    else {
92        panic!("unexpected statement: {:?}", original_statement);
93    };
94
95    // identical logic with func `handle_addition_columns`, reverse the order to keep the original order of additional columns
96    for item in include_column_options.iter().rev() {
97        let col_name = if let Some(col_alias) = &item.column_alias {
98            col_alias.real_value()
99        } else {
100            let data_type = if let Some(dt) = &item.header_inner_expect_type {
101                Some(bind_data_type(dt)?)
102            } else {
103                None
104            };
105            gen_default_addition_col_name(
106                original_source_def.connector_name().as_str(),
107                item.column_type.real_value().as_str(),
108                item.inner_field.as_deref(),
109                data_type.as_ref(),
110            )
111        };
112        // find the column def in the catalog
113        if let Some(col_def) = original_table_def
114            .columns
115            .iter()
116            .find(|col_def| col_def.name() == col_name)
117        {
118            columns.push(ColumnDef {
119                name: Ident::from(col_name.as_str()),
120                data_type: Some(col_def.data_type().to_ast()),
121                collation: None,
122                options: vec![],
123            });
124        }
125    }
126
127    let new_statement = Statement::CreateTable {
128        or_replace,
129        temporary,
130        if_not_exists,
131        name: name.clone(),
132        columns: columns.clone(),
133        wildcard_idx,
134        constraints: constraints.clone(),
135        with_options: {
136            with_options.retain(|item| {
137                TABLE_PROPS.contains(item.name.real_value().to_lowercase().as_str())
138            });
139            with_options
140        },
141        format_encode: None,
142        source_watermarks: vec![], // no source, no watermark
143        append_only,
144        on_conflict,
145        with_version_column,
146        query,
147        cdc_table_info: None,
148        include_column_options: vec![],
149        webhook_info: None,
150        engine,
151    };
152    Ok(new_statement)
153}
154
155pub async fn handle_alter_table_drop_connector(
156    handler_args: HandlerArgs,
157    table_name: ObjectName,
158) -> Result<RwPgResponse> {
159    let session = handler_args.session;
160    let (table_def, source_def) = fetch_schema_info(&session, table_name.clone())?;
161    let original_definition = table_def.create_sql_ast_purified()?;
162
163    let new_statement = rewrite_table_definition(&table_def, &source_def, original_definition)?;
164    let (_, table, graph, col_index_mapping, _) = get_replace_table_plan(
165        &session,
166        table_name,
167        new_statement,
168        &table_def,
169        SqlColumnStrategy::FollowUnchecked,
170    )
171    .await?;
172
173    let catalog_writer = session.catalog_writer()?;
174    catalog_writer
175        .replace_table(
176            None,
177            table,
178            graph,
179            col_index_mapping,
180            TableJobType::General as _,
181        )
182        .await?;
183
184    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
185}