risingwave_frontend/handler/
alter_source_column.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::max_column_id;
17use risingwave_connector::source::{SourceEncode, SourceStruct, extract_source_struct};
18use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName};
19
20use super::create_source::generate_stream_graph_for_source;
21use super::create_table::bind_sql_columns;
22use super::{HandlerArgs, RwPgResponse};
23use crate::Binder;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::error::{ErrorCode, Result, RwError};
26
27pub async fn handle_alter_source_column(
32 handler_args: HandlerArgs,
33 source_name: ObjectName,
34 operation: AlterSourceOperation,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session.clone();
38 let db_name = &session.database();
39 let (schema_name, real_source_name) =
40 Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
41 let search_path = session.config().search_path();
42 let user_name = &session.user_name();
43
44 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46 let mut catalog = {
47 let reader = session.env().catalog_reader().read_guard();
48 let (source, schema_name) =
49 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
50 session.check_privilege_for_drop_alter(schema_name, &**source)?;
51
52 (**source).clone()
53 };
54
55 if catalog.associated_table_id.is_some() {
56 return Err(ErrorCode::NotSupported(
57 "alter table with connector with ALTER SOURCE statement".to_owned(),
58 "try to use ALTER TABLE instead".to_owned(),
59 )
60 .into());
61 };
62
63 let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
65 match encode {
66 SourceEncode::Avro | SourceEncode::Protobuf => {
67 return Err(ErrorCode::NotSupported(
68 "alter source with schema registry".to_owned(),
69 "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
70 )
71 .into());
72 }
73 SourceEncode::Json if catalog.info.use_schema_registry => {
74 return Err(ErrorCode::NotSupported(
75 "alter source with schema registry".to_owned(),
76 "try `ALTER SOURCE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
77 )
78 .into());
79 }
80 SourceEncode::Invalid | SourceEncode::Native | SourceEncode::None => {
81 return Err(RwError::from(ErrorCode::NotSupported(
82 format!("alter source with encode {:?}", encode),
83 "Only source with encode JSON | BYTES | CSV | PARQUET can be altered".into(),
84 )));
85 }
86 SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {}
87 }
88
89 let columns = &mut catalog.columns;
90 match operation {
91 AlterSourceOperation::AddColumn { column_def } => {
92 let new_column_name = column_def.name.real_value();
93 if columns
94 .iter()
95 .any(|c| c.column_desc.name == new_column_name)
96 {
97 Err(ErrorCode::InvalidInputSyntax(format!(
98 "column \"{new_column_name}\" of source \"{source_name}\" already exists"
99 )))?
100 }
101
102 let mut bound_column = bind_sql_columns(&[column_def], false)?.remove(0);
104 bound_column.column_desc.column_id = max_column_id(columns).next();
105 columns.push(bound_column);
106 }
108 _ => unreachable!(),
109 }
110
111 catalog.version += 1;
113 catalog.fill_purified_create_sql();
114
115 let catalog_writer = session.catalog_writer()?;
116 if catalog.info.is_shared() {
117 let graph = generate_stream_graph_for_source(handler_args, catalog.clone())?;
118 catalog_writer
119 .replace_source(catalog.to_prost(), graph)
120 .await?
121 } else {
122 catalog_writer.alter_source(catalog.to_prost()).await?
123 };
124
125 Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
126}
127
128#[cfg(test)]
129pub mod tests {
130 use std::collections::BTreeMap;
131
132 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
133
134 use crate::catalog::root_catalog::SchemaPath;
135 use crate::test_utils::LocalFrontend;
136
137 #[tokio::test]
138 async fn test_alter_source_column_handler() {
139 let frontend = LocalFrontend::new(Default::default()).await;
140 let session = frontend.session_ref();
141 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
142
143 let sql = r#"create source s_shared (v1 int) with (
144 connector = 'kafka',
145 topic = 'abc',
146 properties.bootstrap.server = 'localhost:29092',
147 ) FORMAT PLAIN ENCODE JSON;"#;
148
149 frontend
150 .run_sql_with_session(session.clone(), sql)
151 .await
152 .unwrap();
153
154 frontend
155 .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
156 .await
157 .unwrap();
158 let sql = r#"create source s (v1 int) with (
159 connector = 'kafka',
160 topic = 'abc',
161 properties.bootstrap.server = 'localhost:29092',
162 ) FORMAT PLAIN ENCODE JSON;"#;
163
164 frontend
165 .run_sql_with_session(session.clone(), sql)
166 .await
167 .unwrap();
168
169 let get_source = |name: &str| {
170 let catalog_reader = session.env().catalog_reader().read_guard();
171 catalog_reader
172 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, name)
173 .unwrap()
174 .0
175 .clone()
176 };
177
178 let source = get_source("s");
179
180 let sql = "alter source s_shared add column v2 varchar;";
181 frontend.run_sql(sql).await.unwrap();
182
183 let altered_source = get_source("s_shared");
184 let altered_columns: BTreeMap<_, _> = altered_source
185 .columns
186 .iter()
187 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
188 .collect();
189
190 expect_test::expect![[r#"
193 {
194 "_row_id": (
195 Serial,
196 #0,
197 ),
198 "_rw_kafka_offset": (
199 Varchar,
200 #4,
201 ),
202 "_rw_kafka_partition": (
203 Varchar,
204 #3,
205 ),
206 "_rw_kafka_timestamp": (
207 Timestamptz,
208 #2,
209 ),
210 "v1": (
211 Int32,
212 #1,
213 ),
214 "v2": (
215 Varchar,
216 #5,
217 ),
218 }
219 "#]]
220 .assert_debug_eq(&altered_columns);
221
222 assert_eq!(source.version + 1, altered_source.version);
224
225 expect_test::expect!["CREATE SOURCE s_shared (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
227
228 let sql = "alter source s add column v2 varchar;";
229 frontend.run_sql(sql).await.unwrap();
230
231 let altered_source = get_source("s");
232 let altered_columns: BTreeMap<_, _> = altered_source
233 .columns
234 .iter()
235 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
236 .collect();
237
238 expect_test::expect![[r#"
241 {
242 "_row_id": (
243 Serial,
244 #0,
245 ),
246 "_rw_kafka_timestamp": (
247 Timestamptz,
248 #2,
249 ),
250 "v1": (
251 Int32,
252 #1,
253 ),
254 "v2": (
255 Varchar,
256 #3,
257 ),
258 }
259 "#]]
260 .assert_debug_eq(&altered_columns);
261
262 assert_eq!(source.version + 1, altered_source.version);
264
265 expect_test::expect!["CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition);
267 }
268}