risingwave_frontend/handler/
alter_source_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::max_column_id;
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18use risingwave_connector::source::{SourceEncode, SourceStruct, extract_source_struct};
19use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName};
20
21use super::create_source::generate_stream_graph_for_source;
22use super::create_table::bind_sql_columns;
23use super::{HandlerArgs, RwPgResponse};
24use crate::Binder;
25use crate::catalog::root_catalog::SchemaPath;
26use crate::error::{ErrorCode, Result, RwError};
27
28// Note for future drop column:
29// 1. Dependencies of generated columns
30
31/// Handle `ALTER TABLE [ADD] COLUMN` statements.
32pub async fn handle_alter_source_column(
33    handler_args: HandlerArgs,
34    source_name: ObjectName,
35    operation: AlterSourceOperation,
36) -> Result<RwPgResponse> {
37    // Get original definition
38    let session = handler_args.session.clone();
39    let db_name = &session.database();
40    let (schema_name, real_source_name) =
41        Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
42    let search_path = session.config().search_path();
43    let user_name = &session.user_name();
44
45    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
46
47    let mut catalog = {
48        let reader = session.env().catalog_reader().read_guard();
49        let (source, schema_name) =
50            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
51        session.check_privilege_for_drop_alter(schema_name, &**source)?;
52
53        (**source).clone()
54    };
55
56    if catalog.associated_table_id.is_some() {
57        return Err(ErrorCode::NotSupported(
58            "alter table with connector with ALTER SOURCE statement".to_owned(),
59            "try to use ALTER TABLE instead".to_owned(),
60        )
61        .into());
62    };
63
64    // Currently only allow source without schema registry
65    let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
66    match encode {
67        SourceEncode::Avro | SourceEncode::Protobuf => {
68            return Err(ErrorCode::NotSupported(
69                "alter source with schema registry".to_owned(),
70                "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
71            )
72            .into());
73        }
74        SourceEncode::Json if catalog.info.use_schema_registry => {
75            return Err(ErrorCode::NotSupported(
76                "alter source with schema registry".to_owned(),
77                "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
78            )
79            .into());
80        }
81        SourceEncode::Invalid | SourceEncode::Native | SourceEncode::None => {
82            return Err(RwError::from(ErrorCode::NotSupported(
83                format!("alter source with encode {:?}", encode),
84                "Only source with encode JSON | BYTES | CSV | PARQUET can be altered".into(),
85            )));
86        }
87        SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {}
88    }
89
90    let old_columns = catalog.columns.clone();
91    let columns = &mut catalog.columns;
92    match operation {
93        AlterSourceOperation::AddColumn { column_def } => {
94            let new_column_name = column_def.name.real_value();
95            if columns
96                .iter()
97                .any(|c| c.column_desc.name == new_column_name)
98            {
99                Err(ErrorCode::InvalidInputSyntax(format!(
100                    "column \"{new_column_name}\" of source \"{source_name}\" already exists"
101                )))?
102            }
103
104            // add column name is from user, so we still have check for reserved column name
105            let mut bound_column = bind_sql_columns(&[column_def], false)?.remove(0);
106            bound_column.column_desc.column_id = max_column_id(columns).next();
107            columns.push(bound_column);
108            // No need to update the definition here. It will be done by purification later.
109        }
110        _ => unreachable!(),
111    }
112
113    // update version
114    catalog.version += 1;
115    catalog.fill_purified_create_sql();
116
117    let catalog_writer = session.catalog_writer()?;
118    if catalog.info.is_shared() {
119        let graph = generate_stream_graph_for_source(handler_args, catalog.clone())?;
120
121        // Calculate the mapping from the original columns to the new columns.
122        let col_index_mapping = ColIndexMapping::new(
123            old_columns
124                .iter()
125                .map(|old_c| {
126                    catalog
127                        .columns
128                        .iter()
129                        .position(|new_c| new_c.column_id() == old_c.column_id())
130                })
131                .collect(),
132            catalog.columns.len(),
133        );
134        catalog_writer
135            .replace_source(catalog.to_prost(), graph, col_index_mapping)
136            .await?
137    } else {
138        catalog_writer.alter_source(catalog.to_prost()).await?
139    };
140
141    Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
142}
143
144#[cfg(test)]
145pub mod tests {
146    use std::collections::BTreeMap;
147
148    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
149
150    use crate::catalog::root_catalog::SchemaPath;
151    use crate::test_utils::LocalFrontend;
152
153    #[tokio::test]
154    async fn test_alter_source_column_handler() {
155        let frontend = LocalFrontend::new(Default::default()).await;
156        let session = frontend.session_ref();
157        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
158
159        let sql = r#"create source s_shared (v1 int) with (
160            connector = 'kafka',
161            topic = 'abc',
162            properties.bootstrap.server = 'localhost:29092',
163        ) FORMAT PLAIN ENCODE JSON;"#;
164
165        frontend
166            .run_sql_with_session(session.clone(), sql)
167            .await
168            .unwrap();
169
170        frontend
171            .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
172            .await
173            .unwrap();
174        let sql = r#"create source s (v1 int) with (
175            connector = 'kafka',
176            topic = 'abc',
177            properties.bootstrap.server = 'localhost:29092',
178          ) FORMAT PLAIN ENCODE JSON;"#;
179
180        frontend
181            .run_sql_with_session(session.clone(), sql)
182            .await
183            .unwrap();
184
185        let get_source = |name: &str| {
186            let catalog_reader = session.env().catalog_reader().read_guard();
187            catalog_reader
188                .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, name)
189                .unwrap()
190                .0
191                .clone()
192        };
193
194        let source = get_source("s");
195
196        let sql = "alter source s_shared add column v2 varchar;";
197        frontend.run_sql(sql).await.unwrap();
198
199        let altered_source = get_source("s_shared");
200        let altered_columns: BTreeMap<_, _> = altered_source
201            .columns
202            .iter()
203            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
204            .collect();
205
206        // Check the new column is added.
207        // Check the old columns and IDs are not changed.
208        expect_test::expect![[r#"
209            {
210                "_row_id": (
211                    Serial,
212                    #0,
213                ),
214                "_rw_kafka_offset": (
215                    Varchar,
216                    #4,
217                ),
218                "_rw_kafka_partition": (
219                    Varchar,
220                    #3,
221                ),
222                "_rw_kafka_timestamp": (
223                    Timestamptz,
224                    #2,
225                ),
226                "v1": (
227                    Int32,
228                    #1,
229                ),
230                "v2": (
231                    Varchar,
232                    #5,
233                ),
234            }
235        "#]]
236        .assert_debug_eq(&altered_columns);
237
238        // Check version
239        assert_eq!(source.version + 1, altered_source.version);
240
241        // Check definition
242        expect_test::expect!["CREATE SOURCE s_shared (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
243
244        let sql = "alter source s add column v2 varchar;";
245        frontend.run_sql(sql).await.unwrap();
246
247        let altered_source = get_source("s");
248        let altered_columns: BTreeMap<_, _> = altered_source
249            .columns
250            .iter()
251            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
252            .collect();
253
254        // Check the new column is added.
255        // Check the old columns and IDs are not changed.
256        expect_test::expect![[r#"
257            {
258                "_row_id": (
259                    Serial,
260                    #0,
261                ),
262                "_rw_kafka_timestamp": (
263                    Timestamptz,
264                    #2,
265                ),
266                "v1": (
267                    Int32,
268                    #1,
269                ),
270                "v2": (
271                    Varchar,
272                    #3,
273                ),
274            }
275        "#]]
276        .assert_debug_eq(&altered_columns);
277
278        // Check version
279        assert_eq!(source.version + 1, altered_source.version);
280
281        // Check definition
282        expect_test::expect!["CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
283    }
284}