risingwave_frontend/handler/
alter_table_drop_connector.rs1use std::collections::HashSet;
16use std::sync::{Arc, LazyLock};
17
18use risingwave_connector::parser::additional_columns::gen_default_addition_col_name;
19use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, Ident};
22
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::source_catalog::SourceCatalog;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_source::SqlColumnStrategy;
27use crate::handler::{
28 HandlerArgs, ObjectName, PgResponse, RwPgResponse, Statement, StatementType,
29 get_replace_table_plan,
30};
31use crate::session::SessionImpl;
32use crate::utils::data_type::DataTypeToAst;
33use crate::utils::options::RETENTION_SECONDS;
34use crate::{Binder, TableCatalog, bind_data_type};
35
36static TABLE_PROPS: LazyLock<HashSet<&str>> =
38 LazyLock::new(|| HashSet::from([COMMIT_CHECKPOINT_INTERVAL, RETENTION_SECONDS]));
39
40fn fetch_schema_info(
41 session: &Arc<SessionImpl>,
42 table_name: ObjectName,
43) -> Result<(Arc<TableCatalog>, Arc<SourceCatalog>)> {
44 let db_name = session.database();
45 let (schema_name, real_table_name) =
46 Binder::resolve_schema_qualified_name(db_name.as_str(), table_name.clone())?;
47 let search_path = session.config().search_path();
48 let user_name = &session.auth_context().user_name;
49
50 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
51 let reader = session.env().catalog_reader().read_guard();
52
53 let (table_def, schema_name) =
54 reader.get_any_table_by_name(db_name.as_str(), schema_path, &real_table_name)?;
55 session.check_privilege_for_drop_alter(schema_name, &**table_def)?;
56
57 let Some(source_id) = table_def.associated_source_id else {
58 return Err(ErrorCode::ProtocolError(format!(
59 "Table {} is not associated with a connector",
60 real_table_name
61 ))
62 .into());
63 };
64 let (source_def, _) =
65 reader.get_source_by_id(db_name.as_str(), schema_path, &source_id.table_id())?;
66 Ok((table_def.clone(), source_def.clone()))
67}
68
69fn rewrite_table_definition(
70 original_table_def: &Arc<TableCatalog>,
71 original_source_def: &Arc<SourceCatalog>,
72 original_statement: Statement,
73) -> Result<Statement> {
74 let Statement::CreateTable {
75 mut columns,
76 include_column_options,
77 or_replace,
78 temporary,
79 if_not_exists,
80 name,
81 wildcard_idx,
82 constraints,
83 mut with_options,
84 append_only,
85 on_conflict,
86 with_version_column,
87 query,
88 engine,
89 ..
90 } = original_statement
91 else {
92 panic!("unexpected statement: {:?}", original_statement);
93 };
94
95 for item in include_column_options.iter().rev() {
97 let col_name = if let Some(col_alias) = &item.column_alias {
98 col_alias.real_value()
99 } else {
100 let data_type = if let Some(dt) = &item.header_inner_expect_type {
101 Some(bind_data_type(dt)?)
102 } else {
103 None
104 };
105 gen_default_addition_col_name(
106 original_source_def.connector_name().as_str(),
107 item.column_type.real_value().as_str(),
108 item.inner_field.as_deref(),
109 data_type.as_ref(),
110 )
111 };
112 if let Some(col_def) = original_table_def
114 .columns
115 .iter()
116 .find(|col_def| col_def.name() == col_name)
117 {
118 columns.push(ColumnDef {
119 name: Ident::from(col_name.as_str()),
120 data_type: Some(col_def.data_type().to_ast()),
121 collation: None,
122 options: vec![],
123 });
124 }
125 }
126
127 let new_statement = Statement::CreateTable {
128 or_replace,
129 temporary,
130 if_not_exists,
131 name: name.clone(),
132 columns: columns.clone(),
133 wildcard_idx,
134 constraints: constraints.clone(),
135 with_options: {
136 with_options.retain(|item| {
137 TABLE_PROPS.contains(item.name.real_value().to_lowercase().as_str())
138 });
139 with_options
140 },
141 format_encode: None,
142 source_watermarks: vec![], append_only,
144 on_conflict,
145 with_version_column,
146 query,
147 cdc_table_info: None,
148 include_column_options: vec![],
149 webhook_info: None,
150 engine,
151 };
152 Ok(new_statement)
153}
154
155pub async fn handle_alter_table_drop_connector(
156 handler_args: HandlerArgs,
157 table_name: ObjectName,
158) -> Result<RwPgResponse> {
159 let session = handler_args.session;
160 let (table_def, source_def) = fetch_schema_info(&session, table_name.clone())?;
161 let original_definition = table_def.create_sql_ast_purified()?;
162
163 let new_statement = rewrite_table_definition(&table_def, &source_def, original_definition)?;
164 let (_, table, graph, col_index_mapping, _) = get_replace_table_plan(
165 &session,
166 table_name,
167 new_statement,
168 &table_def,
169 SqlColumnStrategy::FollowUnchecked,
170 )
171 .await?;
172
173 let catalog_writer = session.catalog_writer()?;
174 catalog_writer
175 .replace_table(
176 None,
177 table,
178 graph,
179 col_index_mapping,
180 TableJobType::General as _,
181 )
182 .await?;
183
184 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
185}