risingwave_frontend/handler/
alter_source_column.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::max_column_id;
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18use risingwave_connector::source::{SourceEncode, SourceStruct, extract_source_struct};
19use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName};
20
21use super::create_source::generate_stream_graph_for_source;
22use super::create_table::bind_sql_columns;
23use super::{HandlerArgs, RwPgResponse};
24use crate::Binder;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::error::{ErrorCode, Result, RwError};
27
28pub async fn handle_alter_source_column(
33 handler_args: HandlerArgs,
34 source_name: ObjectName,
35 operation: AlterSourceOperation,
36) -> Result<RwPgResponse> {
37 let session = handler_args.session.clone();
39 let db_name = &session.database();
40 let (schema_name, real_source_name) =
41 Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
42 let search_path = session.config().search_path();
43 let user_name = &session.user_name();
44
45 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
46
47 let mut catalog = {
48 let reader = session.env().catalog_reader().read_guard();
49 let (source, schema_name) =
50 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
51 session.check_privilege_for_drop_alter(schema_name, &**source)?;
52
53 (**source).clone()
54 };
55
56 if catalog.associated_table_id.is_some() {
57 return Err(ErrorCode::NotSupported(
58 "alter table with connector with ALTER SOURCE statement".to_owned(),
59 "try to use ALTER TABLE instead".to_owned(),
60 )
61 .into());
62 };
63
64 let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
66 match encode {
67 SourceEncode::Avro | SourceEncode::Protobuf => {
68 return Err(ErrorCode::NotSupported(
69 "alter source with schema registry".to_owned(),
70 "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
71 )
72 .into());
73 }
74 SourceEncode::Json if catalog.info.use_schema_registry => {
75 return Err(ErrorCode::NotSupported(
76 "alter source with schema registry".to_owned(),
77 "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
78 )
79 .into());
80 }
81 SourceEncode::Invalid | SourceEncode::Native | SourceEncode::None => {
82 return Err(RwError::from(ErrorCode::NotSupported(
83 format!("alter source with encode {:?}", encode),
84 "Only source with encode JSON | BYTES | CSV | PARQUET can be altered".into(),
85 )));
86 }
87 SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {}
88 }
89
90 let old_columns = catalog.columns.clone();
91 let columns = &mut catalog.columns;
92 match operation {
93 AlterSourceOperation::AddColumn { column_def } => {
94 let new_column_name = column_def.name.real_value();
95 if columns
96 .iter()
97 .any(|c| c.column_desc.name == new_column_name)
98 {
99 Err(ErrorCode::InvalidInputSyntax(format!(
100 "column \"{new_column_name}\" of source \"{source_name}\" already exists"
101 )))?
102 }
103
104 let mut bound_column = bind_sql_columns(&[column_def], false)?.remove(0);
106 bound_column.column_desc.column_id = max_column_id(columns).next();
107 columns.push(bound_column);
108 }
110 _ => unreachable!(),
111 }
112
113 catalog.version += 1;
115 catalog.fill_purified_create_sql();
116
117 let catalog_writer = session.catalog_writer()?;
118 if catalog.info.is_shared() {
119 let graph = generate_stream_graph_for_source(handler_args, catalog.clone())?;
120
121 let col_index_mapping = ColIndexMapping::new(
123 old_columns
124 .iter()
125 .map(|old_c| {
126 catalog
127 .columns
128 .iter()
129 .position(|new_c| new_c.column_id() == old_c.column_id())
130 })
131 .collect(),
132 catalog.columns.len(),
133 );
134 catalog_writer
135 .replace_source(catalog.to_prost(), graph, col_index_mapping)
136 .await?
137 } else {
138 catalog_writer.alter_source(catalog.to_prost()).await?
139 };
140
141 Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
142}
143
144#[cfg(test)]
145pub mod tests {
146 use std::collections::BTreeMap;
147
148 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
149
150 use crate::catalog::root_catalog::SchemaPath;
151 use crate::test_utils::LocalFrontend;
152
153 #[tokio::test]
154 async fn test_alter_source_column_handler() {
155 let frontend = LocalFrontend::new(Default::default()).await;
156 let session = frontend.session_ref();
157 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
158
159 let sql = r#"create source s_shared (v1 int) with (
160 connector = 'kafka',
161 topic = 'abc',
162 properties.bootstrap.server = 'localhost:29092',
163 ) FORMAT PLAIN ENCODE JSON;"#;
164
165 frontend
166 .run_sql_with_session(session.clone(), sql)
167 .await
168 .unwrap();
169
170 frontend
171 .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
172 .await
173 .unwrap();
174 let sql = r#"create source s (v1 int) with (
175 connector = 'kafka',
176 topic = 'abc',
177 properties.bootstrap.server = 'localhost:29092',
178 ) FORMAT PLAIN ENCODE JSON;"#;
179
180 frontend
181 .run_sql_with_session(session.clone(), sql)
182 .await
183 .unwrap();
184
185 let get_source = |name: &str| {
186 let catalog_reader = session.env().catalog_reader().read_guard();
187 catalog_reader
188 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, name)
189 .unwrap()
190 .0
191 .clone()
192 };
193
194 let source = get_source("s");
195
196 let sql = "alter source s_shared add column v2 varchar;";
197 frontend.run_sql(sql).await.unwrap();
198
199 let altered_source = get_source("s_shared");
200 let altered_columns: BTreeMap<_, _> = altered_source
201 .columns
202 .iter()
203 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
204 .collect();
205
206 expect_test::expect![[r#"
209 {
210 "_row_id": (
211 Serial,
212 #0,
213 ),
214 "_rw_kafka_offset": (
215 Varchar,
216 #4,
217 ),
218 "_rw_kafka_partition": (
219 Varchar,
220 #3,
221 ),
222 "_rw_kafka_timestamp": (
223 Timestamptz,
224 #2,
225 ),
226 "v1": (
227 Int32,
228 #1,
229 ),
230 "v2": (
231 Varchar,
232 #5,
233 ),
234 }
235 "#]]
236 .assert_debug_eq(&altered_columns);
237
238 assert_eq!(source.version + 1, altered_source.version);
240
241 expect_test::expect!["CREATE SOURCE s_shared (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
243
244 let sql = "alter source s add column v2 varchar;";
245 frontend.run_sql(sql).await.unwrap();
246
247 let altered_source = get_source("s");
248 let altered_columns: BTreeMap<_, _> = altered_source
249 .columns
250 .iter()
251 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
252 .collect();
253
254 expect_test::expect![[r#"
257 {
258 "_row_id": (
259 Serial,
260 #0,
261 ),
262 "_rw_kafka_timestamp": (
263 Timestamptz,
264 #2,
265 ),
266 "v1": (
267 Int32,
268 #1,
269 ),
270 "v2": (
271 Varchar,
272 #3,
273 ),
274 }
275 "#]]
276 .assert_debug_eq(&altered_columns);
277
278 assert_eq!(source.version + 1, altered_source.version);
280
281 expect_test::expect!["CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
283 }
284}