risingwave_frontend/handler/
alter_source_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::max_column_id;
17use risingwave_connector::source::{SourceEncode, SourceStruct, extract_source_struct};
18use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName};
19
20use super::create_source::generate_stream_graph_for_source;
21use super::create_table::bind_sql_columns;
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::error::{ErrorCode, Result, RwError};
26
27// Note for future drop column:
28// 1. Dependencies of generated columns
29
30/// Handle `ALTER TABLE [ADD] COLUMN` statements.
31pub async fn handle_alter_source_column(
32    handler_args: HandlerArgs,
33    source_name: ObjectName,
34    operation: AlterSourceOperation,
35) -> Result<RwPgResponse> {
36    // Get original definition
37    let session = handler_args.session.clone();
38    let db_name = &session.database();
39    let (schema_name, real_source_name) =
40        Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
41    let search_path = session.config().search_path();
42    let user_name = &session.user_name();
43
44    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46    let mut catalog = {
47        let reader = session.env().catalog_reader().read_guard();
48        let (source, schema_name) =
49            reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
50        session.check_privilege_for_drop_alter(schema_name, &**source)?;
51
52        (**source).clone()
53    };
54
55    if catalog.associated_table_id.is_some() {
56        return Err(ErrorCode::NotSupported(
57            "alter table with connector with ALTER SOURCE statement".to_owned(),
58            "try to use ALTER TABLE instead".to_owned(),
59        )
60        .into());
61    };
62
63    // Currently only allow source without schema registry
64    let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
65    match encode {
66        SourceEncode::Avro | SourceEncode::Protobuf => {
67            return Err(ErrorCode::NotSupported(
68                "alter source with schema registry".to_owned(),
69                "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
70            )
71            .into());
72        }
73        SourceEncode::Json if catalog.info.use_schema_registry => {
74            return Err(ErrorCode::NotSupported(
75                "alter source with schema registry".to_owned(),
76                "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
77            )
78            .into());
79        }
80        SourceEncode::Invalid | SourceEncode::Native | SourceEncode::None => {
81            return Err(RwError::from(ErrorCode::NotSupported(
82                format!("alter source with encode {:?}", encode),
83                "Only source with encode JSON | BYTES | CSV | PARQUET can be altered".into(),
84            )));
85        }
86        SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {}
87    }
88
89    let columns = &mut catalog.columns;
90    match operation {
91        AlterSourceOperation::AddColumn { column_def } => {
92            let new_column_name = column_def.name.real_value();
93            if columns
94                .iter()
95                .any(|c| c.column_desc.name == new_column_name)
96            {
97                Err(ErrorCode::InvalidInputSyntax(format!(
98                    "column \"{new_column_name}\" of source \"{source_name}\" already exists"
99                )))?
100            }
101
102            // add column name is from user, so we still have check for reserved column name
103            let mut bound_column = bind_sql_columns(&[column_def], false)?.remove(0);
104            bound_column.column_desc.column_id = max_column_id(columns).next();
105            columns.push(bound_column);
106            // No need to update the definition here. It will be done by purification later.
107        }
108        _ => unreachable!(),
109    }
110
111    // update version
112    catalog.version += 1;
113    catalog.fill_purified_create_sql();
114
115    let catalog_writer = session.catalog_writer()?;
116    if catalog.info.is_shared() {
117        let graph = generate_stream_graph_for_source(handler_args, catalog.clone())?;
118        catalog_writer
119            .replace_source(catalog.to_prost(), graph)
120            .await?
121    } else {
122        catalog_writer.alter_source(catalog.to_prost()).await?
123    };
124
125    Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
126}
127
128#[cfg(test)]
129pub mod tests {
130    use std::collections::BTreeMap;
131
132    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
133
134    use crate::catalog::root_catalog::SchemaPath;
135    use crate::test_utils::LocalFrontend;
136
137    #[tokio::test]
138    async fn test_alter_source_column_handler() {
139        let frontend = LocalFrontend::new(Default::default()).await;
140        let session = frontend.session_ref();
141        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
142
143        let sql = r#"create source s_shared (v1 int) with (
144            connector = 'kafka',
145            topic = 'abc',
146            properties.bootstrap.server = 'localhost:29092',
147        ) FORMAT PLAIN ENCODE JSON;"#;
148
149        frontend
150            .run_sql_with_session(session.clone(), sql)
151            .await
152            .unwrap();
153
154        frontend
155            .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
156            .await
157            .unwrap();
158        let sql = r#"create source s (v1 int) with (
159            connector = 'kafka',
160            topic = 'abc',
161            properties.bootstrap.server = 'localhost:29092',
162          ) FORMAT PLAIN ENCODE JSON;"#;
163
164        frontend
165            .run_sql_with_session(session.clone(), sql)
166            .await
167            .unwrap();
168
169        let get_source = |name: &str| {
170            let catalog_reader = session.env().catalog_reader().read_guard();
171            catalog_reader
172                .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, name)
173                .unwrap()
174                .0
175                .clone()
176        };
177
178        let source = get_source("s");
179
180        let sql = "alter source s_shared add column v2 varchar;";
181        frontend.run_sql(sql).await.unwrap();
182
183        let altered_source = get_source("s_shared");
184        let altered_columns: BTreeMap<_, _> = altered_source
185            .columns
186            .iter()
187            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
188            .collect();
189
190        // Check the new column is added.
191        // Check the old columns and IDs are not changed.
192        expect_test::expect![[r#"
193            {
194                "_row_id": (
195                    Serial,
196                    #0,
197                ),
198                "_rw_kafka_offset": (
199                    Varchar,
200                    #4,
201                ),
202                "_rw_kafka_partition": (
203                    Varchar,
204                    #3,
205                ),
206                "_rw_kafka_timestamp": (
207                    Timestamptz,
208                    #2,
209                ),
210                "v1": (
211                    Int32,
212                    #1,
213                ),
214                "v2": (
215                    Varchar,
216                    #5,
217                ),
218            }
219        "#]]
220        .assert_debug_eq(&altered_columns);
221
222        // Check version
223        assert_eq!(source.version + 1, altered_source.version);
224
225        // Check definition
226        expect_test::expect!["CREATE SOURCE s_shared (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
227
228        let sql = "alter source s add column v2 varchar;";
229        frontend.run_sql(sql).await.unwrap();
230
231        let altered_source = get_source("s");
232        let altered_columns: BTreeMap<_, _> = altered_source
233            .columns
234            .iter()
235            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
236            .collect();
237
238        // Check the new column is added.
239        // Check the old columns and IDs are not changed.
240        expect_test::expect![[r#"
241            {
242                "_row_id": (
243                    Serial,
244                    #0,
245                ),
246                "_rw_kafka_timestamp": (
247                    Timestamptz,
248                    #2,
249                ),
250                "v1": (
251                    Int32,
252                    #1,
253                ),
254                "v2": (
255                    Varchar,
256                    #3,
257                ),
258            }
259        "#]]
260        .assert_debug_eq(&altered_columns);
261
262        // Check version
263        assert_eq!(source.version + 1, altered_source.version);
264
265        // Check definition
266        expect_test::expect!["CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
267    }
268}