1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX};
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_compaction_group;
46mod alter_connection_props;
47mod alter_database_param;
48mod alter_mv;
49mod alter_owner;
50mod alter_parallelism;
51mod alter_rename;
52mod alter_resource_group;
53mod alter_secret;
54mod alter_set_schema;
55mod alter_sink_props;
56mod alter_source_column;
57mod alter_source_props;
58mod alter_source_with_sr;
59mod alter_streaming_config;
60mod alter_streaming_enable_unaligned_join;
61mod alter_streaming_rate_limit;
62mod alter_subscription_retention;
63mod alter_swap_rename;
64mod alter_system;
65mod alter_table_column;
66pub mod alter_table_drop_connector;
67pub mod alter_table_props;
68mod alter_table_with_sr;
69pub mod alter_user;
70mod alter_utils;
71mod alter_watermark;
72mod backup;
73pub mod cancel_job;
74pub mod close_cursor;
75mod comment;
76pub mod create_aggregate;
77pub mod create_connection;
78mod create_database;
79pub mod create_function;
80pub mod create_index;
81pub mod create_mv;
82pub mod create_schema;
83pub mod create_secret;
84pub mod create_sink;
85pub mod create_source;
86pub mod create_sql_function;
87pub mod create_subscription;
88pub mod create_table;
89pub mod create_table_as;
90pub mod create_user;
91pub mod create_view;
92pub mod declare_cursor;
93mod delete_meta_snapshot;
94pub mod describe;
95pub mod discard;
96mod drop_connection;
97mod drop_database;
98pub mod drop_function;
99mod drop_index;
100pub mod drop_mv;
101mod drop_schema;
102pub mod drop_secret;
103pub mod drop_sink;
104pub mod drop_source;
105pub mod drop_subscription;
106pub mod drop_table;
107pub mod drop_user;
108mod drop_view;
109pub mod explain;
110pub mod explain_analyze_stream_job;
111pub mod extended_handle;
112pub mod fetch_cursor;
113mod flush;
114pub mod handle_privilege;
115pub mod kill_process;
116mod prepared_statement;
117pub mod privilege;
118pub mod query;
119mod recover;
120mod refresh;
121mod reset_source;
122pub mod show;
123mod transaction;
124mod use_db;
125pub mod util;
126pub mod vacuum;
127pub mod variable;
128mod wait;
129
130pub use alter_table_column::{
131 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
132};
133
134pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
136
137pub type RwPgResponse = PgResponse<PgResponseStream>;
139
140#[easy_ext::ext(RwPgResponseBuilderExt)]
141impl RwPgResponseBuilder {
142 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
144 let fields = T::fields();
145 self.values(
146 rows.into_iter()
147 .map(|row| {
148 Row::new(
149 row.into_owned_row()
150 .into_iter()
151 .zip_eq_fast(&fields)
152 .map(|(datum, (_, ty))| {
153 datum.map(|scalar| {
154 scalar.as_scalar_ref_impl().text_format(ty).into()
155 })
156 })
157 .collect(),
158 )
159 })
160 .collect_vec()
161 .into(),
162 fields_to_descriptors(fields),
163 )
164 }
165}
166
167pub fn fields_to_descriptors(
168 fields: Vec<(&str, risingwave_common::types::DataType)>,
169) -> Vec<PgFieldDescriptor> {
170 fields
171 .iter()
172 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
173 .collect()
174}
175
176pub enum PgResponseStream {
177 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
178 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
179 Rows(BoxStream<'static, RowSetResult>),
180}
181
182impl Stream for PgResponseStream {
183 type Item = std::result::Result<Vec<Row>, BoxedError>;
184
185 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
186 match &mut *self {
187 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
188 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
189 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
190 }
191 }
192}
193
194impl From<Vec<Row>> for PgResponseStream {
195 fn from(rows: Vec<Row>) -> Self {
196 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
197 }
198}
199
200#[derive(Clone)]
201pub struct HandlerArgs {
202 pub session: Arc<SessionImpl>,
203 pub sql: Arc<str>,
204 pub normalized_sql: String,
205 pub with_options: WithOptions,
206}
207
208impl HandlerArgs {
209 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
210 Ok(Self {
211 session,
212 sql,
213 with_options: WithOptions::try_from(stmt)?,
214 normalized_sql: Self::normalize_sql(stmt),
215 })
216 }
217
218 fn normalize_sql(stmt: &Statement) -> String {
224 let mut stmt = stmt.clone();
225 match &mut stmt {
226 Statement::CreateView {
227 or_replace,
228 if_not_exists,
229 ..
230 } => {
231 *or_replace = false;
232 *if_not_exists = false;
233 }
234 Statement::CreateTable {
235 or_replace,
236 if_not_exists,
237 ..
238 } => {
239 *or_replace = false;
240 *if_not_exists = false;
241 }
242 Statement::CreateIndex { if_not_exists, .. } => {
243 *if_not_exists = false;
244 }
245 Statement::CreateSource {
246 stmt: CreateSourceStatement { if_not_exists, .. },
247 ..
248 } => {
249 *if_not_exists = false;
250 }
251 Statement::CreateSink {
252 stmt: CreateSinkStatement { if_not_exists, .. },
253 } => {
254 *if_not_exists = false;
255 }
256 Statement::CreateSubscription {
257 stmt: CreateSubscriptionStatement { if_not_exists, .. },
258 } => {
259 *if_not_exists = false;
260 }
261 Statement::CreateConnection {
262 stmt: CreateConnectionStatement { if_not_exists, .. },
263 } => {
264 *if_not_exists = false;
265 }
266 _ => {}
267 }
268 stmt.to_string()
269 }
270}
271
272#[expect(clippy::large_stack_frames)]
273pub async fn handle(
274 session: Arc<SessionImpl>,
275 stmt: Statement,
276 sql: Arc<str>,
277 formats: Vec<Format>,
278) -> Result<RwPgResponse> {
279 session.clear_cancel_query_flag();
280 let _guard = session.txn_begin_implicit();
281 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
282
283 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
284
285 match stmt {
286 Statement::Explain {
287 statement,
288 analyze,
289 options,
290 } => {
291 Box::pin(explain::handle_explain(
292 handler_args,
293 *statement,
294 options,
295 analyze,
296 ))
297 .await
298 }
299 Statement::ExplainAnalyzeStreamJob {
300 target,
301 duration_secs,
302 } => {
303 explain_analyze_stream_job::handle_explain_analyze_stream_job(
304 handler_args,
305 target,
306 duration_secs,
307 )
308 .await
309 }
310 Statement::CreateSource { stmt } => {
311 create_source::handle_create_source(handler_args, stmt).await
312 }
313 Statement::CreateSink { stmt } => {
314 create_sink::handle_create_sink(handler_args, stmt, false).await
315 }
316 Statement::CreateSubscription { stmt } => {
317 create_subscription::handle_create_subscription(handler_args, stmt).await
318 }
319 Statement::CreateConnection { stmt } => {
320 create_connection::handle_create_connection(handler_args, stmt).await
321 }
322 Statement::CreateSecret { stmt } => {
323 create_secret::handle_create_secret(handler_args, stmt).await
324 }
325 Statement::CreateFunction {
326 or_replace,
327 temporary,
328 if_not_exists,
329 name,
330 args,
331 returns,
332 params,
333 with_options,
334 } => {
335 if params.language.is_none()
338 || !params
339 .language
340 .as_ref()
341 .unwrap()
342 .real_value()
343 .eq_ignore_ascii_case("sql")
344 {
345 create_function::handle_create_function(
346 handler_args,
347 or_replace,
348 temporary,
349 if_not_exists,
350 name,
351 args,
352 returns,
353 params,
354 with_options,
355 )
356 .await
357 } else {
358 create_sql_function::handle_create_sql_function(
359 handler_args,
360 or_replace,
361 temporary,
362 if_not_exists,
363 name,
364 args,
365 returns,
366 params,
367 )
368 .await
369 }
370 }
371 Statement::CreateAggregate {
372 or_replace,
373 if_not_exists,
374 name,
375 args,
376 returns,
377 params,
378 ..
379 } => {
380 create_aggregate::handle_create_aggregate(
381 handler_args,
382 or_replace,
383 if_not_exists,
384 name,
385 args,
386 returns,
387 params,
388 )
389 .await
390 }
391 Statement::CreateTable {
392 name,
393 columns,
394 wildcard_idx,
395 constraints,
396 query,
397 with_options: _, or_replace,
400 temporary,
401 if_not_exists,
402 format_encode,
403 source_watermarks,
404 append_only,
405 on_conflict,
406 with_version_columns,
407 cdc_table_info,
408 include_column_options,
409 webhook_info,
410 engine,
411 } => {
412 if or_replace {
413 bail_not_implemented!("CREATE OR REPLACE TABLE");
414 }
415 if temporary {
416 bail_not_implemented!("CREATE TEMPORARY TABLE");
417 }
418 if let Some(query) = query {
419 return create_table_as::handle_create_as(
420 handler_args,
421 name,
422 if_not_exists,
423 query,
424 columns,
425 append_only,
426 on_conflict,
427 with_version_columns
428 .iter()
429 .map(|col| col.real_value())
430 .collect(),
431 engine,
432 )
433 .await;
434 }
435 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
436 Box::pin(create_table::handle_create_table(
437 handler_args,
438 name,
439 columns,
440 wildcard_idx,
441 constraints,
442 if_not_exists,
443 format_encode,
444 source_watermarks,
445 append_only,
446 on_conflict,
447 with_version_columns
448 .iter()
449 .map(|col| col.real_value())
450 .collect(),
451 cdc_table_info,
452 include_column_options,
453 webhook_info,
454 engine,
455 ))
456 .await
457 }
458 Statement::CreateDatabase {
459 db_name,
460 if_not_exists,
461 owner,
462 resource_group,
463 barrier_interval_ms,
464 checkpoint_frequency,
465 } => {
466 create_database::handle_create_database(
467 handler_args,
468 db_name,
469 if_not_exists,
470 owner,
471 resource_group,
472 barrier_interval_ms,
473 checkpoint_frequency,
474 )
475 .await
476 }
477 Statement::CreateSchema {
478 schema_name,
479 if_not_exists,
480 owner,
481 } => {
482 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
483 .await
484 }
485 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
486 Statement::DeclareCursor { stmt } => {
487 declare_cursor::handle_declare_cursor(handler_args, stmt).await
488 }
489 Statement::FetchCursor { stmt } => {
490 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
491 }
492 Statement::CloseCursor { stmt } => {
493 close_cursor::handle_close_cursor(handler_args, stmt).await
494 }
495 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
496 Statement::Grant { .. } => {
497 handle_privilege::handle_grant_privilege(handler_args, stmt).await
498 }
499 Statement::Revoke { .. } => {
500 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
501 }
502 Statement::Describe { name, kind } => match kind {
503 DescribeKind::Fragments => {
504 describe::handle_describe_fragments(handler_args, name).await
505 }
506 DescribeKind::Plain => describe::handle_describe(handler_args, name),
507 },
508 Statement::DescribeFragment { fragment_id } => {
509 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
510 }
511 Statement::Discard(..) => discard::handle_discard(handler_args),
512 Statement::ShowObjects {
513 object: show_object,
514 filter,
515 } => show::handle_show_object(handler_args, show_object, filter).await,
516 Statement::ShowCreateObject { create_type, name } => {
517 show::handle_show_create_object(handler_args, create_type, name)
518 }
519 Statement::ShowTransactionIsolationLevel => {
520 transaction::handle_show_isolation_level(handler_args)
521 }
522 Statement::Drop(DropStatement {
523 object_type,
524 object_name,
525 if_exists,
526 drop_mode,
527 }) => {
528 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
529 match object_type {
530 ObjectType::MaterializedView
531 | ObjectType::View
532 | ObjectType::Sink
533 | ObjectType::Source
534 | ObjectType::Subscription
535 | ObjectType::Index
536 | ObjectType::Table
537 | ObjectType::Schema
538 | ObjectType::Connection
539 | ObjectType::Secret => true,
540 ObjectType::Database | ObjectType::User => {
541 bail_not_implemented!("DROP CASCADE");
542 }
543 }
544 } else {
545 false
546 };
547 match object_type {
548 ObjectType::Table => {
549 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
550 .await
551 }
552 ObjectType::MaterializedView => {
553 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
554 }
555 ObjectType::Index => {
556 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
557 .await
558 }
559 ObjectType::Source => {
560 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
561 .await
562 }
563 ObjectType::Sink => {
564 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
565 }
566 ObjectType::Subscription => {
567 drop_subscription::handle_drop_subscription(
568 handler_args,
569 object_name,
570 if_exists,
571 cascade,
572 )
573 .await
574 }
575 ObjectType::Database => {
576 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
577 }
578 ObjectType::Schema => {
579 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
580 .await
581 }
582 ObjectType::User => {
583 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
584 }
585 ObjectType::View => {
586 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
587 }
588 ObjectType::Connection => {
589 drop_connection::handle_drop_connection(
590 handler_args,
591 object_name,
592 if_exists,
593 cascade,
594 )
595 .await
596 }
597 ObjectType::Secret => {
598 drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
599 .await
600 }
601 }
602 }
603 Statement::DropFunction {
605 if_exists,
606 func_desc,
607 option,
608 } => {
609 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
610 .await
611 }
612 Statement::DropAggregate {
613 if_exists,
614 func_desc,
615 option,
616 } => {
617 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
618 .await
619 }
620 Statement::Query(_)
621 | Statement::Insert { .. }
622 | Statement::Delete { .. }
623 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
624 Statement::Copy {
625 entity: CopyEntity::Query(query),
626 target: CopyTarget::Stdout,
627 } => {
628 let response =
629 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
630 .await?;
631 Ok(response.into_copy_query_to_stdout())
632 }
633 Statement::CreateView {
634 materialized,
635 if_not_exists,
636 name,
637 columns,
638 query,
639 with_options: _, or_replace, emit_mode,
642 } => {
643 if or_replace {
644 bail_not_implemented!("CREATE OR REPLACE VIEW");
645 }
646 if materialized {
647 create_mv::handle_create_mv(
648 handler_args,
649 if_not_exists,
650 name,
651 *query,
652 columns,
653 emit_mode,
654 )
655 .await
656 } else {
657 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
658 .await
659 }
660 }
661 Statement::Flush => flush::handle_flush(handler_args).await,
662 Statement::Wait(target) => wait::handle_wait(handler_args, target).await,
663 Statement::Backup => backup::handle_backup(handler_args).await,
664 Statement::DeleteMetaSnapshots { snapshot_ids } => {
665 delete_meta_snapshot::handle_delete_meta_snapshots(handler_args, snapshot_ids).await
666 }
667 Statement::Recover => recover::handle_recover(handler_args).await,
668 Statement::SetVariable {
669 local: _,
670 variable,
671 value,
672 } => {
673 if variable.real_value().eq_ignore_ascii_case("database") {
675 let x = variable::set_var_to_param_str(&value);
676 let res = use_db::handle_use_db(
677 handler_args,
678 ObjectName::from(vec![Ident::from_real_value(
679 x.as_deref().unwrap_or("default"),
680 )]),
681 )?;
682 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
683 for notice in res.notices() {
684 builder = builder.notice(notice);
685 }
686 return Ok(builder.into());
687 }
688 variable::handle_set(handler_args, variable, value)
689 }
690 Statement::SetTimeZone { local: _, value } => {
691 variable::handle_set_time_zone(handler_args, value)
692 }
693 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
694 Statement::CreateIndex {
695 name,
696 table_name,
697 method,
698 columns,
699 include,
700 distributed_by,
701 unique,
702 if_not_exists,
703 with_properties: _,
704 } => {
705 if unique {
706 bail_not_implemented!("create unique index");
707 }
708
709 create_index::handle_create_index(
710 handler_args,
711 if_not_exists,
712 name,
713 table_name,
714 method,
715 columns.clone(),
716 include,
717 distributed_by,
718 )
719 .await
720 }
721 Statement::AlterDatabase { name, operation } => match operation {
722 AlterDatabaseOperation::RenameDatabase { database_name } => {
723 alter_rename::handle_rename_database(handler_args, name, database_name).await
724 }
725 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
726 alter_owner::handle_alter_owner(
727 handler_args,
728 name,
729 new_owner_name,
730 StatementType::ALTER_DATABASE,
731 None,
732 )
733 .await
734 }
735 AlterDatabaseOperation::SetParam(config_param) => {
736 let ConfigParam { param, value } = config_param;
737
738 let database_param = match param.real_value().to_uppercase().as_str() {
739 "BARRIER_INTERVAL_MS" => {
740 let barrier_interval_ms = match value {
741 SetVariableValue::Default => None,
742 SetVariableValue::Single(SetVariableValueSingle::Literal(
743 Value::Number(num),
744 )) => {
745 let num = num.parse::<u32>().map_err(|e| {
746 ErrorCode::InvalidInputSyntax(format!(
747 "barrier_interval_ms must be a u32 integer: {}",
748 e.as_report()
749 ))
750 })?;
751 Some(num)
752 }
753 _ => {
754 return Err(ErrorCode::InvalidInputSyntax(
755 "barrier_interval_ms must be a u32 integer or DEFAULT"
756 .to_owned(),
757 )
758 .into());
759 }
760 };
761 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
762 }
763 "CHECKPOINT_FREQUENCY" => {
764 let checkpoint_frequency = match value {
765 SetVariableValue::Default => None,
766 SetVariableValue::Single(SetVariableValueSingle::Literal(
767 Value::Number(num),
768 )) => {
769 let num = num.parse::<u64>().map_err(|e| {
770 ErrorCode::InvalidInputSyntax(format!(
771 "checkpoint_frequency must be a u64 integer: {}",
772 e.as_report()
773 ))
774 })?;
775 Some(num)
776 }
777 _ => {
778 return Err(ErrorCode::InvalidInputSyntax(
779 "checkpoint_frequency must be a u64 integer or DEFAULT"
780 .to_owned(),
781 )
782 .into());
783 }
784 };
785 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
786 }
787 _ => {
788 return Err(ErrorCode::InvalidInputSyntax(format!(
789 "Unsupported database config parameter: {}",
790 param.real_value()
791 ))
792 .into());
793 }
794 };
795
796 alter_database_param::handle_alter_database_param(
797 handler_args,
798 name,
799 database_param,
800 )
801 .await
802 }
803 },
804 Statement::AlterSchema { name, operation } => match operation {
805 AlterSchemaOperation::RenameSchema { schema_name } => {
806 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
807 }
808 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
809 alter_owner::handle_alter_owner(
810 handler_args,
811 name,
812 new_owner_name,
813 StatementType::ALTER_SCHEMA,
814 None,
815 )
816 .await
817 }
818 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
819 alter_swap_rename::handle_swap_rename(
820 handler_args,
821 name,
822 target_schema,
823 StatementType::ALTER_SCHEMA,
824 )
825 .await
826 }
827 },
828 Statement::AlterTable { name, operation } => match operation {
829 AlterTableOperation::AddColumn { .. }
830 | AlterTableOperation::DropColumn { .. }
831 | AlterTableOperation::AlterColumn { .. } => {
832 Box::pin(alter_table_column::handle_alter_table_column(
833 handler_args,
834 name,
835 operation,
836 ))
837 .await
838 }
839 AlterTableOperation::AlterWatermark {
840 column_name,
841 expr,
842 with_ttl,
843 } => {
844 Box::pin(alter_watermark::handle_alter_watermark(
845 handler_args,
846 name,
847 column_name,
848 expr,
849 with_ttl,
850 ))
851 .await
852 }
853 AlterTableOperation::RenameTable { table_name } => {
854 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
855 .await
856 }
857 AlterTableOperation::ChangeOwner { new_owner_name } => {
858 alter_owner::handle_alter_owner(
859 handler_args,
860 name,
861 new_owner_name,
862 StatementType::ALTER_TABLE,
863 None,
864 )
865 .await
866 }
867 AlterTableOperation::SetParallelism {
868 parallelism,
869 deferred,
870 } => {
871 alter_parallelism::handle_alter_parallelism(
872 handler_args,
873 name,
874 parallelism,
875 StatementType::ALTER_TABLE,
876 deferred,
877 )
878 .await
879 }
880 AlterTableOperation::SetBackfillParallelism {
881 parallelism,
882 deferred,
883 } => {
884 alter_parallelism::handle_alter_backfill_parallelism(
885 handler_args,
886 name,
887 parallelism,
888 StatementType::ALTER_TABLE,
889 deferred,
890 )
891 .await
892 }
893 AlterTableOperation::SetSchema { new_schema_name } => {
894 alter_set_schema::handle_alter_set_schema(
895 handler_args,
896 name,
897 new_schema_name,
898 StatementType::ALTER_TABLE,
899 None,
900 )
901 .await
902 }
903 AlterTableOperation::RefreshSchema => {
904 Box::pin(alter_table_with_sr::handle_refresh_schema(
905 handler_args,
906 name,
907 ))
908 .await
909 }
910 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
911 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
912 handler_args,
913 PbThrottleTarget::Table,
914 risingwave_pb::common::PbThrottleType::Source,
915 name,
916 rate_limit,
917 )
918 .await
919 }
920 AlterTableOperation::DropConnector => {
921 Box::pin(
922 alter_table_drop_connector::handle_alter_table_drop_connector(
923 handler_args,
924 name,
925 ),
926 )
927 .await
928 }
929 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
930 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
931 handler_args,
932 PbThrottleTarget::Table,
933 risingwave_pb::common::PbThrottleType::Dml,
934 name,
935 rate_limit,
936 )
937 .await
938 }
939 AlterTableOperation::SetConfig { entries } => {
940 alter_streaming_config::handle_alter_streaming_set_config(
941 handler_args,
942 name,
943 entries,
944 StatementType::ALTER_TABLE,
945 )
946 .await
947 }
948 AlterTableOperation::ResetConfig { keys } => {
949 alter_streaming_config::handle_alter_streaming_reset_config(
950 handler_args,
951 name,
952 keys,
953 StatementType::ALTER_TABLE,
954 )
955 .await
956 }
957 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
958 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
959 handler_args,
960 PbThrottleTarget::Table,
961 risingwave_pb::common::PbThrottleType::Backfill,
962 name,
963 rate_limit,
964 )
965 .await
966 }
967 AlterTableOperation::SwapRenameTable { target_table } => {
968 alter_swap_rename::handle_swap_rename(
969 handler_args,
970 name,
971 target_table,
972 StatementType::ALTER_TABLE,
973 )
974 .await
975 }
976 AlterTableOperation::AlterConnectorProps { alter_props } => {
977 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
978 }
979 AlterTableOperation::AddConstraint { .. }
980 | AlterTableOperation::DropConstraint { .. }
981 | AlterTableOperation::RenameColumn { .. }
982 | AlterTableOperation::ChangeColumn { .. }
983 | AlterTableOperation::RenameConstraint { .. } => {
984 bail_not_implemented!(
985 "Unhandled statement: {}",
986 Statement::AlterTable { name, operation }
987 )
988 }
989 },
990 Statement::AlterIndex { name, operation } => match operation {
991 AlterIndexOperation::RenameIndex { index_name } => {
992 alter_rename::handle_rename_index(handler_args, name, index_name).await
993 }
994 AlterIndexOperation::SetParallelism {
995 parallelism,
996 deferred,
997 } => {
998 alter_parallelism::handle_alter_parallelism(
999 handler_args,
1000 name,
1001 parallelism,
1002 StatementType::ALTER_INDEX,
1003 deferred,
1004 )
1005 .await
1006 }
1007 AlterIndexOperation::SetBackfillParallelism {
1008 parallelism,
1009 deferred,
1010 } => {
1011 alter_parallelism::handle_alter_backfill_parallelism(
1012 handler_args,
1013 name,
1014 parallelism,
1015 StatementType::ALTER_INDEX,
1016 deferred,
1017 )
1018 .await
1019 }
1020 AlterIndexOperation::SetConfig { entries } => {
1021 alter_streaming_config::handle_alter_streaming_set_config(
1022 handler_args,
1023 name,
1024 entries,
1025 StatementType::ALTER_INDEX,
1026 )
1027 .await
1028 }
1029 AlterIndexOperation::ResetConfig { keys } => {
1030 alter_streaming_config::handle_alter_streaming_reset_config(
1031 handler_args,
1032 name,
1033 keys,
1034 StatementType::ALTER_INDEX,
1035 )
1036 .await
1037 }
1038 },
1039 Statement::AlterView {
1040 materialized,
1041 name,
1042 operation,
1043 } => {
1044 let statement_type = if materialized {
1045 StatementType::ALTER_MATERIALIZED_VIEW
1046 } else {
1047 StatementType::ALTER_VIEW
1048 };
1049 match operation {
1050 AlterViewOperation::RenameView { view_name } => {
1051 if materialized {
1052 alter_rename::handle_rename_table(
1053 handler_args,
1054 TableType::MaterializedView,
1055 name,
1056 view_name,
1057 )
1058 .await
1059 } else {
1060 alter_rename::handle_rename_view(handler_args, name, view_name).await
1061 }
1062 }
1063 AlterViewOperation::SetParallelism {
1064 parallelism,
1065 deferred,
1066 } => {
1067 if !materialized {
1068 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1069 }
1070 alter_parallelism::handle_alter_parallelism(
1071 handler_args,
1072 name,
1073 parallelism,
1074 statement_type,
1075 deferred,
1076 )
1077 .await
1078 }
1079 AlterViewOperation::SetBackfillParallelism {
1080 parallelism,
1081 deferred,
1082 } => {
1083 if !materialized {
1084 bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1085 }
1086 alter_parallelism::handle_alter_backfill_parallelism(
1087 handler_args,
1088 name,
1089 parallelism,
1090 statement_type,
1091 deferred,
1092 )
1093 .await
1094 }
1095 AlterViewOperation::SetResourceGroup {
1096 resource_group,
1097 deferred,
1098 } => {
1099 if !materialized {
1100 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1101 }
1102 alter_resource_group::handle_alter_resource_group(
1103 handler_args,
1104 name,
1105 resource_group,
1106 statement_type,
1107 deferred,
1108 )
1109 .await
1110 }
1111 AlterViewOperation::ChangeOwner { new_owner_name } => {
1112 alter_owner::handle_alter_owner(
1113 handler_args,
1114 name,
1115 new_owner_name,
1116 statement_type,
1117 None,
1118 )
1119 .await
1120 }
1121 AlterViewOperation::SetSchema { new_schema_name } => {
1122 alter_set_schema::handle_alter_set_schema(
1123 handler_args,
1124 name,
1125 new_schema_name,
1126 statement_type,
1127 None,
1128 )
1129 .await
1130 }
1131 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1132 if !materialized {
1133 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1134 }
1135 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1136 handler_args,
1137 PbThrottleTarget::Mv,
1138 risingwave_pb::common::PbThrottleType::Backfill,
1139 name,
1140 rate_limit,
1141 )
1142 .await
1143 }
1144 AlterViewOperation::SwapRenameView { target_view } => {
1145 alter_swap_rename::handle_swap_rename(
1146 handler_args,
1147 name,
1148 target_view,
1149 statement_type,
1150 )
1151 .await
1152 }
1153 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1154 if !materialized {
1155 bail!(
1156 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1157 );
1158 }
1159 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1160 }
1161 AlterViewOperation::AsQuery { query } => {
1162 if !materialized {
1163 bail_not_implemented!("ALTER VIEW AS QUERY");
1164 }
1165 if !cfg!(debug_assertions) {
1167 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1168 }
1169 alter_mv::handle_alter_mv(handler_args, name, query).await
1170 }
1171 AlterViewOperation::SetConfig { entries } => {
1172 if !materialized {
1173 bail!("SET CONFIG is only supported for materialized views");
1174 }
1175 alter_streaming_config::handle_alter_streaming_set_config(
1176 handler_args,
1177 name,
1178 entries,
1179 statement_type,
1180 )
1181 .await
1182 }
1183 AlterViewOperation::ResetConfig { keys } => {
1184 if !materialized {
1185 bail!("RESET CONFIG is only supported for materialized views");
1186 }
1187 alter_streaming_config::handle_alter_streaming_reset_config(
1188 handler_args,
1189 name,
1190 keys,
1191 statement_type,
1192 )
1193 .await
1194 }
1195 }
1196 }
1197
1198 Statement::AlterSink { name, operation } => match operation {
1199 AlterSinkOperation::AlterConnectorProps {
1200 alter_props: changed_props,
1201 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1202 AlterSinkOperation::RenameSink { sink_name } => {
1203 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1204 }
1205 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1206 alter_owner::handle_alter_owner(
1207 handler_args,
1208 name,
1209 new_owner_name,
1210 StatementType::ALTER_SINK,
1211 None,
1212 )
1213 .await
1214 }
1215 AlterSinkOperation::SetSchema { new_schema_name } => {
1216 alter_set_schema::handle_alter_set_schema(
1217 handler_args,
1218 name,
1219 new_schema_name,
1220 StatementType::ALTER_SINK,
1221 None,
1222 )
1223 .await
1224 }
1225 AlterSinkOperation::SetParallelism {
1226 parallelism,
1227 deferred,
1228 } => {
1229 alter_parallelism::handle_alter_parallelism(
1230 handler_args,
1231 name,
1232 parallelism,
1233 StatementType::ALTER_SINK,
1234 deferred,
1235 )
1236 .await
1237 }
1238 AlterSinkOperation::SetBackfillParallelism {
1239 parallelism,
1240 deferred,
1241 } => {
1242 alter_parallelism::handle_alter_backfill_parallelism(
1243 handler_args,
1244 name,
1245 parallelism,
1246 StatementType::ALTER_SINK,
1247 deferred,
1248 )
1249 .await
1250 }
1251 AlterSinkOperation::SetConfig { entries } => {
1252 alter_streaming_config::handle_alter_streaming_set_config(
1253 handler_args,
1254 name,
1255 entries,
1256 StatementType::ALTER_SINK,
1257 )
1258 .await
1259 }
1260 AlterSinkOperation::ResetConfig { keys } => {
1261 alter_streaming_config::handle_alter_streaming_reset_config(
1262 handler_args,
1263 name,
1264 keys,
1265 StatementType::ALTER_SINK,
1266 )
1267 .await
1268 }
1269 AlterSinkOperation::SwapRenameSink { target_sink } => {
1270 alter_swap_rename::handle_swap_rename(
1271 handler_args,
1272 name,
1273 target_sink,
1274 StatementType::ALTER_SINK,
1275 )
1276 .await
1277 }
1278 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1279 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1280 handler_args,
1281 PbThrottleTarget::Sink,
1282 risingwave_pb::common::PbThrottleType::Sink,
1283 name,
1284 rate_limit,
1285 )
1286 .await
1287 }
1288 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1289 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1290 handler_args,
1291 PbThrottleTarget::Sink,
1292 risingwave_pb::common::PbThrottleType::Backfill,
1293 name,
1294 rate_limit,
1295 )
1296 .await
1297 }
1298 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1299 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1300 handler_args,
1301 name,
1302 enable,
1303 )
1304 .await
1305 }
1306 },
1307 Statement::AlterSubscription { name, operation } => match operation {
1308 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1309 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1310 .await
1311 }
1312 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1313 alter_owner::handle_alter_owner(
1314 handler_args,
1315 name,
1316 new_owner_name,
1317 StatementType::ALTER_SUBSCRIPTION,
1318 None,
1319 )
1320 .await
1321 }
1322 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1323 alter_set_schema::handle_alter_set_schema(
1324 handler_args,
1325 name,
1326 new_schema_name,
1327 StatementType::ALTER_SUBSCRIPTION,
1328 None,
1329 )
1330 .await
1331 }
1332 AlterSubscriptionOperation::SetRetention { retention } => {
1333 alter_subscription_retention::handle_alter_subscription_retention(
1334 handler_args,
1335 name,
1336 retention,
1337 )
1338 .await
1339 }
1340 AlterSubscriptionOperation::SwapRenameSubscription {
1341 target_subscription,
1342 } => {
1343 alter_swap_rename::handle_swap_rename(
1344 handler_args,
1345 name,
1346 target_subscription,
1347 StatementType::ALTER_SUBSCRIPTION,
1348 )
1349 .await
1350 }
1351 },
1352 Statement::AlterSource { name, operation } => match operation {
1353 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1354 alter_source_props::handle_alter_source_connector_props(
1355 handler_args,
1356 name,
1357 alter_props,
1358 )
1359 .await
1360 }
1361 AlterSourceOperation::RenameSource { source_name } => {
1362 alter_rename::handle_rename_source(handler_args, name, source_name).await
1363 }
1364 AlterSourceOperation::AddColumn { .. } => {
1365 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1366 }
1367 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1368 alter_owner::handle_alter_owner(
1369 handler_args,
1370 name,
1371 new_owner_name,
1372 StatementType::ALTER_SOURCE,
1373 None,
1374 )
1375 .await
1376 }
1377 AlterSourceOperation::SetSchema { new_schema_name } => {
1378 alter_set_schema::handle_alter_set_schema(
1379 handler_args,
1380 name,
1381 new_schema_name,
1382 StatementType::ALTER_SOURCE,
1383 None,
1384 )
1385 .await
1386 }
1387 AlterSourceOperation::FormatEncode { format_encode } => {
1388 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1389 .await
1390 }
1391 AlterSourceOperation::RefreshSchema => {
1392 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1393 }
1394 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1395 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1396 handler_args,
1397 PbThrottleTarget::Source,
1398 risingwave_pb::common::PbThrottleType::Source,
1399 name,
1400 rate_limit,
1401 )
1402 .await
1403 }
1404 AlterSourceOperation::SwapRenameSource { target_source } => {
1405 alter_swap_rename::handle_swap_rename(
1406 handler_args,
1407 name,
1408 target_source,
1409 StatementType::ALTER_SOURCE,
1410 )
1411 .await
1412 }
1413 AlterSourceOperation::SetParallelism {
1414 parallelism,
1415 deferred,
1416 } => {
1417 alter_parallelism::handle_alter_parallelism(
1418 handler_args,
1419 name,
1420 parallelism,
1421 StatementType::ALTER_SOURCE,
1422 deferred,
1423 )
1424 .await
1425 }
1426 AlterSourceOperation::SetBackfillParallelism {
1427 parallelism,
1428 deferred,
1429 } => {
1430 alter_parallelism::handle_alter_backfill_parallelism(
1431 handler_args,
1432 name,
1433 parallelism,
1434 StatementType::ALTER_SOURCE,
1435 deferred,
1436 )
1437 .await
1438 }
1439 AlterSourceOperation::SetConfig { entries } => {
1440 alter_streaming_config::handle_alter_streaming_set_config(
1441 handler_args,
1442 name,
1443 entries,
1444 StatementType::ALTER_SOURCE,
1445 )
1446 .await
1447 }
1448 AlterSourceOperation::ResetConfig { keys } => {
1449 alter_streaming_config::handle_alter_streaming_reset_config(
1450 handler_args,
1451 name,
1452 keys,
1453 StatementType::ALTER_SOURCE,
1454 )
1455 .await
1456 }
1457 AlterSourceOperation::ResetSource => {
1458 reset_source::handle_reset_source(handler_args, name).await
1459 }
1460 },
1461 Statement::AlterFunction {
1462 name,
1463 args,
1464 operation,
1465 } => match operation {
1466 AlterFunctionOperation::SetSchema { new_schema_name } => {
1467 alter_set_schema::handle_alter_set_schema(
1468 handler_args,
1469 name,
1470 new_schema_name,
1471 StatementType::ALTER_FUNCTION,
1472 args,
1473 )
1474 .await
1475 }
1476 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1477 alter_owner::handle_alter_owner(
1478 handler_args,
1479 name,
1480 new_owner_name,
1481 StatementType::ALTER_FUNCTION,
1482 args,
1483 )
1484 .await
1485 }
1486 },
1487 Statement::AlterConnection { name, operation } => match operation {
1488 AlterConnectionOperation::SetSchema { new_schema_name } => {
1489 alter_set_schema::handle_alter_set_schema(
1490 handler_args,
1491 name,
1492 new_schema_name,
1493 StatementType::ALTER_CONNECTION,
1494 None,
1495 )
1496 .await
1497 }
1498 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1499 alter_owner::handle_alter_owner(
1500 handler_args,
1501 name,
1502 new_owner_name,
1503 StatementType::ALTER_CONNECTION,
1504 None,
1505 )
1506 .await
1507 }
1508 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1509 alter_connection_props::handle_alter_connection_connector_props(
1510 handler_args,
1511 name,
1512 alter_props,
1513 )
1514 .await
1515 }
1516 },
1517 Statement::AlterSystem { param, value } => {
1518 alter_system::handle_alter_system(handler_args, param, value).await
1519 }
1520 Statement::AlterSecret { name, operation } => match operation {
1521 AlterSecretOperation::ChangeCredential {
1522 with_options,
1523 new_credential,
1524 } => {
1525 alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1526 .await
1527 }
1528 AlterSecretOperation::ChangeOwner { new_owner_name } => {
1529 alter_owner::handle_alter_owner(
1530 handler_args,
1531 name,
1532 new_owner_name,
1533 StatementType::ALTER_SECRET,
1534 None,
1535 )
1536 .await
1537 }
1538 },
1539 Statement::AlterFragment {
1540 fragment_ids,
1541 operation,
1542 } => match operation {
1543 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1544 let [fragment_id] = fragment_ids.as_slice() else {
1545 return Err(ErrorCode::InvalidInputSyntax(
1546 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1547 .to_owned(),
1548 )
1549 .into());
1550 };
1551 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1552 &handler_args.session,
1553 PbThrottleTarget::Fragment,
1554 risingwave_pb::common::PbThrottleType::Backfill,
1555 *fragment_id,
1556 rate_limit,
1557 StatementType::SET_VARIABLE,
1558 )
1559 .await
1560 }
1561 AlterFragmentOperation::SetParallelism { parallelism } => {
1562 alter_parallelism::handle_alter_fragment_parallelism(
1563 handler_args,
1564 fragment_ids.into_iter().map_into().collect(),
1565 parallelism,
1566 )
1567 .await
1568 }
1569 },
1570 Statement::AlterDefaultPrivileges { .. } => {
1571 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1572 }
1573 Statement::AlterCompactionGroup {
1574 group_ids,
1575 operation,
1576 } => {
1577 alter_compaction_group::handle_alter_compaction_group(
1578 handler_args,
1579 group_ids,
1580 operation,
1581 )
1582 .await
1583 }
1584 Statement::StartTransaction { modes } => {
1585 transaction::handle_begin(handler_args, START_TRANSACTION, modes)
1586 }
1587 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes),
1588 Statement::Commit { chain } => {
1589 transaction::handle_commit(handler_args, COMMIT, chain).await
1590 }
1591 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1592 Statement::Rollback { chain } => {
1593 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1594 }
1595 Statement::SetTransaction {
1596 modes,
1597 snapshot,
1598 session,
1599 } => transaction::handle_set(handler_args, modes, snapshot, session),
1600 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1601 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1602 Statement::Comment {
1603 object_type,
1604 object_name,
1605 comment,
1606 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1607 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1608 Statement::Prepare {
1609 name,
1610 data_types,
1611 statement,
1612 } => prepared_statement::handle_prepare(name, data_types, statement),
1613 Statement::Deallocate { name, prepare } => {
1614 prepared_statement::handle_deallocate(name, prepare)
1615 }
1616 Statement::Vacuum { object_name, full } => {
1617 vacuum::handle_vacuum(handler_args, object_name, full).await
1618 }
1619 Statement::Refresh { table_name } => {
1620 refresh::handle_refresh(handler_args, table_name).await
1621 }
1622 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1623 }
1624}
1625
1626fn check_ban_ddl_for_iceberg_engine_table(
1627 session: Arc<SessionImpl>,
1628 stmt: &Statement,
1629) -> Result<()> {
1630 if let Statement::AlterTable { name, operation } = stmt {
1631 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1632 if table.is_iceberg_engine_table() {
1633 let has_auto_refresh_schema_sink =
1634 if matches!(operation, AlterTableOperation::AddColumn { .. }) {
1635 let catalog_reader = session.env().catalog_reader().read_guard();
1636 let db_name = session.database();
1637 let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name());
1638 let sink = catalog_reader
1639 .get_schema_by_name(&db_name, &schema_name)
1640 .ok()
1641 .and_then(|schema| schema.get_created_sink_by_name(&sink_name));
1642 sink.and_then(|s| s.auto_refresh_schema_from_table)
1643 .is_some()
1644 } else {
1645 false
1646 };
1647
1648 check_ban_alter_table_operation_for_iceberg_engine_table(
1649 operation,
1650 &schema_name,
1651 name,
1652 has_auto_refresh_schema_sink,
1653 )?;
1654 }
1655 }
1656
1657 Ok(())
1658}
1659
1660fn check_ban_alter_table_operation_for_iceberg_engine_table(
1661 operation: &AlterTableOperation,
1662 schema_name: &str,
1663 table_name: &ObjectName,
1664 has_auto_refresh_schema_sink: bool,
1665) -> Result<()> {
1666 match operation {
1667 AlterTableOperation::AddColumn { .. } => {
1668 if !has_auto_refresh_schema_sink {
1669 bail!(
1670 "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1671 operation,
1672 schema_name,
1673 table_name
1674 );
1675 }
1676 }
1677 AlterTableOperation::DropColumn { .. } => {
1678 bail!(
1680 "ALTER TABLE DROP COLUMN is not supported for iceberg table: {}.{}",
1681 schema_name,
1682 table_name
1683 );
1684 }
1685 AlterTableOperation::RenameColumn { .. }
1686 | AlterTableOperation::ChangeColumn { .. }
1687 | AlterTableOperation::AlterColumn { .. } => {
1688 bail!(
1689 "ALTER TABLE {} is not supported for iceberg table: {}.{}. Existing column schema mutation is not supported currently",
1690 operation,
1691 schema_name,
1692 table_name
1693 );
1694 }
1695 AlterTableOperation::RenameTable { .. } => {
1696 bail!(
1697 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1698 schema_name,
1699 table_name
1700 );
1701 }
1702 AlterTableOperation::SetParallelism { .. } => {
1703 bail!(
1704 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1705 schema_name,
1706 table_name
1707 );
1708 }
1709 AlterTableOperation::SetBackfillParallelism { .. } => {
1710 bail!(
1711 "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1712 schema_name,
1713 table_name
1714 );
1715 }
1716 AlterTableOperation::SetSchema { .. } => {
1717 bail!(
1718 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1719 schema_name,
1720 table_name
1721 );
1722 }
1723 AlterTableOperation::RefreshSchema => {
1724 bail!(
1725 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1726 schema_name,
1727 table_name
1728 );
1729 }
1730 AlterTableOperation::SetSourceRateLimit { .. } => {
1731 bail!(
1732 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1733 schema_name,
1734 table_name
1735 );
1736 }
1737 AlterTableOperation::AlterWatermark { .. } => {
1738 bail!(
1739 "ALTER TABLE ALTER WATERMARK is not supported for iceberg table: {}.{}",
1740 schema_name,
1741 table_name
1742 );
1743 }
1744 _ => {}
1745 }
1746 Ok(())
1747}