1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_subscription_retention;
62mod alter_swap_rename;
63mod alter_system;
64mod alter_table_column;
65pub mod alter_table_drop_connector;
66pub mod alter_table_props;
67mod alter_table_with_sr;
68pub mod alter_user;
69mod alter_utils;
70pub mod cancel_job;
71pub mod close_cursor;
72mod comment;
73pub mod create_aggregate;
74pub mod create_connection;
75mod create_database;
76pub mod create_function;
77pub mod create_index;
78pub mod create_mv;
79pub mod create_schema;
80pub mod create_secret;
81pub mod create_sink;
82pub mod create_source;
83pub mod create_sql_function;
84pub mod create_subscription;
85pub mod create_table;
86pub mod create_table_as;
87pub mod create_user;
88pub mod create_view;
89pub mod declare_cursor;
90pub mod describe;
91pub mod discard;
92mod drop_connection;
93mod drop_database;
94pub mod drop_function;
95mod drop_index;
96pub mod drop_mv;
97mod drop_schema;
98pub mod drop_secret;
99pub mod drop_sink;
100pub mod drop_source;
101pub mod drop_subscription;
102pub mod drop_table;
103pub mod drop_user;
104mod drop_view;
105pub mod explain;
106pub mod explain_analyze_stream_job;
107pub mod extended_handle;
108pub mod fetch_cursor;
109mod flush;
110pub mod handle_privilege;
111pub mod kill_process;
112mod prepared_statement;
113pub mod privilege;
114pub mod query;
115mod recover;
116mod refresh;
117mod reset_source;
118pub mod show;
119mod transaction;
120mod use_db;
121pub mod util;
122pub mod vacuum;
123pub mod variable;
124mod wait;
125
126pub use alter_table_column::{
127 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
128};
129
130pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
132
133pub type RwPgResponse = PgResponse<PgResponseStream>;
135
136#[easy_ext::ext(RwPgResponseBuilderExt)]
137impl RwPgResponseBuilder {
138 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
140 let fields = T::fields();
141 self.values(
142 rows.into_iter()
143 .map(|row| {
144 Row::new(
145 row.into_owned_row()
146 .into_iter()
147 .zip_eq_fast(&fields)
148 .map(|(datum, (_, ty))| {
149 datum.map(|scalar| {
150 scalar.as_scalar_ref_impl().text_format(ty).into()
151 })
152 })
153 .collect(),
154 )
155 })
156 .collect_vec()
157 .into(),
158 fields_to_descriptors(fields),
159 )
160 }
161}
162
163pub fn fields_to_descriptors(
164 fields: Vec<(&str, risingwave_common::types::DataType)>,
165) -> Vec<PgFieldDescriptor> {
166 fields
167 .iter()
168 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
169 .collect()
170}
171
172pub enum PgResponseStream {
173 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
174 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
175 Rows(BoxStream<'static, RowSetResult>),
176}
177
178impl Stream for PgResponseStream {
179 type Item = std::result::Result<Vec<Row>, BoxedError>;
180
181 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
182 match &mut *self {
183 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
184 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
185 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
186 }
187 }
188}
189
190impl From<Vec<Row>> for PgResponseStream {
191 fn from(rows: Vec<Row>) -> Self {
192 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
193 }
194}
195
196#[derive(Clone)]
197pub struct HandlerArgs {
198 pub session: Arc<SessionImpl>,
199 pub sql: Arc<str>,
200 pub normalized_sql: String,
201 pub with_options: WithOptions,
202}
203
204impl HandlerArgs {
205 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
206 Ok(Self {
207 session,
208 sql,
209 with_options: WithOptions::try_from(stmt)?,
210 normalized_sql: Self::normalize_sql(stmt),
211 })
212 }
213
214 fn normalize_sql(stmt: &Statement) -> String {
220 let mut stmt = stmt.clone();
221 match &mut stmt {
222 Statement::CreateView {
223 or_replace,
224 if_not_exists,
225 ..
226 } => {
227 *or_replace = false;
228 *if_not_exists = false;
229 }
230 Statement::CreateTable {
231 or_replace,
232 if_not_exists,
233 ..
234 } => {
235 *or_replace = false;
236 *if_not_exists = false;
237 }
238 Statement::CreateIndex { if_not_exists, .. } => {
239 *if_not_exists = false;
240 }
241 Statement::CreateSource {
242 stmt: CreateSourceStatement { if_not_exists, .. },
243 ..
244 } => {
245 *if_not_exists = false;
246 }
247 Statement::CreateSink {
248 stmt: CreateSinkStatement { if_not_exists, .. },
249 } => {
250 *if_not_exists = false;
251 }
252 Statement::CreateSubscription {
253 stmt: CreateSubscriptionStatement { if_not_exists, .. },
254 } => {
255 *if_not_exists = false;
256 }
257 Statement::CreateConnection {
258 stmt: CreateConnectionStatement { if_not_exists, .. },
259 } => {
260 *if_not_exists = false;
261 }
262 _ => {}
263 }
264 stmt.to_string()
265 }
266}
267
268pub async fn handle(
269 session: Arc<SessionImpl>,
270 stmt: Statement,
271 sql: Arc<str>,
272 formats: Vec<Format>,
273) -> Result<RwPgResponse> {
274 session.clear_cancel_query_flag();
275 let _guard = session.txn_begin_implicit();
276 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
277
278 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
279
280 match stmt {
281 Statement::Explain {
282 statement,
283 analyze,
284 options,
285 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
286 Statement::ExplainAnalyzeStreamJob {
287 target,
288 duration_secs,
289 } => {
290 explain_analyze_stream_job::handle_explain_analyze_stream_job(
291 handler_args,
292 target,
293 duration_secs,
294 )
295 .await
296 }
297 Statement::CreateSource { stmt } => {
298 create_source::handle_create_source(handler_args, stmt).await
299 }
300 Statement::CreateSink { stmt } => {
301 create_sink::handle_create_sink(handler_args, stmt, false).await
302 }
303 Statement::CreateSubscription { stmt } => {
304 create_subscription::handle_create_subscription(handler_args, stmt).await
305 }
306 Statement::CreateConnection { stmt } => {
307 create_connection::handle_create_connection(handler_args, stmt).await
308 }
309 Statement::CreateSecret { stmt } => {
310 create_secret::handle_create_secret(handler_args, stmt).await
311 }
312 Statement::CreateFunction {
313 or_replace,
314 temporary,
315 if_not_exists,
316 name,
317 args,
318 returns,
319 params,
320 with_options,
321 } => {
322 if params.language.is_none()
325 || !params
326 .language
327 .as_ref()
328 .unwrap()
329 .real_value()
330 .eq_ignore_ascii_case("sql")
331 {
332 create_function::handle_create_function(
333 handler_args,
334 or_replace,
335 temporary,
336 if_not_exists,
337 name,
338 args,
339 returns,
340 params,
341 with_options,
342 )
343 .await
344 } else {
345 create_sql_function::handle_create_sql_function(
346 handler_args,
347 or_replace,
348 temporary,
349 if_not_exists,
350 name,
351 args,
352 returns,
353 params,
354 )
355 .await
356 }
357 }
358 Statement::CreateAggregate {
359 or_replace,
360 if_not_exists,
361 name,
362 args,
363 returns,
364 params,
365 ..
366 } => {
367 create_aggregate::handle_create_aggregate(
368 handler_args,
369 or_replace,
370 if_not_exists,
371 name,
372 args,
373 returns,
374 params,
375 )
376 .await
377 }
378 Statement::CreateTable {
379 name,
380 columns,
381 wildcard_idx,
382 constraints,
383 query,
384 with_options: _, or_replace,
387 temporary,
388 if_not_exists,
389 format_encode,
390 source_watermarks,
391 append_only,
392 on_conflict,
393 with_version_columns,
394 cdc_table_info,
395 include_column_options,
396 webhook_info,
397 engine,
398 } => {
399 if or_replace {
400 bail_not_implemented!("CREATE OR REPLACE TABLE");
401 }
402 if temporary {
403 bail_not_implemented!("CREATE TEMPORARY TABLE");
404 }
405 if let Some(query) = query {
406 return create_table_as::handle_create_as(
407 handler_args,
408 name,
409 if_not_exists,
410 query,
411 columns,
412 append_only,
413 on_conflict,
414 with_version_columns
415 .iter()
416 .map(|col| col.real_value())
417 .collect(),
418 engine,
419 )
420 .await;
421 }
422 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
423 create_table::handle_create_table(
424 handler_args,
425 name,
426 columns,
427 wildcard_idx,
428 constraints,
429 if_not_exists,
430 format_encode,
431 source_watermarks,
432 append_only,
433 on_conflict,
434 with_version_columns
435 .iter()
436 .map(|col| col.real_value())
437 .collect(),
438 cdc_table_info,
439 include_column_options,
440 webhook_info,
441 engine,
442 )
443 .await
444 }
445 Statement::CreateDatabase {
446 db_name,
447 if_not_exists,
448 owner,
449 resource_group,
450 barrier_interval_ms,
451 checkpoint_frequency,
452 } => {
453 create_database::handle_create_database(
454 handler_args,
455 db_name,
456 if_not_exists,
457 owner,
458 resource_group,
459 barrier_interval_ms,
460 checkpoint_frequency,
461 )
462 .await
463 }
464 Statement::CreateSchema {
465 schema_name,
466 if_not_exists,
467 owner,
468 } => {
469 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
470 .await
471 }
472 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
473 Statement::DeclareCursor { stmt } => {
474 declare_cursor::handle_declare_cursor(handler_args, stmt).await
475 }
476 Statement::FetchCursor { stmt } => {
477 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
478 }
479 Statement::CloseCursor { stmt } => {
480 close_cursor::handle_close_cursor(handler_args, stmt).await
481 }
482 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
483 Statement::Grant { .. } => {
484 handle_privilege::handle_grant_privilege(handler_args, stmt).await
485 }
486 Statement::Revoke { .. } => {
487 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
488 }
489 Statement::Describe { name, kind } => match kind {
490 DescribeKind::Fragments => {
491 describe::handle_describe_fragments(handler_args, name).await
492 }
493 DescribeKind::Plain => describe::handle_describe(handler_args, name),
494 },
495 Statement::DescribeFragment { fragment_id } => {
496 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
497 }
498 Statement::Discard(..) => discard::handle_discard(handler_args),
499 Statement::ShowObjects {
500 object: show_object,
501 filter,
502 } => show::handle_show_object(handler_args, show_object, filter).await,
503 Statement::ShowCreateObject { create_type, name } => {
504 show::handle_show_create_object(handler_args, create_type, name)
505 }
506 Statement::ShowTransactionIsolationLevel => {
507 transaction::handle_show_isolation_level(handler_args)
508 }
509 Statement::Drop(DropStatement {
510 object_type,
511 object_name,
512 if_exists,
513 drop_mode,
514 }) => {
515 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
516 match object_type {
517 ObjectType::MaterializedView
518 | ObjectType::View
519 | ObjectType::Sink
520 | ObjectType::Source
521 | ObjectType::Subscription
522 | ObjectType::Index
523 | ObjectType::Table
524 | ObjectType::Schema
525 | ObjectType::Connection
526 | ObjectType::Secret => true,
527 ObjectType::Database | ObjectType::User => {
528 bail_not_implemented!("DROP CASCADE");
529 }
530 }
531 } else {
532 false
533 };
534 match object_type {
535 ObjectType::Table => {
536 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
537 .await
538 }
539 ObjectType::MaterializedView => {
540 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
541 }
542 ObjectType::Index => {
543 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
544 .await
545 }
546 ObjectType::Source => {
547 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
548 .await
549 }
550 ObjectType::Sink => {
551 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
552 }
553 ObjectType::Subscription => {
554 drop_subscription::handle_drop_subscription(
555 handler_args,
556 object_name,
557 if_exists,
558 cascade,
559 )
560 .await
561 }
562 ObjectType::Database => {
563 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
564 }
565 ObjectType::Schema => {
566 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
567 .await
568 }
569 ObjectType::User => {
570 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
571 }
572 ObjectType::View => {
573 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
574 }
575 ObjectType::Connection => {
576 drop_connection::handle_drop_connection(
577 handler_args,
578 object_name,
579 if_exists,
580 cascade,
581 )
582 .await
583 }
584 ObjectType::Secret => {
585 drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
586 .await
587 }
588 }
589 }
590 Statement::DropFunction {
592 if_exists,
593 func_desc,
594 option,
595 } => {
596 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
597 .await
598 }
599 Statement::DropAggregate {
600 if_exists,
601 func_desc,
602 option,
603 } => {
604 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
605 .await
606 }
607 Statement::Query(_)
608 | Statement::Insert { .. }
609 | Statement::Delete { .. }
610 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
611 Statement::Copy {
612 entity: CopyEntity::Query(query),
613 target: CopyTarget::Stdout,
614 } => {
615 let response =
616 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
617 .await?;
618 Ok(response.into_copy_query_to_stdout())
619 }
620 Statement::CreateView {
621 materialized,
622 if_not_exists,
623 name,
624 columns,
625 query,
626 with_options: _, or_replace, emit_mode,
629 } => {
630 if or_replace {
631 bail_not_implemented!("CREATE OR REPLACE VIEW");
632 }
633 if materialized {
634 create_mv::handle_create_mv(
635 handler_args,
636 if_not_exists,
637 name,
638 *query,
639 columns,
640 emit_mode,
641 )
642 .await
643 } else {
644 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
645 .await
646 }
647 }
648 Statement::Flush => flush::handle_flush(handler_args).await,
649 Statement::Wait => wait::handle_wait(handler_args).await,
650 Statement::Recover => recover::handle_recover(handler_args).await,
651 Statement::SetVariable {
652 local: _,
653 variable,
654 value,
655 } => {
656 if variable.real_value().eq_ignore_ascii_case("database") {
658 let x = variable::set_var_to_param_str(&value);
659 let res = use_db::handle_use_db(
660 handler_args,
661 ObjectName::from(vec![Ident::from_real_value(
662 x.as_deref().unwrap_or("default"),
663 )]),
664 )?;
665 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
666 for notice in res.notices() {
667 builder = builder.notice(notice);
668 }
669 return Ok(builder.into());
670 }
671 variable::handle_set(handler_args, variable, value)
672 }
673 Statement::SetTimeZone { local: _, value } => {
674 variable::handle_set_time_zone(handler_args, value)
675 }
676 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
677 Statement::CreateIndex {
678 name,
679 table_name,
680 method,
681 columns,
682 include,
683 distributed_by,
684 unique,
685 if_not_exists,
686 with_properties: _,
687 } => {
688 if unique {
689 bail_not_implemented!("create unique index");
690 }
691
692 create_index::handle_create_index(
693 handler_args,
694 if_not_exists,
695 name,
696 table_name,
697 method,
698 columns.clone(),
699 include,
700 distributed_by,
701 )
702 .await
703 }
704 Statement::AlterDatabase { name, operation } => match operation {
705 AlterDatabaseOperation::RenameDatabase { database_name } => {
706 alter_rename::handle_rename_database(handler_args, name, database_name).await
707 }
708 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
709 alter_owner::handle_alter_owner(
710 handler_args,
711 name,
712 new_owner_name,
713 StatementType::ALTER_DATABASE,
714 None,
715 )
716 .await
717 }
718 AlterDatabaseOperation::SetParam(config_param) => {
719 let ConfigParam { param, value } = config_param;
720
721 let database_param = match param.real_value().to_uppercase().as_str() {
722 "BARRIER_INTERVAL_MS" => {
723 let barrier_interval_ms = match value {
724 SetVariableValue::Default => None,
725 SetVariableValue::Single(SetVariableValueSingle::Literal(
726 Value::Number(num),
727 )) => {
728 let num = num.parse::<u32>().map_err(|e| {
729 ErrorCode::InvalidInputSyntax(format!(
730 "barrier_interval_ms must be a u32 integer: {}",
731 e.as_report()
732 ))
733 })?;
734 Some(num)
735 }
736 _ => {
737 return Err(ErrorCode::InvalidInputSyntax(
738 "barrier_interval_ms must be a u32 integer or DEFAULT"
739 .to_owned(),
740 )
741 .into());
742 }
743 };
744 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
745 }
746 "CHECKPOINT_FREQUENCY" => {
747 let checkpoint_frequency = match value {
748 SetVariableValue::Default => None,
749 SetVariableValue::Single(SetVariableValueSingle::Literal(
750 Value::Number(num),
751 )) => {
752 let num = num.parse::<u64>().map_err(|e| {
753 ErrorCode::InvalidInputSyntax(format!(
754 "checkpoint_frequency must be a u64 integer: {}",
755 e.as_report()
756 ))
757 })?;
758 Some(num)
759 }
760 _ => {
761 return Err(ErrorCode::InvalidInputSyntax(
762 "checkpoint_frequency must be a u64 integer or DEFAULT"
763 .to_owned(),
764 )
765 .into());
766 }
767 };
768 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
769 }
770 _ => {
771 return Err(ErrorCode::InvalidInputSyntax(format!(
772 "Unsupported database config parameter: {}",
773 param.real_value()
774 ))
775 .into());
776 }
777 };
778
779 alter_database_param::handle_alter_database_param(
780 handler_args,
781 name,
782 database_param,
783 )
784 .await
785 }
786 },
787 Statement::AlterSchema { name, operation } => match operation {
788 AlterSchemaOperation::RenameSchema { schema_name } => {
789 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
790 }
791 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
792 alter_owner::handle_alter_owner(
793 handler_args,
794 name,
795 new_owner_name,
796 StatementType::ALTER_SCHEMA,
797 None,
798 )
799 .await
800 }
801 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
802 alter_swap_rename::handle_swap_rename(
803 handler_args,
804 name,
805 target_schema,
806 StatementType::ALTER_SCHEMA,
807 )
808 .await
809 }
810 },
811 Statement::AlterTable { name, operation } => match operation {
812 AlterTableOperation::AddColumn { .. }
813 | AlterTableOperation::DropColumn { .. }
814 | AlterTableOperation::AlterColumn { .. } => {
815 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
816 }
817 AlterTableOperation::RenameTable { table_name } => {
818 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
819 .await
820 }
821 AlterTableOperation::ChangeOwner { new_owner_name } => {
822 alter_owner::handle_alter_owner(
823 handler_args,
824 name,
825 new_owner_name,
826 StatementType::ALTER_TABLE,
827 None,
828 )
829 .await
830 }
831 AlterTableOperation::SetParallelism {
832 parallelism,
833 deferred,
834 } => {
835 alter_parallelism::handle_alter_parallelism(
836 handler_args,
837 name,
838 parallelism,
839 StatementType::ALTER_TABLE,
840 deferred,
841 )
842 .await
843 }
844 AlterTableOperation::SetBackfillParallelism {
845 parallelism,
846 deferred,
847 } => {
848 alter_parallelism::handle_alter_backfill_parallelism(
849 handler_args,
850 name,
851 parallelism,
852 StatementType::ALTER_TABLE,
853 deferred,
854 )
855 .await
856 }
857 AlterTableOperation::SetSchema { new_schema_name } => {
858 alter_set_schema::handle_alter_set_schema(
859 handler_args,
860 name,
861 new_schema_name,
862 StatementType::ALTER_TABLE,
863 None,
864 )
865 .await
866 }
867 AlterTableOperation::RefreshSchema => {
868 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
869 }
870 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
871 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
872 handler_args,
873 PbThrottleTarget::Table,
874 risingwave_pb::common::PbThrottleType::Source,
875 name,
876 rate_limit,
877 )
878 .await
879 }
880 AlterTableOperation::DropConnector => {
881 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
882 .await
883 }
884 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
885 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
886 handler_args,
887 PbThrottleTarget::Table,
888 risingwave_pb::common::PbThrottleType::Dml,
889 name,
890 rate_limit,
891 )
892 .await
893 }
894 AlterTableOperation::SetConfig { entries } => {
895 alter_streaming_config::handle_alter_streaming_set_config(
896 handler_args,
897 name,
898 entries,
899 StatementType::ALTER_TABLE,
900 )
901 .await
902 }
903 AlterTableOperation::ResetConfig { keys } => {
904 alter_streaming_config::handle_alter_streaming_reset_config(
905 handler_args,
906 name,
907 keys,
908 StatementType::ALTER_TABLE,
909 )
910 .await
911 }
912 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
913 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
914 handler_args,
915 PbThrottleTarget::Table,
916 risingwave_pb::common::PbThrottleType::Backfill,
917 name,
918 rate_limit,
919 )
920 .await
921 }
922 AlterTableOperation::SwapRenameTable { target_table } => {
923 alter_swap_rename::handle_swap_rename(
924 handler_args,
925 name,
926 target_table,
927 StatementType::ALTER_TABLE,
928 )
929 .await
930 }
931 AlterTableOperation::AlterConnectorProps { alter_props } => {
932 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
933 }
934 AlterTableOperation::AddConstraint { .. }
935 | AlterTableOperation::DropConstraint { .. }
936 | AlterTableOperation::RenameColumn { .. }
937 | AlterTableOperation::ChangeColumn { .. }
938 | AlterTableOperation::RenameConstraint { .. } => {
939 bail_not_implemented!(
940 "Unhandled statement: {}",
941 Statement::AlterTable { name, operation }
942 )
943 }
944 },
945 Statement::AlterIndex { name, operation } => match operation {
946 AlterIndexOperation::RenameIndex { index_name } => {
947 alter_rename::handle_rename_index(handler_args, name, index_name).await
948 }
949 AlterIndexOperation::SetParallelism {
950 parallelism,
951 deferred,
952 } => {
953 alter_parallelism::handle_alter_parallelism(
954 handler_args,
955 name,
956 parallelism,
957 StatementType::ALTER_INDEX,
958 deferred,
959 )
960 .await
961 }
962 AlterIndexOperation::SetBackfillParallelism {
963 parallelism,
964 deferred,
965 } => {
966 alter_parallelism::handle_alter_backfill_parallelism(
967 handler_args,
968 name,
969 parallelism,
970 StatementType::ALTER_INDEX,
971 deferred,
972 )
973 .await
974 }
975 AlterIndexOperation::SetConfig { entries } => {
976 alter_streaming_config::handle_alter_streaming_set_config(
977 handler_args,
978 name,
979 entries,
980 StatementType::ALTER_INDEX,
981 )
982 .await
983 }
984 AlterIndexOperation::ResetConfig { keys } => {
985 alter_streaming_config::handle_alter_streaming_reset_config(
986 handler_args,
987 name,
988 keys,
989 StatementType::ALTER_INDEX,
990 )
991 .await
992 }
993 },
994 Statement::AlterView {
995 materialized,
996 name,
997 operation,
998 } => {
999 let statement_type = if materialized {
1000 StatementType::ALTER_MATERIALIZED_VIEW
1001 } else {
1002 StatementType::ALTER_VIEW
1003 };
1004 match operation {
1005 AlterViewOperation::RenameView { view_name } => {
1006 if materialized {
1007 alter_rename::handle_rename_table(
1008 handler_args,
1009 TableType::MaterializedView,
1010 name,
1011 view_name,
1012 )
1013 .await
1014 } else {
1015 alter_rename::handle_rename_view(handler_args, name, view_name).await
1016 }
1017 }
1018 AlterViewOperation::SetParallelism {
1019 parallelism,
1020 deferred,
1021 } => {
1022 if !materialized {
1023 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1024 }
1025 alter_parallelism::handle_alter_parallelism(
1026 handler_args,
1027 name,
1028 parallelism,
1029 statement_type,
1030 deferred,
1031 )
1032 .await
1033 }
1034 AlterViewOperation::SetBackfillParallelism {
1035 parallelism,
1036 deferred,
1037 } => {
1038 if !materialized {
1039 bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1040 }
1041 alter_parallelism::handle_alter_backfill_parallelism(
1042 handler_args,
1043 name,
1044 parallelism,
1045 statement_type,
1046 deferred,
1047 )
1048 .await
1049 }
1050 AlterViewOperation::SetResourceGroup {
1051 resource_group,
1052 deferred,
1053 } => {
1054 if !materialized {
1055 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1056 }
1057 alter_resource_group::handle_alter_resource_group(
1058 handler_args,
1059 name,
1060 resource_group,
1061 statement_type,
1062 deferred,
1063 )
1064 .await
1065 }
1066 AlterViewOperation::ChangeOwner { new_owner_name } => {
1067 alter_owner::handle_alter_owner(
1068 handler_args,
1069 name,
1070 new_owner_name,
1071 statement_type,
1072 None,
1073 )
1074 .await
1075 }
1076 AlterViewOperation::SetSchema { new_schema_name } => {
1077 alter_set_schema::handle_alter_set_schema(
1078 handler_args,
1079 name,
1080 new_schema_name,
1081 statement_type,
1082 None,
1083 )
1084 .await
1085 }
1086 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1087 if !materialized {
1088 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1089 }
1090 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1091 handler_args,
1092 PbThrottleTarget::Mv,
1093 risingwave_pb::common::PbThrottleType::Backfill,
1094 name,
1095 rate_limit,
1096 )
1097 .await
1098 }
1099 AlterViewOperation::SwapRenameView { target_view } => {
1100 alter_swap_rename::handle_swap_rename(
1101 handler_args,
1102 name,
1103 target_view,
1104 statement_type,
1105 )
1106 .await
1107 }
1108 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1109 if !materialized {
1110 bail!(
1111 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1112 );
1113 }
1114 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1115 }
1116 AlterViewOperation::AsQuery { query } => {
1117 if !materialized {
1118 bail_not_implemented!("ALTER VIEW AS QUERY");
1119 }
1120 if !cfg!(debug_assertions) {
1122 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1123 }
1124 alter_mv::handle_alter_mv(handler_args, name, query).await
1125 }
1126 AlterViewOperation::SetConfig { entries } => {
1127 if !materialized {
1128 bail!("SET CONFIG is only supported for materialized views");
1129 }
1130 alter_streaming_config::handle_alter_streaming_set_config(
1131 handler_args,
1132 name,
1133 entries,
1134 statement_type,
1135 )
1136 .await
1137 }
1138 AlterViewOperation::ResetConfig { keys } => {
1139 if !materialized {
1140 bail!("RESET CONFIG is only supported for materialized views");
1141 }
1142 alter_streaming_config::handle_alter_streaming_reset_config(
1143 handler_args,
1144 name,
1145 keys,
1146 statement_type,
1147 )
1148 .await
1149 }
1150 }
1151 }
1152
1153 Statement::AlterSink { name, operation } => match operation {
1154 AlterSinkOperation::AlterConnectorProps {
1155 alter_props: changed_props,
1156 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1157 AlterSinkOperation::RenameSink { sink_name } => {
1158 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1159 }
1160 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1161 alter_owner::handle_alter_owner(
1162 handler_args,
1163 name,
1164 new_owner_name,
1165 StatementType::ALTER_SINK,
1166 None,
1167 )
1168 .await
1169 }
1170 AlterSinkOperation::SetSchema { new_schema_name } => {
1171 alter_set_schema::handle_alter_set_schema(
1172 handler_args,
1173 name,
1174 new_schema_name,
1175 StatementType::ALTER_SINK,
1176 None,
1177 )
1178 .await
1179 }
1180 AlterSinkOperation::SetParallelism {
1181 parallelism,
1182 deferred,
1183 } => {
1184 alter_parallelism::handle_alter_parallelism(
1185 handler_args,
1186 name,
1187 parallelism,
1188 StatementType::ALTER_SINK,
1189 deferred,
1190 )
1191 .await
1192 }
1193 AlterSinkOperation::SetBackfillParallelism {
1194 parallelism,
1195 deferred,
1196 } => {
1197 alter_parallelism::handle_alter_backfill_parallelism(
1198 handler_args,
1199 name,
1200 parallelism,
1201 StatementType::ALTER_SINK,
1202 deferred,
1203 )
1204 .await
1205 }
1206 AlterSinkOperation::SetConfig { entries } => {
1207 alter_streaming_config::handle_alter_streaming_set_config(
1208 handler_args,
1209 name,
1210 entries,
1211 StatementType::ALTER_SINK,
1212 )
1213 .await
1214 }
1215 AlterSinkOperation::ResetConfig { keys } => {
1216 alter_streaming_config::handle_alter_streaming_reset_config(
1217 handler_args,
1218 name,
1219 keys,
1220 StatementType::ALTER_SINK,
1221 )
1222 .await
1223 }
1224 AlterSinkOperation::SwapRenameSink { target_sink } => {
1225 alter_swap_rename::handle_swap_rename(
1226 handler_args,
1227 name,
1228 target_sink,
1229 StatementType::ALTER_SINK,
1230 )
1231 .await
1232 }
1233 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1234 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1235 handler_args,
1236 PbThrottleTarget::Sink,
1237 risingwave_pb::common::PbThrottleType::Sink,
1238 name,
1239 rate_limit,
1240 )
1241 .await
1242 }
1243 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1244 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1245 handler_args,
1246 PbThrottleTarget::Sink,
1247 risingwave_pb::common::PbThrottleType::Backfill,
1248 name,
1249 rate_limit,
1250 )
1251 .await
1252 }
1253 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1254 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1255 handler_args,
1256 name,
1257 enable,
1258 )
1259 .await
1260 }
1261 },
1262 Statement::AlterSubscription { name, operation } => match operation {
1263 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1264 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1265 .await
1266 }
1267 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1268 alter_owner::handle_alter_owner(
1269 handler_args,
1270 name,
1271 new_owner_name,
1272 StatementType::ALTER_SUBSCRIPTION,
1273 None,
1274 )
1275 .await
1276 }
1277 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1278 alter_set_schema::handle_alter_set_schema(
1279 handler_args,
1280 name,
1281 new_schema_name,
1282 StatementType::ALTER_SUBSCRIPTION,
1283 None,
1284 )
1285 .await
1286 }
1287 AlterSubscriptionOperation::SetRetention { retention } => {
1288 alter_subscription_retention::handle_alter_subscription_retention(
1289 handler_args,
1290 name,
1291 retention,
1292 )
1293 .await
1294 }
1295 AlterSubscriptionOperation::SwapRenameSubscription {
1296 target_subscription,
1297 } => {
1298 alter_swap_rename::handle_swap_rename(
1299 handler_args,
1300 name,
1301 target_subscription,
1302 StatementType::ALTER_SUBSCRIPTION,
1303 )
1304 .await
1305 }
1306 },
1307 Statement::AlterSource { name, operation } => match operation {
1308 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1309 alter_source_props::handle_alter_source_connector_props(
1310 handler_args,
1311 name,
1312 alter_props,
1313 )
1314 .await
1315 }
1316 AlterSourceOperation::RenameSource { source_name } => {
1317 alter_rename::handle_rename_source(handler_args, name, source_name).await
1318 }
1319 AlterSourceOperation::AddColumn { .. } => {
1320 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1321 }
1322 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1323 alter_owner::handle_alter_owner(
1324 handler_args,
1325 name,
1326 new_owner_name,
1327 StatementType::ALTER_SOURCE,
1328 None,
1329 )
1330 .await
1331 }
1332 AlterSourceOperation::SetSchema { new_schema_name } => {
1333 alter_set_schema::handle_alter_set_schema(
1334 handler_args,
1335 name,
1336 new_schema_name,
1337 StatementType::ALTER_SOURCE,
1338 None,
1339 )
1340 .await
1341 }
1342 AlterSourceOperation::FormatEncode { format_encode } => {
1343 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1344 .await
1345 }
1346 AlterSourceOperation::RefreshSchema => {
1347 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1348 }
1349 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1350 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1351 handler_args,
1352 PbThrottleTarget::Source,
1353 risingwave_pb::common::PbThrottleType::Source,
1354 name,
1355 rate_limit,
1356 )
1357 .await
1358 }
1359 AlterSourceOperation::SwapRenameSource { target_source } => {
1360 alter_swap_rename::handle_swap_rename(
1361 handler_args,
1362 name,
1363 target_source,
1364 StatementType::ALTER_SOURCE,
1365 )
1366 .await
1367 }
1368 AlterSourceOperation::SetParallelism {
1369 parallelism,
1370 deferred,
1371 } => {
1372 alter_parallelism::handle_alter_parallelism(
1373 handler_args,
1374 name,
1375 parallelism,
1376 StatementType::ALTER_SOURCE,
1377 deferred,
1378 )
1379 .await
1380 }
1381 AlterSourceOperation::SetBackfillParallelism {
1382 parallelism,
1383 deferred,
1384 } => {
1385 alter_parallelism::handle_alter_backfill_parallelism(
1386 handler_args,
1387 name,
1388 parallelism,
1389 StatementType::ALTER_SOURCE,
1390 deferred,
1391 )
1392 .await
1393 }
1394 AlterSourceOperation::SetConfig { entries } => {
1395 alter_streaming_config::handle_alter_streaming_set_config(
1396 handler_args,
1397 name,
1398 entries,
1399 StatementType::ALTER_SOURCE,
1400 )
1401 .await
1402 }
1403 AlterSourceOperation::ResetConfig { keys } => {
1404 alter_streaming_config::handle_alter_streaming_reset_config(
1405 handler_args,
1406 name,
1407 keys,
1408 StatementType::ALTER_SOURCE,
1409 )
1410 .await
1411 }
1412 AlterSourceOperation::ResetSource => {
1413 reset_source::handle_reset_source(handler_args, name).await
1414 }
1415 },
1416 Statement::AlterFunction {
1417 name,
1418 args,
1419 operation,
1420 } => match operation {
1421 AlterFunctionOperation::SetSchema { new_schema_name } => {
1422 alter_set_schema::handle_alter_set_schema(
1423 handler_args,
1424 name,
1425 new_schema_name,
1426 StatementType::ALTER_FUNCTION,
1427 args,
1428 )
1429 .await
1430 }
1431 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1432 alter_owner::handle_alter_owner(
1433 handler_args,
1434 name,
1435 new_owner_name,
1436 StatementType::ALTER_FUNCTION,
1437 args,
1438 )
1439 .await
1440 }
1441 },
1442 Statement::AlterConnection { name, operation } => match operation {
1443 AlterConnectionOperation::SetSchema { new_schema_name } => {
1444 alter_set_schema::handle_alter_set_schema(
1445 handler_args,
1446 name,
1447 new_schema_name,
1448 StatementType::ALTER_CONNECTION,
1449 None,
1450 )
1451 .await
1452 }
1453 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1454 alter_owner::handle_alter_owner(
1455 handler_args,
1456 name,
1457 new_owner_name,
1458 StatementType::ALTER_CONNECTION,
1459 None,
1460 )
1461 .await
1462 }
1463 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1464 alter_connection_props::handle_alter_connection_connector_props(
1465 handler_args,
1466 name,
1467 alter_props,
1468 )
1469 .await
1470 }
1471 },
1472 Statement::AlterSystem { param, value } => {
1473 alter_system::handle_alter_system(handler_args, param, value).await
1474 }
1475 Statement::AlterSecret { name, operation } => match operation {
1476 AlterSecretOperation::ChangeCredential {
1477 with_options,
1478 new_credential,
1479 } => {
1480 alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1481 .await
1482 }
1483 AlterSecretOperation::ChangeOwner { new_owner_name } => {
1484 alter_owner::handle_alter_owner(
1485 handler_args,
1486 name,
1487 new_owner_name,
1488 StatementType::ALTER_SECRET,
1489 None,
1490 )
1491 .await
1492 }
1493 },
1494 Statement::AlterFragment {
1495 fragment_ids,
1496 operation,
1497 } => match operation {
1498 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1499 let [fragment_id] = fragment_ids.as_slice() else {
1500 return Err(ErrorCode::InvalidInputSyntax(
1501 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1502 .to_owned(),
1503 )
1504 .into());
1505 };
1506 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1507 &handler_args.session,
1508 PbThrottleTarget::Fragment,
1509 risingwave_pb::common::PbThrottleType::Backfill,
1510 *fragment_id,
1511 rate_limit,
1512 StatementType::SET_VARIABLE,
1513 )
1514 .await
1515 }
1516 AlterFragmentOperation::SetParallelism { parallelism } => {
1517 alter_parallelism::handle_alter_fragment_parallelism(
1518 handler_args,
1519 fragment_ids.into_iter().map_into().collect(),
1520 parallelism,
1521 )
1522 .await
1523 }
1524 },
1525 Statement::AlterDefaultPrivileges { .. } => {
1526 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1527 }
1528 Statement::StartTransaction { modes } => {
1529 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1530 }
1531 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1532 Statement::Commit { chain } => {
1533 transaction::handle_commit(handler_args, COMMIT, chain).await
1534 }
1535 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1536 Statement::Rollback { chain } => {
1537 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1538 }
1539 Statement::SetTransaction {
1540 modes,
1541 snapshot,
1542 session,
1543 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1544 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1545 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1546 Statement::Comment {
1547 object_type,
1548 object_name,
1549 comment,
1550 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1551 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1552 Statement::Prepare {
1553 name,
1554 data_types,
1555 statement,
1556 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1557 Statement::Deallocate { name, prepare } => {
1558 prepared_statement::handle_deallocate(name, prepare).await
1559 }
1560 Statement::Vacuum { object_name, full } => {
1561 vacuum::handle_vacuum(handler_args, object_name, full).await
1562 }
1563 Statement::Refresh { table_name } => {
1564 refresh::handle_refresh(handler_args, table_name).await
1565 }
1566 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1567 }
1568}
1569
1570fn check_ban_ddl_for_iceberg_engine_table(
1571 session: Arc<SessionImpl>,
1572 stmt: &Statement,
1573) -> Result<()> {
1574 match stmt {
1575 Statement::AlterTable {
1576 name,
1577 operation:
1578 operation @ (AlterTableOperation::AddColumn { .. }
1579 | AlterTableOperation::DropColumn { .. }),
1580 } => {
1581 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1582 if table.is_iceberg_engine_table() {
1583 bail!(
1584 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1585 operation,
1586 schema_name,
1587 name
1588 );
1589 }
1590 }
1591
1592 Statement::AlterTable {
1593 name,
1594 operation: AlterTableOperation::RenameTable { .. },
1595 } => {
1596 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1597 if table.is_iceberg_engine_table() {
1598 bail!(
1599 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1600 schema_name,
1601 name
1602 );
1603 }
1604 }
1605
1606 Statement::AlterTable {
1607 name,
1608 operation: AlterTableOperation::SetParallelism { .. },
1609 } => {
1610 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1611 if table.is_iceberg_engine_table() {
1612 bail!(
1613 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1614 schema_name,
1615 name
1616 );
1617 }
1618 }
1619 Statement::AlterTable {
1620 name,
1621 operation: AlterTableOperation::SetBackfillParallelism { .. },
1622 } => {
1623 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1624 if table.is_iceberg_engine_table() {
1625 bail!(
1626 "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1627 schema_name,
1628 name
1629 );
1630 }
1631 }
1632
1633 Statement::AlterTable {
1634 name,
1635 operation: AlterTableOperation::SetSchema { .. },
1636 } => {
1637 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1638 if table.is_iceberg_engine_table() {
1639 bail!(
1640 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1641 schema_name,
1642 name
1643 );
1644 }
1645 }
1646
1647 Statement::AlterTable {
1648 name,
1649 operation: AlterTableOperation::RefreshSchema,
1650 } => {
1651 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1652 if table.is_iceberg_engine_table() {
1653 bail!(
1654 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1655 schema_name,
1656 name
1657 );
1658 }
1659 }
1660
1661 Statement::AlterTable {
1662 name,
1663 operation: AlterTableOperation::SetSourceRateLimit { .. },
1664 } => {
1665 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1666 if table.is_iceberg_engine_table() {
1667 bail!(
1668 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1669 schema_name,
1670 name
1671 );
1672 }
1673 }
1674
1675 _ => {}
1676 }
1677
1678 Ok(())
1679}