risingwave_frontend/handler/
alter_table_drop_connector.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::{Arc, LazyLock};
17
18use risingwave_connector::parser::additional_columns::gen_default_addition_col_name;
19use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, Ident};
22
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::source_catalog::SourceCatalog;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_source::SqlColumnStrategy;
27use crate::handler::{
28    HandlerArgs, ObjectName, PgResponse, RwPgResponse, Statement, StatementType,
29    get_replace_table_plan,
30};
31use crate::session::SessionImpl;
32use crate::utils::data_type::DataTypeToAst;
33use crate::utils::options::RETENTION_SECONDS;
34use crate::{Binder, TableCatalog, bind_data_type};
35
36// allowed in with clause but irrelevant to connector
37static TABLE_PROPS: LazyLock<HashSet<&str>> =
38    LazyLock::new(|| HashSet::from([COMMIT_CHECKPOINT_INTERVAL, RETENTION_SECONDS]));
39
40fn fetch_schema_info(
41    session: &Arc<SessionImpl>,
42    table_name: ObjectName,
43) -> Result<(Arc<TableCatalog>, Arc<SourceCatalog>)> {
44    let db_name = session.database();
45    let (schema_name, real_table_name) =
46        Binder::resolve_schema_qualified_name(db_name.as_str(), &table_name)?;
47    let search_path = session.config().search_path();
48    let user_name = &session.auth_context().user_name;
49
50    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
51    let reader = session.env().catalog_reader().read_guard();
52
53    let (table_def, schema_name) =
54        reader.get_any_table_by_name(db_name.as_str(), schema_path, &real_table_name)?;
55    session.check_privilege_for_drop_alter(schema_name, &**table_def)?;
56
57    let Some(source_id) = table_def.associated_source_id else {
58        return Err(ErrorCode::ProtocolError(format!(
59            "Table {} is not associated with a connector",
60            real_table_name
61        ))
62        .into());
63    };
64    let (source_def, _) = reader.get_source_by_id(db_name.as_str(), schema_path, source_id)?;
65    Ok((table_def.clone(), source_def.clone()))
66}
67
68fn rewrite_table_definition(
69    original_table_def: &Arc<TableCatalog>,
70    original_source_def: &Arc<SourceCatalog>,
71    original_statement: Statement,
72) -> Result<Statement> {
73    let Statement::CreateTable {
74        mut columns,
75        include_column_options,
76        or_replace,
77        temporary,
78        if_not_exists,
79        name,
80        wildcard_idx,
81        constraints,
82        mut with_options,
83        append_only,
84        on_conflict,
85        with_version_columns,
86        query,
87        engine,
88        ..
89    } = original_statement
90    else {
91        panic!("unexpected statement: {:?}", original_statement);
92    };
93
94    // identical logic with func `handle_addition_columns`, reverse the order to keep the original order of additional columns
95    for item in include_column_options.iter().rev() {
96        let col_name = if let Some(col_alias) = &item.column_alias {
97            col_alias.real_value()
98        } else {
99            let data_type = if let Some(dt) = &item.header_inner_expect_type {
100                Some(bind_data_type(dt)?)
101            } else {
102                None
103            };
104            gen_default_addition_col_name(
105                original_source_def.connector_name().as_str(),
106                item.column_type.real_value().as_str(),
107                item.inner_field.as_deref(),
108                data_type.as_ref(),
109            )
110        };
111        // find the column def in the catalog
112        if let Some(col_def) = original_table_def
113            .columns
114            .iter()
115            .find(|col_def| col_def.name() == col_name)
116        {
117            columns.push(ColumnDef {
118                name: Ident::from(col_name.as_str()),
119                data_type: Some(col_def.data_type().to_ast()),
120                collation: None,
121                options: vec![],
122            });
123        }
124    }
125
126    let new_statement = Statement::CreateTable {
127        or_replace,
128        temporary,
129        if_not_exists,
130        name,
131        columns: columns.clone(),
132        wildcard_idx,
133        constraints,
134        with_options: {
135            with_options.retain(|item| {
136                TABLE_PROPS.contains(item.name.real_value().to_lowercase().as_str())
137            });
138            with_options
139        },
140        format_encode: None,
141        source_watermarks: vec![], // no source, no watermark
142        append_only,
143        on_conflict,
144        with_version_columns,
145        query,
146        cdc_table_info: None,
147        include_column_options: vec![],
148        webhook_info: None,
149        engine,
150    };
151    Ok(new_statement)
152}
153
154pub async fn handle_alter_table_drop_connector(
155    handler_args: HandlerArgs,
156    table_name: ObjectName,
157) -> Result<RwPgResponse> {
158    let session = handler_args.session;
159    let (table_def, source_def) = fetch_schema_info(&session, table_name.clone())?;
160    let original_definition = table_def.create_sql_ast_purified()?;
161
162    let new_statement = rewrite_table_definition(&table_def, &source_def, original_definition)?;
163    let (_, table, graph, _) = get_replace_table_plan(
164        &session,
165        table_name,
166        new_statement,
167        &table_def,
168        SqlColumnStrategy::FollowUnchecked,
169    )
170    .await?;
171
172    let catalog_writer = session.catalog_writer()?;
173    catalog_writer
174        .replace_table(None, table.to_prost(), graph, TableJobType::General as _)
175        .await?;
176
177    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
178}