1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX};
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_compaction_group;
46mod alter_connection_props;
47mod alter_database_param;
48mod alter_mv;
49mod alter_owner;
50mod alter_parallelism;
51mod alter_rename;
52mod alter_resource_group;
53mod alter_secret;
54mod alter_set_schema;
55mod alter_sink_props;
56mod alter_source_column;
57mod alter_source_props;
58mod alter_source_with_sr;
59mod alter_streaming_config;
60mod alter_streaming_enable_unaligned_join;
61mod alter_streaming_rate_limit;
62mod alter_subscription_retention;
63mod alter_swap_rename;
64mod alter_system;
65mod alter_table_column;
66pub mod alter_table_drop_connector;
67pub mod alter_table_props;
68mod alter_table_with_sr;
69pub mod alter_user;
70mod alter_utils;
71mod alter_watermark;
72mod backup;
73pub mod cancel_job;
74pub mod close_cursor;
75mod comment;
76pub mod create_aggregate;
77pub mod create_connection;
78mod create_database;
79pub mod create_function;
80pub mod create_index;
81pub mod create_mv;
82pub mod create_schema;
83pub mod create_secret;
84pub mod create_sink;
85pub mod create_source;
86pub mod create_sql_function;
87pub mod create_subscription;
88pub mod create_table;
89pub mod create_table_as;
90pub mod create_user;
91pub mod create_view;
92pub mod declare_cursor;
93mod delete_meta_snapshot;
94pub mod describe;
95pub mod discard;
96mod drop_connection;
97mod drop_database;
98pub mod drop_function;
99mod drop_index;
100pub mod drop_mv;
101mod drop_schema;
102pub mod drop_secret;
103pub mod drop_sink;
104pub mod drop_source;
105pub mod drop_subscription;
106pub mod drop_table;
107pub mod drop_user;
108mod drop_view;
109pub mod explain;
110pub mod explain_analyze_stream_job;
111pub mod extended_handle;
112pub mod fetch_cursor;
113mod flush;
114pub mod handle_privilege;
115pub mod kill_process;
116mod prepared_statement;
117pub mod privilege;
118pub mod query;
119mod recover;
120mod refresh;
121mod reset_source;
122pub mod show;
123mod transaction;
124mod use_db;
125pub mod util;
126pub mod vacuum;
127pub mod variable;
128mod wait;
129
130pub use alter_table_column::{
131 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
132};
133
134pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
136
137pub type RwPgResponse = PgResponse<PgResponseStream>;
139
140#[easy_ext::ext(RwPgResponseBuilderExt)]
141impl RwPgResponseBuilder {
142 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
144 let fields = T::fields();
145 self.values(
146 rows.into_iter()
147 .map(|row| {
148 Row::new(
149 row.into_owned_row()
150 .into_iter()
151 .zip_eq_fast(&fields)
152 .map(|(datum, (_, ty))| {
153 datum.map(|scalar| {
154 scalar.as_scalar_ref_impl().text_format(ty).into()
155 })
156 })
157 .collect(),
158 )
159 })
160 .collect_vec()
161 .into(),
162 fields_to_descriptors(fields),
163 )
164 }
165}
166
167pub fn fields_to_descriptors(
168 fields: Vec<(&str, risingwave_common::types::DataType)>,
169) -> Vec<PgFieldDescriptor> {
170 fields
171 .iter()
172 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
173 .collect()
174}
175
176pub enum PgResponseStream {
177 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
178 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
179 Rows(BoxStream<'static, RowSetResult>),
180}
181
182impl Stream for PgResponseStream {
183 type Item = std::result::Result<Vec<Row>, BoxedError>;
184
185 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
186 match &mut *self {
187 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
188 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
189 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
190 }
191 }
192}
193
194impl From<Vec<Row>> for PgResponseStream {
195 fn from(rows: Vec<Row>) -> Self {
196 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
197 }
198}
199
200#[derive(Clone)]
201pub struct HandlerArgs {
202 pub session: Arc<SessionImpl>,
203 pub sql: Arc<str>,
204 pub normalized_sql: String,
205 pub with_options: WithOptions,
206}
207
208impl HandlerArgs {
209 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
210 Ok(Self {
211 session,
212 sql,
213 with_options: WithOptions::try_from(stmt)?,
214 normalized_sql: Self::normalize_sql(stmt),
215 })
216 }
217
218 fn normalize_sql(stmt: &Statement) -> String {
224 let mut stmt = stmt.clone();
225 match &mut stmt {
226 Statement::CreateView {
227 or_replace,
228 if_not_exists,
229 ..
230 } => {
231 *or_replace = false;
232 *if_not_exists = false;
233 }
234 Statement::CreateTable {
235 or_replace,
236 if_not_exists,
237 ..
238 } => {
239 *or_replace = false;
240 *if_not_exists = false;
241 }
242 Statement::CreateIndex { if_not_exists, .. } => {
243 *if_not_exists = false;
244 }
245 Statement::CreateSource {
246 stmt: CreateSourceStatement { if_not_exists, .. },
247 ..
248 } => {
249 *if_not_exists = false;
250 }
251 Statement::CreateSink {
252 stmt: CreateSinkStatement { if_not_exists, .. },
253 } => {
254 *if_not_exists = false;
255 }
256 Statement::CreateSubscription {
257 stmt: CreateSubscriptionStatement { if_not_exists, .. },
258 } => {
259 *if_not_exists = false;
260 }
261 Statement::CreateConnection {
262 stmt: CreateConnectionStatement { if_not_exists, .. },
263 } => {
264 *if_not_exists = false;
265 }
266 _ => {}
267 }
268 stmt.to_string()
269 }
270}
271
272#[expect(clippy::large_stack_frames)]
273pub async fn handle(
274 session: Arc<SessionImpl>,
275 stmt: Statement,
276 sql: Arc<str>,
277 formats: Vec<Format>,
278) -> Result<RwPgResponse> {
279 session.clear_cancel_query_flag();
280 let _guard = session.txn_begin_implicit();
281 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
282
283 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
284
285 match stmt {
286 Statement::Explain {
287 statement,
288 analyze,
289 options,
290 } => {
291 Box::pin(explain::handle_explain(
292 handler_args,
293 *statement,
294 options,
295 analyze,
296 ))
297 .await
298 }
299 Statement::ExplainAnalyzeStreamJob {
300 target,
301 duration_secs,
302 } => {
303 explain_analyze_stream_job::handle_explain_analyze_stream_job(
304 handler_args,
305 target,
306 duration_secs,
307 )
308 .await
309 }
310 Statement::CreateSource { stmt } => {
311 create_source::handle_create_source(handler_args, stmt).await
312 }
313 Statement::CreateSink { stmt } => {
314 create_sink::handle_create_sink(handler_args, stmt, false).await
315 }
316 Statement::CreateSubscription { stmt } => {
317 create_subscription::handle_create_subscription(handler_args, stmt).await
318 }
319 Statement::CreateConnection { stmt } => {
320 create_connection::handle_create_connection(handler_args, stmt).await
321 }
322 Statement::CreateSecret { stmt } => {
323 create_secret::handle_create_secret(handler_args, stmt).await
324 }
325 Statement::CreateFunction {
326 or_replace,
327 temporary,
328 if_not_exists,
329 name,
330 args,
331 returns,
332 params,
333 with_options,
334 } => {
335 if params.language.is_none()
338 || !params
339 .language
340 .as_ref()
341 .unwrap()
342 .real_value()
343 .eq_ignore_ascii_case("sql")
344 {
345 create_function::handle_create_function(
346 handler_args,
347 or_replace,
348 temporary,
349 if_not_exists,
350 name,
351 args,
352 returns,
353 params,
354 with_options,
355 )
356 .await
357 } else {
358 create_sql_function::handle_create_sql_function(
359 handler_args,
360 or_replace,
361 temporary,
362 if_not_exists,
363 name,
364 args,
365 returns,
366 params,
367 )
368 .await
369 }
370 }
371 Statement::CreateAggregate {
372 or_replace,
373 if_not_exists,
374 name,
375 args,
376 returns,
377 params,
378 ..
379 } => {
380 create_aggregate::handle_create_aggregate(
381 handler_args,
382 or_replace,
383 if_not_exists,
384 name,
385 args,
386 returns,
387 params,
388 )
389 .await
390 }
391 Statement::CreateTable {
392 name,
393 columns,
394 wildcard_idx,
395 constraints,
396 query,
397 with_options: _, or_replace,
400 temporary,
401 if_not_exists,
402 format_encode,
403 source_watermarks,
404 append_only,
405 on_conflict,
406 with_version_columns,
407 cdc_table_info,
408 include_column_options,
409 webhook_info,
410 engine,
411 } => {
412 if or_replace {
413 bail_not_implemented!("CREATE OR REPLACE TABLE");
414 }
415 if temporary {
416 bail_not_implemented!("CREATE TEMPORARY TABLE");
417 }
418 if let Some(query) = query {
419 return create_table_as::handle_create_as(
420 handler_args,
421 name,
422 if_not_exists,
423 query,
424 columns,
425 append_only,
426 on_conflict,
427 with_version_columns
428 .iter()
429 .map(|col| col.real_value())
430 .collect(),
431 engine,
432 )
433 .await;
434 }
435 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
436 Box::pin(create_table::handle_create_table(
437 handler_args,
438 name,
439 columns,
440 wildcard_idx,
441 constraints,
442 if_not_exists,
443 format_encode,
444 source_watermarks,
445 append_only,
446 on_conflict,
447 with_version_columns
448 .iter()
449 .map(|col| col.real_value())
450 .collect(),
451 cdc_table_info,
452 include_column_options,
453 webhook_info,
454 engine,
455 ))
456 .await
457 }
458 Statement::CreateDatabase {
459 db_name,
460 if_not_exists,
461 owner,
462 resource_group,
463 barrier_interval_ms,
464 checkpoint_frequency,
465 } => {
466 create_database::handle_create_database(
467 handler_args,
468 db_name,
469 if_not_exists,
470 owner,
471 resource_group,
472 barrier_interval_ms,
473 checkpoint_frequency,
474 )
475 .await
476 }
477 Statement::CreateSchema {
478 schema_name,
479 if_not_exists,
480 owner,
481 } => {
482 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
483 .await
484 }
485 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
486 Statement::DeclareCursor { stmt } => {
487 declare_cursor::handle_declare_cursor(handler_args, stmt).await
488 }
489 Statement::FetchCursor { stmt } => {
490 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
491 }
492 Statement::CloseCursor { stmt } => {
493 close_cursor::handle_close_cursor(handler_args, stmt).await
494 }
495 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
496 Statement::Grant { .. } => {
497 handle_privilege::handle_grant_privilege(handler_args, stmt).await
498 }
499 Statement::Revoke { .. } => {
500 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
501 }
502 Statement::Describe { name, kind } => match kind {
503 DescribeKind::Fragments => {
504 describe::handle_describe_fragments(handler_args, name).await
505 }
506 DescribeKind::Plain => describe::handle_describe(handler_args, name),
507 },
508 Statement::DescribeFragment { fragment_id } => {
509 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
510 }
511 Statement::Discard(..) => discard::handle_discard(handler_args),
512 Statement::ShowObjects {
513 object: show_object,
514 filter,
515 } => show::handle_show_object(handler_args, show_object, filter).await,
516 Statement::ShowCreateObject { create_type, name } => {
517 show::handle_show_create_object(handler_args, create_type, name)
518 }
519 Statement::ShowTransactionIsolationLevel => {
520 transaction::handle_show_isolation_level(handler_args)
521 }
522 Statement::Drop(DropStatement {
523 object_type,
524 object_name,
525 if_exists,
526 drop_mode,
527 }) => {
528 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
529 match object_type {
530 ObjectType::MaterializedView
531 | ObjectType::View
532 | ObjectType::Sink
533 | ObjectType::Source
534 | ObjectType::Subscription
535 | ObjectType::Index
536 | ObjectType::Table
537 | ObjectType::Schema
538 | ObjectType::Connection
539 | ObjectType::Secret => true,
540 ObjectType::Database | ObjectType::User => {
541 bail_not_implemented!("DROP CASCADE");
542 }
543 }
544 } else {
545 false
546 };
547 match object_type {
548 ObjectType::Table => {
549 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
550 .await
551 }
552 ObjectType::MaterializedView => {
553 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
554 }
555 ObjectType::Index => {
556 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
557 .await
558 }
559 ObjectType::Source => {
560 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
561 .await
562 }
563 ObjectType::Sink => {
564 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
565 }
566 ObjectType::Subscription => {
567 drop_subscription::handle_drop_subscription(
568 handler_args,
569 object_name,
570 if_exists,
571 cascade,
572 )
573 .await
574 }
575 ObjectType::Database => {
576 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
577 }
578 ObjectType::Schema => {
579 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
580 .await
581 }
582 ObjectType::User => {
583 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
584 }
585 ObjectType::View => {
586 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
587 }
588 ObjectType::Connection => {
589 drop_connection::handle_drop_connection(
590 handler_args,
591 object_name,
592 if_exists,
593 cascade,
594 )
595 .await
596 }
597 ObjectType::Secret => {
598 drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
599 .await
600 }
601 }
602 }
603 Statement::DropFunction {
605 if_exists,
606 func_desc,
607 option,
608 } => {
609 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
610 .await
611 }
612 Statement::DropAggregate {
613 if_exists,
614 func_desc,
615 option,
616 } => {
617 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
618 .await
619 }
620 Statement::Query(_)
621 | Statement::Insert { .. }
622 | Statement::Delete { .. }
623 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
624 Statement::Copy {
625 entity: CopyEntity::Query(query),
626 target: CopyTarget::Stdout,
627 } => {
628 let response =
629 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
630 .await?;
631 Ok(response.into_copy_query_to_stdout())
632 }
633 Statement::CreateView {
634 materialized,
635 if_not_exists,
636 name,
637 columns,
638 query,
639 with_options: _, or_replace, emit_mode,
642 } => {
643 if or_replace {
644 bail_not_implemented!("CREATE OR REPLACE VIEW");
645 }
646 if materialized {
647 create_mv::handle_create_mv(
648 handler_args,
649 if_not_exists,
650 name,
651 *query,
652 columns,
653 emit_mode,
654 )
655 .await
656 } else {
657 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
658 .await
659 }
660 }
661 Statement::Flush => flush::handle_flush(handler_args).await,
662 Statement::Wait(target) => wait::handle_wait(handler_args, target).await,
663 Statement::Backup => backup::handle_backup(handler_args).await,
664 Statement::DeleteMetaSnapshots { snapshot_ids } => {
665 delete_meta_snapshot::handle_delete_meta_snapshots(handler_args, snapshot_ids).await
666 }
667 Statement::Recover => recover::handle_recover(handler_args).await,
668 Statement::SetVariable {
669 local: _,
670 variable,
671 value,
672 } => {
673 if variable.real_value().eq_ignore_ascii_case("database") {
675 let x = variable::set_var_to_param_str(&value);
676 let res = use_db::handle_use_db(
677 handler_args,
678 ObjectName::from(vec![Ident::from_real_value(
679 x.as_deref().unwrap_or("default"),
680 )]),
681 )?;
682 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
683 for notice in res.notices() {
684 builder = builder.notice(notice);
685 }
686 return Ok(builder.into());
687 }
688 variable::handle_set(handler_args, variable, value)
689 }
690 Statement::SetTimeZone { local: _, value } => {
691 variable::handle_set_time_zone(handler_args, value)
692 }
693 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
694 Statement::CreateIndex {
695 name,
696 table_name,
697 method,
698 columns,
699 include,
700 distributed_by,
701 unique,
702 if_not_exists,
703 with_properties: _,
704 } => {
705 if unique {
706 bail_not_implemented!("create unique index");
707 }
708
709 create_index::handle_create_index(
710 handler_args,
711 if_not_exists,
712 name,
713 table_name,
714 method,
715 columns.clone(),
716 include,
717 distributed_by,
718 )
719 .await
720 }
721 Statement::AlterDatabase { name, operation } => match operation {
722 AlterDatabaseOperation::RenameDatabase { database_name } => {
723 alter_rename::handle_rename_database(handler_args, name, database_name).await
724 }
725 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
726 alter_owner::handle_alter_owner(
727 handler_args,
728 name,
729 new_owner_name,
730 StatementType::ALTER_DATABASE,
731 None,
732 )
733 .await
734 }
735 AlterDatabaseOperation::SetParam(config_param) => {
736 let ConfigParam { param, value } = config_param;
737
738 let database_param = match param.real_value().to_uppercase().as_str() {
739 "BARRIER_INTERVAL_MS" => {
740 let barrier_interval_ms = match value {
741 SetVariableValue::Default => None,
742 SetVariableValue::Single(SetVariableValueSingle::Literal(
743 Value::Number(num),
744 )) => {
745 let num = num.parse::<u32>().map_err(|e| {
746 ErrorCode::InvalidInputSyntax(format!(
747 "barrier_interval_ms must be a u32 integer: {}",
748 e.as_report()
749 ))
750 })?;
751 Some(num)
752 }
753 _ => {
754 return Err(ErrorCode::InvalidInputSyntax(
755 "barrier_interval_ms must be a u32 integer or DEFAULT"
756 .to_owned(),
757 )
758 .into());
759 }
760 };
761 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
762 }
763 "CHECKPOINT_FREQUENCY" => {
764 let checkpoint_frequency = match value {
765 SetVariableValue::Default => None,
766 SetVariableValue::Single(SetVariableValueSingle::Literal(
767 Value::Number(num),
768 )) => {
769 let num = num.parse::<u64>().map_err(|e| {
770 ErrorCode::InvalidInputSyntax(format!(
771 "checkpoint_frequency must be a u64 integer: {}",
772 e.as_report()
773 ))
774 })?;
775 Some(num)
776 }
777 _ => {
778 return Err(ErrorCode::InvalidInputSyntax(
779 "checkpoint_frequency must be a u64 integer or DEFAULT"
780 .to_owned(),
781 )
782 .into());
783 }
784 };
785 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
786 }
787 _ => {
788 return Err(ErrorCode::InvalidInputSyntax(format!(
789 "Unsupported database config parameter: {}",
790 param.real_value()
791 ))
792 .into());
793 }
794 };
795
796 alter_database_param::handle_alter_database_param(
797 handler_args,
798 name,
799 database_param,
800 )
801 .await
802 }
803 AlterDatabaseOperation::SetResourceGroup {
804 resource_group,
805 deferred,
806 } => {
807 alter_database_param::handle_alter_database_resource_group(
808 handler_args,
809 name,
810 resource_group,
811 deferred,
812 )
813 .await
814 }
815 },
816 Statement::AlterSchema { name, operation } => match operation {
817 AlterSchemaOperation::RenameSchema { schema_name } => {
818 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
819 }
820 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
821 alter_owner::handle_alter_owner(
822 handler_args,
823 name,
824 new_owner_name,
825 StatementType::ALTER_SCHEMA,
826 None,
827 )
828 .await
829 }
830 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
831 alter_swap_rename::handle_swap_rename(
832 handler_args,
833 name,
834 target_schema,
835 StatementType::ALTER_SCHEMA,
836 )
837 .await
838 }
839 },
840 Statement::AlterTable { name, operation } => match operation {
841 AlterTableOperation::AddColumn { .. }
842 | AlterTableOperation::DropColumn { .. }
843 | AlterTableOperation::AlterColumn { .. } => {
844 Box::pin(alter_table_column::handle_alter_table_column(
845 handler_args,
846 name,
847 operation,
848 ))
849 .await
850 }
851 AlterTableOperation::AlterWatermark {
852 column_name,
853 expr,
854 with_ttl,
855 } => {
856 Box::pin(alter_watermark::handle_alter_watermark(
857 handler_args,
858 name,
859 column_name,
860 expr,
861 with_ttl,
862 ))
863 .await
864 }
865 AlterTableOperation::RenameTable { table_name } => {
866 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
867 .await
868 }
869 AlterTableOperation::ChangeOwner { new_owner_name } => {
870 alter_owner::handle_alter_owner(
871 handler_args,
872 name,
873 new_owner_name,
874 StatementType::ALTER_TABLE,
875 None,
876 )
877 .await
878 }
879 AlterTableOperation::SetParallelism {
880 parallelism,
881 deferred,
882 } => {
883 alter_parallelism::handle_alter_parallelism(
884 handler_args,
885 name,
886 parallelism,
887 StatementType::ALTER_TABLE,
888 deferred,
889 )
890 .await
891 }
892 AlterTableOperation::SetBackfillParallelism {
893 parallelism,
894 deferred,
895 } => {
896 alter_parallelism::handle_alter_backfill_parallelism(
897 handler_args,
898 name,
899 parallelism,
900 StatementType::ALTER_TABLE,
901 deferred,
902 )
903 .await
904 }
905 AlterTableOperation::SetSchema { new_schema_name } => {
906 alter_set_schema::handle_alter_set_schema(
907 handler_args,
908 name,
909 new_schema_name,
910 StatementType::ALTER_TABLE,
911 None,
912 )
913 .await
914 }
915 AlterTableOperation::RefreshSchema => {
916 Box::pin(alter_table_with_sr::handle_refresh_schema(
917 handler_args,
918 name,
919 ))
920 .await
921 }
922 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
923 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
924 handler_args,
925 PbThrottleTarget::Table,
926 risingwave_pb::common::PbThrottleType::Source,
927 name,
928 rate_limit,
929 )
930 .await
931 }
932 AlterTableOperation::DropConnector => {
933 Box::pin(
934 alter_table_drop_connector::handle_alter_table_drop_connector(
935 handler_args,
936 name,
937 ),
938 )
939 .await
940 }
941 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
942 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
943 handler_args,
944 PbThrottleTarget::Table,
945 risingwave_pb::common::PbThrottleType::Dml,
946 name,
947 rate_limit,
948 )
949 .await
950 }
951 AlterTableOperation::SetConfig { entries } => {
952 alter_streaming_config::handle_alter_streaming_set_config(
953 handler_args,
954 name,
955 entries,
956 StatementType::ALTER_TABLE,
957 )
958 .await
959 }
960 AlterTableOperation::ResetConfig { keys } => {
961 alter_streaming_config::handle_alter_streaming_reset_config(
962 handler_args,
963 name,
964 keys,
965 StatementType::ALTER_TABLE,
966 )
967 .await
968 }
969 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
970 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
971 handler_args,
972 PbThrottleTarget::Table,
973 risingwave_pb::common::PbThrottleType::Backfill,
974 name,
975 rate_limit,
976 )
977 .await
978 }
979 AlterTableOperation::SwapRenameTable { target_table } => {
980 alter_swap_rename::handle_swap_rename(
981 handler_args,
982 name,
983 target_table,
984 StatementType::ALTER_TABLE,
985 )
986 .await
987 }
988 AlterTableOperation::AlterConnectorProps { alter_props } => {
989 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
990 }
991 AlterTableOperation::AddConstraint { .. }
992 | AlterTableOperation::DropConstraint { .. }
993 | AlterTableOperation::RenameColumn { .. }
994 | AlterTableOperation::ChangeColumn { .. }
995 | AlterTableOperation::RenameConstraint { .. } => {
996 bail_not_implemented!(
997 "Unhandled statement: {}",
998 Statement::AlterTable { name, operation }
999 )
1000 }
1001 },
1002 Statement::AlterIndex { name, operation } => match operation {
1003 AlterIndexOperation::RenameIndex { index_name } => {
1004 alter_rename::handle_rename_index(handler_args, name, index_name).await
1005 }
1006 AlterIndexOperation::SetParallelism {
1007 parallelism,
1008 deferred,
1009 } => {
1010 alter_parallelism::handle_alter_parallelism(
1011 handler_args,
1012 name,
1013 parallelism,
1014 StatementType::ALTER_INDEX,
1015 deferred,
1016 )
1017 .await
1018 }
1019 AlterIndexOperation::SetBackfillParallelism {
1020 parallelism,
1021 deferred,
1022 } => {
1023 alter_parallelism::handle_alter_backfill_parallelism(
1024 handler_args,
1025 name,
1026 parallelism,
1027 StatementType::ALTER_INDEX,
1028 deferred,
1029 )
1030 .await
1031 }
1032 AlterIndexOperation::SetResourceGroup {
1033 resource_group,
1034 deferred,
1035 } => {
1036 alter_resource_group::handle_alter_resource_group(
1037 handler_args,
1038 name,
1039 resource_group,
1040 StatementType::ALTER_INDEX,
1041 deferred,
1042 )
1043 .await
1044 }
1045 AlterIndexOperation::SetConfig { entries } => {
1046 alter_streaming_config::handle_alter_streaming_set_config(
1047 handler_args,
1048 name,
1049 entries,
1050 StatementType::ALTER_INDEX,
1051 )
1052 .await
1053 }
1054 AlterIndexOperation::ResetConfig { keys } => {
1055 alter_streaming_config::handle_alter_streaming_reset_config(
1056 handler_args,
1057 name,
1058 keys,
1059 StatementType::ALTER_INDEX,
1060 )
1061 .await
1062 }
1063 },
1064 Statement::AlterView {
1065 materialized,
1066 name,
1067 operation,
1068 } => {
1069 let statement_type = if materialized {
1070 StatementType::ALTER_MATERIALIZED_VIEW
1071 } else {
1072 StatementType::ALTER_VIEW
1073 };
1074 match operation {
1075 AlterViewOperation::RenameView { view_name } => {
1076 if materialized {
1077 alter_rename::handle_rename_table(
1078 handler_args,
1079 TableType::MaterializedView,
1080 name,
1081 view_name,
1082 )
1083 .await
1084 } else {
1085 alter_rename::handle_rename_view(handler_args, name, view_name).await
1086 }
1087 }
1088 AlterViewOperation::SetParallelism {
1089 parallelism,
1090 deferred,
1091 } => {
1092 if !materialized {
1093 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1094 }
1095 alter_parallelism::handle_alter_parallelism(
1096 handler_args,
1097 name,
1098 parallelism,
1099 statement_type,
1100 deferred,
1101 )
1102 .await
1103 }
1104 AlterViewOperation::SetBackfillParallelism {
1105 parallelism,
1106 deferred,
1107 } => {
1108 if !materialized {
1109 bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1110 }
1111 alter_parallelism::handle_alter_backfill_parallelism(
1112 handler_args,
1113 name,
1114 parallelism,
1115 statement_type,
1116 deferred,
1117 )
1118 .await
1119 }
1120 AlterViewOperation::SetResourceGroup {
1121 resource_group,
1122 deferred,
1123 } => {
1124 if !materialized {
1125 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1126 }
1127 alter_resource_group::handle_alter_resource_group(
1128 handler_args,
1129 name,
1130 resource_group,
1131 statement_type,
1132 deferred,
1133 )
1134 .await
1135 }
1136 AlterViewOperation::ChangeOwner { new_owner_name } => {
1137 alter_owner::handle_alter_owner(
1138 handler_args,
1139 name,
1140 new_owner_name,
1141 statement_type,
1142 None,
1143 )
1144 .await
1145 }
1146 AlterViewOperation::SetSchema { new_schema_name } => {
1147 alter_set_schema::handle_alter_set_schema(
1148 handler_args,
1149 name,
1150 new_schema_name,
1151 statement_type,
1152 None,
1153 )
1154 .await
1155 }
1156 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1157 if !materialized {
1158 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1159 }
1160 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1161 handler_args,
1162 PbThrottleTarget::Mv,
1163 risingwave_pb::common::PbThrottleType::Backfill,
1164 name,
1165 rate_limit,
1166 )
1167 .await
1168 }
1169 AlterViewOperation::SwapRenameView { target_view } => {
1170 alter_swap_rename::handle_swap_rename(
1171 handler_args,
1172 name,
1173 target_view,
1174 statement_type,
1175 )
1176 .await
1177 }
1178 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1179 if !materialized {
1180 bail!(
1181 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1182 );
1183 }
1184 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1185 }
1186 AlterViewOperation::AsQuery { query } => {
1187 if !materialized {
1188 bail_not_implemented!("ALTER VIEW AS QUERY");
1189 }
1190 if !cfg!(debug_assertions) {
1192 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1193 }
1194 alter_mv::handle_alter_mv(handler_args, name, query).await
1195 }
1196 AlterViewOperation::SetConfig { entries } => {
1197 if !materialized {
1198 bail!("SET CONFIG is only supported for materialized views");
1199 }
1200 alter_streaming_config::handle_alter_streaming_set_config(
1201 handler_args,
1202 name,
1203 entries,
1204 statement_type,
1205 )
1206 .await
1207 }
1208 AlterViewOperation::ResetConfig { keys } => {
1209 if !materialized {
1210 bail!("RESET CONFIG is only supported for materialized views");
1211 }
1212 alter_streaming_config::handle_alter_streaming_reset_config(
1213 handler_args,
1214 name,
1215 keys,
1216 statement_type,
1217 )
1218 .await
1219 }
1220 }
1221 }
1222
1223 Statement::AlterSink { name, operation } => match operation {
1224 AlterSinkOperation::AlterConnectorProps {
1225 alter_props: changed_props,
1226 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1227 AlterSinkOperation::RenameSink { sink_name } => {
1228 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1229 }
1230 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1231 alter_owner::handle_alter_owner(
1232 handler_args,
1233 name,
1234 new_owner_name,
1235 StatementType::ALTER_SINK,
1236 None,
1237 )
1238 .await
1239 }
1240 AlterSinkOperation::SetSchema { new_schema_name } => {
1241 alter_set_schema::handle_alter_set_schema(
1242 handler_args,
1243 name,
1244 new_schema_name,
1245 StatementType::ALTER_SINK,
1246 None,
1247 )
1248 .await
1249 }
1250 AlterSinkOperation::SetParallelism {
1251 parallelism,
1252 deferred,
1253 } => {
1254 alter_parallelism::handle_alter_parallelism(
1255 handler_args,
1256 name,
1257 parallelism,
1258 StatementType::ALTER_SINK,
1259 deferred,
1260 )
1261 .await
1262 }
1263 AlterSinkOperation::SetBackfillParallelism {
1264 parallelism,
1265 deferred,
1266 } => {
1267 alter_parallelism::handle_alter_backfill_parallelism(
1268 handler_args,
1269 name,
1270 parallelism,
1271 StatementType::ALTER_SINK,
1272 deferred,
1273 )
1274 .await
1275 }
1276 AlterSinkOperation::SetResourceGroup {
1277 resource_group,
1278 deferred,
1279 } => {
1280 alter_resource_group::handle_alter_resource_group(
1281 handler_args,
1282 name,
1283 resource_group,
1284 StatementType::ALTER_SINK,
1285 deferred,
1286 )
1287 .await
1288 }
1289 AlterSinkOperation::SetConfig { entries } => {
1290 alter_streaming_config::handle_alter_streaming_set_config(
1291 handler_args,
1292 name,
1293 entries,
1294 StatementType::ALTER_SINK,
1295 )
1296 .await
1297 }
1298 AlterSinkOperation::ResetConfig { keys } => {
1299 alter_streaming_config::handle_alter_streaming_reset_config(
1300 handler_args,
1301 name,
1302 keys,
1303 StatementType::ALTER_SINK,
1304 )
1305 .await
1306 }
1307 AlterSinkOperation::SwapRenameSink { target_sink } => {
1308 alter_swap_rename::handle_swap_rename(
1309 handler_args,
1310 name,
1311 target_sink,
1312 StatementType::ALTER_SINK,
1313 )
1314 .await
1315 }
1316 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1317 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1318 handler_args,
1319 PbThrottleTarget::Sink,
1320 risingwave_pb::common::PbThrottleType::Sink,
1321 name,
1322 rate_limit,
1323 )
1324 .await
1325 }
1326 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1327 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1328 handler_args,
1329 PbThrottleTarget::Sink,
1330 risingwave_pb::common::PbThrottleType::Backfill,
1331 name,
1332 rate_limit,
1333 )
1334 .await
1335 }
1336 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1337 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1338 handler_args,
1339 name,
1340 enable,
1341 )
1342 .await
1343 }
1344 },
1345 Statement::AlterSubscription { name, operation } => match operation {
1346 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1347 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1348 .await
1349 }
1350 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1351 alter_owner::handle_alter_owner(
1352 handler_args,
1353 name,
1354 new_owner_name,
1355 StatementType::ALTER_SUBSCRIPTION,
1356 None,
1357 )
1358 .await
1359 }
1360 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1361 alter_set_schema::handle_alter_set_schema(
1362 handler_args,
1363 name,
1364 new_schema_name,
1365 StatementType::ALTER_SUBSCRIPTION,
1366 None,
1367 )
1368 .await
1369 }
1370 AlterSubscriptionOperation::SetRetention { retention } => {
1371 alter_subscription_retention::handle_alter_subscription_retention(
1372 handler_args,
1373 name,
1374 retention,
1375 )
1376 .await
1377 }
1378 AlterSubscriptionOperation::SwapRenameSubscription {
1379 target_subscription,
1380 } => {
1381 alter_swap_rename::handle_swap_rename(
1382 handler_args,
1383 name,
1384 target_subscription,
1385 StatementType::ALTER_SUBSCRIPTION,
1386 )
1387 .await
1388 }
1389 },
1390 Statement::AlterSource { name, operation } => match operation {
1391 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1392 alter_source_props::handle_alter_source_connector_props(
1393 handler_args,
1394 name,
1395 alter_props,
1396 )
1397 .await
1398 }
1399 AlterSourceOperation::RenameSource { source_name } => {
1400 alter_rename::handle_rename_source(handler_args, name, source_name).await
1401 }
1402 AlterSourceOperation::AddColumn { .. } => {
1403 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1404 }
1405 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1406 alter_owner::handle_alter_owner(
1407 handler_args,
1408 name,
1409 new_owner_name,
1410 StatementType::ALTER_SOURCE,
1411 None,
1412 )
1413 .await
1414 }
1415 AlterSourceOperation::SetSchema { new_schema_name } => {
1416 alter_set_schema::handle_alter_set_schema(
1417 handler_args,
1418 name,
1419 new_schema_name,
1420 StatementType::ALTER_SOURCE,
1421 None,
1422 )
1423 .await
1424 }
1425 AlterSourceOperation::FormatEncode { format_encode } => {
1426 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1427 .await
1428 }
1429 AlterSourceOperation::RefreshSchema => {
1430 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1431 }
1432 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1433 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1434 handler_args,
1435 PbThrottleTarget::Source,
1436 risingwave_pb::common::PbThrottleType::Source,
1437 name,
1438 rate_limit,
1439 )
1440 .await
1441 }
1442 AlterSourceOperation::SwapRenameSource { target_source } => {
1443 alter_swap_rename::handle_swap_rename(
1444 handler_args,
1445 name,
1446 target_source,
1447 StatementType::ALTER_SOURCE,
1448 )
1449 .await
1450 }
1451 AlterSourceOperation::SetParallelism {
1452 parallelism,
1453 deferred,
1454 } => {
1455 alter_parallelism::handle_alter_parallelism(
1456 handler_args,
1457 name,
1458 parallelism,
1459 StatementType::ALTER_SOURCE,
1460 deferred,
1461 )
1462 .await
1463 }
1464 AlterSourceOperation::SetBackfillParallelism {
1465 parallelism,
1466 deferred,
1467 } => {
1468 alter_parallelism::handle_alter_backfill_parallelism(
1469 handler_args,
1470 name,
1471 parallelism,
1472 StatementType::ALTER_SOURCE,
1473 deferred,
1474 )
1475 .await
1476 }
1477 AlterSourceOperation::SetConfig { entries } => {
1478 alter_streaming_config::handle_alter_streaming_set_config(
1479 handler_args,
1480 name,
1481 entries,
1482 StatementType::ALTER_SOURCE,
1483 )
1484 .await
1485 }
1486 AlterSourceOperation::ResetConfig { keys } => {
1487 alter_streaming_config::handle_alter_streaming_reset_config(
1488 handler_args,
1489 name,
1490 keys,
1491 StatementType::ALTER_SOURCE,
1492 )
1493 .await
1494 }
1495 AlterSourceOperation::ResetSource => {
1496 reset_source::handle_reset_source(handler_args, name).await
1497 }
1498 },
1499 Statement::AlterFunction {
1500 name,
1501 args,
1502 operation,
1503 } => match operation {
1504 AlterFunctionOperation::SetSchema { new_schema_name } => {
1505 alter_set_schema::handle_alter_set_schema(
1506 handler_args,
1507 name,
1508 new_schema_name,
1509 StatementType::ALTER_FUNCTION,
1510 args,
1511 )
1512 .await
1513 }
1514 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1515 alter_owner::handle_alter_owner(
1516 handler_args,
1517 name,
1518 new_owner_name,
1519 StatementType::ALTER_FUNCTION,
1520 args,
1521 )
1522 .await
1523 }
1524 },
1525 Statement::AlterConnection { name, operation } => match operation {
1526 AlterConnectionOperation::SetSchema { new_schema_name } => {
1527 alter_set_schema::handle_alter_set_schema(
1528 handler_args,
1529 name,
1530 new_schema_name,
1531 StatementType::ALTER_CONNECTION,
1532 None,
1533 )
1534 .await
1535 }
1536 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1537 alter_owner::handle_alter_owner(
1538 handler_args,
1539 name,
1540 new_owner_name,
1541 StatementType::ALTER_CONNECTION,
1542 None,
1543 )
1544 .await
1545 }
1546 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1547 alter_connection_props::handle_alter_connection_connector_props(
1548 handler_args,
1549 name,
1550 alter_props,
1551 )
1552 .await
1553 }
1554 },
1555 Statement::AlterSystem { param, value } => {
1556 alter_system::handle_alter_system(handler_args, param, value).await
1557 }
1558 Statement::AlterSecret { name, operation } => match operation {
1559 AlterSecretOperation::ChangeCredential {
1560 with_options,
1561 new_credential,
1562 } => {
1563 alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1564 .await
1565 }
1566 AlterSecretOperation::ChangeOwner { new_owner_name } => {
1567 alter_owner::handle_alter_owner(
1568 handler_args,
1569 name,
1570 new_owner_name,
1571 StatementType::ALTER_SECRET,
1572 None,
1573 )
1574 .await
1575 }
1576 },
1577 Statement::AlterFragment {
1578 fragment_ids,
1579 operation,
1580 } => match operation {
1581 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1582 let [fragment_id] = fragment_ids.as_slice() else {
1583 return Err(ErrorCode::InvalidInputSyntax(
1584 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1585 .to_owned(),
1586 )
1587 .into());
1588 };
1589 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1590 &handler_args.session,
1591 PbThrottleTarget::Fragment,
1592 risingwave_pb::common::PbThrottleType::Backfill,
1593 *fragment_id,
1594 rate_limit,
1595 StatementType::SET_VARIABLE,
1596 )
1597 .await
1598 }
1599 AlterFragmentOperation::SetParallelism { parallelism } => {
1600 alter_parallelism::handle_alter_fragment_parallelism(
1601 handler_args,
1602 fragment_ids.into_iter().map_into().collect(),
1603 parallelism,
1604 )
1605 .await
1606 }
1607 },
1608 Statement::AlterDefaultPrivileges { .. } => {
1609 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1610 }
1611 Statement::AlterCompactionGroup {
1612 group_ids,
1613 operation,
1614 } => {
1615 alter_compaction_group::handle_alter_compaction_group(
1616 handler_args,
1617 group_ids,
1618 operation,
1619 )
1620 .await
1621 }
1622 Statement::StartTransaction { modes } => {
1623 transaction::handle_begin(handler_args, START_TRANSACTION, modes)
1624 }
1625 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes),
1626 Statement::Commit { chain } => {
1627 transaction::handle_commit(handler_args, COMMIT, chain).await
1628 }
1629 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1630 Statement::Rollback { chain } => {
1631 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1632 }
1633 Statement::SetTransaction {
1634 modes,
1635 snapshot,
1636 session,
1637 } => transaction::handle_set(handler_args, modes, snapshot, session),
1638 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1639 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1640 Statement::Comment {
1641 object_type,
1642 object_name,
1643 comment,
1644 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1645 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1646 Statement::Prepare {
1647 name,
1648 data_types,
1649 statement,
1650 } => prepared_statement::handle_prepare(name, data_types, statement),
1651 Statement::Deallocate { name, prepare } => {
1652 prepared_statement::handle_deallocate(name, prepare)
1653 }
1654 Statement::Vacuum { object_name, full } => {
1655 vacuum::handle_vacuum(handler_args, object_name, full).await
1656 }
1657 Statement::Refresh { table_name } => {
1658 refresh::handle_refresh(handler_args, table_name).await
1659 }
1660 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1661 }
1662}
1663
1664fn check_ban_ddl_for_iceberg_engine_table(
1665 session: Arc<SessionImpl>,
1666 stmt: &Statement,
1667) -> Result<()> {
1668 if let Statement::AlterTable { name, operation } = stmt {
1669 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1670 if table.is_iceberg_engine_table() {
1671 let has_auto_refresh_schema_sink = if matches!(
1672 operation,
1673 AlterTableOperation::AddColumn { .. } | AlterTableOperation::DropColumn { .. }
1674 ) {
1675 let catalog_reader = session.env().catalog_reader().read_guard();
1676 let db_name = session.database();
1677 let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name());
1678 let sink = catalog_reader
1679 .get_schema_by_name(&db_name, &schema_name)
1680 .ok()
1681 .and_then(|schema| schema.get_created_sink_by_name(&sink_name));
1682 sink.and_then(|s| s.auto_refresh_schema_from_table)
1683 .is_some()
1684 } else {
1685 false
1686 };
1687
1688 check_ban_alter_table_operation_for_iceberg_engine_table(
1689 operation,
1690 &schema_name,
1691 name,
1692 has_auto_refresh_schema_sink,
1693 )?;
1694 }
1695 }
1696
1697 Ok(())
1698}
1699
1700fn check_ban_alter_table_operation_for_iceberg_engine_table(
1701 operation: &AlterTableOperation,
1702 schema_name: &str,
1703 table_name: &ObjectName,
1704 has_auto_refresh_schema_sink: bool,
1705) -> Result<()> {
1706 match operation {
1707 AlterTableOperation::AddColumn { .. } => {
1708 if !has_auto_refresh_schema_sink {
1709 bail!(
1710 "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1711 operation,
1712 schema_name,
1713 table_name
1714 );
1715 }
1716 }
1717 AlterTableOperation::DropColumn { .. } => {
1718 if !has_auto_refresh_schema_sink {
1719 bail!(
1720 "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1721 operation,
1722 schema_name,
1723 table_name
1724 );
1725 }
1726 }
1727 AlterTableOperation::RenameColumn { .. }
1728 | AlterTableOperation::ChangeColumn { .. }
1729 | AlterTableOperation::AlterColumn { .. } => {
1730 bail!(
1731 "ALTER TABLE {} is not supported for iceberg table: {}.{}. Existing column schema mutation is not supported currently",
1732 operation,
1733 schema_name,
1734 table_name
1735 );
1736 }
1737 AlterTableOperation::RenameTable { .. } => {
1738 bail!(
1739 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1740 schema_name,
1741 table_name
1742 );
1743 }
1744 AlterTableOperation::SetParallelism { .. } => {
1745 bail!(
1746 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1747 schema_name,
1748 table_name
1749 );
1750 }
1751 AlterTableOperation::SetBackfillParallelism { .. } => {
1752 bail!(
1753 "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1754 schema_name,
1755 table_name
1756 );
1757 }
1758 AlterTableOperation::SetSchema { .. } => {
1759 bail!(
1760 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1761 schema_name,
1762 table_name
1763 );
1764 }
1765 AlterTableOperation::RefreshSchema => {
1766 bail!(
1767 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1768 schema_name,
1769 table_name
1770 );
1771 }
1772 AlterTableOperation::SetSourceRateLimit { .. } => {
1773 bail!(
1774 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1775 schema_name,
1776 table_name
1777 );
1778 }
1779 AlterTableOperation::AlterWatermark { .. } => {
1780 bail!(
1781 "ALTER TABLE ALTER WATERMARK is not supported for iceberg table: {}.{}",
1782 schema_name,
1783 table_name
1784 );
1785 }
1786 _ => {}
1787 }
1788 Ok(())
1789}