1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_subscription_retention;
62mod alter_swap_rename;
63mod alter_system;
64mod alter_table_column;
65pub mod alter_table_drop_connector;
66pub mod alter_table_props;
67mod alter_table_with_sr;
68pub mod alter_user;
69mod alter_utils;
70pub mod cancel_job;
71pub mod close_cursor;
72mod comment;
73pub mod create_aggregate;
74pub mod create_connection;
75mod create_database;
76pub mod create_function;
77pub mod create_index;
78pub mod create_mv;
79pub mod create_schema;
80pub mod create_secret;
81pub mod create_sink;
82pub mod create_source;
83pub mod create_sql_function;
84pub mod create_subscription;
85pub mod create_table;
86pub mod create_table_as;
87pub mod create_user;
88pub mod create_view;
89pub mod declare_cursor;
90pub mod describe;
91pub mod discard;
92mod drop_connection;
93mod drop_database;
94pub mod drop_function;
95mod drop_index;
96pub mod drop_mv;
97mod drop_schema;
98pub mod drop_secret;
99pub mod drop_sink;
100pub mod drop_source;
101pub mod drop_subscription;
102pub mod drop_table;
103pub mod drop_user;
104mod drop_view;
105pub mod explain;
106pub mod explain_analyze_stream_job;
107pub mod extended_handle;
108pub mod fetch_cursor;
109mod flush;
110pub mod handle_privilege;
111pub mod kill_process;
112mod prepared_statement;
113pub mod privilege;
114pub mod query;
115mod recover;
116mod refresh;
117mod reset_source;
118pub mod show;
119mod transaction;
120mod use_db;
121pub mod util;
122pub mod vacuum;
123pub mod variable;
124mod wait;
125
126pub use alter_table_column::{
127 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
128};
129
130pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
132
133pub type RwPgResponse = PgResponse<PgResponseStream>;
135
136#[easy_ext::ext(RwPgResponseBuilderExt)]
137impl RwPgResponseBuilder {
138 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
140 let fields = T::fields();
141 self.values(
142 rows.into_iter()
143 .map(|row| {
144 Row::new(
145 row.into_owned_row()
146 .into_iter()
147 .zip_eq_fast(&fields)
148 .map(|(datum, (_, ty))| {
149 datum.map(|scalar| {
150 scalar.as_scalar_ref_impl().text_format(ty).into()
151 })
152 })
153 .collect(),
154 )
155 })
156 .collect_vec()
157 .into(),
158 fields_to_descriptors(fields),
159 )
160 }
161}
162
163pub fn fields_to_descriptors(
164 fields: Vec<(&str, risingwave_common::types::DataType)>,
165) -> Vec<PgFieldDescriptor> {
166 fields
167 .iter()
168 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
169 .collect()
170}
171
172pub enum PgResponseStream {
173 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
174 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
175 Rows(BoxStream<'static, RowSetResult>),
176}
177
178impl Stream for PgResponseStream {
179 type Item = std::result::Result<Vec<Row>, BoxedError>;
180
181 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
182 match &mut *self {
183 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
184 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
185 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
186 }
187 }
188}
189
190impl From<Vec<Row>> for PgResponseStream {
191 fn from(rows: Vec<Row>) -> Self {
192 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
193 }
194}
195
196#[derive(Clone)]
197pub struct HandlerArgs {
198 pub session: Arc<SessionImpl>,
199 pub sql: Arc<str>,
200 pub normalized_sql: String,
201 pub with_options: WithOptions,
202}
203
204impl HandlerArgs {
205 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
206 Ok(Self {
207 session,
208 sql,
209 with_options: WithOptions::try_from(stmt)?,
210 normalized_sql: Self::normalize_sql(stmt),
211 })
212 }
213
214 fn normalize_sql(stmt: &Statement) -> String {
220 let mut stmt = stmt.clone();
221 match &mut stmt {
222 Statement::CreateView {
223 or_replace,
224 if_not_exists,
225 ..
226 } => {
227 *or_replace = false;
228 *if_not_exists = false;
229 }
230 Statement::CreateTable {
231 or_replace,
232 if_not_exists,
233 ..
234 } => {
235 *or_replace = false;
236 *if_not_exists = false;
237 }
238 Statement::CreateIndex { if_not_exists, .. } => {
239 *if_not_exists = false;
240 }
241 Statement::CreateSource {
242 stmt: CreateSourceStatement { if_not_exists, .. },
243 ..
244 } => {
245 *if_not_exists = false;
246 }
247 Statement::CreateSink {
248 stmt: CreateSinkStatement { if_not_exists, .. },
249 } => {
250 *if_not_exists = false;
251 }
252 Statement::CreateSubscription {
253 stmt: CreateSubscriptionStatement { if_not_exists, .. },
254 } => {
255 *if_not_exists = false;
256 }
257 Statement::CreateConnection {
258 stmt: CreateConnectionStatement { if_not_exists, .. },
259 } => {
260 *if_not_exists = false;
261 }
262 _ => {}
263 }
264 stmt.to_string()
265 }
266}
267
268#[allow(clippy::large_stack_frames)]
269pub async fn handle(
270 session: Arc<SessionImpl>,
271 stmt: Statement,
272 sql: Arc<str>,
273 formats: Vec<Format>,
274) -> Result<RwPgResponse> {
275 session.clear_cancel_query_flag();
276 let _guard = session.txn_begin_implicit();
277 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
278
279 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
280
281 match stmt {
282 Statement::Explain {
283 statement,
284 analyze,
285 options,
286 } => {
287 Box::pin(explain::handle_explain(
288 handler_args,
289 *statement,
290 options,
291 analyze,
292 ))
293 .await
294 }
295 Statement::ExplainAnalyzeStreamJob {
296 target,
297 duration_secs,
298 } => {
299 explain_analyze_stream_job::handle_explain_analyze_stream_job(
300 handler_args,
301 target,
302 duration_secs,
303 )
304 .await
305 }
306 Statement::CreateSource { stmt } => {
307 create_source::handle_create_source(handler_args, stmt).await
308 }
309 Statement::CreateSink { stmt } => {
310 create_sink::handle_create_sink(handler_args, stmt, false).await
311 }
312 Statement::CreateSubscription { stmt } => {
313 create_subscription::handle_create_subscription(handler_args, stmt).await
314 }
315 Statement::CreateConnection { stmt } => {
316 create_connection::handle_create_connection(handler_args, stmt).await
317 }
318 Statement::CreateSecret { stmt } => {
319 create_secret::handle_create_secret(handler_args, stmt).await
320 }
321 Statement::CreateFunction {
322 or_replace,
323 temporary,
324 if_not_exists,
325 name,
326 args,
327 returns,
328 params,
329 with_options,
330 } => {
331 if params.language.is_none()
334 || !params
335 .language
336 .as_ref()
337 .unwrap()
338 .real_value()
339 .eq_ignore_ascii_case("sql")
340 {
341 create_function::handle_create_function(
342 handler_args,
343 or_replace,
344 temporary,
345 if_not_exists,
346 name,
347 args,
348 returns,
349 params,
350 with_options,
351 )
352 .await
353 } else {
354 create_sql_function::handle_create_sql_function(
355 handler_args,
356 or_replace,
357 temporary,
358 if_not_exists,
359 name,
360 args,
361 returns,
362 params,
363 )
364 .await
365 }
366 }
367 Statement::CreateAggregate {
368 or_replace,
369 if_not_exists,
370 name,
371 args,
372 returns,
373 params,
374 ..
375 } => {
376 create_aggregate::handle_create_aggregate(
377 handler_args,
378 or_replace,
379 if_not_exists,
380 name,
381 args,
382 returns,
383 params,
384 )
385 .await
386 }
387 Statement::CreateTable {
388 name,
389 columns,
390 wildcard_idx,
391 constraints,
392 query,
393 with_options: _, or_replace,
396 temporary,
397 if_not_exists,
398 format_encode,
399 source_watermarks,
400 append_only,
401 on_conflict,
402 with_version_columns,
403 cdc_table_info,
404 include_column_options,
405 webhook_info,
406 engine,
407 } => {
408 if or_replace {
409 bail_not_implemented!("CREATE OR REPLACE TABLE");
410 }
411 if temporary {
412 bail_not_implemented!("CREATE TEMPORARY TABLE");
413 }
414 if let Some(query) = query {
415 return create_table_as::handle_create_as(
416 handler_args,
417 name,
418 if_not_exists,
419 query,
420 columns,
421 append_only,
422 on_conflict,
423 with_version_columns
424 .iter()
425 .map(|col| col.real_value())
426 .collect(),
427 engine,
428 )
429 .await;
430 }
431 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
432 Box::pin(create_table::handle_create_table(
433 handler_args,
434 name,
435 columns,
436 wildcard_idx,
437 constraints,
438 if_not_exists,
439 format_encode,
440 source_watermarks,
441 append_only,
442 on_conflict,
443 with_version_columns
444 .iter()
445 .map(|col| col.real_value())
446 .collect(),
447 cdc_table_info,
448 include_column_options,
449 webhook_info,
450 engine,
451 ))
452 .await
453 }
454 Statement::CreateDatabase {
455 db_name,
456 if_not_exists,
457 owner,
458 resource_group,
459 barrier_interval_ms,
460 checkpoint_frequency,
461 } => {
462 create_database::handle_create_database(
463 handler_args,
464 db_name,
465 if_not_exists,
466 owner,
467 resource_group,
468 barrier_interval_ms,
469 checkpoint_frequency,
470 )
471 .await
472 }
473 Statement::CreateSchema {
474 schema_name,
475 if_not_exists,
476 owner,
477 } => {
478 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
479 .await
480 }
481 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
482 Statement::DeclareCursor { stmt } => {
483 declare_cursor::handle_declare_cursor(handler_args, stmt).await
484 }
485 Statement::FetchCursor { stmt } => {
486 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
487 }
488 Statement::CloseCursor { stmt } => {
489 close_cursor::handle_close_cursor(handler_args, stmt).await
490 }
491 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
492 Statement::Grant { .. } => {
493 handle_privilege::handle_grant_privilege(handler_args, stmt).await
494 }
495 Statement::Revoke { .. } => {
496 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
497 }
498 Statement::Describe { name, kind } => match kind {
499 DescribeKind::Fragments => {
500 describe::handle_describe_fragments(handler_args, name).await
501 }
502 DescribeKind::Plain => describe::handle_describe(handler_args, name),
503 },
504 Statement::DescribeFragment { fragment_id } => {
505 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
506 }
507 Statement::Discard(..) => discard::handle_discard(handler_args),
508 Statement::ShowObjects {
509 object: show_object,
510 filter,
511 } => show::handle_show_object(handler_args, show_object, filter).await,
512 Statement::ShowCreateObject { create_type, name } => {
513 show::handle_show_create_object(handler_args, create_type, name)
514 }
515 Statement::ShowTransactionIsolationLevel => {
516 transaction::handle_show_isolation_level(handler_args)
517 }
518 Statement::Drop(DropStatement {
519 object_type,
520 object_name,
521 if_exists,
522 drop_mode,
523 }) => {
524 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
525 match object_type {
526 ObjectType::MaterializedView
527 | ObjectType::View
528 | ObjectType::Sink
529 | ObjectType::Source
530 | ObjectType::Subscription
531 | ObjectType::Index
532 | ObjectType::Table
533 | ObjectType::Schema
534 | ObjectType::Connection
535 | ObjectType::Secret => true,
536 ObjectType::Database | ObjectType::User => {
537 bail_not_implemented!("DROP CASCADE");
538 }
539 }
540 } else {
541 false
542 };
543 match object_type {
544 ObjectType::Table => {
545 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
546 .await
547 }
548 ObjectType::MaterializedView => {
549 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
550 }
551 ObjectType::Index => {
552 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
553 .await
554 }
555 ObjectType::Source => {
556 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
557 .await
558 }
559 ObjectType::Sink => {
560 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
561 }
562 ObjectType::Subscription => {
563 drop_subscription::handle_drop_subscription(
564 handler_args,
565 object_name,
566 if_exists,
567 cascade,
568 )
569 .await
570 }
571 ObjectType::Database => {
572 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
573 }
574 ObjectType::Schema => {
575 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
576 .await
577 }
578 ObjectType::User => {
579 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
580 }
581 ObjectType::View => {
582 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
583 }
584 ObjectType::Connection => {
585 drop_connection::handle_drop_connection(
586 handler_args,
587 object_name,
588 if_exists,
589 cascade,
590 )
591 .await
592 }
593 ObjectType::Secret => {
594 drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
595 .await
596 }
597 }
598 }
599 Statement::DropFunction {
601 if_exists,
602 func_desc,
603 option,
604 } => {
605 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
606 .await
607 }
608 Statement::DropAggregate {
609 if_exists,
610 func_desc,
611 option,
612 } => {
613 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
614 .await
615 }
616 Statement::Query(_)
617 | Statement::Insert { .. }
618 | Statement::Delete { .. }
619 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
620 Statement::Copy {
621 entity: CopyEntity::Query(query),
622 target: CopyTarget::Stdout,
623 } => {
624 let response =
625 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
626 .await?;
627 Ok(response.into_copy_query_to_stdout())
628 }
629 Statement::CreateView {
630 materialized,
631 if_not_exists,
632 name,
633 columns,
634 query,
635 with_options: _, or_replace, emit_mode,
638 } => {
639 if or_replace {
640 bail_not_implemented!("CREATE OR REPLACE VIEW");
641 }
642 if materialized {
643 create_mv::handle_create_mv(
644 handler_args,
645 if_not_exists,
646 name,
647 *query,
648 columns,
649 emit_mode,
650 )
651 .await
652 } else {
653 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
654 .await
655 }
656 }
657 Statement::Flush => flush::handle_flush(handler_args).await,
658 Statement::Wait => wait::handle_wait(handler_args).await,
659 Statement::Recover => recover::handle_recover(handler_args).await,
660 Statement::SetVariable {
661 local: _,
662 variable,
663 value,
664 } => {
665 if variable.real_value().eq_ignore_ascii_case("database") {
667 let x = variable::set_var_to_param_str(&value);
668 let res = use_db::handle_use_db(
669 handler_args,
670 ObjectName::from(vec![Ident::from_real_value(
671 x.as_deref().unwrap_or("default"),
672 )]),
673 )?;
674 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
675 for notice in res.notices() {
676 builder = builder.notice(notice);
677 }
678 return Ok(builder.into());
679 }
680 variable::handle_set(handler_args, variable, value)
681 }
682 Statement::SetTimeZone { local: _, value } => {
683 variable::handle_set_time_zone(handler_args, value)
684 }
685 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
686 Statement::CreateIndex {
687 name,
688 table_name,
689 method,
690 columns,
691 include,
692 distributed_by,
693 unique,
694 if_not_exists,
695 with_properties: _,
696 } => {
697 if unique {
698 bail_not_implemented!("create unique index");
699 }
700
701 create_index::handle_create_index(
702 handler_args,
703 if_not_exists,
704 name,
705 table_name,
706 method,
707 columns.clone(),
708 include,
709 distributed_by,
710 )
711 .await
712 }
713 Statement::AlterDatabase { name, operation } => match operation {
714 AlterDatabaseOperation::RenameDatabase { database_name } => {
715 alter_rename::handle_rename_database(handler_args, name, database_name).await
716 }
717 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
718 alter_owner::handle_alter_owner(
719 handler_args,
720 name,
721 new_owner_name,
722 StatementType::ALTER_DATABASE,
723 None,
724 )
725 .await
726 }
727 AlterDatabaseOperation::SetParam(config_param) => {
728 let ConfigParam { param, value } = config_param;
729
730 let database_param = match param.real_value().to_uppercase().as_str() {
731 "BARRIER_INTERVAL_MS" => {
732 let barrier_interval_ms = match value {
733 SetVariableValue::Default => None,
734 SetVariableValue::Single(SetVariableValueSingle::Literal(
735 Value::Number(num),
736 )) => {
737 let num = num.parse::<u32>().map_err(|e| {
738 ErrorCode::InvalidInputSyntax(format!(
739 "barrier_interval_ms must be a u32 integer: {}",
740 e.as_report()
741 ))
742 })?;
743 Some(num)
744 }
745 _ => {
746 return Err(ErrorCode::InvalidInputSyntax(
747 "barrier_interval_ms must be a u32 integer or DEFAULT"
748 .to_owned(),
749 )
750 .into());
751 }
752 };
753 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
754 }
755 "CHECKPOINT_FREQUENCY" => {
756 let checkpoint_frequency = match value {
757 SetVariableValue::Default => None,
758 SetVariableValue::Single(SetVariableValueSingle::Literal(
759 Value::Number(num),
760 )) => {
761 let num = num.parse::<u64>().map_err(|e| {
762 ErrorCode::InvalidInputSyntax(format!(
763 "checkpoint_frequency must be a u64 integer: {}",
764 e.as_report()
765 ))
766 })?;
767 Some(num)
768 }
769 _ => {
770 return Err(ErrorCode::InvalidInputSyntax(
771 "checkpoint_frequency must be a u64 integer or DEFAULT"
772 .to_owned(),
773 )
774 .into());
775 }
776 };
777 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
778 }
779 _ => {
780 return Err(ErrorCode::InvalidInputSyntax(format!(
781 "Unsupported database config parameter: {}",
782 param.real_value()
783 ))
784 .into());
785 }
786 };
787
788 alter_database_param::handle_alter_database_param(
789 handler_args,
790 name,
791 database_param,
792 )
793 .await
794 }
795 },
796 Statement::AlterSchema { name, operation } => match operation {
797 AlterSchemaOperation::RenameSchema { schema_name } => {
798 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
799 }
800 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
801 alter_owner::handle_alter_owner(
802 handler_args,
803 name,
804 new_owner_name,
805 StatementType::ALTER_SCHEMA,
806 None,
807 )
808 .await
809 }
810 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
811 alter_swap_rename::handle_swap_rename(
812 handler_args,
813 name,
814 target_schema,
815 StatementType::ALTER_SCHEMA,
816 )
817 .await
818 }
819 },
820 Statement::AlterTable { name, operation } => match operation {
821 AlterTableOperation::AddColumn { .. }
822 | AlterTableOperation::DropColumn { .. }
823 | AlterTableOperation::AlterColumn { .. } => {
824 Box::pin(alter_table_column::handle_alter_table_column(
825 handler_args,
826 name,
827 operation,
828 ))
829 .await
830 }
831 AlterTableOperation::RenameTable { table_name } => {
832 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
833 .await
834 }
835 AlterTableOperation::ChangeOwner { new_owner_name } => {
836 alter_owner::handle_alter_owner(
837 handler_args,
838 name,
839 new_owner_name,
840 StatementType::ALTER_TABLE,
841 None,
842 )
843 .await
844 }
845 AlterTableOperation::SetParallelism {
846 parallelism,
847 deferred,
848 } => {
849 alter_parallelism::handle_alter_parallelism(
850 handler_args,
851 name,
852 parallelism,
853 StatementType::ALTER_TABLE,
854 deferred,
855 )
856 .await
857 }
858 AlterTableOperation::SetBackfillParallelism {
859 parallelism,
860 deferred,
861 } => {
862 alter_parallelism::handle_alter_backfill_parallelism(
863 handler_args,
864 name,
865 parallelism,
866 StatementType::ALTER_TABLE,
867 deferred,
868 )
869 .await
870 }
871 AlterTableOperation::SetSchema { new_schema_name } => {
872 alter_set_schema::handle_alter_set_schema(
873 handler_args,
874 name,
875 new_schema_name,
876 StatementType::ALTER_TABLE,
877 None,
878 )
879 .await
880 }
881 AlterTableOperation::RefreshSchema => {
882 Box::pin(alter_table_with_sr::handle_refresh_schema(
883 handler_args,
884 name,
885 ))
886 .await
887 }
888 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
889 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
890 handler_args,
891 PbThrottleTarget::Table,
892 risingwave_pb::common::PbThrottleType::Source,
893 name,
894 rate_limit,
895 )
896 .await
897 }
898 AlterTableOperation::DropConnector => {
899 Box::pin(
900 alter_table_drop_connector::handle_alter_table_drop_connector(
901 handler_args,
902 name,
903 ),
904 )
905 .await
906 }
907 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
908 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
909 handler_args,
910 PbThrottleTarget::Table,
911 risingwave_pb::common::PbThrottleType::Dml,
912 name,
913 rate_limit,
914 )
915 .await
916 }
917 AlterTableOperation::SetConfig { entries } => {
918 alter_streaming_config::handle_alter_streaming_set_config(
919 handler_args,
920 name,
921 entries,
922 StatementType::ALTER_TABLE,
923 )
924 .await
925 }
926 AlterTableOperation::ResetConfig { keys } => {
927 alter_streaming_config::handle_alter_streaming_reset_config(
928 handler_args,
929 name,
930 keys,
931 StatementType::ALTER_TABLE,
932 )
933 .await
934 }
935 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
936 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
937 handler_args,
938 PbThrottleTarget::Table,
939 risingwave_pb::common::PbThrottleType::Backfill,
940 name,
941 rate_limit,
942 )
943 .await
944 }
945 AlterTableOperation::SwapRenameTable { target_table } => {
946 alter_swap_rename::handle_swap_rename(
947 handler_args,
948 name,
949 target_table,
950 StatementType::ALTER_TABLE,
951 )
952 .await
953 }
954 AlterTableOperation::AlterConnectorProps { alter_props } => {
955 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
956 }
957 AlterTableOperation::AddConstraint { .. }
958 | AlterTableOperation::DropConstraint { .. }
959 | AlterTableOperation::RenameColumn { .. }
960 | AlterTableOperation::ChangeColumn { .. }
961 | AlterTableOperation::RenameConstraint { .. } => {
962 bail_not_implemented!(
963 "Unhandled statement: {}",
964 Statement::AlterTable { name, operation }
965 )
966 }
967 },
968 Statement::AlterIndex { name, operation } => match operation {
969 AlterIndexOperation::RenameIndex { index_name } => {
970 alter_rename::handle_rename_index(handler_args, name, index_name).await
971 }
972 AlterIndexOperation::SetParallelism {
973 parallelism,
974 deferred,
975 } => {
976 alter_parallelism::handle_alter_parallelism(
977 handler_args,
978 name,
979 parallelism,
980 StatementType::ALTER_INDEX,
981 deferred,
982 )
983 .await
984 }
985 AlterIndexOperation::SetBackfillParallelism {
986 parallelism,
987 deferred,
988 } => {
989 alter_parallelism::handle_alter_backfill_parallelism(
990 handler_args,
991 name,
992 parallelism,
993 StatementType::ALTER_INDEX,
994 deferred,
995 )
996 .await
997 }
998 AlterIndexOperation::SetConfig { entries } => {
999 alter_streaming_config::handle_alter_streaming_set_config(
1000 handler_args,
1001 name,
1002 entries,
1003 StatementType::ALTER_INDEX,
1004 )
1005 .await
1006 }
1007 AlterIndexOperation::ResetConfig { keys } => {
1008 alter_streaming_config::handle_alter_streaming_reset_config(
1009 handler_args,
1010 name,
1011 keys,
1012 StatementType::ALTER_INDEX,
1013 )
1014 .await
1015 }
1016 },
1017 Statement::AlterView {
1018 materialized,
1019 name,
1020 operation,
1021 } => {
1022 let statement_type = if materialized {
1023 StatementType::ALTER_MATERIALIZED_VIEW
1024 } else {
1025 StatementType::ALTER_VIEW
1026 };
1027 match operation {
1028 AlterViewOperation::RenameView { view_name } => {
1029 if materialized {
1030 alter_rename::handle_rename_table(
1031 handler_args,
1032 TableType::MaterializedView,
1033 name,
1034 view_name,
1035 )
1036 .await
1037 } else {
1038 alter_rename::handle_rename_view(handler_args, name, view_name).await
1039 }
1040 }
1041 AlterViewOperation::SetParallelism {
1042 parallelism,
1043 deferred,
1044 } => {
1045 if !materialized {
1046 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1047 }
1048 alter_parallelism::handle_alter_parallelism(
1049 handler_args,
1050 name,
1051 parallelism,
1052 statement_type,
1053 deferred,
1054 )
1055 .await
1056 }
1057 AlterViewOperation::SetBackfillParallelism {
1058 parallelism,
1059 deferred,
1060 } => {
1061 if !materialized {
1062 bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1063 }
1064 alter_parallelism::handle_alter_backfill_parallelism(
1065 handler_args,
1066 name,
1067 parallelism,
1068 statement_type,
1069 deferred,
1070 )
1071 .await
1072 }
1073 AlterViewOperation::SetResourceGroup {
1074 resource_group,
1075 deferred,
1076 } => {
1077 if !materialized {
1078 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1079 }
1080 alter_resource_group::handle_alter_resource_group(
1081 handler_args,
1082 name,
1083 resource_group,
1084 statement_type,
1085 deferred,
1086 )
1087 .await
1088 }
1089 AlterViewOperation::ChangeOwner { new_owner_name } => {
1090 alter_owner::handle_alter_owner(
1091 handler_args,
1092 name,
1093 new_owner_name,
1094 statement_type,
1095 None,
1096 )
1097 .await
1098 }
1099 AlterViewOperation::SetSchema { new_schema_name } => {
1100 alter_set_schema::handle_alter_set_schema(
1101 handler_args,
1102 name,
1103 new_schema_name,
1104 statement_type,
1105 None,
1106 )
1107 .await
1108 }
1109 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1110 if !materialized {
1111 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1112 }
1113 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1114 handler_args,
1115 PbThrottleTarget::Mv,
1116 risingwave_pb::common::PbThrottleType::Backfill,
1117 name,
1118 rate_limit,
1119 )
1120 .await
1121 }
1122 AlterViewOperation::SwapRenameView { target_view } => {
1123 alter_swap_rename::handle_swap_rename(
1124 handler_args,
1125 name,
1126 target_view,
1127 statement_type,
1128 )
1129 .await
1130 }
1131 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1132 if !materialized {
1133 bail!(
1134 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1135 );
1136 }
1137 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1138 }
1139 AlterViewOperation::AsQuery { query } => {
1140 if !materialized {
1141 bail_not_implemented!("ALTER VIEW AS QUERY");
1142 }
1143 if !cfg!(debug_assertions) {
1145 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1146 }
1147 alter_mv::handle_alter_mv(handler_args, name, query).await
1148 }
1149 AlterViewOperation::SetConfig { entries } => {
1150 if !materialized {
1151 bail!("SET CONFIG is only supported for materialized views");
1152 }
1153 alter_streaming_config::handle_alter_streaming_set_config(
1154 handler_args,
1155 name,
1156 entries,
1157 statement_type,
1158 )
1159 .await
1160 }
1161 AlterViewOperation::ResetConfig { keys } => {
1162 if !materialized {
1163 bail!("RESET CONFIG is only supported for materialized views");
1164 }
1165 alter_streaming_config::handle_alter_streaming_reset_config(
1166 handler_args,
1167 name,
1168 keys,
1169 statement_type,
1170 )
1171 .await
1172 }
1173 }
1174 }
1175
1176 Statement::AlterSink { name, operation } => match operation {
1177 AlterSinkOperation::AlterConnectorProps {
1178 alter_props: changed_props,
1179 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1180 AlterSinkOperation::RenameSink { sink_name } => {
1181 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1182 }
1183 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1184 alter_owner::handle_alter_owner(
1185 handler_args,
1186 name,
1187 new_owner_name,
1188 StatementType::ALTER_SINK,
1189 None,
1190 )
1191 .await
1192 }
1193 AlterSinkOperation::SetSchema { new_schema_name } => {
1194 alter_set_schema::handle_alter_set_schema(
1195 handler_args,
1196 name,
1197 new_schema_name,
1198 StatementType::ALTER_SINK,
1199 None,
1200 )
1201 .await
1202 }
1203 AlterSinkOperation::SetParallelism {
1204 parallelism,
1205 deferred,
1206 } => {
1207 alter_parallelism::handle_alter_parallelism(
1208 handler_args,
1209 name,
1210 parallelism,
1211 StatementType::ALTER_SINK,
1212 deferred,
1213 )
1214 .await
1215 }
1216 AlterSinkOperation::SetBackfillParallelism {
1217 parallelism,
1218 deferred,
1219 } => {
1220 alter_parallelism::handle_alter_backfill_parallelism(
1221 handler_args,
1222 name,
1223 parallelism,
1224 StatementType::ALTER_SINK,
1225 deferred,
1226 )
1227 .await
1228 }
1229 AlterSinkOperation::SetConfig { entries } => {
1230 alter_streaming_config::handle_alter_streaming_set_config(
1231 handler_args,
1232 name,
1233 entries,
1234 StatementType::ALTER_SINK,
1235 )
1236 .await
1237 }
1238 AlterSinkOperation::ResetConfig { keys } => {
1239 alter_streaming_config::handle_alter_streaming_reset_config(
1240 handler_args,
1241 name,
1242 keys,
1243 StatementType::ALTER_SINK,
1244 )
1245 .await
1246 }
1247 AlterSinkOperation::SwapRenameSink { target_sink } => {
1248 alter_swap_rename::handle_swap_rename(
1249 handler_args,
1250 name,
1251 target_sink,
1252 StatementType::ALTER_SINK,
1253 )
1254 .await
1255 }
1256 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1257 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1258 handler_args,
1259 PbThrottleTarget::Sink,
1260 risingwave_pb::common::PbThrottleType::Sink,
1261 name,
1262 rate_limit,
1263 )
1264 .await
1265 }
1266 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1267 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1268 handler_args,
1269 PbThrottleTarget::Sink,
1270 risingwave_pb::common::PbThrottleType::Backfill,
1271 name,
1272 rate_limit,
1273 )
1274 .await
1275 }
1276 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1277 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1278 handler_args,
1279 name,
1280 enable,
1281 )
1282 .await
1283 }
1284 },
1285 Statement::AlterSubscription { name, operation } => match operation {
1286 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1287 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1288 .await
1289 }
1290 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1291 alter_owner::handle_alter_owner(
1292 handler_args,
1293 name,
1294 new_owner_name,
1295 StatementType::ALTER_SUBSCRIPTION,
1296 None,
1297 )
1298 .await
1299 }
1300 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1301 alter_set_schema::handle_alter_set_schema(
1302 handler_args,
1303 name,
1304 new_schema_name,
1305 StatementType::ALTER_SUBSCRIPTION,
1306 None,
1307 )
1308 .await
1309 }
1310 AlterSubscriptionOperation::SetRetention { retention } => {
1311 alter_subscription_retention::handle_alter_subscription_retention(
1312 handler_args,
1313 name,
1314 retention,
1315 )
1316 .await
1317 }
1318 AlterSubscriptionOperation::SwapRenameSubscription {
1319 target_subscription,
1320 } => {
1321 alter_swap_rename::handle_swap_rename(
1322 handler_args,
1323 name,
1324 target_subscription,
1325 StatementType::ALTER_SUBSCRIPTION,
1326 )
1327 .await
1328 }
1329 },
1330 Statement::AlterSource { name, operation } => match operation {
1331 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1332 alter_source_props::handle_alter_source_connector_props(
1333 handler_args,
1334 name,
1335 alter_props,
1336 )
1337 .await
1338 }
1339 AlterSourceOperation::RenameSource { source_name } => {
1340 alter_rename::handle_rename_source(handler_args, name, source_name).await
1341 }
1342 AlterSourceOperation::AddColumn { .. } => {
1343 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1344 }
1345 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1346 alter_owner::handle_alter_owner(
1347 handler_args,
1348 name,
1349 new_owner_name,
1350 StatementType::ALTER_SOURCE,
1351 None,
1352 )
1353 .await
1354 }
1355 AlterSourceOperation::SetSchema { new_schema_name } => {
1356 alter_set_schema::handle_alter_set_schema(
1357 handler_args,
1358 name,
1359 new_schema_name,
1360 StatementType::ALTER_SOURCE,
1361 None,
1362 )
1363 .await
1364 }
1365 AlterSourceOperation::FormatEncode { format_encode } => {
1366 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1367 .await
1368 }
1369 AlterSourceOperation::RefreshSchema => {
1370 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1371 }
1372 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1373 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1374 handler_args,
1375 PbThrottleTarget::Source,
1376 risingwave_pb::common::PbThrottleType::Source,
1377 name,
1378 rate_limit,
1379 )
1380 .await
1381 }
1382 AlterSourceOperation::SwapRenameSource { target_source } => {
1383 alter_swap_rename::handle_swap_rename(
1384 handler_args,
1385 name,
1386 target_source,
1387 StatementType::ALTER_SOURCE,
1388 )
1389 .await
1390 }
1391 AlterSourceOperation::SetParallelism {
1392 parallelism,
1393 deferred,
1394 } => {
1395 alter_parallelism::handle_alter_parallelism(
1396 handler_args,
1397 name,
1398 parallelism,
1399 StatementType::ALTER_SOURCE,
1400 deferred,
1401 )
1402 .await
1403 }
1404 AlterSourceOperation::SetBackfillParallelism {
1405 parallelism,
1406 deferred,
1407 } => {
1408 alter_parallelism::handle_alter_backfill_parallelism(
1409 handler_args,
1410 name,
1411 parallelism,
1412 StatementType::ALTER_SOURCE,
1413 deferred,
1414 )
1415 .await
1416 }
1417 AlterSourceOperation::SetConfig { entries } => {
1418 alter_streaming_config::handle_alter_streaming_set_config(
1419 handler_args,
1420 name,
1421 entries,
1422 StatementType::ALTER_SOURCE,
1423 )
1424 .await
1425 }
1426 AlterSourceOperation::ResetConfig { keys } => {
1427 alter_streaming_config::handle_alter_streaming_reset_config(
1428 handler_args,
1429 name,
1430 keys,
1431 StatementType::ALTER_SOURCE,
1432 )
1433 .await
1434 }
1435 AlterSourceOperation::ResetSource => {
1436 reset_source::handle_reset_source(handler_args, name).await
1437 }
1438 },
1439 Statement::AlterFunction {
1440 name,
1441 args,
1442 operation,
1443 } => match operation {
1444 AlterFunctionOperation::SetSchema { new_schema_name } => {
1445 alter_set_schema::handle_alter_set_schema(
1446 handler_args,
1447 name,
1448 new_schema_name,
1449 StatementType::ALTER_FUNCTION,
1450 args,
1451 )
1452 .await
1453 }
1454 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1455 alter_owner::handle_alter_owner(
1456 handler_args,
1457 name,
1458 new_owner_name,
1459 StatementType::ALTER_FUNCTION,
1460 args,
1461 )
1462 .await
1463 }
1464 },
1465 Statement::AlterConnection { name, operation } => match operation {
1466 AlterConnectionOperation::SetSchema { new_schema_name } => {
1467 alter_set_schema::handle_alter_set_schema(
1468 handler_args,
1469 name,
1470 new_schema_name,
1471 StatementType::ALTER_CONNECTION,
1472 None,
1473 )
1474 .await
1475 }
1476 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1477 alter_owner::handle_alter_owner(
1478 handler_args,
1479 name,
1480 new_owner_name,
1481 StatementType::ALTER_CONNECTION,
1482 None,
1483 )
1484 .await
1485 }
1486 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1487 alter_connection_props::handle_alter_connection_connector_props(
1488 handler_args,
1489 name,
1490 alter_props,
1491 )
1492 .await
1493 }
1494 },
1495 Statement::AlterSystem { param, value } => {
1496 alter_system::handle_alter_system(handler_args, param, value).await
1497 }
1498 Statement::AlterSecret { name, operation } => match operation {
1499 AlterSecretOperation::ChangeCredential {
1500 with_options,
1501 new_credential,
1502 } => {
1503 alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1504 .await
1505 }
1506 AlterSecretOperation::ChangeOwner { new_owner_name } => {
1507 alter_owner::handle_alter_owner(
1508 handler_args,
1509 name,
1510 new_owner_name,
1511 StatementType::ALTER_SECRET,
1512 None,
1513 )
1514 .await
1515 }
1516 },
1517 Statement::AlterFragment {
1518 fragment_ids,
1519 operation,
1520 } => match operation {
1521 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1522 let [fragment_id] = fragment_ids.as_slice() else {
1523 return Err(ErrorCode::InvalidInputSyntax(
1524 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1525 .to_owned(),
1526 )
1527 .into());
1528 };
1529 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1530 &handler_args.session,
1531 PbThrottleTarget::Fragment,
1532 risingwave_pb::common::PbThrottleType::Backfill,
1533 *fragment_id,
1534 rate_limit,
1535 StatementType::SET_VARIABLE,
1536 )
1537 .await
1538 }
1539 AlterFragmentOperation::SetParallelism { parallelism } => {
1540 alter_parallelism::handle_alter_fragment_parallelism(
1541 handler_args,
1542 fragment_ids.into_iter().map_into().collect(),
1543 parallelism,
1544 )
1545 .await
1546 }
1547 },
1548 Statement::AlterDefaultPrivileges { .. } => {
1549 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1550 }
1551 Statement::StartTransaction { modes } => {
1552 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1553 }
1554 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1555 Statement::Commit { chain } => {
1556 transaction::handle_commit(handler_args, COMMIT, chain).await
1557 }
1558 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1559 Statement::Rollback { chain } => {
1560 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1561 }
1562 Statement::SetTransaction {
1563 modes,
1564 snapshot,
1565 session,
1566 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1567 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1568 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1569 Statement::Comment {
1570 object_type,
1571 object_name,
1572 comment,
1573 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1574 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1575 Statement::Prepare {
1576 name,
1577 data_types,
1578 statement,
1579 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1580 Statement::Deallocate { name, prepare } => {
1581 prepared_statement::handle_deallocate(name, prepare).await
1582 }
1583 Statement::Vacuum { object_name, full } => {
1584 vacuum::handle_vacuum(handler_args, object_name, full).await
1585 }
1586 Statement::Refresh { table_name } => {
1587 refresh::handle_refresh(handler_args, table_name).await
1588 }
1589 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1590 }
1591}
1592
1593fn check_ban_ddl_for_iceberg_engine_table(
1594 session: Arc<SessionImpl>,
1595 stmt: &Statement,
1596) -> Result<()> {
1597 match stmt {
1598 Statement::AlterTable {
1599 name,
1600 operation:
1601 operation @ (AlterTableOperation::AddColumn { .. }
1602 | AlterTableOperation::DropColumn { .. }),
1603 } => {
1604 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1605 if table.is_iceberg_engine_table() {
1606 bail!(
1607 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1608 operation,
1609 schema_name,
1610 name
1611 );
1612 }
1613 }
1614
1615 Statement::AlterTable {
1616 name,
1617 operation: AlterTableOperation::RenameTable { .. },
1618 } => {
1619 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1620 if table.is_iceberg_engine_table() {
1621 bail!(
1622 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1623 schema_name,
1624 name
1625 );
1626 }
1627 }
1628
1629 Statement::AlterTable {
1630 name,
1631 operation: AlterTableOperation::SetParallelism { .. },
1632 } => {
1633 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1634 if table.is_iceberg_engine_table() {
1635 bail!(
1636 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1637 schema_name,
1638 name
1639 );
1640 }
1641 }
1642 Statement::AlterTable {
1643 name,
1644 operation: AlterTableOperation::SetBackfillParallelism { .. },
1645 } => {
1646 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1647 if table.is_iceberg_engine_table() {
1648 bail!(
1649 "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1650 schema_name,
1651 name
1652 );
1653 }
1654 }
1655
1656 Statement::AlterTable {
1657 name,
1658 operation: AlterTableOperation::SetSchema { .. },
1659 } => {
1660 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1661 if table.is_iceberg_engine_table() {
1662 bail!(
1663 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1664 schema_name,
1665 name
1666 );
1667 }
1668 }
1669
1670 Statement::AlterTable {
1671 name,
1672 operation: AlterTableOperation::RefreshSchema,
1673 } => {
1674 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1675 if table.is_iceberg_engine_table() {
1676 bail!(
1677 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1678 schema_name,
1679 name
1680 );
1681 }
1682 }
1683
1684 Statement::AlterTable {
1685 name,
1686 operation: AlterTableOperation::SetSourceRateLimit { .. },
1687 } => {
1688 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1689 if table.is_iceberg_engine_table() {
1690 bail!(
1691 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1692 schema_name,
1693 name
1694 );
1695 }
1696 }
1697
1698 _ => {}
1699 }
1700
1701 Ok(())
1702}