risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116mod reset_source;
117pub mod show;
118mod transaction;
119mod use_db;
120pub mod util;
121pub mod vacuum;
122pub mod variable;
123mod wait;
124
125pub use alter_table_column::{
126    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
127};
128
129/// The [`PgResponseBuilder`] used by RisingWave.
130pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
131
132/// The [`PgResponse`] used by RisingWave.
133pub type RwPgResponse = PgResponse<PgResponseStream>;
134
135#[easy_ext::ext(RwPgResponseBuilderExt)]
136impl RwPgResponseBuilder {
137    /// Append rows to the response.
138    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
139        let fields = T::fields();
140        self.values(
141            rows.into_iter()
142                .map(|row| {
143                    Row::new(
144                        row.into_owned_row()
145                            .into_iter()
146                            .zip_eq_fast(&fields)
147                            .map(|(datum, (_, ty))| {
148                                datum.map(|scalar| {
149                                    scalar.as_scalar_ref_impl().text_format(ty).into()
150                                })
151                            })
152                            .collect(),
153                    )
154                })
155                .collect_vec()
156                .into(),
157            fields_to_descriptors(fields),
158        )
159    }
160}
161
162pub fn fields_to_descriptors(
163    fields: Vec<(&str, risingwave_common::types::DataType)>,
164) -> Vec<PgFieldDescriptor> {
165    fields
166        .iter()
167        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
168        .collect()
169}
170
171pub enum PgResponseStream {
172    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
173    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
174    Rows(BoxStream<'static, RowSetResult>),
175}
176
177impl Stream for PgResponseStream {
178    type Item = std::result::Result<Vec<Row>, BoxedError>;
179
180    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181        match &mut *self {
182            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
183            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
184            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
185        }
186    }
187}
188
189impl From<Vec<Row>> for PgResponseStream {
190    fn from(rows: Vec<Row>) -> Self {
191        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
192    }
193}
194
195#[derive(Clone)]
196pub struct HandlerArgs {
197    pub session: Arc<SessionImpl>,
198    pub sql: Arc<str>,
199    pub normalized_sql: String,
200    pub with_options: WithOptions,
201}
202
203impl HandlerArgs {
204    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
205        Ok(Self {
206            session,
207            sql,
208            with_options: WithOptions::try_from(stmt)?,
209            normalized_sql: Self::normalize_sql(stmt),
210        })
211    }
212
213    /// Get normalized SQL from the statement.
214    ///
215    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
216    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
217    ///   make it suitable for the `SHOW CREATE` statements.
218    fn normalize_sql(stmt: &Statement) -> String {
219        let mut stmt = stmt.clone();
220        match &mut stmt {
221            Statement::CreateView {
222                or_replace,
223                if_not_exists,
224                ..
225            } => {
226                *or_replace = false;
227                *if_not_exists = false;
228            }
229            Statement::CreateTable {
230                or_replace,
231                if_not_exists,
232                ..
233            } => {
234                *or_replace = false;
235                *if_not_exists = false;
236            }
237            Statement::CreateIndex { if_not_exists, .. } => {
238                *if_not_exists = false;
239            }
240            Statement::CreateSource {
241                stmt: CreateSourceStatement { if_not_exists, .. },
242                ..
243            } => {
244                *if_not_exists = false;
245            }
246            Statement::CreateSink {
247                stmt: CreateSinkStatement { if_not_exists, .. },
248            } => {
249                *if_not_exists = false;
250            }
251            Statement::CreateSubscription {
252                stmt: CreateSubscriptionStatement { if_not_exists, .. },
253            } => {
254                *if_not_exists = false;
255            }
256            Statement::CreateConnection {
257                stmt: CreateConnectionStatement { if_not_exists, .. },
258            } => {
259                *if_not_exists = false;
260            }
261            _ => {}
262        }
263        stmt.to_string()
264    }
265}
266
267pub async fn handle(
268    session: Arc<SessionImpl>,
269    stmt: Statement,
270    sql: Arc<str>,
271    formats: Vec<Format>,
272) -> Result<RwPgResponse> {
273    session.clear_cancel_query_flag();
274    let _guard = session.txn_begin_implicit();
275    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
276
277    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
278
279    match stmt {
280        Statement::Explain {
281            statement,
282            analyze,
283            options,
284        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
285        Statement::ExplainAnalyzeStreamJob {
286            target,
287            duration_secs,
288        } => {
289            explain_analyze_stream_job::handle_explain_analyze_stream_job(
290                handler_args,
291                target,
292                duration_secs,
293            )
294            .await
295        }
296        Statement::CreateSource { stmt } => {
297            create_source::handle_create_source(handler_args, stmt).await
298        }
299        Statement::CreateSink { stmt } => {
300            create_sink::handle_create_sink(handler_args, stmt, false).await
301        }
302        Statement::CreateSubscription { stmt } => {
303            create_subscription::handle_create_subscription(handler_args, stmt).await
304        }
305        Statement::CreateConnection { stmt } => {
306            create_connection::handle_create_connection(handler_args, stmt).await
307        }
308        Statement::CreateSecret { stmt } => {
309            create_secret::handle_create_secret(handler_args, stmt).await
310        }
311        Statement::CreateFunction {
312            or_replace,
313            temporary,
314            if_not_exists,
315            name,
316            args,
317            returns,
318            params,
319            with_options,
320        } => {
321            // For general udf, `language` clause could be ignored
322            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
323            if params.language.is_none()
324                || !params
325                    .language
326                    .as_ref()
327                    .unwrap()
328                    .real_value()
329                    .eq_ignore_ascii_case("sql")
330            {
331                create_function::handle_create_function(
332                    handler_args,
333                    or_replace,
334                    temporary,
335                    if_not_exists,
336                    name,
337                    args,
338                    returns,
339                    params,
340                    with_options,
341                )
342                .await
343            } else {
344                create_sql_function::handle_create_sql_function(
345                    handler_args,
346                    or_replace,
347                    temporary,
348                    if_not_exists,
349                    name,
350                    args,
351                    returns,
352                    params,
353                )
354                .await
355            }
356        }
357        Statement::CreateAggregate {
358            or_replace,
359            if_not_exists,
360            name,
361            args,
362            returns,
363            params,
364            ..
365        } => {
366            create_aggregate::handle_create_aggregate(
367                handler_args,
368                or_replace,
369                if_not_exists,
370                name,
371                args,
372                returns,
373                params,
374            )
375            .await
376        }
377        Statement::CreateTable {
378            name,
379            columns,
380            wildcard_idx,
381            constraints,
382            query,
383            with_options: _, // It is put in OptimizerContext
384            // Not supported things
385            or_replace,
386            temporary,
387            if_not_exists,
388            format_encode,
389            source_watermarks,
390            append_only,
391            on_conflict,
392            with_version_columns,
393            cdc_table_info,
394            include_column_options,
395            webhook_info,
396            engine,
397        } => {
398            if or_replace {
399                bail_not_implemented!("CREATE OR REPLACE TABLE");
400            }
401            if temporary {
402                bail_not_implemented!("CREATE TEMPORARY TABLE");
403            }
404            if let Some(query) = query {
405                return create_table_as::handle_create_as(
406                    handler_args,
407                    name,
408                    if_not_exists,
409                    query,
410                    columns,
411                    append_only,
412                    on_conflict,
413                    with_version_columns
414                        .iter()
415                        .map(|col| col.real_value())
416                        .collect(),
417                    engine,
418                )
419                .await;
420            }
421            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
422            create_table::handle_create_table(
423                handler_args,
424                name,
425                columns,
426                wildcard_idx,
427                constraints,
428                if_not_exists,
429                format_encode,
430                source_watermarks,
431                append_only,
432                on_conflict,
433                with_version_columns
434                    .iter()
435                    .map(|col| col.real_value())
436                    .collect(),
437                cdc_table_info,
438                include_column_options,
439                webhook_info,
440                engine,
441            )
442            .await
443        }
444        Statement::CreateDatabase {
445            db_name,
446            if_not_exists,
447            owner,
448            resource_group,
449            barrier_interval_ms,
450            checkpoint_frequency,
451        } => {
452            create_database::handle_create_database(
453                handler_args,
454                db_name,
455                if_not_exists,
456                owner,
457                resource_group,
458                barrier_interval_ms,
459                checkpoint_frequency,
460            )
461            .await
462        }
463        Statement::CreateSchema {
464            schema_name,
465            if_not_exists,
466            owner,
467        } => {
468            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
469                .await
470        }
471        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
472        Statement::DeclareCursor { stmt } => {
473            declare_cursor::handle_declare_cursor(handler_args, stmt).await
474        }
475        Statement::FetchCursor { stmt } => {
476            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
477        }
478        Statement::CloseCursor { stmt } => {
479            close_cursor::handle_close_cursor(handler_args, stmt).await
480        }
481        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
482        Statement::Grant { .. } => {
483            handle_privilege::handle_grant_privilege(handler_args, stmt).await
484        }
485        Statement::Revoke { .. } => {
486            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
487        }
488        Statement::Describe { name, kind } => match kind {
489            DescribeKind::Fragments => {
490                describe::handle_describe_fragments(handler_args, name).await
491            }
492            DescribeKind::Plain => describe::handle_describe(handler_args, name),
493        },
494        Statement::DescribeFragment { fragment_id } => {
495            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
496        }
497        Statement::Discard(..) => discard::handle_discard(handler_args),
498        Statement::ShowObjects {
499            object: show_object,
500            filter,
501        } => show::handle_show_object(handler_args, show_object, filter).await,
502        Statement::ShowCreateObject { create_type, name } => {
503            show::handle_show_create_object(handler_args, create_type, name)
504        }
505        Statement::ShowTransactionIsolationLevel => {
506            transaction::handle_show_isolation_level(handler_args)
507        }
508        Statement::Drop(DropStatement {
509            object_type,
510            object_name,
511            if_exists,
512            drop_mode,
513        }) => {
514            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
515                match object_type {
516                    ObjectType::MaterializedView
517                    | ObjectType::View
518                    | ObjectType::Sink
519                    | ObjectType::Source
520                    | ObjectType::Subscription
521                    | ObjectType::Index
522                    | ObjectType::Table
523                    | ObjectType::Schema
524                    | ObjectType::Connection => true,
525                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
526                        bail_not_implemented!("DROP CASCADE");
527                    }
528                }
529            } else {
530                false
531            };
532            match object_type {
533                ObjectType::Table => {
534                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
535                        .await
536                }
537                ObjectType::MaterializedView => {
538                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
539                }
540                ObjectType::Index => {
541                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
542                        .await
543                }
544                ObjectType::Source => {
545                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
546                        .await
547                }
548                ObjectType::Sink => {
549                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
550                }
551                ObjectType::Subscription => {
552                    drop_subscription::handle_drop_subscription(
553                        handler_args,
554                        object_name,
555                        if_exists,
556                        cascade,
557                    )
558                    .await
559                }
560                ObjectType::Database => {
561                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
562                }
563                ObjectType::Schema => {
564                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
565                        .await
566                }
567                ObjectType::User => {
568                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
569                }
570                ObjectType::View => {
571                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
572                }
573                ObjectType::Connection => {
574                    drop_connection::handle_drop_connection(
575                        handler_args,
576                        object_name,
577                        if_exists,
578                        cascade,
579                    )
580                    .await
581                }
582                ObjectType::Secret => {
583                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
584                }
585            }
586        }
587        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
588        Statement::DropFunction {
589            if_exists,
590            func_desc,
591            option,
592        } => {
593            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
594                .await
595        }
596        Statement::DropAggregate {
597            if_exists,
598            func_desc,
599            option,
600        } => {
601            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
602                .await
603        }
604        Statement::Query(_)
605        | Statement::Insert { .. }
606        | Statement::Delete { .. }
607        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
608        Statement::Copy {
609            entity: CopyEntity::Query(query),
610            target: CopyTarget::Stdout,
611        } => {
612            let response =
613                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
614                    .await?;
615            Ok(response.into_copy_query_to_stdout())
616        }
617        Statement::CreateView {
618            materialized,
619            if_not_exists,
620            name,
621            columns,
622            query,
623            with_options: _, // It is put in OptimizerContext
624            or_replace,      // not supported
625            emit_mode,
626        } => {
627            if or_replace {
628                bail_not_implemented!("CREATE OR REPLACE VIEW");
629            }
630            if materialized {
631                create_mv::handle_create_mv(
632                    handler_args,
633                    if_not_exists,
634                    name,
635                    *query,
636                    columns,
637                    emit_mode,
638                )
639                .await
640            } else {
641                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
642                    .await
643            }
644        }
645        Statement::Flush => flush::handle_flush(handler_args).await,
646        Statement::Wait => wait::handle_wait(handler_args).await,
647        Statement::Recover => recover::handle_recover(handler_args).await,
648        Statement::SetVariable {
649            local: _,
650            variable,
651            value,
652        } => {
653            // special handle for `use database`
654            if variable.real_value().eq_ignore_ascii_case("database") {
655                let x = variable::set_var_to_param_str(&value);
656                let res = use_db::handle_use_db(
657                    handler_args,
658                    ObjectName::from(vec![Ident::from_real_value(
659                        x.as_deref().unwrap_or("default"),
660                    )]),
661                )?;
662                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
663                for notice in res.notices() {
664                    builder = builder.notice(notice);
665                }
666                return Ok(builder.into());
667            }
668            variable::handle_set(handler_args, variable, value)
669        }
670        Statement::SetTimeZone { local: _, value } => {
671            variable::handle_set_time_zone(handler_args, value)
672        }
673        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
674        Statement::CreateIndex {
675            name,
676            table_name,
677            method,
678            columns,
679            include,
680            distributed_by,
681            unique,
682            if_not_exists,
683            with_properties: _,
684        } => {
685            if unique {
686                bail_not_implemented!("create unique index");
687            }
688
689            create_index::handle_create_index(
690                handler_args,
691                if_not_exists,
692                name,
693                table_name,
694                method,
695                columns.clone(),
696                include,
697                distributed_by,
698            )
699            .await
700        }
701        Statement::AlterDatabase { name, operation } => match operation {
702            AlterDatabaseOperation::RenameDatabase { database_name } => {
703                alter_rename::handle_rename_database(handler_args, name, database_name).await
704            }
705            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
706                alter_owner::handle_alter_owner(
707                    handler_args,
708                    name,
709                    new_owner_name,
710                    StatementType::ALTER_DATABASE,
711                    None,
712                )
713                .await
714            }
715            AlterDatabaseOperation::SetParam(config_param) => {
716                let ConfigParam { param, value } = config_param;
717
718                let database_param = match param.real_value().to_uppercase().as_str() {
719                    "BARRIER_INTERVAL_MS" => {
720                        let barrier_interval_ms = match value {
721                            SetVariableValue::Default => None,
722                            SetVariableValue::Single(SetVariableValueSingle::Literal(
723                                Value::Number(num),
724                            )) => {
725                                let num = num.parse::<u32>().map_err(|e| {
726                                    ErrorCode::InvalidInputSyntax(format!(
727                                        "barrier_interval_ms must be a u32 integer: {}",
728                                        e.as_report()
729                                    ))
730                                })?;
731                                Some(num)
732                            }
733                            _ => {
734                                return Err(ErrorCode::InvalidInputSyntax(
735                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
736                                        .to_owned(),
737                                )
738                                .into());
739                            }
740                        };
741                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
742                    }
743                    "CHECKPOINT_FREQUENCY" => {
744                        let checkpoint_frequency = match value {
745                            SetVariableValue::Default => None,
746                            SetVariableValue::Single(SetVariableValueSingle::Literal(
747                                Value::Number(num),
748                            )) => {
749                                let num = num.parse::<u64>().map_err(|e| {
750                                    ErrorCode::InvalidInputSyntax(format!(
751                                        "checkpoint_frequency must be a u64 integer: {}",
752                                        e.as_report()
753                                    ))
754                                })?;
755                                Some(num)
756                            }
757                            _ => {
758                                return Err(ErrorCode::InvalidInputSyntax(
759                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
760                                        .to_owned(),
761                                )
762                                .into());
763                            }
764                        };
765                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
766                    }
767                    _ => {
768                        return Err(ErrorCode::InvalidInputSyntax(format!(
769                            "Unsupported database config parameter: {}",
770                            param.real_value()
771                        ))
772                        .into());
773                    }
774                };
775
776                alter_database_param::handle_alter_database_param(
777                    handler_args,
778                    name,
779                    database_param,
780                )
781                .await
782            }
783        },
784        Statement::AlterSchema { name, operation } => match operation {
785            AlterSchemaOperation::RenameSchema { schema_name } => {
786                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
787            }
788            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
789                alter_owner::handle_alter_owner(
790                    handler_args,
791                    name,
792                    new_owner_name,
793                    StatementType::ALTER_SCHEMA,
794                    None,
795                )
796                .await
797            }
798            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
799                alter_swap_rename::handle_swap_rename(
800                    handler_args,
801                    name,
802                    target_schema,
803                    StatementType::ALTER_SCHEMA,
804                )
805                .await
806            }
807        },
808        Statement::AlterTable { name, operation } => match operation {
809            AlterTableOperation::AddColumn { .. }
810            | AlterTableOperation::DropColumn { .. }
811            | AlterTableOperation::AlterColumn { .. } => {
812                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
813            }
814            AlterTableOperation::RenameTable { table_name } => {
815                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
816                    .await
817            }
818            AlterTableOperation::ChangeOwner { new_owner_name } => {
819                alter_owner::handle_alter_owner(
820                    handler_args,
821                    name,
822                    new_owner_name,
823                    StatementType::ALTER_TABLE,
824                    None,
825                )
826                .await
827            }
828            AlterTableOperation::SetParallelism {
829                parallelism,
830                deferred,
831            } => {
832                alter_parallelism::handle_alter_parallelism(
833                    handler_args,
834                    name,
835                    parallelism,
836                    StatementType::ALTER_TABLE,
837                    deferred,
838                )
839                .await
840            }
841            AlterTableOperation::SetSchema { new_schema_name } => {
842                alter_set_schema::handle_alter_set_schema(
843                    handler_args,
844                    name,
845                    new_schema_name,
846                    StatementType::ALTER_TABLE,
847                    None,
848                )
849                .await
850            }
851            AlterTableOperation::RefreshSchema => {
852                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
853            }
854            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
855                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
856                    handler_args,
857                    PbThrottleTarget::Table,
858                    risingwave_pb::common::PbThrottleType::Source,
859                    name,
860                    rate_limit,
861                )
862                .await
863            }
864            AlterTableOperation::DropConnector => {
865                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
866                    .await
867            }
868            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
869                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
870                    handler_args,
871                    PbThrottleTarget::Table,
872                    risingwave_pb::common::PbThrottleType::Dml,
873                    name,
874                    rate_limit,
875                )
876                .await
877            }
878            AlterTableOperation::SetConfig { entries } => {
879                alter_streaming_config::handle_alter_streaming_set_config(
880                    handler_args,
881                    name,
882                    entries,
883                    StatementType::ALTER_TABLE,
884                )
885                .await
886            }
887            AlterTableOperation::ResetConfig { keys } => {
888                alter_streaming_config::handle_alter_streaming_reset_config(
889                    handler_args,
890                    name,
891                    keys,
892                    StatementType::ALTER_TABLE,
893                )
894                .await
895            }
896            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
897                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
898                    handler_args,
899                    PbThrottleTarget::Table,
900                    risingwave_pb::common::PbThrottleType::Backfill,
901                    name,
902                    rate_limit,
903                )
904                .await
905            }
906            AlterTableOperation::SwapRenameTable { target_table } => {
907                alter_swap_rename::handle_swap_rename(
908                    handler_args,
909                    name,
910                    target_table,
911                    StatementType::ALTER_TABLE,
912                )
913                .await
914            }
915            AlterTableOperation::AlterConnectorProps { alter_props } => {
916                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
917            }
918            AlterTableOperation::AddConstraint { .. }
919            | AlterTableOperation::DropConstraint { .. }
920            | AlterTableOperation::RenameColumn { .. }
921            | AlterTableOperation::ChangeColumn { .. }
922            | AlterTableOperation::RenameConstraint { .. } => {
923                bail_not_implemented!(
924                    "Unhandled statement: {}",
925                    Statement::AlterTable { name, operation }
926                )
927            }
928        },
929        Statement::AlterIndex { name, operation } => match operation {
930            AlterIndexOperation::RenameIndex { index_name } => {
931                alter_rename::handle_rename_index(handler_args, name, index_name).await
932            }
933            AlterIndexOperation::SetParallelism {
934                parallelism,
935                deferred,
936            } => {
937                alter_parallelism::handle_alter_parallelism(
938                    handler_args,
939                    name,
940                    parallelism,
941                    StatementType::ALTER_INDEX,
942                    deferred,
943                )
944                .await
945            }
946            AlterIndexOperation::SetConfig { entries } => {
947                alter_streaming_config::handle_alter_streaming_set_config(
948                    handler_args,
949                    name,
950                    entries,
951                    StatementType::ALTER_INDEX,
952                )
953                .await
954            }
955            AlterIndexOperation::ResetConfig { keys } => {
956                alter_streaming_config::handle_alter_streaming_reset_config(
957                    handler_args,
958                    name,
959                    keys,
960                    StatementType::ALTER_INDEX,
961                )
962                .await
963            }
964        },
965        Statement::AlterView {
966            materialized,
967            name,
968            operation,
969        } => {
970            let statement_type = if materialized {
971                StatementType::ALTER_MATERIALIZED_VIEW
972            } else {
973                StatementType::ALTER_VIEW
974            };
975            match operation {
976                AlterViewOperation::RenameView { view_name } => {
977                    if materialized {
978                        alter_rename::handle_rename_table(
979                            handler_args,
980                            TableType::MaterializedView,
981                            name,
982                            view_name,
983                        )
984                        .await
985                    } else {
986                        alter_rename::handle_rename_view(handler_args, name, view_name).await
987                    }
988                }
989                AlterViewOperation::SetParallelism {
990                    parallelism,
991                    deferred,
992                } => {
993                    if !materialized {
994                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
995                    }
996                    alter_parallelism::handle_alter_parallelism(
997                        handler_args,
998                        name,
999                        parallelism,
1000                        statement_type,
1001                        deferred,
1002                    )
1003                    .await
1004                }
1005                AlterViewOperation::SetResourceGroup {
1006                    resource_group,
1007                    deferred,
1008                } => {
1009                    if !materialized {
1010                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1011                    }
1012                    alter_resource_group::handle_alter_resource_group(
1013                        handler_args,
1014                        name,
1015                        resource_group,
1016                        statement_type,
1017                        deferred,
1018                    )
1019                    .await
1020                }
1021                AlterViewOperation::ChangeOwner { new_owner_name } => {
1022                    alter_owner::handle_alter_owner(
1023                        handler_args,
1024                        name,
1025                        new_owner_name,
1026                        statement_type,
1027                        None,
1028                    )
1029                    .await
1030                }
1031                AlterViewOperation::SetSchema { new_schema_name } => {
1032                    alter_set_schema::handle_alter_set_schema(
1033                        handler_args,
1034                        name,
1035                        new_schema_name,
1036                        statement_type,
1037                        None,
1038                    )
1039                    .await
1040                }
1041                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1042                    if !materialized {
1043                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1044                    }
1045                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1046                        handler_args,
1047                        PbThrottleTarget::Mv,
1048                        risingwave_pb::common::PbThrottleType::Backfill,
1049                        name,
1050                        rate_limit,
1051                    )
1052                    .await
1053                }
1054                AlterViewOperation::SwapRenameView { target_view } => {
1055                    alter_swap_rename::handle_swap_rename(
1056                        handler_args,
1057                        name,
1058                        target_view,
1059                        statement_type,
1060                    )
1061                    .await
1062                }
1063                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1064                    if !materialized {
1065                        bail!(
1066                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1067                        );
1068                    }
1069                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1070                }
1071                AlterViewOperation::AsQuery { query } => {
1072                    if !materialized {
1073                        bail_not_implemented!("ALTER VIEW AS QUERY");
1074                    }
1075                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1076                    if !cfg!(debug_assertions) {
1077                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1078                    }
1079                    alter_mv::handle_alter_mv(handler_args, name, query).await
1080                }
1081                AlterViewOperation::SetConfig { entries } => {
1082                    if !materialized {
1083                        bail!("SET CONFIG is only supported for materialized views");
1084                    }
1085                    alter_streaming_config::handle_alter_streaming_set_config(
1086                        handler_args,
1087                        name,
1088                        entries,
1089                        statement_type,
1090                    )
1091                    .await
1092                }
1093                AlterViewOperation::ResetConfig { keys } => {
1094                    if !materialized {
1095                        bail!("RESET CONFIG is only supported for materialized views");
1096                    }
1097                    alter_streaming_config::handle_alter_streaming_reset_config(
1098                        handler_args,
1099                        name,
1100                        keys,
1101                        statement_type,
1102                    )
1103                    .await
1104                }
1105            }
1106        }
1107
1108        Statement::AlterSink { name, operation } => match operation {
1109            AlterSinkOperation::AlterConnectorProps {
1110                alter_props: changed_props,
1111            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1112            AlterSinkOperation::RenameSink { sink_name } => {
1113                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1114            }
1115            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1116                alter_owner::handle_alter_owner(
1117                    handler_args,
1118                    name,
1119                    new_owner_name,
1120                    StatementType::ALTER_SINK,
1121                    None,
1122                )
1123                .await
1124            }
1125            AlterSinkOperation::SetSchema { new_schema_name } => {
1126                alter_set_schema::handle_alter_set_schema(
1127                    handler_args,
1128                    name,
1129                    new_schema_name,
1130                    StatementType::ALTER_SINK,
1131                    None,
1132                )
1133                .await
1134            }
1135            AlterSinkOperation::SetParallelism {
1136                parallelism,
1137                deferred,
1138            } => {
1139                alter_parallelism::handle_alter_parallelism(
1140                    handler_args,
1141                    name,
1142                    parallelism,
1143                    StatementType::ALTER_SINK,
1144                    deferred,
1145                )
1146                .await
1147            }
1148            AlterSinkOperation::SetConfig { entries } => {
1149                alter_streaming_config::handle_alter_streaming_set_config(
1150                    handler_args,
1151                    name,
1152                    entries,
1153                    StatementType::ALTER_SINK,
1154                )
1155                .await
1156            }
1157            AlterSinkOperation::ResetConfig { keys } => {
1158                alter_streaming_config::handle_alter_streaming_reset_config(
1159                    handler_args,
1160                    name,
1161                    keys,
1162                    StatementType::ALTER_SINK,
1163                )
1164                .await
1165            }
1166            AlterSinkOperation::SwapRenameSink { target_sink } => {
1167                alter_swap_rename::handle_swap_rename(
1168                    handler_args,
1169                    name,
1170                    target_sink,
1171                    StatementType::ALTER_SINK,
1172                )
1173                .await
1174            }
1175            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1176                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1177                    handler_args,
1178                    PbThrottleTarget::Sink,
1179                    risingwave_pb::common::PbThrottleType::Sink,
1180                    name,
1181                    rate_limit,
1182                )
1183                .await
1184            }
1185            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1186                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1187                    handler_args,
1188                    PbThrottleTarget::Sink,
1189                    risingwave_pb::common::PbThrottleType::Backfill,
1190                    name,
1191                    rate_limit,
1192                )
1193                .await
1194            }
1195            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1196                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1197                    handler_args,
1198                    name,
1199                    enable,
1200                )
1201                .await
1202            }
1203        },
1204        Statement::AlterSubscription { name, operation } => match operation {
1205            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1206                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1207                    .await
1208            }
1209            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1210                alter_owner::handle_alter_owner(
1211                    handler_args,
1212                    name,
1213                    new_owner_name,
1214                    StatementType::ALTER_SUBSCRIPTION,
1215                    None,
1216                )
1217                .await
1218            }
1219            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1220                alter_set_schema::handle_alter_set_schema(
1221                    handler_args,
1222                    name,
1223                    new_schema_name,
1224                    StatementType::ALTER_SUBSCRIPTION,
1225                    None,
1226                )
1227                .await
1228            }
1229            AlterSubscriptionOperation::SwapRenameSubscription {
1230                target_subscription,
1231            } => {
1232                alter_swap_rename::handle_swap_rename(
1233                    handler_args,
1234                    name,
1235                    target_subscription,
1236                    StatementType::ALTER_SUBSCRIPTION,
1237                )
1238                .await
1239            }
1240        },
1241        Statement::AlterSource { name, operation } => match operation {
1242            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1243                alter_source_props::handle_alter_source_connector_props(
1244                    handler_args,
1245                    name,
1246                    alter_props,
1247                )
1248                .await
1249            }
1250            AlterSourceOperation::RenameSource { source_name } => {
1251                alter_rename::handle_rename_source(handler_args, name, source_name).await
1252            }
1253            AlterSourceOperation::AddColumn { .. } => {
1254                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1255            }
1256            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1257                alter_owner::handle_alter_owner(
1258                    handler_args,
1259                    name,
1260                    new_owner_name,
1261                    StatementType::ALTER_SOURCE,
1262                    None,
1263                )
1264                .await
1265            }
1266            AlterSourceOperation::SetSchema { new_schema_name } => {
1267                alter_set_schema::handle_alter_set_schema(
1268                    handler_args,
1269                    name,
1270                    new_schema_name,
1271                    StatementType::ALTER_SOURCE,
1272                    None,
1273                )
1274                .await
1275            }
1276            AlterSourceOperation::FormatEncode { format_encode } => {
1277                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1278                    .await
1279            }
1280            AlterSourceOperation::RefreshSchema => {
1281                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1282            }
1283            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1284                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1285                    handler_args,
1286                    PbThrottleTarget::Source,
1287                    risingwave_pb::common::PbThrottleType::Source,
1288                    name,
1289                    rate_limit,
1290                )
1291                .await
1292            }
1293            AlterSourceOperation::SwapRenameSource { target_source } => {
1294                alter_swap_rename::handle_swap_rename(
1295                    handler_args,
1296                    name,
1297                    target_source,
1298                    StatementType::ALTER_SOURCE,
1299                )
1300                .await
1301            }
1302            AlterSourceOperation::SetParallelism {
1303                parallelism,
1304                deferred,
1305            } => {
1306                alter_parallelism::handle_alter_parallelism(
1307                    handler_args,
1308                    name,
1309                    parallelism,
1310                    StatementType::ALTER_SOURCE,
1311                    deferred,
1312                )
1313                .await
1314            }
1315            AlterSourceOperation::SetConfig { entries } => {
1316                alter_streaming_config::handle_alter_streaming_set_config(
1317                    handler_args,
1318                    name,
1319                    entries,
1320                    StatementType::ALTER_SOURCE,
1321                )
1322                .await
1323            }
1324            AlterSourceOperation::ResetConfig { keys } => {
1325                alter_streaming_config::handle_alter_streaming_reset_config(
1326                    handler_args,
1327                    name,
1328                    keys,
1329                    StatementType::ALTER_SOURCE,
1330                )
1331                .await
1332            }
1333            AlterSourceOperation::ResetSource => {
1334                reset_source::handle_reset_source(handler_args, name).await
1335            }
1336        },
1337        Statement::AlterFunction {
1338            name,
1339            args,
1340            operation,
1341        } => match operation {
1342            AlterFunctionOperation::SetSchema { new_schema_name } => {
1343                alter_set_schema::handle_alter_set_schema(
1344                    handler_args,
1345                    name,
1346                    new_schema_name,
1347                    StatementType::ALTER_FUNCTION,
1348                    args,
1349                )
1350                .await
1351            }
1352            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1353                alter_owner::handle_alter_owner(
1354                    handler_args,
1355                    name,
1356                    new_owner_name,
1357                    StatementType::ALTER_FUNCTION,
1358                    args,
1359                )
1360                .await
1361            }
1362        },
1363        Statement::AlterConnection { name, operation } => match operation {
1364            AlterConnectionOperation::SetSchema { new_schema_name } => {
1365                alter_set_schema::handle_alter_set_schema(
1366                    handler_args,
1367                    name,
1368                    new_schema_name,
1369                    StatementType::ALTER_CONNECTION,
1370                    None,
1371                )
1372                .await
1373            }
1374            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1375                alter_owner::handle_alter_owner(
1376                    handler_args,
1377                    name,
1378                    new_owner_name,
1379                    StatementType::ALTER_CONNECTION,
1380                    None,
1381                )
1382                .await
1383            }
1384            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1385                alter_connection_props::handle_alter_connection_connector_props(
1386                    handler_args,
1387                    name,
1388                    alter_props,
1389                )
1390                .await
1391            }
1392        },
1393        Statement::AlterSystem { param, value } => {
1394            alter_system::handle_alter_system(handler_args, param, value).await
1395        }
1396        Statement::AlterSecret { name, operation } => match operation {
1397            AlterSecretOperation::ChangeCredential {
1398                with_options,
1399                new_credential,
1400            } => {
1401                alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1402                    .await
1403            }
1404            AlterSecretOperation::ChangeOwner { new_owner_name } => {
1405                alter_owner::handle_alter_owner(
1406                    handler_args,
1407                    name,
1408                    new_owner_name,
1409                    StatementType::ALTER_SECRET,
1410                    None,
1411                )
1412                .await
1413            }
1414        },
1415        Statement::AlterFragment {
1416            fragment_ids,
1417            operation,
1418        } => match operation {
1419            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1420                let [fragment_id] = fragment_ids.as_slice() else {
1421                    return Err(ErrorCode::InvalidInputSyntax(
1422                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1423                            .to_owned(),
1424                    )
1425                    .into());
1426                };
1427                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1428                    &handler_args.session,
1429                    PbThrottleTarget::Fragment,
1430                    risingwave_pb::common::PbThrottleType::Backfill,
1431                    *fragment_id,
1432                    rate_limit,
1433                    StatementType::SET_VARIABLE,
1434                )
1435                .await
1436            }
1437            AlterFragmentOperation::SetParallelism { parallelism } => {
1438                alter_parallelism::handle_alter_fragment_parallelism(
1439                    handler_args,
1440                    fragment_ids.into_iter().map_into().collect(),
1441                    parallelism,
1442                )
1443                .await
1444            }
1445        },
1446        Statement::AlterDefaultPrivileges { .. } => {
1447            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1448        }
1449        Statement::StartTransaction { modes } => {
1450            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1451        }
1452        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1453        Statement::Commit { chain } => {
1454            transaction::handle_commit(handler_args, COMMIT, chain).await
1455        }
1456        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1457        Statement::Rollback { chain } => {
1458            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1459        }
1460        Statement::SetTransaction {
1461            modes,
1462            snapshot,
1463            session,
1464        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1465        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1466        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1467        Statement::Comment {
1468            object_type,
1469            object_name,
1470            comment,
1471        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1472        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1473        Statement::Prepare {
1474            name,
1475            data_types,
1476            statement,
1477        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1478        Statement::Deallocate { name, prepare } => {
1479            prepared_statement::handle_deallocate(name, prepare).await
1480        }
1481        Statement::Vacuum { object_name, full } => {
1482            vacuum::handle_vacuum(handler_args, object_name, full).await
1483        }
1484        Statement::Refresh { table_name } => {
1485            refresh::handle_refresh(handler_args, table_name).await
1486        }
1487        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1488    }
1489}
1490
1491fn check_ban_ddl_for_iceberg_engine_table(
1492    session: Arc<SessionImpl>,
1493    stmt: &Statement,
1494) -> Result<()> {
1495    match stmt {
1496        Statement::AlterTable {
1497            name,
1498            operation:
1499                operation @ (AlterTableOperation::AddColumn { .. }
1500                | AlterTableOperation::DropColumn { .. }),
1501        } => {
1502            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1503            if table.is_iceberg_engine_table() {
1504                bail!(
1505                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1506                    operation,
1507                    schema_name,
1508                    name
1509                );
1510            }
1511        }
1512
1513        Statement::AlterTable {
1514            name,
1515            operation: AlterTableOperation::RenameTable { .. },
1516        } => {
1517            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1518            if table.is_iceberg_engine_table() {
1519                bail!(
1520                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1521                    schema_name,
1522                    name
1523                );
1524            }
1525        }
1526
1527        Statement::AlterTable {
1528            name,
1529            operation: AlterTableOperation::SetParallelism { .. },
1530        } => {
1531            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1532            if table.is_iceberg_engine_table() {
1533                bail!(
1534                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1535                    schema_name,
1536                    name
1537                );
1538            }
1539        }
1540
1541        Statement::AlterTable {
1542            name,
1543            operation: AlterTableOperation::SetSchema { .. },
1544        } => {
1545            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1546            if table.is_iceberg_engine_table() {
1547                bail!(
1548                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1549                    schema_name,
1550                    name
1551                );
1552            }
1553        }
1554
1555        Statement::AlterTable {
1556            name,
1557            operation: AlterTableOperation::RefreshSchema,
1558        } => {
1559            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1560            if table.is_iceberg_engine_table() {
1561                bail!(
1562                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1563                    schema_name,
1564                    name
1565                );
1566            }
1567        }
1568
1569        Statement::AlterTable {
1570            name,
1571            operation: AlterTableOperation::SetSourceRateLimit { .. },
1572        } => {
1573            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1574            if table.is_iceberg_engine_table() {
1575                bail!(
1576                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1577                    schema_name,
1578                    name
1579                );
1580            }
1581        }
1582
1583        _ => {}
1584    }
1585
1586    Ok(())
1587}