risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116mod reset_source;
117pub mod show;
118mod transaction;
119mod use_db;
120pub mod util;
121pub mod vacuum;
122pub mod variable;
123mod wait;
124
125pub use alter_table_column::{
126    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
127};
128
129/// The [`PgResponseBuilder`] used by RisingWave.
130pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
131
132/// The [`PgResponse`] used by RisingWave.
133pub type RwPgResponse = PgResponse<PgResponseStream>;
134
135#[easy_ext::ext(RwPgResponseBuilderExt)]
136impl RwPgResponseBuilder {
137    /// Append rows to the response.
138    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
139        let fields = T::fields();
140        self.values(
141            rows.into_iter()
142                .map(|row| {
143                    Row::new(
144                        row.into_owned_row()
145                            .into_iter()
146                            .zip_eq_fast(&fields)
147                            .map(|(datum, (_, ty))| {
148                                datum.map(|scalar| {
149                                    scalar.as_scalar_ref_impl().text_format(ty).into()
150                                })
151                            })
152                            .collect(),
153                    )
154                })
155                .collect_vec()
156                .into(),
157            fields_to_descriptors(fields),
158        )
159    }
160}
161
162pub fn fields_to_descriptors(
163    fields: Vec<(&str, risingwave_common::types::DataType)>,
164) -> Vec<PgFieldDescriptor> {
165    fields
166        .iter()
167        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
168        .collect()
169}
170
171pub enum PgResponseStream {
172    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
173    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
174    Rows(BoxStream<'static, RowSetResult>),
175}
176
177impl Stream for PgResponseStream {
178    type Item = std::result::Result<Vec<Row>, BoxedError>;
179
180    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181        match &mut *self {
182            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
183            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
184            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
185        }
186    }
187}
188
189impl From<Vec<Row>> for PgResponseStream {
190    fn from(rows: Vec<Row>) -> Self {
191        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
192    }
193}
194
195#[derive(Clone)]
196pub struct HandlerArgs {
197    pub session: Arc<SessionImpl>,
198    pub sql: Arc<str>,
199    pub normalized_sql: String,
200    pub with_options: WithOptions,
201}
202
203impl HandlerArgs {
204    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
205        Ok(Self {
206            session,
207            sql,
208            with_options: WithOptions::try_from(stmt)?,
209            normalized_sql: Self::normalize_sql(stmt),
210        })
211    }
212
213    /// Get normalized SQL from the statement.
214    ///
215    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
216    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
217    ///   make it suitable for the `SHOW CREATE` statements.
218    fn normalize_sql(stmt: &Statement) -> String {
219        let mut stmt = stmt.clone();
220        match &mut stmt {
221            Statement::CreateView {
222                or_replace,
223                if_not_exists,
224                ..
225            } => {
226                *or_replace = false;
227                *if_not_exists = false;
228            }
229            Statement::CreateTable {
230                or_replace,
231                if_not_exists,
232                ..
233            } => {
234                *or_replace = false;
235                *if_not_exists = false;
236            }
237            Statement::CreateIndex { if_not_exists, .. } => {
238                *if_not_exists = false;
239            }
240            Statement::CreateSource {
241                stmt: CreateSourceStatement { if_not_exists, .. },
242                ..
243            } => {
244                *if_not_exists = false;
245            }
246            Statement::CreateSink {
247                stmt: CreateSinkStatement { if_not_exists, .. },
248            } => {
249                *if_not_exists = false;
250            }
251            Statement::CreateSubscription {
252                stmt: CreateSubscriptionStatement { if_not_exists, .. },
253            } => {
254                *if_not_exists = false;
255            }
256            Statement::CreateConnection {
257                stmt: CreateConnectionStatement { if_not_exists, .. },
258            } => {
259                *if_not_exists = false;
260            }
261            _ => {}
262        }
263        stmt.to_string()
264    }
265}
266
267pub async fn handle(
268    session: Arc<SessionImpl>,
269    stmt: Statement,
270    sql: Arc<str>,
271    formats: Vec<Format>,
272) -> Result<RwPgResponse> {
273    session.clear_cancel_query_flag();
274    let _guard = session.txn_begin_implicit();
275    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
276
277    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
278
279    match stmt {
280        Statement::Explain {
281            statement,
282            analyze,
283            options,
284        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
285        Statement::ExplainAnalyzeStreamJob {
286            target,
287            duration_secs,
288        } => {
289            explain_analyze_stream_job::handle_explain_analyze_stream_job(
290                handler_args,
291                target,
292                duration_secs,
293            )
294            .await
295        }
296        Statement::CreateSource { stmt } => {
297            create_source::handle_create_source(handler_args, stmt).await
298        }
299        Statement::CreateSink { stmt } => {
300            create_sink::handle_create_sink(handler_args, stmt, false).await
301        }
302        Statement::CreateSubscription { stmt } => {
303            create_subscription::handle_create_subscription(handler_args, stmt).await
304        }
305        Statement::CreateConnection { stmt } => {
306            create_connection::handle_create_connection(handler_args, stmt).await
307        }
308        Statement::CreateSecret { stmt } => {
309            create_secret::handle_create_secret(handler_args, stmt).await
310        }
311        Statement::CreateFunction {
312            or_replace,
313            temporary,
314            if_not_exists,
315            name,
316            args,
317            returns,
318            params,
319            with_options,
320        } => {
321            // For general udf, `language` clause could be ignored
322            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
323            if params.language.is_none()
324                || !params
325                    .language
326                    .as_ref()
327                    .unwrap()
328                    .real_value()
329                    .eq_ignore_ascii_case("sql")
330            {
331                create_function::handle_create_function(
332                    handler_args,
333                    or_replace,
334                    temporary,
335                    if_not_exists,
336                    name,
337                    args,
338                    returns,
339                    params,
340                    with_options,
341                )
342                .await
343            } else {
344                create_sql_function::handle_create_sql_function(
345                    handler_args,
346                    or_replace,
347                    temporary,
348                    if_not_exists,
349                    name,
350                    args,
351                    returns,
352                    params,
353                )
354                .await
355            }
356        }
357        Statement::CreateAggregate {
358            or_replace,
359            if_not_exists,
360            name,
361            args,
362            returns,
363            params,
364            ..
365        } => {
366            create_aggregate::handle_create_aggregate(
367                handler_args,
368                or_replace,
369                if_not_exists,
370                name,
371                args,
372                returns,
373                params,
374            )
375            .await
376        }
377        Statement::CreateTable {
378            name,
379            columns,
380            wildcard_idx,
381            constraints,
382            query,
383            with_options: _, // It is put in OptimizerContext
384            // Not supported things
385            or_replace,
386            temporary,
387            if_not_exists,
388            format_encode,
389            source_watermarks,
390            append_only,
391            on_conflict,
392            with_version_columns,
393            cdc_table_info,
394            include_column_options,
395            webhook_info,
396            engine,
397        } => {
398            if or_replace {
399                bail_not_implemented!("CREATE OR REPLACE TABLE");
400            }
401            if temporary {
402                bail_not_implemented!("CREATE TEMPORARY TABLE");
403            }
404            if let Some(query) = query {
405                return create_table_as::handle_create_as(
406                    handler_args,
407                    name,
408                    if_not_exists,
409                    query,
410                    columns,
411                    append_only,
412                    on_conflict,
413                    with_version_columns
414                        .iter()
415                        .map(|col| col.real_value())
416                        .collect(),
417                    engine,
418                )
419                .await;
420            }
421            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
422            create_table::handle_create_table(
423                handler_args,
424                name,
425                columns,
426                wildcard_idx,
427                constraints,
428                if_not_exists,
429                format_encode,
430                source_watermarks,
431                append_only,
432                on_conflict,
433                with_version_columns
434                    .iter()
435                    .map(|col| col.real_value())
436                    .collect(),
437                cdc_table_info,
438                include_column_options,
439                webhook_info,
440                engine,
441            )
442            .await
443        }
444        Statement::CreateDatabase {
445            db_name,
446            if_not_exists,
447            owner,
448            resource_group,
449            barrier_interval_ms,
450            checkpoint_frequency,
451        } => {
452            create_database::handle_create_database(
453                handler_args,
454                db_name,
455                if_not_exists,
456                owner,
457                resource_group,
458                barrier_interval_ms,
459                checkpoint_frequency,
460            )
461            .await
462        }
463        Statement::CreateSchema {
464            schema_name,
465            if_not_exists,
466            owner,
467        } => {
468            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
469                .await
470        }
471        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
472        Statement::DeclareCursor { stmt } => {
473            declare_cursor::handle_declare_cursor(handler_args, stmt).await
474        }
475        Statement::FetchCursor { stmt } => {
476            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
477        }
478        Statement::CloseCursor { stmt } => {
479            close_cursor::handle_close_cursor(handler_args, stmt).await
480        }
481        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
482        Statement::Grant { .. } => {
483            handle_privilege::handle_grant_privilege(handler_args, stmt).await
484        }
485        Statement::Revoke { .. } => {
486            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
487        }
488        Statement::Describe { name, kind } => match kind {
489            DescribeKind::Fragments => {
490                describe::handle_describe_fragments(handler_args, name).await
491            }
492            DescribeKind::Plain => describe::handle_describe(handler_args, name),
493        },
494        Statement::DescribeFragment { fragment_id } => {
495            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
496        }
497        Statement::Discard(..) => discard::handle_discard(handler_args),
498        Statement::ShowObjects {
499            object: show_object,
500            filter,
501        } => show::handle_show_object(handler_args, show_object, filter).await,
502        Statement::ShowCreateObject { create_type, name } => {
503            show::handle_show_create_object(handler_args, create_type, name)
504        }
505        Statement::ShowTransactionIsolationLevel => {
506            transaction::handle_show_isolation_level(handler_args)
507        }
508        Statement::Drop(DropStatement {
509            object_type,
510            object_name,
511            if_exists,
512            drop_mode,
513        }) => {
514            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
515                match object_type {
516                    ObjectType::MaterializedView
517                    | ObjectType::View
518                    | ObjectType::Sink
519                    | ObjectType::Source
520                    | ObjectType::Subscription
521                    | ObjectType::Index
522                    | ObjectType::Table
523                    | ObjectType::Schema
524                    | ObjectType::Connection
525                    | ObjectType::Secret => true,
526                    ObjectType::Database | ObjectType::User => {
527                        bail_not_implemented!("DROP CASCADE");
528                    }
529                }
530            } else {
531                false
532            };
533            match object_type {
534                ObjectType::Table => {
535                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
536                        .await
537                }
538                ObjectType::MaterializedView => {
539                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
540                }
541                ObjectType::Index => {
542                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
543                        .await
544                }
545                ObjectType::Source => {
546                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
547                        .await
548                }
549                ObjectType::Sink => {
550                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
551                }
552                ObjectType::Subscription => {
553                    drop_subscription::handle_drop_subscription(
554                        handler_args,
555                        object_name,
556                        if_exists,
557                        cascade,
558                    )
559                    .await
560                }
561                ObjectType::Database => {
562                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
563                }
564                ObjectType::Schema => {
565                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
566                        .await
567                }
568                ObjectType::User => {
569                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
570                }
571                ObjectType::View => {
572                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
573                }
574                ObjectType::Connection => {
575                    drop_connection::handle_drop_connection(
576                        handler_args,
577                        object_name,
578                        if_exists,
579                        cascade,
580                    )
581                    .await
582                }
583                ObjectType::Secret => {
584                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
585                        .await
586                }
587            }
588        }
589        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
590        Statement::DropFunction {
591            if_exists,
592            func_desc,
593            option,
594        } => {
595            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
596                .await
597        }
598        Statement::DropAggregate {
599            if_exists,
600            func_desc,
601            option,
602        } => {
603            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
604                .await
605        }
606        Statement::Query(_)
607        | Statement::Insert { .. }
608        | Statement::Delete { .. }
609        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
610        Statement::Copy {
611            entity: CopyEntity::Query(query),
612            target: CopyTarget::Stdout,
613        } => {
614            let response =
615                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
616                    .await?;
617            Ok(response.into_copy_query_to_stdout())
618        }
619        Statement::CreateView {
620            materialized,
621            if_not_exists,
622            name,
623            columns,
624            query,
625            with_options: _, // It is put in OptimizerContext
626            or_replace,      // not supported
627            emit_mode,
628        } => {
629            if or_replace {
630                bail_not_implemented!("CREATE OR REPLACE VIEW");
631            }
632            if materialized {
633                create_mv::handle_create_mv(
634                    handler_args,
635                    if_not_exists,
636                    name,
637                    *query,
638                    columns,
639                    emit_mode,
640                )
641                .await
642            } else {
643                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
644                    .await
645            }
646        }
647        Statement::Flush => flush::handle_flush(handler_args).await,
648        Statement::Wait => wait::handle_wait(handler_args).await,
649        Statement::Recover => recover::handle_recover(handler_args).await,
650        Statement::SetVariable {
651            local: _,
652            variable,
653            value,
654        } => {
655            // special handle for `use database`
656            if variable.real_value().eq_ignore_ascii_case("database") {
657                let x = variable::set_var_to_param_str(&value);
658                let res = use_db::handle_use_db(
659                    handler_args,
660                    ObjectName::from(vec![Ident::from_real_value(
661                        x.as_deref().unwrap_or("default"),
662                    )]),
663                )?;
664                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
665                for notice in res.notices() {
666                    builder = builder.notice(notice);
667                }
668                return Ok(builder.into());
669            }
670            variable::handle_set(handler_args, variable, value)
671        }
672        Statement::SetTimeZone { local: _, value } => {
673            variable::handle_set_time_zone(handler_args, value)
674        }
675        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
676        Statement::CreateIndex {
677            name,
678            table_name,
679            method,
680            columns,
681            include,
682            distributed_by,
683            unique,
684            if_not_exists,
685            with_properties: _,
686        } => {
687            if unique {
688                bail_not_implemented!("create unique index");
689            }
690
691            create_index::handle_create_index(
692                handler_args,
693                if_not_exists,
694                name,
695                table_name,
696                method,
697                columns.clone(),
698                include,
699                distributed_by,
700            )
701            .await
702        }
703        Statement::AlterDatabase { name, operation } => match operation {
704            AlterDatabaseOperation::RenameDatabase { database_name } => {
705                alter_rename::handle_rename_database(handler_args, name, database_name).await
706            }
707            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
708                alter_owner::handle_alter_owner(
709                    handler_args,
710                    name,
711                    new_owner_name,
712                    StatementType::ALTER_DATABASE,
713                    None,
714                )
715                .await
716            }
717            AlterDatabaseOperation::SetParam(config_param) => {
718                let ConfigParam { param, value } = config_param;
719
720                let database_param = match param.real_value().to_uppercase().as_str() {
721                    "BARRIER_INTERVAL_MS" => {
722                        let barrier_interval_ms = match value {
723                            SetVariableValue::Default => None,
724                            SetVariableValue::Single(SetVariableValueSingle::Literal(
725                                Value::Number(num),
726                            )) => {
727                                let num = num.parse::<u32>().map_err(|e| {
728                                    ErrorCode::InvalidInputSyntax(format!(
729                                        "barrier_interval_ms must be a u32 integer: {}",
730                                        e.as_report()
731                                    ))
732                                })?;
733                                Some(num)
734                            }
735                            _ => {
736                                return Err(ErrorCode::InvalidInputSyntax(
737                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
738                                        .to_owned(),
739                                )
740                                .into());
741                            }
742                        };
743                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
744                    }
745                    "CHECKPOINT_FREQUENCY" => {
746                        let checkpoint_frequency = match value {
747                            SetVariableValue::Default => None,
748                            SetVariableValue::Single(SetVariableValueSingle::Literal(
749                                Value::Number(num),
750                            )) => {
751                                let num = num.parse::<u64>().map_err(|e| {
752                                    ErrorCode::InvalidInputSyntax(format!(
753                                        "checkpoint_frequency must be a u64 integer: {}",
754                                        e.as_report()
755                                    ))
756                                })?;
757                                Some(num)
758                            }
759                            _ => {
760                                return Err(ErrorCode::InvalidInputSyntax(
761                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
762                                        .to_owned(),
763                                )
764                                .into());
765                            }
766                        };
767                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
768                    }
769                    _ => {
770                        return Err(ErrorCode::InvalidInputSyntax(format!(
771                            "Unsupported database config parameter: {}",
772                            param.real_value()
773                        ))
774                        .into());
775                    }
776                };
777
778                alter_database_param::handle_alter_database_param(
779                    handler_args,
780                    name,
781                    database_param,
782                )
783                .await
784            }
785        },
786        Statement::AlterSchema { name, operation } => match operation {
787            AlterSchemaOperation::RenameSchema { schema_name } => {
788                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
789            }
790            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
791                alter_owner::handle_alter_owner(
792                    handler_args,
793                    name,
794                    new_owner_name,
795                    StatementType::ALTER_SCHEMA,
796                    None,
797                )
798                .await
799            }
800            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
801                alter_swap_rename::handle_swap_rename(
802                    handler_args,
803                    name,
804                    target_schema,
805                    StatementType::ALTER_SCHEMA,
806                )
807                .await
808            }
809        },
810        Statement::AlterTable { name, operation } => match operation {
811            AlterTableOperation::AddColumn { .. }
812            | AlterTableOperation::DropColumn { .. }
813            | AlterTableOperation::AlterColumn { .. } => {
814                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
815            }
816            AlterTableOperation::RenameTable { table_name } => {
817                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
818                    .await
819            }
820            AlterTableOperation::ChangeOwner { new_owner_name } => {
821                alter_owner::handle_alter_owner(
822                    handler_args,
823                    name,
824                    new_owner_name,
825                    StatementType::ALTER_TABLE,
826                    None,
827                )
828                .await
829            }
830            AlterTableOperation::SetParallelism {
831                parallelism,
832                deferred,
833            } => {
834                alter_parallelism::handle_alter_parallelism(
835                    handler_args,
836                    name,
837                    parallelism,
838                    StatementType::ALTER_TABLE,
839                    deferred,
840                )
841                .await
842            }
843            AlterTableOperation::SetSchema { new_schema_name } => {
844                alter_set_schema::handle_alter_set_schema(
845                    handler_args,
846                    name,
847                    new_schema_name,
848                    StatementType::ALTER_TABLE,
849                    None,
850                )
851                .await
852            }
853            AlterTableOperation::RefreshSchema => {
854                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
855            }
856            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
857                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
858                    handler_args,
859                    PbThrottleTarget::Table,
860                    risingwave_pb::common::PbThrottleType::Source,
861                    name,
862                    rate_limit,
863                )
864                .await
865            }
866            AlterTableOperation::DropConnector => {
867                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
868                    .await
869            }
870            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
871                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
872                    handler_args,
873                    PbThrottleTarget::Table,
874                    risingwave_pb::common::PbThrottleType::Dml,
875                    name,
876                    rate_limit,
877                )
878                .await
879            }
880            AlterTableOperation::SetConfig { entries } => {
881                alter_streaming_config::handle_alter_streaming_set_config(
882                    handler_args,
883                    name,
884                    entries,
885                    StatementType::ALTER_TABLE,
886                )
887                .await
888            }
889            AlterTableOperation::ResetConfig { keys } => {
890                alter_streaming_config::handle_alter_streaming_reset_config(
891                    handler_args,
892                    name,
893                    keys,
894                    StatementType::ALTER_TABLE,
895                )
896                .await
897            }
898            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
899                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
900                    handler_args,
901                    PbThrottleTarget::Table,
902                    risingwave_pb::common::PbThrottleType::Backfill,
903                    name,
904                    rate_limit,
905                )
906                .await
907            }
908            AlterTableOperation::SwapRenameTable { target_table } => {
909                alter_swap_rename::handle_swap_rename(
910                    handler_args,
911                    name,
912                    target_table,
913                    StatementType::ALTER_TABLE,
914                )
915                .await
916            }
917            AlterTableOperation::AlterConnectorProps { alter_props } => {
918                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
919            }
920            AlterTableOperation::AddConstraint { .. }
921            | AlterTableOperation::DropConstraint { .. }
922            | AlterTableOperation::RenameColumn { .. }
923            | AlterTableOperation::ChangeColumn { .. }
924            | AlterTableOperation::RenameConstraint { .. } => {
925                bail_not_implemented!(
926                    "Unhandled statement: {}",
927                    Statement::AlterTable { name, operation }
928                )
929            }
930        },
931        Statement::AlterIndex { name, operation } => match operation {
932            AlterIndexOperation::RenameIndex { index_name } => {
933                alter_rename::handle_rename_index(handler_args, name, index_name).await
934            }
935            AlterIndexOperation::SetParallelism {
936                parallelism,
937                deferred,
938            } => {
939                alter_parallelism::handle_alter_parallelism(
940                    handler_args,
941                    name,
942                    parallelism,
943                    StatementType::ALTER_INDEX,
944                    deferred,
945                )
946                .await
947            }
948            AlterIndexOperation::SetConfig { entries } => {
949                alter_streaming_config::handle_alter_streaming_set_config(
950                    handler_args,
951                    name,
952                    entries,
953                    StatementType::ALTER_INDEX,
954                )
955                .await
956            }
957            AlterIndexOperation::ResetConfig { keys } => {
958                alter_streaming_config::handle_alter_streaming_reset_config(
959                    handler_args,
960                    name,
961                    keys,
962                    StatementType::ALTER_INDEX,
963                )
964                .await
965            }
966        },
967        Statement::AlterView {
968            materialized,
969            name,
970            operation,
971        } => {
972            let statement_type = if materialized {
973                StatementType::ALTER_MATERIALIZED_VIEW
974            } else {
975                StatementType::ALTER_VIEW
976            };
977            match operation {
978                AlterViewOperation::RenameView { view_name } => {
979                    if materialized {
980                        alter_rename::handle_rename_table(
981                            handler_args,
982                            TableType::MaterializedView,
983                            name,
984                            view_name,
985                        )
986                        .await
987                    } else {
988                        alter_rename::handle_rename_view(handler_args, name, view_name).await
989                    }
990                }
991                AlterViewOperation::SetParallelism {
992                    parallelism,
993                    deferred,
994                } => {
995                    if !materialized {
996                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
997                    }
998                    alter_parallelism::handle_alter_parallelism(
999                        handler_args,
1000                        name,
1001                        parallelism,
1002                        statement_type,
1003                        deferred,
1004                    )
1005                    .await
1006                }
1007                AlterViewOperation::SetResourceGroup {
1008                    resource_group,
1009                    deferred,
1010                } => {
1011                    if !materialized {
1012                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1013                    }
1014                    alter_resource_group::handle_alter_resource_group(
1015                        handler_args,
1016                        name,
1017                        resource_group,
1018                        statement_type,
1019                        deferred,
1020                    )
1021                    .await
1022                }
1023                AlterViewOperation::ChangeOwner { new_owner_name } => {
1024                    alter_owner::handle_alter_owner(
1025                        handler_args,
1026                        name,
1027                        new_owner_name,
1028                        statement_type,
1029                        None,
1030                    )
1031                    .await
1032                }
1033                AlterViewOperation::SetSchema { new_schema_name } => {
1034                    alter_set_schema::handle_alter_set_schema(
1035                        handler_args,
1036                        name,
1037                        new_schema_name,
1038                        statement_type,
1039                        None,
1040                    )
1041                    .await
1042                }
1043                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1044                    if !materialized {
1045                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1046                    }
1047                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1048                        handler_args,
1049                        PbThrottleTarget::Mv,
1050                        risingwave_pb::common::PbThrottleType::Backfill,
1051                        name,
1052                        rate_limit,
1053                    )
1054                    .await
1055                }
1056                AlterViewOperation::SwapRenameView { target_view } => {
1057                    alter_swap_rename::handle_swap_rename(
1058                        handler_args,
1059                        name,
1060                        target_view,
1061                        statement_type,
1062                    )
1063                    .await
1064                }
1065                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1066                    if !materialized {
1067                        bail!(
1068                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1069                        );
1070                    }
1071                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1072                }
1073                AlterViewOperation::AsQuery { query } => {
1074                    if !materialized {
1075                        bail_not_implemented!("ALTER VIEW AS QUERY");
1076                    }
1077                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1078                    if !cfg!(debug_assertions) {
1079                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1080                    }
1081                    alter_mv::handle_alter_mv(handler_args, name, query).await
1082                }
1083                AlterViewOperation::SetConfig { entries } => {
1084                    if !materialized {
1085                        bail!("SET CONFIG is only supported for materialized views");
1086                    }
1087                    alter_streaming_config::handle_alter_streaming_set_config(
1088                        handler_args,
1089                        name,
1090                        entries,
1091                        statement_type,
1092                    )
1093                    .await
1094                }
1095                AlterViewOperation::ResetConfig { keys } => {
1096                    if !materialized {
1097                        bail!("RESET CONFIG is only supported for materialized views");
1098                    }
1099                    alter_streaming_config::handle_alter_streaming_reset_config(
1100                        handler_args,
1101                        name,
1102                        keys,
1103                        statement_type,
1104                    )
1105                    .await
1106                }
1107            }
1108        }
1109
1110        Statement::AlterSink { name, operation } => match operation {
1111            AlterSinkOperation::AlterConnectorProps {
1112                alter_props: changed_props,
1113            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1114            AlterSinkOperation::RenameSink { sink_name } => {
1115                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1116            }
1117            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1118                alter_owner::handle_alter_owner(
1119                    handler_args,
1120                    name,
1121                    new_owner_name,
1122                    StatementType::ALTER_SINK,
1123                    None,
1124                )
1125                .await
1126            }
1127            AlterSinkOperation::SetSchema { new_schema_name } => {
1128                alter_set_schema::handle_alter_set_schema(
1129                    handler_args,
1130                    name,
1131                    new_schema_name,
1132                    StatementType::ALTER_SINK,
1133                    None,
1134                )
1135                .await
1136            }
1137            AlterSinkOperation::SetParallelism {
1138                parallelism,
1139                deferred,
1140            } => {
1141                alter_parallelism::handle_alter_parallelism(
1142                    handler_args,
1143                    name,
1144                    parallelism,
1145                    StatementType::ALTER_SINK,
1146                    deferred,
1147                )
1148                .await
1149            }
1150            AlterSinkOperation::SetConfig { entries } => {
1151                alter_streaming_config::handle_alter_streaming_set_config(
1152                    handler_args,
1153                    name,
1154                    entries,
1155                    StatementType::ALTER_SINK,
1156                )
1157                .await
1158            }
1159            AlterSinkOperation::ResetConfig { keys } => {
1160                alter_streaming_config::handle_alter_streaming_reset_config(
1161                    handler_args,
1162                    name,
1163                    keys,
1164                    StatementType::ALTER_SINK,
1165                )
1166                .await
1167            }
1168            AlterSinkOperation::SwapRenameSink { target_sink } => {
1169                alter_swap_rename::handle_swap_rename(
1170                    handler_args,
1171                    name,
1172                    target_sink,
1173                    StatementType::ALTER_SINK,
1174                )
1175                .await
1176            }
1177            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1178                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1179                    handler_args,
1180                    PbThrottleTarget::Sink,
1181                    risingwave_pb::common::PbThrottleType::Sink,
1182                    name,
1183                    rate_limit,
1184                )
1185                .await
1186            }
1187            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1188                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1189                    handler_args,
1190                    PbThrottleTarget::Sink,
1191                    risingwave_pb::common::PbThrottleType::Backfill,
1192                    name,
1193                    rate_limit,
1194                )
1195                .await
1196            }
1197            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1198                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1199                    handler_args,
1200                    name,
1201                    enable,
1202                )
1203                .await
1204            }
1205        },
1206        Statement::AlterSubscription { name, operation } => match operation {
1207            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1208                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1209                    .await
1210            }
1211            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1212                alter_owner::handle_alter_owner(
1213                    handler_args,
1214                    name,
1215                    new_owner_name,
1216                    StatementType::ALTER_SUBSCRIPTION,
1217                    None,
1218                )
1219                .await
1220            }
1221            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1222                alter_set_schema::handle_alter_set_schema(
1223                    handler_args,
1224                    name,
1225                    new_schema_name,
1226                    StatementType::ALTER_SUBSCRIPTION,
1227                    None,
1228                )
1229                .await
1230            }
1231            AlterSubscriptionOperation::SwapRenameSubscription {
1232                target_subscription,
1233            } => {
1234                alter_swap_rename::handle_swap_rename(
1235                    handler_args,
1236                    name,
1237                    target_subscription,
1238                    StatementType::ALTER_SUBSCRIPTION,
1239                )
1240                .await
1241            }
1242        },
1243        Statement::AlterSource { name, operation } => match operation {
1244            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1245                alter_source_props::handle_alter_source_connector_props(
1246                    handler_args,
1247                    name,
1248                    alter_props,
1249                )
1250                .await
1251            }
1252            AlterSourceOperation::RenameSource { source_name } => {
1253                alter_rename::handle_rename_source(handler_args, name, source_name).await
1254            }
1255            AlterSourceOperation::AddColumn { .. } => {
1256                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1257            }
1258            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1259                alter_owner::handle_alter_owner(
1260                    handler_args,
1261                    name,
1262                    new_owner_name,
1263                    StatementType::ALTER_SOURCE,
1264                    None,
1265                )
1266                .await
1267            }
1268            AlterSourceOperation::SetSchema { new_schema_name } => {
1269                alter_set_schema::handle_alter_set_schema(
1270                    handler_args,
1271                    name,
1272                    new_schema_name,
1273                    StatementType::ALTER_SOURCE,
1274                    None,
1275                )
1276                .await
1277            }
1278            AlterSourceOperation::FormatEncode { format_encode } => {
1279                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1280                    .await
1281            }
1282            AlterSourceOperation::RefreshSchema => {
1283                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1284            }
1285            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1286                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1287                    handler_args,
1288                    PbThrottleTarget::Source,
1289                    risingwave_pb::common::PbThrottleType::Source,
1290                    name,
1291                    rate_limit,
1292                )
1293                .await
1294            }
1295            AlterSourceOperation::SwapRenameSource { target_source } => {
1296                alter_swap_rename::handle_swap_rename(
1297                    handler_args,
1298                    name,
1299                    target_source,
1300                    StatementType::ALTER_SOURCE,
1301                )
1302                .await
1303            }
1304            AlterSourceOperation::SetParallelism {
1305                parallelism,
1306                deferred,
1307            } => {
1308                alter_parallelism::handle_alter_parallelism(
1309                    handler_args,
1310                    name,
1311                    parallelism,
1312                    StatementType::ALTER_SOURCE,
1313                    deferred,
1314                )
1315                .await
1316            }
1317            AlterSourceOperation::SetConfig { entries } => {
1318                alter_streaming_config::handle_alter_streaming_set_config(
1319                    handler_args,
1320                    name,
1321                    entries,
1322                    StatementType::ALTER_SOURCE,
1323                )
1324                .await
1325            }
1326            AlterSourceOperation::ResetConfig { keys } => {
1327                alter_streaming_config::handle_alter_streaming_reset_config(
1328                    handler_args,
1329                    name,
1330                    keys,
1331                    StatementType::ALTER_SOURCE,
1332                )
1333                .await
1334            }
1335            AlterSourceOperation::ResetSource => {
1336                reset_source::handle_reset_source(handler_args, name).await
1337            }
1338        },
1339        Statement::AlterFunction {
1340            name,
1341            args,
1342            operation,
1343        } => match operation {
1344            AlterFunctionOperation::SetSchema { new_schema_name } => {
1345                alter_set_schema::handle_alter_set_schema(
1346                    handler_args,
1347                    name,
1348                    new_schema_name,
1349                    StatementType::ALTER_FUNCTION,
1350                    args,
1351                )
1352                .await
1353            }
1354            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1355                alter_owner::handle_alter_owner(
1356                    handler_args,
1357                    name,
1358                    new_owner_name,
1359                    StatementType::ALTER_FUNCTION,
1360                    args,
1361                )
1362                .await
1363            }
1364        },
1365        Statement::AlterConnection { name, operation } => match operation {
1366            AlterConnectionOperation::SetSchema { new_schema_name } => {
1367                alter_set_schema::handle_alter_set_schema(
1368                    handler_args,
1369                    name,
1370                    new_schema_name,
1371                    StatementType::ALTER_CONNECTION,
1372                    None,
1373                )
1374                .await
1375            }
1376            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1377                alter_owner::handle_alter_owner(
1378                    handler_args,
1379                    name,
1380                    new_owner_name,
1381                    StatementType::ALTER_CONNECTION,
1382                    None,
1383                )
1384                .await
1385            }
1386            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1387                alter_connection_props::handle_alter_connection_connector_props(
1388                    handler_args,
1389                    name,
1390                    alter_props,
1391                )
1392                .await
1393            }
1394        },
1395        Statement::AlterSystem { param, value } => {
1396            alter_system::handle_alter_system(handler_args, param, value).await
1397        }
1398        Statement::AlterSecret { name, operation } => match operation {
1399            AlterSecretOperation::ChangeCredential {
1400                with_options,
1401                new_credential,
1402            } => {
1403                alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1404                    .await
1405            }
1406            AlterSecretOperation::ChangeOwner { new_owner_name } => {
1407                alter_owner::handle_alter_owner(
1408                    handler_args,
1409                    name,
1410                    new_owner_name,
1411                    StatementType::ALTER_SECRET,
1412                    None,
1413                )
1414                .await
1415            }
1416        },
1417        Statement::AlterFragment {
1418            fragment_ids,
1419            operation,
1420        } => match operation {
1421            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1422                let [fragment_id] = fragment_ids.as_slice() else {
1423                    return Err(ErrorCode::InvalidInputSyntax(
1424                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1425                            .to_owned(),
1426                    )
1427                    .into());
1428                };
1429                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1430                    &handler_args.session,
1431                    PbThrottleTarget::Fragment,
1432                    risingwave_pb::common::PbThrottleType::Backfill,
1433                    *fragment_id,
1434                    rate_limit,
1435                    StatementType::SET_VARIABLE,
1436                )
1437                .await
1438            }
1439            AlterFragmentOperation::SetParallelism { parallelism } => {
1440                alter_parallelism::handle_alter_fragment_parallelism(
1441                    handler_args,
1442                    fragment_ids.into_iter().map_into().collect(),
1443                    parallelism,
1444                )
1445                .await
1446            }
1447        },
1448        Statement::AlterDefaultPrivileges { .. } => {
1449            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1450        }
1451        Statement::StartTransaction { modes } => {
1452            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1453        }
1454        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1455        Statement::Commit { chain } => {
1456            transaction::handle_commit(handler_args, COMMIT, chain).await
1457        }
1458        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1459        Statement::Rollback { chain } => {
1460            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1461        }
1462        Statement::SetTransaction {
1463            modes,
1464            snapshot,
1465            session,
1466        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1467        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1468        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1469        Statement::Comment {
1470            object_type,
1471            object_name,
1472            comment,
1473        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1474        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1475        Statement::Prepare {
1476            name,
1477            data_types,
1478            statement,
1479        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1480        Statement::Deallocate { name, prepare } => {
1481            prepared_statement::handle_deallocate(name, prepare).await
1482        }
1483        Statement::Vacuum { object_name, full } => {
1484            vacuum::handle_vacuum(handler_args, object_name, full).await
1485        }
1486        Statement::Refresh { table_name } => {
1487            refresh::handle_refresh(handler_args, table_name).await
1488        }
1489        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1490    }
1491}
1492
1493fn check_ban_ddl_for_iceberg_engine_table(
1494    session: Arc<SessionImpl>,
1495    stmt: &Statement,
1496) -> Result<()> {
1497    match stmt {
1498        Statement::AlterTable {
1499            name,
1500            operation:
1501                operation @ (AlterTableOperation::AddColumn { .. }
1502                | AlterTableOperation::DropColumn { .. }),
1503        } => {
1504            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1505            if table.is_iceberg_engine_table() {
1506                bail!(
1507                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1508                    operation,
1509                    schema_name,
1510                    name
1511                );
1512            }
1513        }
1514
1515        Statement::AlterTable {
1516            name,
1517            operation: AlterTableOperation::RenameTable { .. },
1518        } => {
1519            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1520            if table.is_iceberg_engine_table() {
1521                bail!(
1522                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1523                    schema_name,
1524                    name
1525                );
1526            }
1527        }
1528
1529        Statement::AlterTable {
1530            name,
1531            operation: AlterTableOperation::SetParallelism { .. },
1532        } => {
1533            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1534            if table.is_iceberg_engine_table() {
1535                bail!(
1536                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1537                    schema_name,
1538                    name
1539                );
1540            }
1541        }
1542
1543        Statement::AlterTable {
1544            name,
1545            operation: AlterTableOperation::SetSchema { .. },
1546        } => {
1547            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1548            if table.is_iceberg_engine_table() {
1549                bail!(
1550                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1551                    schema_name,
1552                    name
1553                );
1554            }
1555        }
1556
1557        Statement::AlterTable {
1558            name,
1559            operation: AlterTableOperation::RefreshSchema,
1560        } => {
1561            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1562            if table.is_iceberg_engine_table() {
1563                bail!(
1564                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1565                    schema_name,
1566                    name
1567                );
1568            }
1569        }
1570
1571        Statement::AlterTable {
1572            name,
1573            operation: AlterTableOperation::SetSourceRateLimit { .. },
1574        } => {
1575            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1576            if table.is_iceberg_engine_table() {
1577                bail!(
1578                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1579                    schema_name,
1580                    name
1581                );
1582            }
1583        }
1584
1585        _ => {}
1586    }
1587
1588    Ok(())
1589}