1use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::StatementType;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{ColumnCatalog, max_column_id};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::catalog::StreamSourceInfo;
25use risingwave_pb::plan_common::{EncodeType, FormatType};
26use risingwave_sqlparser::ast::{
27 CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName,
28 SqlOption, Statement,
29};
30
31use super::create_source::{
32 generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility,
33};
34use super::util::SourceSchemaCompatExt;
35use super::{HandlerArgs, RwPgResponse};
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::source_catalog::SourceCatalog;
38use crate::error::{ErrorCode, Result};
39use crate::handler::create_source::{CreateSourceType, bind_columns_from_source};
40use crate::session::SessionImpl;
41use crate::utils::resolve_secret_ref_in_with_options;
42use crate::{Binder, WithOptions};
43
44fn format_type_to_format(from: FormatType) -> Option<Format> {
45 Some(match from {
46 FormatType::Unspecified => return None,
47 FormatType::Native => Format::Native,
48 FormatType::Debezium => Format::Debezium,
49 FormatType::DebeziumMongo => Format::DebeziumMongo,
50 FormatType::Maxwell => Format::Maxwell,
51 FormatType::Canal => Format::Canal,
52 FormatType::Upsert => Format::Upsert,
53 FormatType::Plain => Format::Plain,
54 FormatType::None => Format::None,
55 })
56}
57
58fn encode_type_to_encode(from: EncodeType) -> Option<Encode> {
59 Some(match from {
60 EncodeType::Unspecified => return None,
61 EncodeType::Native => Encode::Native,
62 EncodeType::Avro => Encode::Avro,
63 EncodeType::Csv => Encode::Csv,
64 EncodeType::Protobuf => Encode::Protobuf,
65 EncodeType::Json => Encode::Json,
66 EncodeType::Bytes => Encode::Bytes,
67 EncodeType::Template => Encode::Template,
68 EncodeType::Parquet => Encode::Parquet,
69 EncodeType::None => Encode::None,
70 EncodeType::Text => Encode::Text,
71 })
72}
73
74fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec<ColumnCatalog> {
85 columns_a
86 .iter()
87 .filter(|col_a| {
88 !col_a.is_hidden()
89 && !col_a.is_connector_additional_column()
90 && !columns_b.iter().any(|col_b| {
91 col_a.name() == col_b.name() && col_a.data_type() == col_b.data_type()
92 })
93 })
94 .cloned()
95 .collect()
96}
97
98pub fn fetch_source_catalog_with_db_schema_id(
100 session: &SessionImpl,
101 name: &ObjectName,
102) -> Result<Arc<SourceCatalog>> {
103 let db_name = &session.database();
104 let (schema_name, real_source_name) =
105 Binder::resolve_schema_qualified_name(db_name, name.clone())?;
106 let search_path = session.config().search_path();
107 let user_name = &session.user_name();
108
109 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
110
111 let reader = session.env().catalog_reader().read_guard();
112 let (source, schema_name) =
113 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
114
115 session.check_privilege_for_drop_alter(schema_name, &**source)?;
116
117 Ok(Arc::clone(source))
118}
119
120pub fn check_format_encode(
123 original_source: &SourceCatalog,
124 new_format_encode: &FormatEncodeOptions,
125) -> Result<()> {
126 let StreamSourceInfo {
127 format, row_encode, ..
128 } = original_source.info;
129 let (Some(old_format), Some(old_row_encode)) = (
130 format_type_to_format(FormatType::try_from(format).unwrap()),
131 encode_type_to_encode(EncodeType::try_from(row_encode).unwrap()),
132 ) else {
133 return Err(ErrorCode::NotSupported(
134 "altering a legacy source which is not created using `FORMAT .. ENCODE ..` Clause"
135 .to_owned(),
136 "try this feature by creating a fresh source".to_owned(),
137 )
138 .into());
139 };
140
141 if new_format_encode.format != old_format || new_format_encode.row_encode != old_row_encode {
142 bail_not_implemented!(
143 "the original definition is FORMAT {:?} ENCODE {:?}, and altering them is not supported yet",
144 &old_format,
145 &old_row_encode,
146 );
147 }
148
149 Ok(())
150}
151
152pub async fn refresh_sr_and_get_columns_diff(
154 original_source: &SourceCatalog,
155 format_encode: &FormatEncodeOptions,
156 session: &Arc<SessionImpl>,
157) -> Result<(StreamSourceInfo, Vec<ColumnCatalog>, Vec<ColumnCatalog>)> {
158 let mut with_properties = original_source.with_properties.clone();
159 validate_compatibility(format_encode, &mut with_properties)?;
160
161 if with_properties.is_cdc_connector() {
162 bail_not_implemented!("altering a cdc source is not supported");
163 }
164
165 let (Some(columns_from_resolve_source), source_info) = bind_columns_from_source(
166 session,
167 format_encode,
168 Either::Right(&with_properties),
169 CreateSourceType::for_replace(original_source),
170 )
171 .await?
172 else {
173 unreachable!("source without schema registry is rejected")
175 };
176
177 let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns);
178 let mut next_col_id = max_column_id(&original_source.columns).next();
180 for col in &mut added_columns {
181 col.column_desc.column_id = next_col_id;
182 next_col_id = next_col_id.next();
183 }
184 let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source);
185 tracing::debug!(
186 ?added_columns,
187 ?dropped_columns,
188 ?columns_from_resolve_source,
189 original_source = ?original_source.columns
190 );
191
192 Ok((source_info, added_columns, dropped_columns))
193}
194
195fn get_format_encode_from_source(source: &SourceCatalog) -> Result<FormatEncodeOptions> {
196 let stmt = source.create_sql_ast()?;
197 let Statement::CreateSource {
198 stmt: CreateSourceStatement { format_encode, .. },
199 } = stmt
200 else {
201 unreachable!()
202 };
203 Ok(format_encode.into_v2_with_warning())
204}
205
206pub async fn handler_refresh_schema(
207 handler_args: HandlerArgs,
208 name: ObjectName,
209) -> Result<RwPgResponse> {
210 let source = fetch_source_catalog_with_db_schema_id(&handler_args.session, &name)?;
211 let format_encode = get_format_encode_from_source(&source)?;
212 handle_alter_source_with_sr(handler_args, name, format_encode).await
213}
214
215pub async fn handle_alter_source_with_sr(
216 handler_args: HandlerArgs,
217 name: ObjectName,
218 format_encode: FormatEncodeOptions,
219) -> Result<RwPgResponse> {
220 let session = handler_args.session.clone();
221 let source = fetch_source_catalog_with_db_schema_id(&session, &name)?;
222 let mut source = source.as_ref().clone();
223 let old_columns = source.columns.clone();
224
225 if source.associated_table_id.is_some() {
226 return Err(ErrorCode::NotSupported(
227 "alter table with connector using ALTER SOURCE statement".to_owned(),
228 "try to use ALTER TABLE instead".to_owned(),
229 )
230 .into());
231 };
232
233 check_format_encode(&source, &format_encode)?;
234
235 if !schema_has_schema_registry(&format_encode) {
236 return Err(ErrorCode::NotSupported(
237 "altering a source without schema registry".to_owned(),
238 "try `ALTER SOURCE .. ADD COLUMN ...` instead".to_owned(),
239 )
240 .into());
241 }
242
243 let (source_info, added_columns, dropped_columns) =
244 refresh_sr_and_get_columns_diff(&source, &format_encode, &session).await?;
245
246 if !dropped_columns.is_empty() {
247 bail_not_implemented!(
248 "this altering statement will drop columns, which is not supported yet: {}",
249 dropped_columns
250 .iter()
251 .map(|col| format!("({}: {})", col.name(), col.data_type()))
252 .join(", ")
253 );
254 }
255
256 source.info = source_info;
257 source.columns.extend(added_columns);
258 source.definition = alter_definition_format_encode(
259 source.create_sql_ast_purified()?,
260 format_encode.row_options.clone(),
261 )?;
262
263 let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options(
264 WithOptions::try_from(format_encode.row_options())?,
265 session.as_ref(),
266 )?
267 .into_parts();
268 source
269 .info
270 .format_encode_options
271 .extend(format_encode_options);
272
273 source
274 .info
275 .format_encode_secret_refs
276 .extend(format_encode_secret_ref);
277
278 source.version += 1;
280
281 let pb_source = source.to_prost();
282 let catalog_writer = session.catalog_writer()?;
283 if source.info.is_shared() {
284 let graph = generate_stream_graph_for_source(handler_args, source.clone())?;
285
286 let col_index_mapping = ColIndexMapping::new(
288 old_columns
289 .iter()
290 .map(|old_c| {
291 source
292 .columns
293 .iter()
294 .position(|new_c| new_c.column_id() == old_c.column_id())
295 })
296 .collect(),
297 source.columns.len(),
298 );
299 catalog_writer
300 .replace_source(pb_source, graph, col_index_mapping)
301 .await?
302 } else {
303 catalog_writer.alter_source(pb_source).await?;
304 }
305 Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
306}
307
308pub fn alter_definition_format_encode(
310 mut stmt: Statement,
311 format_encode_options: Vec<SqlOption>,
312) -> Result<String> {
313 match &mut stmt {
314 Statement::CreateSource {
315 stmt: CreateSourceStatement { format_encode, .. },
316 }
317 | Statement::CreateTable {
318 format_encode: Some(format_encode),
319 ..
320 } => {
321 match format_encode {
322 CompatibleFormatEncode::V2(schema) => {
323 schema.row_options = format_encode_options;
324 }
325 CompatibleFormatEncode::RowFormat(_schema) => unreachable!(),
328 }
329 }
330 _ => unreachable!(),
331 }
332
333 Ok(stmt.to_string())
334}
335
336#[cfg(test)]
337pub mod tests {
338 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
339 use risingwave_common::types::DataType;
340
341 use crate::catalog::root_catalog::SchemaPath;
342 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
343
344 #[tokio::test]
345 async fn test_alter_source_with_sr_handler() {
346 let proto_file = create_proto_file(PROTO_FILE_DATA);
347 let sql = format!(
348 r#"CREATE SOURCE src
349 WITH (
350 connector = 'kafka',
351 topic = 'test-topic',
352 properties.bootstrap.server = 'localhost:29092'
353 )
354 FORMAT PLAIN ENCODE PROTOBUF (
355 message = '.test.TestRecord',
356 schema.location = 'file://{}'
357 )"#,
358 proto_file.path().to_str().unwrap()
359 );
360 let frontend = LocalFrontend::new(Default::default()).await;
361 let session = frontend.session_ref();
362 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
363
364 frontend
365 .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
366 .await
367 .unwrap();
368 frontend
369 .run_sql_with_session(session.clone(), sql)
370 .await
371 .unwrap();
372
373 let get_source = || {
374 let catalog_reader = session.env().catalog_reader().read_guard();
375 catalog_reader
376 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "src")
377 .unwrap()
378 .0
379 .clone()
380 };
381
382 let source = get_source();
383 expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://')"].assert_eq(&source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));
384
385 let sql = format!(
386 r#"ALTER SOURCE src FORMAT UPSERT ENCODE PROTOBUF (
387 message = '.test.TestRecord',
388 schema.location = 'file://{}'
389 )"#,
390 proto_file.path().to_str().unwrap()
391 );
392 assert!(
393 frontend
394 .run_sql(sql)
395 .await
396 .unwrap_err()
397 .to_string()
398 .contains("the original definition is FORMAT Plain ENCODE Protobuf")
399 );
400
401 let sql = format!(
402 r#"ALTER SOURCE src FORMAT PLAIN ENCODE PROTOBUF (
403 message = '.test.TestRecordAlterType',
404 schema.location = 'file://{}'
405 )"#,
406 proto_file.path().to_str().unwrap()
407 );
408 let res_str = frontend.run_sql(sql).await.unwrap_err().to_string();
409 assert!(res_str.contains("id: integer"));
410 assert!(res_str.contains("zipcode: bigint"));
411
412 let sql = format!(
413 r#"ALTER SOURCE src FORMAT PLAIN ENCODE PROTOBUF (
414 message = '.test.TestRecordExt',
415 schema.location = 'file://{}'
416 )"#,
417 proto_file.path().to_str().unwrap()
418 );
419 frontend.run_sql(sql).await.unwrap();
420
421 let altered_source = get_source();
422
423 let name_column = altered_source
424 .columns
425 .iter()
426 .find(|col| col.column_desc.name == "name")
427 .unwrap();
428 assert_eq!(name_column.column_desc.data_type, DataType::Varchar);
429
430 expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL, name CHARACTER VARYING) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://')"].assert_eq(&altered_source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));
431 }
432}