risingwave_frontend/handler/
alter_source_with_sr.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::StatementType;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{ColumnCatalog, max_column_id};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::catalog::StreamSourceInfo;
25use risingwave_pb::plan_common::{EncodeType, FormatType};
26use risingwave_sqlparser::ast::{
27    CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName,
28    SqlOption, Statement,
29};
30
31use super::create_source::{
32    generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility,
33};
34use super::util::SourceSchemaCompatExt;
35use super::{HandlerArgs, RwPgResponse};
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::source_catalog::SourceCatalog;
38use crate::error::{ErrorCode, Result};
39use crate::handler::create_source::{CreateSourceType, bind_columns_from_source};
40use crate::session::SessionImpl;
41use crate::utils::resolve_secret_ref_in_with_options;
42use crate::{Binder, WithOptions};
43
44fn format_type_to_format(from: FormatType) -> Option<Format> {
45    Some(match from {
46        FormatType::Unspecified => return None,
47        FormatType::Native => Format::Native,
48        FormatType::Debezium => Format::Debezium,
49        FormatType::DebeziumMongo => Format::DebeziumMongo,
50        FormatType::Maxwell => Format::Maxwell,
51        FormatType::Canal => Format::Canal,
52        FormatType::Upsert => Format::Upsert,
53        FormatType::Plain => Format::Plain,
54        FormatType::None => Format::None,
55    })
56}
57
58fn encode_type_to_encode(from: EncodeType) -> Option<Encode> {
59    Some(match from {
60        EncodeType::Unspecified => return None,
61        EncodeType::Native => Encode::Native,
62        EncodeType::Avro => Encode::Avro,
63        EncodeType::Csv => Encode::Csv,
64        EncodeType::Protobuf => Encode::Protobuf,
65        EncodeType::Json => Encode::Json,
66        EncodeType::Bytes => Encode::Bytes,
67        EncodeType::Template => Encode::Template,
68        EncodeType::Parquet => Encode::Parquet,
69        EncodeType::None => Encode::None,
70        EncodeType::Text => Encode::Text,
71    })
72}
73
74/// Returns the columns in `columns_a` but not in `columns_b`.
75///
76/// Note:
77/// - The comparison is done by name and data type, without checking `ColumnId`.
78/// - Hidden columns and `INCLUDE ... AS ...` columns are ignored. Because it's only for the special handling of alter sr.
79///   For the newly resolved `columns_from_resolve_source` (created by [`bind_columns_from_source`]), it doesn't contain hidden columns (`_row_id`) and `INCLUDE ... AS ...` columns.
80///   This is fragile and we should really refactor it later.
81/// - Column with the same name but different data type is considered as a different column, i.e., altering the data type of a column
82///   will be treated as dropping the old column and adding a new column. Note that we don't reject here like we do in `ALTER TABLE REFRESH SCHEMA`,
83///   because there's no data persistence (thus compatibility concern) in the source case.
84fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec<ColumnCatalog> {
85    columns_a
86        .iter()
87        .filter(|col_a| {
88            !col_a.is_hidden()
89                && !col_a.is_connector_additional_column()
90                && !columns_b.iter().any(|col_b| {
91                    col_a.name() == col_b.name() && col_a.data_type() == col_b.data_type()
92                })
93        })
94        .cloned()
95        .collect()
96}
97
98/// Fetch the source catalog.
99pub fn fetch_source_catalog_with_db_schema_id(
100    session: &SessionImpl,
101    name: &ObjectName,
102) -> Result<Arc<SourceCatalog>> {
103    let db_name = &session.database();
104    let (schema_name, real_source_name) =
105        Binder::resolve_schema_qualified_name(db_name, name.clone())?;
106    let search_path = session.config().search_path();
107    let user_name = &session.user_name();
108
109    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
110
111    let reader = session.env().catalog_reader().read_guard();
112    let (source, schema_name) =
113        reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
114
115    session.check_privilege_for_drop_alter(schema_name, &**source)?;
116
117    Ok(Arc::clone(source))
118}
119
120/// Check if the original source is created with `FORMAT .. ENCODE ..` clause,
121/// and if the FORMAT and ENCODE are modified.
122pub fn check_format_encode(
123    original_source: &SourceCatalog,
124    new_format_encode: &FormatEncodeOptions,
125) -> Result<()> {
126    let StreamSourceInfo {
127        format, row_encode, ..
128    } = original_source.info;
129    let (Some(old_format), Some(old_row_encode)) = (
130        format_type_to_format(FormatType::try_from(format).unwrap()),
131        encode_type_to_encode(EncodeType::try_from(row_encode).unwrap()),
132    ) else {
133        return Err(ErrorCode::NotSupported(
134            "altering a legacy source which is not created using `FORMAT .. ENCODE ..` Clause"
135                .to_owned(),
136            "try this feature by creating a fresh source".to_owned(),
137        )
138        .into());
139    };
140
141    if new_format_encode.format != old_format || new_format_encode.row_encode != old_row_encode {
142        bail_not_implemented!(
143            "the original definition is FORMAT {:?} ENCODE {:?}, and altering them is not supported yet",
144            &old_format,
145            &old_row_encode,
146        );
147    }
148
149    Ok(())
150}
151
152/// Refresh the source registry and get the added/dropped columns.
153pub async fn refresh_sr_and_get_columns_diff(
154    original_source: &SourceCatalog,
155    format_encode: &FormatEncodeOptions,
156    session: &Arc<SessionImpl>,
157) -> Result<(StreamSourceInfo, Vec<ColumnCatalog>, Vec<ColumnCatalog>)> {
158    let mut with_properties = original_source.with_properties.clone();
159    validate_compatibility(format_encode, &mut with_properties)?;
160
161    if with_properties.is_cdc_connector() {
162        bail_not_implemented!("altering a cdc source is not supported");
163    }
164
165    let (Some(columns_from_resolve_source), source_info) = bind_columns_from_source(
166        session,
167        format_encode,
168        Either::Right(&with_properties),
169        CreateSourceType::for_replace(original_source),
170    )
171    .await?
172    else {
173        // Source without schema registry is rejected.
174        unreachable!("source without schema registry is rejected")
175    };
176
177    let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns);
178    // The newly resolved columns' column IDs also starts from 1. They cannot be used directly.
179    let mut next_col_id = max_column_id(&original_source.columns).next();
180    for col in &mut added_columns {
181        col.column_desc.column_id = next_col_id;
182        next_col_id = next_col_id.next();
183    }
184    let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source);
185    tracing::debug!(
186        ?added_columns,
187        ?dropped_columns,
188        ?columns_from_resolve_source,
189        original_source = ?original_source.columns
190    );
191
192    Ok((source_info, added_columns, dropped_columns))
193}
194
195fn get_format_encode_from_source(source: &SourceCatalog) -> Result<FormatEncodeOptions> {
196    let stmt = source.create_sql_ast()?;
197    let Statement::CreateSource {
198        stmt: CreateSourceStatement { format_encode, .. },
199    } = stmt
200    else {
201        unreachable!()
202    };
203    Ok(format_encode.into_v2_with_warning())
204}
205
206pub async fn handler_refresh_schema(
207    handler_args: HandlerArgs,
208    name: ObjectName,
209) -> Result<RwPgResponse> {
210    let source = fetch_source_catalog_with_db_schema_id(&handler_args.session, &name)?;
211    let format_encode = get_format_encode_from_source(&source)?;
212    handle_alter_source_with_sr(handler_args, name, format_encode).await
213}
214
215pub async fn handle_alter_source_with_sr(
216    handler_args: HandlerArgs,
217    name: ObjectName,
218    format_encode: FormatEncodeOptions,
219) -> Result<RwPgResponse> {
220    let session = handler_args.session.clone();
221    let source = fetch_source_catalog_with_db_schema_id(&session, &name)?;
222    let mut source = source.as_ref().clone();
223    let old_columns = source.columns.clone();
224
225    if source.associated_table_id.is_some() {
226        return Err(ErrorCode::NotSupported(
227            "alter table with connector using ALTER SOURCE statement".to_owned(),
228            "try to use ALTER TABLE instead".to_owned(),
229        )
230        .into());
231    };
232
233    check_format_encode(&source, &format_encode)?;
234
235    if !schema_has_schema_registry(&format_encode) {
236        return Err(ErrorCode::NotSupported(
237            "altering a source without schema registry".to_owned(),
238            "try `ALTER SOURCE .. ADD COLUMN ...` instead".to_owned(),
239        )
240        .into());
241    }
242
243    let (source_info, added_columns, dropped_columns) =
244        refresh_sr_and_get_columns_diff(&source, &format_encode, &session).await?;
245
246    if !dropped_columns.is_empty() {
247        bail_not_implemented!(
248            "this altering statement will drop columns, which is not supported yet: {}",
249            dropped_columns
250                .iter()
251                .map(|col| format!("({}: {})", col.name(), col.data_type()))
252                .join(", ")
253        );
254    }
255
256    source.info = source_info;
257    source.columns.extend(added_columns);
258    source.definition = alter_definition_format_encode(
259        source.create_sql_ast_purified()?,
260        format_encode.row_options.clone(),
261    )?;
262
263    let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options(
264        WithOptions::try_from(format_encode.row_options())?,
265        session.as_ref(),
266    )?
267    .into_parts();
268    source
269        .info
270        .format_encode_options
271        .extend(format_encode_options);
272
273    source
274        .info
275        .format_encode_secret_refs
276        .extend(format_encode_secret_ref);
277
278    // update version
279    source.version += 1;
280
281    let pb_source = source.to_prost();
282    let catalog_writer = session.catalog_writer()?;
283    if source.info.is_shared() {
284        let graph = generate_stream_graph_for_source(handler_args, source.clone())?;
285
286        // Calculate the mapping from the original columns to the new columns.
287        let col_index_mapping = ColIndexMapping::new(
288            old_columns
289                .iter()
290                .map(|old_c| {
291                    source
292                        .columns
293                        .iter()
294                        .position(|new_c| new_c.column_id() == old_c.column_id())
295                })
296                .collect(),
297            source.columns.len(),
298        );
299        catalog_writer
300            .replace_source(pb_source, graph, col_index_mapping)
301            .await?
302    } else {
303        catalog_writer.alter_source(pb_source).await?;
304    }
305    Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
306}
307
308/// Apply the new `format_encode_options` to the source/table definition.
309pub fn alter_definition_format_encode(
310    mut stmt: Statement,
311    format_encode_options: Vec<SqlOption>,
312) -> Result<String> {
313    match &mut stmt {
314        Statement::CreateSource {
315            stmt: CreateSourceStatement { format_encode, .. },
316        }
317        | Statement::CreateTable {
318            format_encode: Some(format_encode),
319            ..
320        } => {
321            match format_encode {
322                CompatibleFormatEncode::V2(schema) => {
323                    schema.row_options = format_encode_options;
324                }
325                // TODO: Confirm the behavior of legacy source schema.
326                // Legacy source schema should be rejected by the handler and never reaches here.
327                CompatibleFormatEncode::RowFormat(_schema) => unreachable!(),
328            }
329        }
330        _ => unreachable!(),
331    }
332
333    Ok(stmt.to_string())
334}
335
336#[cfg(test)]
337pub mod tests {
338    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
339    use risingwave_common::types::DataType;
340
341    use crate::catalog::root_catalog::SchemaPath;
342    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
343
344    #[tokio::test]
345    async fn test_alter_source_with_sr_handler() {
346        let proto_file = create_proto_file(PROTO_FILE_DATA);
347        let sql = format!(
348            r#"CREATE SOURCE src
349            WITH (
350                connector = 'kafka',
351                topic = 'test-topic',
352                properties.bootstrap.server = 'localhost:29092'
353            )
354            FORMAT PLAIN ENCODE PROTOBUF (
355                message = '.test.TestRecord',
356                schema.location = 'file://{}'
357            )"#,
358            proto_file.path().to_str().unwrap()
359        );
360        let frontend = LocalFrontend::new(Default::default()).await;
361        let session = frontend.session_ref();
362        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
363
364        frontend
365            .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
366            .await
367            .unwrap();
368        frontend
369            .run_sql_with_session(session.clone(), sql)
370            .await
371            .unwrap();
372
373        let get_source = || {
374            let catalog_reader = session.env().catalog_reader().read_guard();
375            catalog_reader
376                .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "src")
377                .unwrap()
378                .0
379                .clone()
380        };
381
382        let source = get_source();
383        expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://')"].assert_eq(&source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));
384
385        let sql = format!(
386            r#"ALTER SOURCE src FORMAT UPSERT ENCODE PROTOBUF (
387                message = '.test.TestRecord',
388                schema.location = 'file://{}'
389            )"#,
390            proto_file.path().to_str().unwrap()
391        );
392        assert!(
393            frontend
394                .run_sql(sql)
395                .await
396                .unwrap_err()
397                .to_string()
398                .contains("the original definition is FORMAT Plain ENCODE Protobuf")
399        );
400
401        let sql = format!(
402            r#"ALTER SOURCE src FORMAT PLAIN ENCODE PROTOBUF (
403                message = '.test.TestRecordAlterType',
404                schema.location = 'file://{}'
405            )"#,
406            proto_file.path().to_str().unwrap()
407        );
408        let res_str = frontend.run_sql(sql).await.unwrap_err().to_string();
409        assert!(res_str.contains("id: integer"));
410        assert!(res_str.contains("zipcode: bigint"));
411
412        let sql = format!(
413            r#"ALTER SOURCE src FORMAT PLAIN ENCODE PROTOBUF (
414                message = '.test.TestRecordExt',
415                schema.location = 'file://{}'
416            )"#,
417            proto_file.path().to_str().unwrap()
418        );
419        frontend.run_sql(sql).await.unwrap();
420
421        let altered_source = get_source();
422
423        let name_column = altered_source
424            .columns
425            .iter()
426            .find(|col| col.column_desc.name == "name")
427            .unwrap();
428        assert_eq!(name_column.column_desc.data_type, DataType::Varchar);
429
430        expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL, name CHARACTER VARYING) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://')"].assert_eq(&altered_source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));
431    }
432}