1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX};
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_subscription_retention;
62mod alter_swap_rename;
63mod alter_system;
64mod alter_table_column;
65pub mod alter_table_drop_connector;
66pub mod alter_table_props;
67mod alter_table_with_sr;
68pub mod alter_user;
69mod alter_utils;
70mod backup;
71pub mod cancel_job;
72pub mod close_cursor;
73mod comment;
74pub mod create_aggregate;
75pub mod create_connection;
76mod create_database;
77pub mod create_function;
78pub mod create_index;
79pub mod create_mv;
80pub mod create_schema;
81pub mod create_secret;
82pub mod create_sink;
83pub mod create_source;
84pub mod create_sql_function;
85pub mod create_subscription;
86pub mod create_table;
87pub mod create_table_as;
88pub mod create_user;
89pub mod create_view;
90pub mod declare_cursor;
91mod delete_meta_snapshot;
92pub mod describe;
93pub mod discard;
94mod drop_connection;
95mod drop_database;
96pub mod drop_function;
97mod drop_index;
98pub mod drop_mv;
99mod drop_schema;
100pub mod drop_secret;
101pub mod drop_sink;
102pub mod drop_source;
103pub mod drop_subscription;
104pub mod drop_table;
105pub mod drop_user;
106mod drop_view;
107pub mod explain;
108pub mod explain_analyze_stream_job;
109pub mod extended_handle;
110pub mod fetch_cursor;
111mod flush;
112pub mod handle_privilege;
113pub mod kill_process;
114mod prepared_statement;
115pub mod privilege;
116pub mod query;
117mod recover;
118mod refresh;
119mod reset_source;
120pub mod show;
121mod transaction;
122mod use_db;
123pub mod util;
124pub mod vacuum;
125pub mod variable;
126mod wait;
127
128pub use alter_table_column::{
129 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
130};
131
132pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
134
135pub type RwPgResponse = PgResponse<PgResponseStream>;
137
138#[easy_ext::ext(RwPgResponseBuilderExt)]
139impl RwPgResponseBuilder {
140 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
142 let fields = T::fields();
143 self.values(
144 rows.into_iter()
145 .map(|row| {
146 Row::new(
147 row.into_owned_row()
148 .into_iter()
149 .zip_eq_fast(&fields)
150 .map(|(datum, (_, ty))| {
151 datum.map(|scalar| {
152 scalar.as_scalar_ref_impl().text_format(ty).into()
153 })
154 })
155 .collect(),
156 )
157 })
158 .collect_vec()
159 .into(),
160 fields_to_descriptors(fields),
161 )
162 }
163}
164
165pub fn fields_to_descriptors(
166 fields: Vec<(&str, risingwave_common::types::DataType)>,
167) -> Vec<PgFieldDescriptor> {
168 fields
169 .iter()
170 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
171 .collect()
172}
173
174pub enum PgResponseStream {
175 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
176 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
177 Rows(BoxStream<'static, RowSetResult>),
178}
179
180impl Stream for PgResponseStream {
181 type Item = std::result::Result<Vec<Row>, BoxedError>;
182
183 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
184 match &mut *self {
185 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
186 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
187 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
188 }
189 }
190}
191
192impl From<Vec<Row>> for PgResponseStream {
193 fn from(rows: Vec<Row>) -> Self {
194 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
195 }
196}
197
198#[derive(Clone)]
199pub struct HandlerArgs {
200 pub session: Arc<SessionImpl>,
201 pub sql: Arc<str>,
202 pub normalized_sql: String,
203 pub with_options: WithOptions,
204}
205
206impl HandlerArgs {
207 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
208 Ok(Self {
209 session,
210 sql,
211 with_options: WithOptions::try_from(stmt)?,
212 normalized_sql: Self::normalize_sql(stmt),
213 })
214 }
215
216 fn normalize_sql(stmt: &Statement) -> String {
222 let mut stmt = stmt.clone();
223 match &mut stmt {
224 Statement::CreateView {
225 or_replace,
226 if_not_exists,
227 ..
228 } => {
229 *or_replace = false;
230 *if_not_exists = false;
231 }
232 Statement::CreateTable {
233 or_replace,
234 if_not_exists,
235 ..
236 } => {
237 *or_replace = false;
238 *if_not_exists = false;
239 }
240 Statement::CreateIndex { if_not_exists, .. } => {
241 *if_not_exists = false;
242 }
243 Statement::CreateSource {
244 stmt: CreateSourceStatement { if_not_exists, .. },
245 ..
246 } => {
247 *if_not_exists = false;
248 }
249 Statement::CreateSink {
250 stmt: CreateSinkStatement { if_not_exists, .. },
251 } => {
252 *if_not_exists = false;
253 }
254 Statement::CreateSubscription {
255 stmt: CreateSubscriptionStatement { if_not_exists, .. },
256 } => {
257 *if_not_exists = false;
258 }
259 Statement::CreateConnection {
260 stmt: CreateConnectionStatement { if_not_exists, .. },
261 } => {
262 *if_not_exists = false;
263 }
264 _ => {}
265 }
266 stmt.to_string()
267 }
268}
269
270#[allow(clippy::large_stack_frames)]
271pub async fn handle(
272 session: Arc<SessionImpl>,
273 stmt: Statement,
274 sql: Arc<str>,
275 formats: Vec<Format>,
276) -> Result<RwPgResponse> {
277 session.clear_cancel_query_flag();
278 let _guard = session.txn_begin_implicit();
279 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
280
281 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
282
283 match stmt {
284 Statement::Explain {
285 statement,
286 analyze,
287 options,
288 } => {
289 Box::pin(explain::handle_explain(
290 handler_args,
291 *statement,
292 options,
293 analyze,
294 ))
295 .await
296 }
297 Statement::ExplainAnalyzeStreamJob {
298 target,
299 duration_secs,
300 } => {
301 explain_analyze_stream_job::handle_explain_analyze_stream_job(
302 handler_args,
303 target,
304 duration_secs,
305 )
306 .await
307 }
308 Statement::CreateSource { stmt } => {
309 create_source::handle_create_source(handler_args, stmt).await
310 }
311 Statement::CreateSink { stmt } => {
312 create_sink::handle_create_sink(handler_args, stmt, false).await
313 }
314 Statement::CreateSubscription { stmt } => {
315 create_subscription::handle_create_subscription(handler_args, stmt).await
316 }
317 Statement::CreateConnection { stmt } => {
318 create_connection::handle_create_connection(handler_args, stmt).await
319 }
320 Statement::CreateSecret { stmt } => {
321 create_secret::handle_create_secret(handler_args, stmt).await
322 }
323 Statement::CreateFunction {
324 or_replace,
325 temporary,
326 if_not_exists,
327 name,
328 args,
329 returns,
330 params,
331 with_options,
332 } => {
333 if params.language.is_none()
336 || !params
337 .language
338 .as_ref()
339 .unwrap()
340 .real_value()
341 .eq_ignore_ascii_case("sql")
342 {
343 create_function::handle_create_function(
344 handler_args,
345 or_replace,
346 temporary,
347 if_not_exists,
348 name,
349 args,
350 returns,
351 params,
352 with_options,
353 )
354 .await
355 } else {
356 create_sql_function::handle_create_sql_function(
357 handler_args,
358 or_replace,
359 temporary,
360 if_not_exists,
361 name,
362 args,
363 returns,
364 params,
365 )
366 .await
367 }
368 }
369 Statement::CreateAggregate {
370 or_replace,
371 if_not_exists,
372 name,
373 args,
374 returns,
375 params,
376 ..
377 } => {
378 create_aggregate::handle_create_aggregate(
379 handler_args,
380 or_replace,
381 if_not_exists,
382 name,
383 args,
384 returns,
385 params,
386 )
387 .await
388 }
389 Statement::CreateTable {
390 name,
391 columns,
392 wildcard_idx,
393 constraints,
394 query,
395 with_options: _, or_replace,
398 temporary,
399 if_not_exists,
400 format_encode,
401 source_watermarks,
402 append_only,
403 on_conflict,
404 with_version_columns,
405 cdc_table_info,
406 include_column_options,
407 webhook_info,
408 engine,
409 } => {
410 if or_replace {
411 bail_not_implemented!("CREATE OR REPLACE TABLE");
412 }
413 if temporary {
414 bail_not_implemented!("CREATE TEMPORARY TABLE");
415 }
416 if let Some(query) = query {
417 return create_table_as::handle_create_as(
418 handler_args,
419 name,
420 if_not_exists,
421 query,
422 columns,
423 append_only,
424 on_conflict,
425 with_version_columns
426 .iter()
427 .map(|col| col.real_value())
428 .collect(),
429 engine,
430 )
431 .await;
432 }
433 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
434 Box::pin(create_table::handle_create_table(
435 handler_args,
436 name,
437 columns,
438 wildcard_idx,
439 constraints,
440 if_not_exists,
441 format_encode,
442 source_watermarks,
443 append_only,
444 on_conflict,
445 with_version_columns
446 .iter()
447 .map(|col| col.real_value())
448 .collect(),
449 cdc_table_info,
450 include_column_options,
451 webhook_info,
452 engine,
453 ))
454 .await
455 }
456 Statement::CreateDatabase {
457 db_name,
458 if_not_exists,
459 owner,
460 resource_group,
461 barrier_interval_ms,
462 checkpoint_frequency,
463 } => {
464 create_database::handle_create_database(
465 handler_args,
466 db_name,
467 if_not_exists,
468 owner,
469 resource_group,
470 barrier_interval_ms,
471 checkpoint_frequency,
472 )
473 .await
474 }
475 Statement::CreateSchema {
476 schema_name,
477 if_not_exists,
478 owner,
479 } => {
480 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
481 .await
482 }
483 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
484 Statement::DeclareCursor { stmt } => {
485 declare_cursor::handle_declare_cursor(handler_args, stmt).await
486 }
487 Statement::FetchCursor { stmt } => {
488 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
489 }
490 Statement::CloseCursor { stmt } => {
491 close_cursor::handle_close_cursor(handler_args, stmt).await
492 }
493 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
494 Statement::Grant { .. } => {
495 handle_privilege::handle_grant_privilege(handler_args, stmt).await
496 }
497 Statement::Revoke { .. } => {
498 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
499 }
500 Statement::Describe { name, kind } => match kind {
501 DescribeKind::Fragments => {
502 describe::handle_describe_fragments(handler_args, name).await
503 }
504 DescribeKind::Plain => describe::handle_describe(handler_args, name),
505 },
506 Statement::DescribeFragment { fragment_id } => {
507 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
508 }
509 Statement::Discard(..) => discard::handle_discard(handler_args),
510 Statement::ShowObjects {
511 object: show_object,
512 filter,
513 } => show::handle_show_object(handler_args, show_object, filter).await,
514 Statement::ShowCreateObject { create_type, name } => {
515 show::handle_show_create_object(handler_args, create_type, name)
516 }
517 Statement::ShowTransactionIsolationLevel => {
518 transaction::handle_show_isolation_level(handler_args)
519 }
520 Statement::Drop(DropStatement {
521 object_type,
522 object_name,
523 if_exists,
524 drop_mode,
525 }) => {
526 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
527 match object_type {
528 ObjectType::MaterializedView
529 | ObjectType::View
530 | ObjectType::Sink
531 | ObjectType::Source
532 | ObjectType::Subscription
533 | ObjectType::Index
534 | ObjectType::Table
535 | ObjectType::Schema
536 | ObjectType::Connection
537 | ObjectType::Secret => true,
538 ObjectType::Database | ObjectType::User => {
539 bail_not_implemented!("DROP CASCADE");
540 }
541 }
542 } else {
543 false
544 };
545 match object_type {
546 ObjectType::Table => {
547 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
548 .await
549 }
550 ObjectType::MaterializedView => {
551 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
552 }
553 ObjectType::Index => {
554 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
555 .await
556 }
557 ObjectType::Source => {
558 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
559 .await
560 }
561 ObjectType::Sink => {
562 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
563 }
564 ObjectType::Subscription => {
565 drop_subscription::handle_drop_subscription(
566 handler_args,
567 object_name,
568 if_exists,
569 cascade,
570 )
571 .await
572 }
573 ObjectType::Database => {
574 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
575 }
576 ObjectType::Schema => {
577 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
578 .await
579 }
580 ObjectType::User => {
581 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
582 }
583 ObjectType::View => {
584 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
585 }
586 ObjectType::Connection => {
587 drop_connection::handle_drop_connection(
588 handler_args,
589 object_name,
590 if_exists,
591 cascade,
592 )
593 .await
594 }
595 ObjectType::Secret => {
596 drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
597 .await
598 }
599 }
600 }
601 Statement::DropFunction {
603 if_exists,
604 func_desc,
605 option,
606 } => {
607 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
608 .await
609 }
610 Statement::DropAggregate {
611 if_exists,
612 func_desc,
613 option,
614 } => {
615 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
616 .await
617 }
618 Statement::Query(_)
619 | Statement::Insert { .. }
620 | Statement::Delete { .. }
621 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
622 Statement::Copy {
623 entity: CopyEntity::Query(query),
624 target: CopyTarget::Stdout,
625 } => {
626 let response =
627 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
628 .await?;
629 Ok(response.into_copy_query_to_stdout())
630 }
631 Statement::CreateView {
632 materialized,
633 if_not_exists,
634 name,
635 columns,
636 query,
637 with_options: _, or_replace, emit_mode,
640 } => {
641 if or_replace {
642 bail_not_implemented!("CREATE OR REPLACE VIEW");
643 }
644 if materialized {
645 create_mv::handle_create_mv(
646 handler_args,
647 if_not_exists,
648 name,
649 *query,
650 columns,
651 emit_mode,
652 )
653 .await
654 } else {
655 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
656 .await
657 }
658 }
659 Statement::Flush => flush::handle_flush(handler_args).await,
660 Statement::Wait => wait::handle_wait(handler_args).await,
661 Statement::Backup => backup::handle_backup(handler_args).await,
662 Statement::DeleteMetaSnapshots { snapshot_ids } => {
663 delete_meta_snapshot::handle_delete_meta_snapshots(handler_args, snapshot_ids).await
664 }
665 Statement::Recover => recover::handle_recover(handler_args).await,
666 Statement::SetVariable {
667 local: _,
668 variable,
669 value,
670 } => {
671 if variable.real_value().eq_ignore_ascii_case("database") {
673 let x = variable::set_var_to_param_str(&value);
674 let res = use_db::handle_use_db(
675 handler_args,
676 ObjectName::from(vec![Ident::from_real_value(
677 x.as_deref().unwrap_or("default"),
678 )]),
679 )?;
680 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
681 for notice in res.notices() {
682 builder = builder.notice(notice);
683 }
684 return Ok(builder.into());
685 }
686 variable::handle_set(handler_args, variable, value)
687 }
688 Statement::SetTimeZone { local: _, value } => {
689 variable::handle_set_time_zone(handler_args, value)
690 }
691 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
692 Statement::CreateIndex {
693 name,
694 table_name,
695 method,
696 columns,
697 include,
698 distributed_by,
699 unique,
700 if_not_exists,
701 with_properties: _,
702 } => {
703 if unique {
704 bail_not_implemented!("create unique index");
705 }
706
707 create_index::handle_create_index(
708 handler_args,
709 if_not_exists,
710 name,
711 table_name,
712 method,
713 columns.clone(),
714 include,
715 distributed_by,
716 )
717 .await
718 }
719 Statement::AlterDatabase { name, operation } => match operation {
720 AlterDatabaseOperation::RenameDatabase { database_name } => {
721 alter_rename::handle_rename_database(handler_args, name, database_name).await
722 }
723 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
724 alter_owner::handle_alter_owner(
725 handler_args,
726 name,
727 new_owner_name,
728 StatementType::ALTER_DATABASE,
729 None,
730 )
731 .await
732 }
733 AlterDatabaseOperation::SetParam(config_param) => {
734 let ConfigParam { param, value } = config_param;
735
736 let database_param = match param.real_value().to_uppercase().as_str() {
737 "BARRIER_INTERVAL_MS" => {
738 let barrier_interval_ms = match value {
739 SetVariableValue::Default => None,
740 SetVariableValue::Single(SetVariableValueSingle::Literal(
741 Value::Number(num),
742 )) => {
743 let num = num.parse::<u32>().map_err(|e| {
744 ErrorCode::InvalidInputSyntax(format!(
745 "barrier_interval_ms must be a u32 integer: {}",
746 e.as_report()
747 ))
748 })?;
749 Some(num)
750 }
751 _ => {
752 return Err(ErrorCode::InvalidInputSyntax(
753 "barrier_interval_ms must be a u32 integer or DEFAULT"
754 .to_owned(),
755 )
756 .into());
757 }
758 };
759 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
760 }
761 "CHECKPOINT_FREQUENCY" => {
762 let checkpoint_frequency = match value {
763 SetVariableValue::Default => None,
764 SetVariableValue::Single(SetVariableValueSingle::Literal(
765 Value::Number(num),
766 )) => {
767 let num = num.parse::<u64>().map_err(|e| {
768 ErrorCode::InvalidInputSyntax(format!(
769 "checkpoint_frequency must be a u64 integer: {}",
770 e.as_report()
771 ))
772 })?;
773 Some(num)
774 }
775 _ => {
776 return Err(ErrorCode::InvalidInputSyntax(
777 "checkpoint_frequency must be a u64 integer or DEFAULT"
778 .to_owned(),
779 )
780 .into());
781 }
782 };
783 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
784 }
785 _ => {
786 return Err(ErrorCode::InvalidInputSyntax(format!(
787 "Unsupported database config parameter: {}",
788 param.real_value()
789 ))
790 .into());
791 }
792 };
793
794 alter_database_param::handle_alter_database_param(
795 handler_args,
796 name,
797 database_param,
798 )
799 .await
800 }
801 },
802 Statement::AlterSchema { name, operation } => match operation {
803 AlterSchemaOperation::RenameSchema { schema_name } => {
804 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
805 }
806 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
807 alter_owner::handle_alter_owner(
808 handler_args,
809 name,
810 new_owner_name,
811 StatementType::ALTER_SCHEMA,
812 None,
813 )
814 .await
815 }
816 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
817 alter_swap_rename::handle_swap_rename(
818 handler_args,
819 name,
820 target_schema,
821 StatementType::ALTER_SCHEMA,
822 )
823 .await
824 }
825 },
826 Statement::AlterTable { name, operation } => match operation {
827 AlterTableOperation::AddColumn { .. }
828 | AlterTableOperation::DropColumn { .. }
829 | AlterTableOperation::AlterColumn { .. } => {
830 Box::pin(alter_table_column::handle_alter_table_column(
831 handler_args,
832 name,
833 operation,
834 ))
835 .await
836 }
837 AlterTableOperation::RenameTable { table_name } => {
838 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
839 .await
840 }
841 AlterTableOperation::ChangeOwner { new_owner_name } => {
842 alter_owner::handle_alter_owner(
843 handler_args,
844 name,
845 new_owner_name,
846 StatementType::ALTER_TABLE,
847 None,
848 )
849 .await
850 }
851 AlterTableOperation::SetParallelism {
852 parallelism,
853 deferred,
854 } => {
855 alter_parallelism::handle_alter_parallelism(
856 handler_args,
857 name,
858 parallelism,
859 StatementType::ALTER_TABLE,
860 deferred,
861 )
862 .await
863 }
864 AlterTableOperation::SetBackfillParallelism {
865 parallelism,
866 deferred,
867 } => {
868 alter_parallelism::handle_alter_backfill_parallelism(
869 handler_args,
870 name,
871 parallelism,
872 StatementType::ALTER_TABLE,
873 deferred,
874 )
875 .await
876 }
877 AlterTableOperation::SetSchema { new_schema_name } => {
878 alter_set_schema::handle_alter_set_schema(
879 handler_args,
880 name,
881 new_schema_name,
882 StatementType::ALTER_TABLE,
883 None,
884 )
885 .await
886 }
887 AlterTableOperation::RefreshSchema => {
888 Box::pin(alter_table_with_sr::handle_refresh_schema(
889 handler_args,
890 name,
891 ))
892 .await
893 }
894 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
895 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
896 handler_args,
897 PbThrottleTarget::Table,
898 risingwave_pb::common::PbThrottleType::Source,
899 name,
900 rate_limit,
901 )
902 .await
903 }
904 AlterTableOperation::DropConnector => {
905 Box::pin(
906 alter_table_drop_connector::handle_alter_table_drop_connector(
907 handler_args,
908 name,
909 ),
910 )
911 .await
912 }
913 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
914 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
915 handler_args,
916 PbThrottleTarget::Table,
917 risingwave_pb::common::PbThrottleType::Dml,
918 name,
919 rate_limit,
920 )
921 .await
922 }
923 AlterTableOperation::SetConfig { entries } => {
924 alter_streaming_config::handle_alter_streaming_set_config(
925 handler_args,
926 name,
927 entries,
928 StatementType::ALTER_TABLE,
929 )
930 .await
931 }
932 AlterTableOperation::ResetConfig { keys } => {
933 alter_streaming_config::handle_alter_streaming_reset_config(
934 handler_args,
935 name,
936 keys,
937 StatementType::ALTER_TABLE,
938 )
939 .await
940 }
941 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
942 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
943 handler_args,
944 PbThrottleTarget::Table,
945 risingwave_pb::common::PbThrottleType::Backfill,
946 name,
947 rate_limit,
948 )
949 .await
950 }
951 AlterTableOperation::SwapRenameTable { target_table } => {
952 alter_swap_rename::handle_swap_rename(
953 handler_args,
954 name,
955 target_table,
956 StatementType::ALTER_TABLE,
957 )
958 .await
959 }
960 AlterTableOperation::AlterConnectorProps { alter_props } => {
961 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
962 }
963 AlterTableOperation::AddConstraint { .. }
964 | AlterTableOperation::DropConstraint { .. }
965 | AlterTableOperation::RenameColumn { .. }
966 | AlterTableOperation::ChangeColumn { .. }
967 | AlterTableOperation::RenameConstraint { .. } => {
968 bail_not_implemented!(
969 "Unhandled statement: {}",
970 Statement::AlterTable { name, operation }
971 )
972 }
973 },
974 Statement::AlterIndex { name, operation } => match operation {
975 AlterIndexOperation::RenameIndex { index_name } => {
976 alter_rename::handle_rename_index(handler_args, name, index_name).await
977 }
978 AlterIndexOperation::SetParallelism {
979 parallelism,
980 deferred,
981 } => {
982 alter_parallelism::handle_alter_parallelism(
983 handler_args,
984 name,
985 parallelism,
986 StatementType::ALTER_INDEX,
987 deferred,
988 )
989 .await
990 }
991 AlterIndexOperation::SetBackfillParallelism {
992 parallelism,
993 deferred,
994 } => {
995 alter_parallelism::handle_alter_backfill_parallelism(
996 handler_args,
997 name,
998 parallelism,
999 StatementType::ALTER_INDEX,
1000 deferred,
1001 )
1002 .await
1003 }
1004 AlterIndexOperation::SetConfig { entries } => {
1005 alter_streaming_config::handle_alter_streaming_set_config(
1006 handler_args,
1007 name,
1008 entries,
1009 StatementType::ALTER_INDEX,
1010 )
1011 .await
1012 }
1013 AlterIndexOperation::ResetConfig { keys } => {
1014 alter_streaming_config::handle_alter_streaming_reset_config(
1015 handler_args,
1016 name,
1017 keys,
1018 StatementType::ALTER_INDEX,
1019 )
1020 .await
1021 }
1022 },
1023 Statement::AlterView {
1024 materialized,
1025 name,
1026 operation,
1027 } => {
1028 let statement_type = if materialized {
1029 StatementType::ALTER_MATERIALIZED_VIEW
1030 } else {
1031 StatementType::ALTER_VIEW
1032 };
1033 match operation {
1034 AlterViewOperation::RenameView { view_name } => {
1035 if materialized {
1036 alter_rename::handle_rename_table(
1037 handler_args,
1038 TableType::MaterializedView,
1039 name,
1040 view_name,
1041 )
1042 .await
1043 } else {
1044 alter_rename::handle_rename_view(handler_args, name, view_name).await
1045 }
1046 }
1047 AlterViewOperation::SetParallelism {
1048 parallelism,
1049 deferred,
1050 } => {
1051 if !materialized {
1052 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1053 }
1054 alter_parallelism::handle_alter_parallelism(
1055 handler_args,
1056 name,
1057 parallelism,
1058 statement_type,
1059 deferred,
1060 )
1061 .await
1062 }
1063 AlterViewOperation::SetBackfillParallelism {
1064 parallelism,
1065 deferred,
1066 } => {
1067 if !materialized {
1068 bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1069 }
1070 alter_parallelism::handle_alter_backfill_parallelism(
1071 handler_args,
1072 name,
1073 parallelism,
1074 statement_type,
1075 deferred,
1076 )
1077 .await
1078 }
1079 AlterViewOperation::SetResourceGroup {
1080 resource_group,
1081 deferred,
1082 } => {
1083 if !materialized {
1084 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1085 }
1086 alter_resource_group::handle_alter_resource_group(
1087 handler_args,
1088 name,
1089 resource_group,
1090 statement_type,
1091 deferred,
1092 )
1093 .await
1094 }
1095 AlterViewOperation::ChangeOwner { new_owner_name } => {
1096 alter_owner::handle_alter_owner(
1097 handler_args,
1098 name,
1099 new_owner_name,
1100 statement_type,
1101 None,
1102 )
1103 .await
1104 }
1105 AlterViewOperation::SetSchema { new_schema_name } => {
1106 alter_set_schema::handle_alter_set_schema(
1107 handler_args,
1108 name,
1109 new_schema_name,
1110 statement_type,
1111 None,
1112 )
1113 .await
1114 }
1115 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1116 if !materialized {
1117 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1118 }
1119 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1120 handler_args,
1121 PbThrottleTarget::Mv,
1122 risingwave_pb::common::PbThrottleType::Backfill,
1123 name,
1124 rate_limit,
1125 )
1126 .await
1127 }
1128 AlterViewOperation::SwapRenameView { target_view } => {
1129 alter_swap_rename::handle_swap_rename(
1130 handler_args,
1131 name,
1132 target_view,
1133 statement_type,
1134 )
1135 .await
1136 }
1137 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1138 if !materialized {
1139 bail!(
1140 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1141 );
1142 }
1143 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1144 }
1145 AlterViewOperation::AsQuery { query } => {
1146 if !materialized {
1147 bail_not_implemented!("ALTER VIEW AS QUERY");
1148 }
1149 if !cfg!(debug_assertions) {
1151 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1152 }
1153 alter_mv::handle_alter_mv(handler_args, name, query).await
1154 }
1155 AlterViewOperation::SetConfig { entries } => {
1156 if !materialized {
1157 bail!("SET CONFIG is only supported for materialized views");
1158 }
1159 alter_streaming_config::handle_alter_streaming_set_config(
1160 handler_args,
1161 name,
1162 entries,
1163 statement_type,
1164 )
1165 .await
1166 }
1167 AlterViewOperation::ResetConfig { keys } => {
1168 if !materialized {
1169 bail!("RESET CONFIG is only supported for materialized views");
1170 }
1171 alter_streaming_config::handle_alter_streaming_reset_config(
1172 handler_args,
1173 name,
1174 keys,
1175 statement_type,
1176 )
1177 .await
1178 }
1179 }
1180 }
1181
1182 Statement::AlterSink { name, operation } => match operation {
1183 AlterSinkOperation::AlterConnectorProps {
1184 alter_props: changed_props,
1185 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1186 AlterSinkOperation::RenameSink { sink_name } => {
1187 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1188 }
1189 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1190 alter_owner::handle_alter_owner(
1191 handler_args,
1192 name,
1193 new_owner_name,
1194 StatementType::ALTER_SINK,
1195 None,
1196 )
1197 .await
1198 }
1199 AlterSinkOperation::SetSchema { new_schema_name } => {
1200 alter_set_schema::handle_alter_set_schema(
1201 handler_args,
1202 name,
1203 new_schema_name,
1204 StatementType::ALTER_SINK,
1205 None,
1206 )
1207 .await
1208 }
1209 AlterSinkOperation::SetParallelism {
1210 parallelism,
1211 deferred,
1212 } => {
1213 alter_parallelism::handle_alter_parallelism(
1214 handler_args,
1215 name,
1216 parallelism,
1217 StatementType::ALTER_SINK,
1218 deferred,
1219 )
1220 .await
1221 }
1222 AlterSinkOperation::SetBackfillParallelism {
1223 parallelism,
1224 deferred,
1225 } => {
1226 alter_parallelism::handle_alter_backfill_parallelism(
1227 handler_args,
1228 name,
1229 parallelism,
1230 StatementType::ALTER_SINK,
1231 deferred,
1232 )
1233 .await
1234 }
1235 AlterSinkOperation::SetConfig { entries } => {
1236 alter_streaming_config::handle_alter_streaming_set_config(
1237 handler_args,
1238 name,
1239 entries,
1240 StatementType::ALTER_SINK,
1241 )
1242 .await
1243 }
1244 AlterSinkOperation::ResetConfig { keys } => {
1245 alter_streaming_config::handle_alter_streaming_reset_config(
1246 handler_args,
1247 name,
1248 keys,
1249 StatementType::ALTER_SINK,
1250 )
1251 .await
1252 }
1253 AlterSinkOperation::SwapRenameSink { target_sink } => {
1254 alter_swap_rename::handle_swap_rename(
1255 handler_args,
1256 name,
1257 target_sink,
1258 StatementType::ALTER_SINK,
1259 )
1260 .await
1261 }
1262 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1263 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1264 handler_args,
1265 PbThrottleTarget::Sink,
1266 risingwave_pb::common::PbThrottleType::Sink,
1267 name,
1268 rate_limit,
1269 )
1270 .await
1271 }
1272 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1273 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1274 handler_args,
1275 PbThrottleTarget::Sink,
1276 risingwave_pb::common::PbThrottleType::Backfill,
1277 name,
1278 rate_limit,
1279 )
1280 .await
1281 }
1282 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1283 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1284 handler_args,
1285 name,
1286 enable,
1287 )
1288 .await
1289 }
1290 },
1291 Statement::AlterSubscription { name, operation } => match operation {
1292 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1293 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1294 .await
1295 }
1296 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1297 alter_owner::handle_alter_owner(
1298 handler_args,
1299 name,
1300 new_owner_name,
1301 StatementType::ALTER_SUBSCRIPTION,
1302 None,
1303 )
1304 .await
1305 }
1306 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1307 alter_set_schema::handle_alter_set_schema(
1308 handler_args,
1309 name,
1310 new_schema_name,
1311 StatementType::ALTER_SUBSCRIPTION,
1312 None,
1313 )
1314 .await
1315 }
1316 AlterSubscriptionOperation::SetRetention { retention } => {
1317 alter_subscription_retention::handle_alter_subscription_retention(
1318 handler_args,
1319 name,
1320 retention,
1321 )
1322 .await
1323 }
1324 AlterSubscriptionOperation::SwapRenameSubscription {
1325 target_subscription,
1326 } => {
1327 alter_swap_rename::handle_swap_rename(
1328 handler_args,
1329 name,
1330 target_subscription,
1331 StatementType::ALTER_SUBSCRIPTION,
1332 )
1333 .await
1334 }
1335 },
1336 Statement::AlterSource { name, operation } => match operation {
1337 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1338 alter_source_props::handle_alter_source_connector_props(
1339 handler_args,
1340 name,
1341 alter_props,
1342 )
1343 .await
1344 }
1345 AlterSourceOperation::RenameSource { source_name } => {
1346 alter_rename::handle_rename_source(handler_args, name, source_name).await
1347 }
1348 AlterSourceOperation::AddColumn { .. } => {
1349 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1350 }
1351 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1352 alter_owner::handle_alter_owner(
1353 handler_args,
1354 name,
1355 new_owner_name,
1356 StatementType::ALTER_SOURCE,
1357 None,
1358 )
1359 .await
1360 }
1361 AlterSourceOperation::SetSchema { new_schema_name } => {
1362 alter_set_schema::handle_alter_set_schema(
1363 handler_args,
1364 name,
1365 new_schema_name,
1366 StatementType::ALTER_SOURCE,
1367 None,
1368 )
1369 .await
1370 }
1371 AlterSourceOperation::FormatEncode { format_encode } => {
1372 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1373 .await
1374 }
1375 AlterSourceOperation::RefreshSchema => {
1376 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1377 }
1378 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1379 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1380 handler_args,
1381 PbThrottleTarget::Source,
1382 risingwave_pb::common::PbThrottleType::Source,
1383 name,
1384 rate_limit,
1385 )
1386 .await
1387 }
1388 AlterSourceOperation::SwapRenameSource { target_source } => {
1389 alter_swap_rename::handle_swap_rename(
1390 handler_args,
1391 name,
1392 target_source,
1393 StatementType::ALTER_SOURCE,
1394 )
1395 .await
1396 }
1397 AlterSourceOperation::SetParallelism {
1398 parallelism,
1399 deferred,
1400 } => {
1401 alter_parallelism::handle_alter_parallelism(
1402 handler_args,
1403 name,
1404 parallelism,
1405 StatementType::ALTER_SOURCE,
1406 deferred,
1407 )
1408 .await
1409 }
1410 AlterSourceOperation::SetBackfillParallelism {
1411 parallelism,
1412 deferred,
1413 } => {
1414 alter_parallelism::handle_alter_backfill_parallelism(
1415 handler_args,
1416 name,
1417 parallelism,
1418 StatementType::ALTER_SOURCE,
1419 deferred,
1420 )
1421 .await
1422 }
1423 AlterSourceOperation::SetConfig { entries } => {
1424 alter_streaming_config::handle_alter_streaming_set_config(
1425 handler_args,
1426 name,
1427 entries,
1428 StatementType::ALTER_SOURCE,
1429 )
1430 .await
1431 }
1432 AlterSourceOperation::ResetConfig { keys } => {
1433 alter_streaming_config::handle_alter_streaming_reset_config(
1434 handler_args,
1435 name,
1436 keys,
1437 StatementType::ALTER_SOURCE,
1438 )
1439 .await
1440 }
1441 AlterSourceOperation::ResetSource => {
1442 reset_source::handle_reset_source(handler_args, name).await
1443 }
1444 },
1445 Statement::AlterFunction {
1446 name,
1447 args,
1448 operation,
1449 } => match operation {
1450 AlterFunctionOperation::SetSchema { new_schema_name } => {
1451 alter_set_schema::handle_alter_set_schema(
1452 handler_args,
1453 name,
1454 new_schema_name,
1455 StatementType::ALTER_FUNCTION,
1456 args,
1457 )
1458 .await
1459 }
1460 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1461 alter_owner::handle_alter_owner(
1462 handler_args,
1463 name,
1464 new_owner_name,
1465 StatementType::ALTER_FUNCTION,
1466 args,
1467 )
1468 .await
1469 }
1470 },
1471 Statement::AlterConnection { name, operation } => match operation {
1472 AlterConnectionOperation::SetSchema { new_schema_name } => {
1473 alter_set_schema::handle_alter_set_schema(
1474 handler_args,
1475 name,
1476 new_schema_name,
1477 StatementType::ALTER_CONNECTION,
1478 None,
1479 )
1480 .await
1481 }
1482 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1483 alter_owner::handle_alter_owner(
1484 handler_args,
1485 name,
1486 new_owner_name,
1487 StatementType::ALTER_CONNECTION,
1488 None,
1489 )
1490 .await
1491 }
1492 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1493 alter_connection_props::handle_alter_connection_connector_props(
1494 handler_args,
1495 name,
1496 alter_props,
1497 )
1498 .await
1499 }
1500 },
1501 Statement::AlterSystem { param, value } => {
1502 alter_system::handle_alter_system(handler_args, param, value).await
1503 }
1504 Statement::AlterSecret { name, operation } => match operation {
1505 AlterSecretOperation::ChangeCredential {
1506 with_options,
1507 new_credential,
1508 } => {
1509 alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1510 .await
1511 }
1512 AlterSecretOperation::ChangeOwner { new_owner_name } => {
1513 alter_owner::handle_alter_owner(
1514 handler_args,
1515 name,
1516 new_owner_name,
1517 StatementType::ALTER_SECRET,
1518 None,
1519 )
1520 .await
1521 }
1522 },
1523 Statement::AlterFragment {
1524 fragment_ids,
1525 operation,
1526 } => match operation {
1527 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1528 let [fragment_id] = fragment_ids.as_slice() else {
1529 return Err(ErrorCode::InvalidInputSyntax(
1530 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1531 .to_owned(),
1532 )
1533 .into());
1534 };
1535 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1536 &handler_args.session,
1537 PbThrottleTarget::Fragment,
1538 risingwave_pb::common::PbThrottleType::Backfill,
1539 *fragment_id,
1540 rate_limit,
1541 StatementType::SET_VARIABLE,
1542 )
1543 .await
1544 }
1545 AlterFragmentOperation::SetParallelism { parallelism } => {
1546 alter_parallelism::handle_alter_fragment_parallelism(
1547 handler_args,
1548 fragment_ids.into_iter().map_into().collect(),
1549 parallelism,
1550 )
1551 .await
1552 }
1553 },
1554 Statement::AlterDefaultPrivileges { .. } => {
1555 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1556 }
1557 Statement::StartTransaction { modes } => {
1558 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1559 }
1560 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1561 Statement::Commit { chain } => {
1562 transaction::handle_commit(handler_args, COMMIT, chain).await
1563 }
1564 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1565 Statement::Rollback { chain } => {
1566 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1567 }
1568 Statement::SetTransaction {
1569 modes,
1570 snapshot,
1571 session,
1572 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1573 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1574 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1575 Statement::Comment {
1576 object_type,
1577 object_name,
1578 comment,
1579 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1580 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1581 Statement::Prepare {
1582 name,
1583 data_types,
1584 statement,
1585 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1586 Statement::Deallocate { name, prepare } => {
1587 prepared_statement::handle_deallocate(name, prepare).await
1588 }
1589 Statement::Vacuum { object_name, full } => {
1590 vacuum::handle_vacuum(handler_args, object_name, full).await
1591 }
1592 Statement::Refresh { table_name } => {
1593 refresh::handle_refresh(handler_args, table_name).await
1594 }
1595 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1596 }
1597}
1598
1599fn check_ban_ddl_for_iceberg_engine_table(
1600 session: Arc<SessionImpl>,
1601 stmt: &Statement,
1602) -> Result<()> {
1603 if let Statement::AlterTable { name, operation } = stmt {
1604 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1605 if table.is_iceberg_engine_table() {
1606 let has_auto_refresh_schema_sink =
1607 if matches!(operation, AlterTableOperation::AddColumn { .. }) {
1608 let catalog_reader = session.env().catalog_reader().read_guard();
1609 let db_name = session.database();
1610 let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name());
1611 let sink = catalog_reader
1612 .get_schema_by_name(&db_name, &schema_name)
1613 .ok()
1614 .and_then(|schema| schema.get_created_sink_by_name(&sink_name));
1615 sink.and_then(|s| s.auto_refresh_schema_from_table)
1616 .is_some()
1617 } else {
1618 false
1619 };
1620
1621 check_ban_alter_table_operation_for_iceberg_engine_table(
1622 operation,
1623 &schema_name,
1624 name,
1625 has_auto_refresh_schema_sink,
1626 )?;
1627 }
1628 }
1629
1630 Ok(())
1631}
1632
1633fn check_ban_alter_table_operation_for_iceberg_engine_table(
1634 operation: &AlterTableOperation,
1635 schema_name: &str,
1636 table_name: &ObjectName,
1637 has_auto_refresh_schema_sink: bool,
1638) -> Result<()> {
1639 match operation {
1640 AlterTableOperation::AddColumn { .. } => {
1641 if !has_auto_refresh_schema_sink {
1642 bail!(
1643 "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1644 operation,
1645 schema_name,
1646 table_name
1647 );
1648 }
1649 }
1650 AlterTableOperation::DropColumn { .. } => {
1651 bail!(
1653 "ALTER TABLE DROP COLUMN is not supported for iceberg table: {}.{}",
1654 schema_name,
1655 table_name
1656 );
1657 }
1658 AlterTableOperation::RenameColumn { .. }
1659 | AlterTableOperation::ChangeColumn { .. }
1660 | AlterTableOperation::AlterColumn { .. } => {
1661 bail!(
1662 "ALTER TABLE {} is not supported for iceberg table: {}.{}. Existing column schema mutation is not supported currently",
1663 operation,
1664 schema_name,
1665 table_name
1666 );
1667 }
1668 AlterTableOperation::RenameTable { .. } => {
1669 bail!(
1670 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1671 schema_name,
1672 table_name
1673 );
1674 }
1675 AlterTableOperation::SetParallelism { .. } => {
1676 bail!(
1677 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1678 schema_name,
1679 table_name
1680 );
1681 }
1682 AlterTableOperation::SetBackfillParallelism { .. } => {
1683 bail!(
1684 "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1685 schema_name,
1686 table_name
1687 );
1688 }
1689 AlterTableOperation::SetSchema { .. } => {
1690 bail!(
1691 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1692 schema_name,
1693 table_name
1694 );
1695 }
1696 AlterTableOperation::RefreshSchema => {
1697 bail!(
1698 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1699 schema_name,
1700 table_name
1701 );
1702 }
1703 AlterTableOperation::SetSourceRateLimit { .. } => {
1704 bail!(
1705 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1706 schema_name,
1707 table_name
1708 );
1709 }
1710 _ => {}
1711 }
1712 Ok(())
1713}