risingwave_frontend/handler/
alter_table_drop_connector.rs1use std::collections::HashSet;
16use std::sync::{Arc, LazyLock};
17
18use risingwave_connector::parser::additional_columns::gen_default_addition_col_name;
19use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, Ident};
22
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::source_catalog::SourceCatalog;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_source::SqlColumnStrategy;
27use crate::handler::{
28 HandlerArgs, ObjectName, PgResponse, RwPgResponse, Statement, StatementType,
29 get_replace_table_plan,
30};
31use crate::session::SessionImpl;
32use crate::utils::data_type::DataTypeToAst;
33use crate::utils::options::RETENTION_SECONDS;
34use crate::{Binder, TableCatalog, bind_data_type};
35
36static TABLE_PROPS: LazyLock<HashSet<&str>> =
38 LazyLock::new(|| HashSet::from([COMMIT_CHECKPOINT_INTERVAL, RETENTION_SECONDS]));
39
40fn fetch_schema_info(
41 session: &Arc<SessionImpl>,
42 table_name: ObjectName,
43) -> Result<(Arc<TableCatalog>, Arc<SourceCatalog>)> {
44 let db_name = session.database();
45 let (schema_name, real_table_name) =
46 Binder::resolve_schema_qualified_name(db_name.as_str(), &table_name)?;
47 let search_path = session.config().search_path();
48 let user_name = &session.auth_context().user_name;
49
50 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
51 let reader = session.env().catalog_reader().read_guard();
52
53 let (table_def, schema_name) =
54 reader.get_any_table_by_name(db_name.as_str(), schema_path, &real_table_name)?;
55 session.check_privilege_for_drop_alter(schema_name, &**table_def)?;
56
57 let Some(source_id) = table_def.associated_source_id else {
58 return Err(ErrorCode::ProtocolError(format!(
59 "Table {} is not associated with a connector",
60 real_table_name
61 ))
62 .into());
63 };
64 let (source_def, _) = reader.get_source_by_id(db_name.as_str(), schema_path, source_id)?;
65 Ok((table_def.clone(), source_def.clone()))
66}
67
68fn rewrite_table_definition(
69 original_table_def: &Arc<TableCatalog>,
70 original_source_def: &Arc<SourceCatalog>,
71 original_statement: Statement,
72) -> Result<Statement> {
73 let Statement::CreateTable {
74 mut columns,
75 include_column_options,
76 or_replace,
77 temporary,
78 if_not_exists,
79 name,
80 wildcard_idx,
81 constraints,
82 mut with_options,
83 append_only,
84 on_conflict,
85 with_version_columns,
86 query,
87 engine,
88 ..
89 } = original_statement
90 else {
91 panic!("unexpected statement: {:?}", original_statement);
92 };
93
94 for item in include_column_options.iter().rev() {
96 let col_name = if let Some(col_alias) = &item.column_alias {
97 col_alias.real_value()
98 } else {
99 let data_type = if let Some(dt) = &item.header_inner_expect_type {
100 Some(bind_data_type(dt)?)
101 } else {
102 None
103 };
104 gen_default_addition_col_name(
105 original_source_def.connector_name().as_str(),
106 item.column_type.real_value().as_str(),
107 item.inner_field.as_deref(),
108 data_type.as_ref(),
109 )
110 };
111 if let Some(col_def) = original_table_def
113 .columns
114 .iter()
115 .find(|col_def| col_def.name() == col_name)
116 {
117 columns.push(ColumnDef {
118 name: Ident::from(col_name.as_str()),
119 data_type: Some(col_def.data_type().to_ast()),
120 collation: None,
121 options: vec![],
122 });
123 }
124 }
125
126 let new_statement = Statement::CreateTable {
127 or_replace,
128 temporary,
129 if_not_exists,
130 name,
131 columns: columns.clone(),
132 wildcard_idx,
133 constraints,
134 with_options: {
135 with_options.retain(|item| {
136 TABLE_PROPS.contains(item.name.real_value().to_lowercase().as_str())
137 });
138 with_options
139 },
140 format_encode: None,
141 source_watermarks: vec![], append_only,
143 on_conflict,
144 with_version_columns,
145 query,
146 cdc_table_info: None,
147 include_column_options: vec![],
148 webhook_info: None,
149 engine,
150 };
151 Ok(new_statement)
152}
153
154pub async fn handle_alter_table_drop_connector(
155 handler_args: HandlerArgs,
156 table_name: ObjectName,
157) -> Result<RwPgResponse> {
158 let session = handler_args.session;
159 let (table_def, source_def) = fetch_schema_info(&session, table_name.clone())?;
160 let original_definition = table_def.create_sql_ast_purified()?;
161
162 let new_statement = rewrite_table_definition(&table_def, &source_def, original_definition)?;
163 let (_, table, graph, _) = get_replace_table_plan(
164 &session,
165 table_name,
166 new_statement,
167 &table_def,
168 SqlColumnStrategy::FollowUnchecked,
169 )
170 .await?;
171
172 let catalog_writer = session.catalog_writer()?;
173 catalog_writer
174 .replace_table(None, table.to_prost(), graph, TableJobType::General as _)
175 .await?;
176
177 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
178}