risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_subscription_retention;
62mod alter_swap_rename;
63mod alter_system;
64mod alter_table_column;
65pub mod alter_table_drop_connector;
66pub mod alter_table_props;
67mod alter_table_with_sr;
68pub mod alter_user;
69mod alter_utils;
70pub mod cancel_job;
71pub mod close_cursor;
72mod comment;
73pub mod create_aggregate;
74pub mod create_connection;
75mod create_database;
76pub mod create_function;
77pub mod create_index;
78pub mod create_mv;
79pub mod create_schema;
80pub mod create_secret;
81pub mod create_sink;
82pub mod create_source;
83pub mod create_sql_function;
84pub mod create_subscription;
85pub mod create_table;
86pub mod create_table_as;
87pub mod create_user;
88pub mod create_view;
89pub mod declare_cursor;
90pub mod describe;
91pub mod discard;
92mod drop_connection;
93mod drop_database;
94pub mod drop_function;
95mod drop_index;
96pub mod drop_mv;
97mod drop_schema;
98pub mod drop_secret;
99pub mod drop_sink;
100pub mod drop_source;
101pub mod drop_subscription;
102pub mod drop_table;
103pub mod drop_user;
104mod drop_view;
105pub mod explain;
106pub mod explain_analyze_stream_job;
107pub mod extended_handle;
108pub mod fetch_cursor;
109mod flush;
110pub mod handle_privilege;
111pub mod kill_process;
112mod prepared_statement;
113pub mod privilege;
114pub mod query;
115mod recover;
116mod refresh;
117mod reset_source;
118pub mod show;
119mod transaction;
120mod use_db;
121pub mod util;
122pub mod vacuum;
123pub mod variable;
124mod wait;
125
126pub use alter_table_column::{
127    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
128};
129
130/// The [`PgResponseBuilder`] used by RisingWave.
131pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
132
133/// The [`PgResponse`] used by RisingWave.
134pub type RwPgResponse = PgResponse<PgResponseStream>;
135
136#[easy_ext::ext(RwPgResponseBuilderExt)]
137impl RwPgResponseBuilder {
138    /// Append rows to the response.
139    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
140        let fields = T::fields();
141        self.values(
142            rows.into_iter()
143                .map(|row| {
144                    Row::new(
145                        row.into_owned_row()
146                            .into_iter()
147                            .zip_eq_fast(&fields)
148                            .map(|(datum, (_, ty))| {
149                                datum.map(|scalar| {
150                                    scalar.as_scalar_ref_impl().text_format(ty).into()
151                                })
152                            })
153                            .collect(),
154                    )
155                })
156                .collect_vec()
157                .into(),
158            fields_to_descriptors(fields),
159        )
160    }
161}
162
163pub fn fields_to_descriptors(
164    fields: Vec<(&str, risingwave_common::types::DataType)>,
165) -> Vec<PgFieldDescriptor> {
166    fields
167        .iter()
168        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
169        .collect()
170}
171
172pub enum PgResponseStream {
173    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
174    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
175    Rows(BoxStream<'static, RowSetResult>),
176}
177
178impl Stream for PgResponseStream {
179    type Item = std::result::Result<Vec<Row>, BoxedError>;
180
181    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
182        match &mut *self {
183            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
184            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
185            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
186        }
187    }
188}
189
190impl From<Vec<Row>> for PgResponseStream {
191    fn from(rows: Vec<Row>) -> Self {
192        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
193    }
194}
195
196#[derive(Clone)]
197pub struct HandlerArgs {
198    pub session: Arc<SessionImpl>,
199    pub sql: Arc<str>,
200    pub normalized_sql: String,
201    pub with_options: WithOptions,
202}
203
204impl HandlerArgs {
205    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
206        Ok(Self {
207            session,
208            sql,
209            with_options: WithOptions::try_from(stmt)?,
210            normalized_sql: Self::normalize_sql(stmt),
211        })
212    }
213
214    /// Get normalized SQL from the statement.
215    ///
216    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
217    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
218    ///   make it suitable for the `SHOW CREATE` statements.
219    fn normalize_sql(stmt: &Statement) -> String {
220        let mut stmt = stmt.clone();
221        match &mut stmt {
222            Statement::CreateView {
223                or_replace,
224                if_not_exists,
225                ..
226            } => {
227                *or_replace = false;
228                *if_not_exists = false;
229            }
230            Statement::CreateTable {
231                or_replace,
232                if_not_exists,
233                ..
234            } => {
235                *or_replace = false;
236                *if_not_exists = false;
237            }
238            Statement::CreateIndex { if_not_exists, .. } => {
239                *if_not_exists = false;
240            }
241            Statement::CreateSource {
242                stmt: CreateSourceStatement { if_not_exists, .. },
243                ..
244            } => {
245                *if_not_exists = false;
246            }
247            Statement::CreateSink {
248                stmt: CreateSinkStatement { if_not_exists, .. },
249            } => {
250                *if_not_exists = false;
251            }
252            Statement::CreateSubscription {
253                stmt: CreateSubscriptionStatement { if_not_exists, .. },
254            } => {
255                *if_not_exists = false;
256            }
257            Statement::CreateConnection {
258                stmt: CreateConnectionStatement { if_not_exists, .. },
259            } => {
260                *if_not_exists = false;
261            }
262            _ => {}
263        }
264        stmt.to_string()
265    }
266}
267
268pub async fn handle(
269    session: Arc<SessionImpl>,
270    stmt: Statement,
271    sql: Arc<str>,
272    formats: Vec<Format>,
273) -> Result<RwPgResponse> {
274    session.clear_cancel_query_flag();
275    let _guard = session.txn_begin_implicit();
276    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
277
278    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
279
280    match stmt {
281        Statement::Explain {
282            statement,
283            analyze,
284            options,
285        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
286        Statement::ExplainAnalyzeStreamJob {
287            target,
288            duration_secs,
289        } => {
290            explain_analyze_stream_job::handle_explain_analyze_stream_job(
291                handler_args,
292                target,
293                duration_secs,
294            )
295            .await
296        }
297        Statement::CreateSource { stmt } => {
298            create_source::handle_create_source(handler_args, stmt).await
299        }
300        Statement::CreateSink { stmt } => {
301            create_sink::handle_create_sink(handler_args, stmt, false).await
302        }
303        Statement::CreateSubscription { stmt } => {
304            create_subscription::handle_create_subscription(handler_args, stmt).await
305        }
306        Statement::CreateConnection { stmt } => {
307            create_connection::handle_create_connection(handler_args, stmt).await
308        }
309        Statement::CreateSecret { stmt } => {
310            create_secret::handle_create_secret(handler_args, stmt).await
311        }
312        Statement::CreateFunction {
313            or_replace,
314            temporary,
315            if_not_exists,
316            name,
317            args,
318            returns,
319            params,
320            with_options,
321        } => {
322            // For general udf, `language` clause could be ignored
323            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
324            if params.language.is_none()
325                || !params
326                    .language
327                    .as_ref()
328                    .unwrap()
329                    .real_value()
330                    .eq_ignore_ascii_case("sql")
331            {
332                create_function::handle_create_function(
333                    handler_args,
334                    or_replace,
335                    temporary,
336                    if_not_exists,
337                    name,
338                    args,
339                    returns,
340                    params,
341                    with_options,
342                )
343                .await
344            } else {
345                create_sql_function::handle_create_sql_function(
346                    handler_args,
347                    or_replace,
348                    temporary,
349                    if_not_exists,
350                    name,
351                    args,
352                    returns,
353                    params,
354                )
355                .await
356            }
357        }
358        Statement::CreateAggregate {
359            or_replace,
360            if_not_exists,
361            name,
362            args,
363            returns,
364            params,
365            ..
366        } => {
367            create_aggregate::handle_create_aggregate(
368                handler_args,
369                or_replace,
370                if_not_exists,
371                name,
372                args,
373                returns,
374                params,
375            )
376            .await
377        }
378        Statement::CreateTable {
379            name,
380            columns,
381            wildcard_idx,
382            constraints,
383            query,
384            with_options: _, // It is put in OptimizerContext
385            // Not supported things
386            or_replace,
387            temporary,
388            if_not_exists,
389            format_encode,
390            source_watermarks,
391            append_only,
392            on_conflict,
393            with_version_columns,
394            cdc_table_info,
395            include_column_options,
396            webhook_info,
397            engine,
398        } => {
399            if or_replace {
400                bail_not_implemented!("CREATE OR REPLACE TABLE");
401            }
402            if temporary {
403                bail_not_implemented!("CREATE TEMPORARY TABLE");
404            }
405            if let Some(query) = query {
406                return create_table_as::handle_create_as(
407                    handler_args,
408                    name,
409                    if_not_exists,
410                    query,
411                    columns,
412                    append_only,
413                    on_conflict,
414                    with_version_columns
415                        .iter()
416                        .map(|col| col.real_value())
417                        .collect(),
418                    engine,
419                )
420                .await;
421            }
422            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
423            create_table::handle_create_table(
424                handler_args,
425                name,
426                columns,
427                wildcard_idx,
428                constraints,
429                if_not_exists,
430                format_encode,
431                source_watermarks,
432                append_only,
433                on_conflict,
434                with_version_columns
435                    .iter()
436                    .map(|col| col.real_value())
437                    .collect(),
438                cdc_table_info,
439                include_column_options,
440                webhook_info,
441                engine,
442            )
443            .await
444        }
445        Statement::CreateDatabase {
446            db_name,
447            if_not_exists,
448            owner,
449            resource_group,
450            barrier_interval_ms,
451            checkpoint_frequency,
452        } => {
453            create_database::handle_create_database(
454                handler_args,
455                db_name,
456                if_not_exists,
457                owner,
458                resource_group,
459                barrier_interval_ms,
460                checkpoint_frequency,
461            )
462            .await
463        }
464        Statement::CreateSchema {
465            schema_name,
466            if_not_exists,
467            owner,
468        } => {
469            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
470                .await
471        }
472        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
473        Statement::DeclareCursor { stmt } => {
474            declare_cursor::handle_declare_cursor(handler_args, stmt).await
475        }
476        Statement::FetchCursor { stmt } => {
477            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
478        }
479        Statement::CloseCursor { stmt } => {
480            close_cursor::handle_close_cursor(handler_args, stmt).await
481        }
482        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
483        Statement::Grant { .. } => {
484            handle_privilege::handle_grant_privilege(handler_args, stmt).await
485        }
486        Statement::Revoke { .. } => {
487            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
488        }
489        Statement::Describe { name, kind } => match kind {
490            DescribeKind::Fragments => {
491                describe::handle_describe_fragments(handler_args, name).await
492            }
493            DescribeKind::Plain => describe::handle_describe(handler_args, name),
494        },
495        Statement::DescribeFragment { fragment_id } => {
496            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
497        }
498        Statement::Discard(..) => discard::handle_discard(handler_args),
499        Statement::ShowObjects {
500            object: show_object,
501            filter,
502        } => show::handle_show_object(handler_args, show_object, filter).await,
503        Statement::ShowCreateObject { create_type, name } => {
504            show::handle_show_create_object(handler_args, create_type, name)
505        }
506        Statement::ShowTransactionIsolationLevel => {
507            transaction::handle_show_isolation_level(handler_args)
508        }
509        Statement::Drop(DropStatement {
510            object_type,
511            object_name,
512            if_exists,
513            drop_mode,
514        }) => {
515            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
516                match object_type {
517                    ObjectType::MaterializedView
518                    | ObjectType::View
519                    | ObjectType::Sink
520                    | ObjectType::Source
521                    | ObjectType::Subscription
522                    | ObjectType::Index
523                    | ObjectType::Table
524                    | ObjectType::Schema
525                    | ObjectType::Connection
526                    | ObjectType::Secret => true,
527                    ObjectType::Database | ObjectType::User => {
528                        bail_not_implemented!("DROP CASCADE");
529                    }
530                }
531            } else {
532                false
533            };
534            match object_type {
535                ObjectType::Table => {
536                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
537                        .await
538                }
539                ObjectType::MaterializedView => {
540                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
541                }
542                ObjectType::Index => {
543                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
544                        .await
545                }
546                ObjectType::Source => {
547                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
548                        .await
549                }
550                ObjectType::Sink => {
551                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
552                }
553                ObjectType::Subscription => {
554                    drop_subscription::handle_drop_subscription(
555                        handler_args,
556                        object_name,
557                        if_exists,
558                        cascade,
559                    )
560                    .await
561                }
562                ObjectType::Database => {
563                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
564                }
565                ObjectType::Schema => {
566                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
567                        .await
568                }
569                ObjectType::User => {
570                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
571                }
572                ObjectType::View => {
573                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
574                }
575                ObjectType::Connection => {
576                    drop_connection::handle_drop_connection(
577                        handler_args,
578                        object_name,
579                        if_exists,
580                        cascade,
581                    )
582                    .await
583                }
584                ObjectType::Secret => {
585                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
586                        .await
587                }
588            }
589        }
590        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
591        Statement::DropFunction {
592            if_exists,
593            func_desc,
594            option,
595        } => {
596            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
597                .await
598        }
599        Statement::DropAggregate {
600            if_exists,
601            func_desc,
602            option,
603        } => {
604            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
605                .await
606        }
607        Statement::Query(_)
608        | Statement::Insert { .. }
609        | Statement::Delete { .. }
610        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
611        Statement::Copy {
612            entity: CopyEntity::Query(query),
613            target: CopyTarget::Stdout,
614        } => {
615            let response =
616                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
617                    .await?;
618            Ok(response.into_copy_query_to_stdout())
619        }
620        Statement::CreateView {
621            materialized,
622            if_not_exists,
623            name,
624            columns,
625            query,
626            with_options: _, // It is put in OptimizerContext
627            or_replace,      // not supported
628            emit_mode,
629        } => {
630            if or_replace {
631                bail_not_implemented!("CREATE OR REPLACE VIEW");
632            }
633            if materialized {
634                create_mv::handle_create_mv(
635                    handler_args,
636                    if_not_exists,
637                    name,
638                    *query,
639                    columns,
640                    emit_mode,
641                )
642                .await
643            } else {
644                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
645                    .await
646            }
647        }
648        Statement::Flush => flush::handle_flush(handler_args).await,
649        Statement::Wait => wait::handle_wait(handler_args).await,
650        Statement::Recover => recover::handle_recover(handler_args).await,
651        Statement::SetVariable {
652            local: _,
653            variable,
654            value,
655        } => {
656            // special handle for `use database`
657            if variable.real_value().eq_ignore_ascii_case("database") {
658                let x = variable::set_var_to_param_str(&value);
659                let res = use_db::handle_use_db(
660                    handler_args,
661                    ObjectName::from(vec![Ident::from_real_value(
662                        x.as_deref().unwrap_or("default"),
663                    )]),
664                )?;
665                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
666                for notice in res.notices() {
667                    builder = builder.notice(notice);
668                }
669                return Ok(builder.into());
670            }
671            variable::handle_set(handler_args, variable, value)
672        }
673        Statement::SetTimeZone { local: _, value } => {
674            variable::handle_set_time_zone(handler_args, value)
675        }
676        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
677        Statement::CreateIndex {
678            name,
679            table_name,
680            method,
681            columns,
682            include,
683            distributed_by,
684            unique,
685            if_not_exists,
686            with_properties: _,
687        } => {
688            if unique {
689                bail_not_implemented!("create unique index");
690            }
691
692            create_index::handle_create_index(
693                handler_args,
694                if_not_exists,
695                name,
696                table_name,
697                method,
698                columns.clone(),
699                include,
700                distributed_by,
701            )
702            .await
703        }
704        Statement::AlterDatabase { name, operation } => match operation {
705            AlterDatabaseOperation::RenameDatabase { database_name } => {
706                alter_rename::handle_rename_database(handler_args, name, database_name).await
707            }
708            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
709                alter_owner::handle_alter_owner(
710                    handler_args,
711                    name,
712                    new_owner_name,
713                    StatementType::ALTER_DATABASE,
714                    None,
715                )
716                .await
717            }
718            AlterDatabaseOperation::SetParam(config_param) => {
719                let ConfigParam { param, value } = config_param;
720
721                let database_param = match param.real_value().to_uppercase().as_str() {
722                    "BARRIER_INTERVAL_MS" => {
723                        let barrier_interval_ms = match value {
724                            SetVariableValue::Default => None,
725                            SetVariableValue::Single(SetVariableValueSingle::Literal(
726                                Value::Number(num),
727                            )) => {
728                                let num = num.parse::<u32>().map_err(|e| {
729                                    ErrorCode::InvalidInputSyntax(format!(
730                                        "barrier_interval_ms must be a u32 integer: {}",
731                                        e.as_report()
732                                    ))
733                                })?;
734                                Some(num)
735                            }
736                            _ => {
737                                return Err(ErrorCode::InvalidInputSyntax(
738                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
739                                        .to_owned(),
740                                )
741                                .into());
742                            }
743                        };
744                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
745                    }
746                    "CHECKPOINT_FREQUENCY" => {
747                        let checkpoint_frequency = match value {
748                            SetVariableValue::Default => None,
749                            SetVariableValue::Single(SetVariableValueSingle::Literal(
750                                Value::Number(num),
751                            )) => {
752                                let num = num.parse::<u64>().map_err(|e| {
753                                    ErrorCode::InvalidInputSyntax(format!(
754                                        "checkpoint_frequency must be a u64 integer: {}",
755                                        e.as_report()
756                                    ))
757                                })?;
758                                Some(num)
759                            }
760                            _ => {
761                                return Err(ErrorCode::InvalidInputSyntax(
762                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
763                                        .to_owned(),
764                                )
765                                .into());
766                            }
767                        };
768                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
769                    }
770                    _ => {
771                        return Err(ErrorCode::InvalidInputSyntax(format!(
772                            "Unsupported database config parameter: {}",
773                            param.real_value()
774                        ))
775                        .into());
776                    }
777                };
778
779                alter_database_param::handle_alter_database_param(
780                    handler_args,
781                    name,
782                    database_param,
783                )
784                .await
785            }
786        },
787        Statement::AlterSchema { name, operation } => match operation {
788            AlterSchemaOperation::RenameSchema { schema_name } => {
789                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
790            }
791            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
792                alter_owner::handle_alter_owner(
793                    handler_args,
794                    name,
795                    new_owner_name,
796                    StatementType::ALTER_SCHEMA,
797                    None,
798                )
799                .await
800            }
801            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
802                alter_swap_rename::handle_swap_rename(
803                    handler_args,
804                    name,
805                    target_schema,
806                    StatementType::ALTER_SCHEMA,
807                )
808                .await
809            }
810        },
811        Statement::AlterTable { name, operation } => match operation {
812            AlterTableOperation::AddColumn { .. }
813            | AlterTableOperation::DropColumn { .. }
814            | AlterTableOperation::AlterColumn { .. } => {
815                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
816            }
817            AlterTableOperation::RenameTable { table_name } => {
818                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
819                    .await
820            }
821            AlterTableOperation::ChangeOwner { new_owner_name } => {
822                alter_owner::handle_alter_owner(
823                    handler_args,
824                    name,
825                    new_owner_name,
826                    StatementType::ALTER_TABLE,
827                    None,
828                )
829                .await
830            }
831            AlterTableOperation::SetParallelism {
832                parallelism,
833                deferred,
834            } => {
835                alter_parallelism::handle_alter_parallelism(
836                    handler_args,
837                    name,
838                    parallelism,
839                    StatementType::ALTER_TABLE,
840                    deferred,
841                )
842                .await
843            }
844            AlterTableOperation::SetBackfillParallelism {
845                parallelism,
846                deferred,
847            } => {
848                alter_parallelism::handle_alter_backfill_parallelism(
849                    handler_args,
850                    name,
851                    parallelism,
852                    StatementType::ALTER_TABLE,
853                    deferred,
854                )
855                .await
856            }
857            AlterTableOperation::SetSchema { new_schema_name } => {
858                alter_set_schema::handle_alter_set_schema(
859                    handler_args,
860                    name,
861                    new_schema_name,
862                    StatementType::ALTER_TABLE,
863                    None,
864                )
865                .await
866            }
867            AlterTableOperation::RefreshSchema => {
868                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
869            }
870            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
871                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
872                    handler_args,
873                    PbThrottleTarget::Table,
874                    risingwave_pb::common::PbThrottleType::Source,
875                    name,
876                    rate_limit,
877                )
878                .await
879            }
880            AlterTableOperation::DropConnector => {
881                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
882                    .await
883            }
884            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
885                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
886                    handler_args,
887                    PbThrottleTarget::Table,
888                    risingwave_pb::common::PbThrottleType::Dml,
889                    name,
890                    rate_limit,
891                )
892                .await
893            }
894            AlterTableOperation::SetConfig { entries } => {
895                alter_streaming_config::handle_alter_streaming_set_config(
896                    handler_args,
897                    name,
898                    entries,
899                    StatementType::ALTER_TABLE,
900                )
901                .await
902            }
903            AlterTableOperation::ResetConfig { keys } => {
904                alter_streaming_config::handle_alter_streaming_reset_config(
905                    handler_args,
906                    name,
907                    keys,
908                    StatementType::ALTER_TABLE,
909                )
910                .await
911            }
912            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
913                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
914                    handler_args,
915                    PbThrottleTarget::Table,
916                    risingwave_pb::common::PbThrottleType::Backfill,
917                    name,
918                    rate_limit,
919                )
920                .await
921            }
922            AlterTableOperation::SwapRenameTable { target_table } => {
923                alter_swap_rename::handle_swap_rename(
924                    handler_args,
925                    name,
926                    target_table,
927                    StatementType::ALTER_TABLE,
928                )
929                .await
930            }
931            AlterTableOperation::AlterConnectorProps { alter_props } => {
932                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
933            }
934            AlterTableOperation::AddConstraint { .. }
935            | AlterTableOperation::DropConstraint { .. }
936            | AlterTableOperation::RenameColumn { .. }
937            | AlterTableOperation::ChangeColumn { .. }
938            | AlterTableOperation::RenameConstraint { .. } => {
939                bail_not_implemented!(
940                    "Unhandled statement: {}",
941                    Statement::AlterTable { name, operation }
942                )
943            }
944        },
945        Statement::AlterIndex { name, operation } => match operation {
946            AlterIndexOperation::RenameIndex { index_name } => {
947                alter_rename::handle_rename_index(handler_args, name, index_name).await
948            }
949            AlterIndexOperation::SetParallelism {
950                parallelism,
951                deferred,
952            } => {
953                alter_parallelism::handle_alter_parallelism(
954                    handler_args,
955                    name,
956                    parallelism,
957                    StatementType::ALTER_INDEX,
958                    deferred,
959                )
960                .await
961            }
962            AlterIndexOperation::SetBackfillParallelism {
963                parallelism,
964                deferred,
965            } => {
966                alter_parallelism::handle_alter_backfill_parallelism(
967                    handler_args,
968                    name,
969                    parallelism,
970                    StatementType::ALTER_INDEX,
971                    deferred,
972                )
973                .await
974            }
975            AlterIndexOperation::SetConfig { entries } => {
976                alter_streaming_config::handle_alter_streaming_set_config(
977                    handler_args,
978                    name,
979                    entries,
980                    StatementType::ALTER_INDEX,
981                )
982                .await
983            }
984            AlterIndexOperation::ResetConfig { keys } => {
985                alter_streaming_config::handle_alter_streaming_reset_config(
986                    handler_args,
987                    name,
988                    keys,
989                    StatementType::ALTER_INDEX,
990                )
991                .await
992            }
993        },
994        Statement::AlterView {
995            materialized,
996            name,
997            operation,
998        } => {
999            let statement_type = if materialized {
1000                StatementType::ALTER_MATERIALIZED_VIEW
1001            } else {
1002                StatementType::ALTER_VIEW
1003            };
1004            match operation {
1005                AlterViewOperation::RenameView { view_name } => {
1006                    if materialized {
1007                        alter_rename::handle_rename_table(
1008                            handler_args,
1009                            TableType::MaterializedView,
1010                            name,
1011                            view_name,
1012                        )
1013                        .await
1014                    } else {
1015                        alter_rename::handle_rename_view(handler_args, name, view_name).await
1016                    }
1017                }
1018                AlterViewOperation::SetParallelism {
1019                    parallelism,
1020                    deferred,
1021                } => {
1022                    if !materialized {
1023                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1024                    }
1025                    alter_parallelism::handle_alter_parallelism(
1026                        handler_args,
1027                        name,
1028                        parallelism,
1029                        statement_type,
1030                        deferred,
1031                    )
1032                    .await
1033                }
1034                AlterViewOperation::SetBackfillParallelism {
1035                    parallelism,
1036                    deferred,
1037                } => {
1038                    if !materialized {
1039                        bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1040                    }
1041                    alter_parallelism::handle_alter_backfill_parallelism(
1042                        handler_args,
1043                        name,
1044                        parallelism,
1045                        statement_type,
1046                        deferred,
1047                    )
1048                    .await
1049                }
1050                AlterViewOperation::SetResourceGroup {
1051                    resource_group,
1052                    deferred,
1053                } => {
1054                    if !materialized {
1055                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1056                    }
1057                    alter_resource_group::handle_alter_resource_group(
1058                        handler_args,
1059                        name,
1060                        resource_group,
1061                        statement_type,
1062                        deferred,
1063                    )
1064                    .await
1065                }
1066                AlterViewOperation::ChangeOwner { new_owner_name } => {
1067                    alter_owner::handle_alter_owner(
1068                        handler_args,
1069                        name,
1070                        new_owner_name,
1071                        statement_type,
1072                        None,
1073                    )
1074                    .await
1075                }
1076                AlterViewOperation::SetSchema { new_schema_name } => {
1077                    alter_set_schema::handle_alter_set_schema(
1078                        handler_args,
1079                        name,
1080                        new_schema_name,
1081                        statement_type,
1082                        None,
1083                    )
1084                    .await
1085                }
1086                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1087                    if !materialized {
1088                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1089                    }
1090                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1091                        handler_args,
1092                        PbThrottleTarget::Mv,
1093                        risingwave_pb::common::PbThrottleType::Backfill,
1094                        name,
1095                        rate_limit,
1096                    )
1097                    .await
1098                }
1099                AlterViewOperation::SwapRenameView { target_view } => {
1100                    alter_swap_rename::handle_swap_rename(
1101                        handler_args,
1102                        name,
1103                        target_view,
1104                        statement_type,
1105                    )
1106                    .await
1107                }
1108                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1109                    if !materialized {
1110                        bail!(
1111                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1112                        );
1113                    }
1114                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1115                }
1116                AlterViewOperation::AsQuery { query } => {
1117                    if !materialized {
1118                        bail_not_implemented!("ALTER VIEW AS QUERY");
1119                    }
1120                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1121                    if !cfg!(debug_assertions) {
1122                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1123                    }
1124                    alter_mv::handle_alter_mv(handler_args, name, query).await
1125                }
1126                AlterViewOperation::SetConfig { entries } => {
1127                    if !materialized {
1128                        bail!("SET CONFIG is only supported for materialized views");
1129                    }
1130                    alter_streaming_config::handle_alter_streaming_set_config(
1131                        handler_args,
1132                        name,
1133                        entries,
1134                        statement_type,
1135                    )
1136                    .await
1137                }
1138                AlterViewOperation::ResetConfig { keys } => {
1139                    if !materialized {
1140                        bail!("RESET CONFIG is only supported for materialized views");
1141                    }
1142                    alter_streaming_config::handle_alter_streaming_reset_config(
1143                        handler_args,
1144                        name,
1145                        keys,
1146                        statement_type,
1147                    )
1148                    .await
1149                }
1150            }
1151        }
1152
1153        Statement::AlterSink { name, operation } => match operation {
1154            AlterSinkOperation::AlterConnectorProps {
1155                alter_props: changed_props,
1156            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1157            AlterSinkOperation::RenameSink { sink_name } => {
1158                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1159            }
1160            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1161                alter_owner::handle_alter_owner(
1162                    handler_args,
1163                    name,
1164                    new_owner_name,
1165                    StatementType::ALTER_SINK,
1166                    None,
1167                )
1168                .await
1169            }
1170            AlterSinkOperation::SetSchema { new_schema_name } => {
1171                alter_set_schema::handle_alter_set_schema(
1172                    handler_args,
1173                    name,
1174                    new_schema_name,
1175                    StatementType::ALTER_SINK,
1176                    None,
1177                )
1178                .await
1179            }
1180            AlterSinkOperation::SetParallelism {
1181                parallelism,
1182                deferred,
1183            } => {
1184                alter_parallelism::handle_alter_parallelism(
1185                    handler_args,
1186                    name,
1187                    parallelism,
1188                    StatementType::ALTER_SINK,
1189                    deferred,
1190                )
1191                .await
1192            }
1193            AlterSinkOperation::SetBackfillParallelism {
1194                parallelism,
1195                deferred,
1196            } => {
1197                alter_parallelism::handle_alter_backfill_parallelism(
1198                    handler_args,
1199                    name,
1200                    parallelism,
1201                    StatementType::ALTER_SINK,
1202                    deferred,
1203                )
1204                .await
1205            }
1206            AlterSinkOperation::SetConfig { entries } => {
1207                alter_streaming_config::handle_alter_streaming_set_config(
1208                    handler_args,
1209                    name,
1210                    entries,
1211                    StatementType::ALTER_SINK,
1212                )
1213                .await
1214            }
1215            AlterSinkOperation::ResetConfig { keys } => {
1216                alter_streaming_config::handle_alter_streaming_reset_config(
1217                    handler_args,
1218                    name,
1219                    keys,
1220                    StatementType::ALTER_SINK,
1221                )
1222                .await
1223            }
1224            AlterSinkOperation::SwapRenameSink { target_sink } => {
1225                alter_swap_rename::handle_swap_rename(
1226                    handler_args,
1227                    name,
1228                    target_sink,
1229                    StatementType::ALTER_SINK,
1230                )
1231                .await
1232            }
1233            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1234                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1235                    handler_args,
1236                    PbThrottleTarget::Sink,
1237                    risingwave_pb::common::PbThrottleType::Sink,
1238                    name,
1239                    rate_limit,
1240                )
1241                .await
1242            }
1243            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1244                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1245                    handler_args,
1246                    PbThrottleTarget::Sink,
1247                    risingwave_pb::common::PbThrottleType::Backfill,
1248                    name,
1249                    rate_limit,
1250                )
1251                .await
1252            }
1253            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1254                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1255                    handler_args,
1256                    name,
1257                    enable,
1258                )
1259                .await
1260            }
1261        },
1262        Statement::AlterSubscription { name, operation } => match operation {
1263            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1264                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1265                    .await
1266            }
1267            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1268                alter_owner::handle_alter_owner(
1269                    handler_args,
1270                    name,
1271                    new_owner_name,
1272                    StatementType::ALTER_SUBSCRIPTION,
1273                    None,
1274                )
1275                .await
1276            }
1277            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1278                alter_set_schema::handle_alter_set_schema(
1279                    handler_args,
1280                    name,
1281                    new_schema_name,
1282                    StatementType::ALTER_SUBSCRIPTION,
1283                    None,
1284                )
1285                .await
1286            }
1287            AlterSubscriptionOperation::SetRetention { retention } => {
1288                alter_subscription_retention::handle_alter_subscription_retention(
1289                    handler_args,
1290                    name,
1291                    retention,
1292                )
1293                .await
1294            }
1295            AlterSubscriptionOperation::SwapRenameSubscription {
1296                target_subscription,
1297            } => {
1298                alter_swap_rename::handle_swap_rename(
1299                    handler_args,
1300                    name,
1301                    target_subscription,
1302                    StatementType::ALTER_SUBSCRIPTION,
1303                )
1304                .await
1305            }
1306        },
1307        Statement::AlterSource { name, operation } => match operation {
1308            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1309                alter_source_props::handle_alter_source_connector_props(
1310                    handler_args,
1311                    name,
1312                    alter_props,
1313                )
1314                .await
1315            }
1316            AlterSourceOperation::RenameSource { source_name } => {
1317                alter_rename::handle_rename_source(handler_args, name, source_name).await
1318            }
1319            AlterSourceOperation::AddColumn { .. } => {
1320                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1321            }
1322            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1323                alter_owner::handle_alter_owner(
1324                    handler_args,
1325                    name,
1326                    new_owner_name,
1327                    StatementType::ALTER_SOURCE,
1328                    None,
1329                )
1330                .await
1331            }
1332            AlterSourceOperation::SetSchema { new_schema_name } => {
1333                alter_set_schema::handle_alter_set_schema(
1334                    handler_args,
1335                    name,
1336                    new_schema_name,
1337                    StatementType::ALTER_SOURCE,
1338                    None,
1339                )
1340                .await
1341            }
1342            AlterSourceOperation::FormatEncode { format_encode } => {
1343                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1344                    .await
1345            }
1346            AlterSourceOperation::RefreshSchema => {
1347                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1348            }
1349            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1350                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1351                    handler_args,
1352                    PbThrottleTarget::Source,
1353                    risingwave_pb::common::PbThrottleType::Source,
1354                    name,
1355                    rate_limit,
1356                )
1357                .await
1358            }
1359            AlterSourceOperation::SwapRenameSource { target_source } => {
1360                alter_swap_rename::handle_swap_rename(
1361                    handler_args,
1362                    name,
1363                    target_source,
1364                    StatementType::ALTER_SOURCE,
1365                )
1366                .await
1367            }
1368            AlterSourceOperation::SetParallelism {
1369                parallelism,
1370                deferred,
1371            } => {
1372                alter_parallelism::handle_alter_parallelism(
1373                    handler_args,
1374                    name,
1375                    parallelism,
1376                    StatementType::ALTER_SOURCE,
1377                    deferred,
1378                )
1379                .await
1380            }
1381            AlterSourceOperation::SetBackfillParallelism {
1382                parallelism,
1383                deferred,
1384            } => {
1385                alter_parallelism::handle_alter_backfill_parallelism(
1386                    handler_args,
1387                    name,
1388                    parallelism,
1389                    StatementType::ALTER_SOURCE,
1390                    deferred,
1391                )
1392                .await
1393            }
1394            AlterSourceOperation::SetConfig { entries } => {
1395                alter_streaming_config::handle_alter_streaming_set_config(
1396                    handler_args,
1397                    name,
1398                    entries,
1399                    StatementType::ALTER_SOURCE,
1400                )
1401                .await
1402            }
1403            AlterSourceOperation::ResetConfig { keys } => {
1404                alter_streaming_config::handle_alter_streaming_reset_config(
1405                    handler_args,
1406                    name,
1407                    keys,
1408                    StatementType::ALTER_SOURCE,
1409                )
1410                .await
1411            }
1412            AlterSourceOperation::ResetSource => {
1413                reset_source::handle_reset_source(handler_args, name).await
1414            }
1415        },
1416        Statement::AlterFunction {
1417            name,
1418            args,
1419            operation,
1420        } => match operation {
1421            AlterFunctionOperation::SetSchema { new_schema_name } => {
1422                alter_set_schema::handle_alter_set_schema(
1423                    handler_args,
1424                    name,
1425                    new_schema_name,
1426                    StatementType::ALTER_FUNCTION,
1427                    args,
1428                )
1429                .await
1430            }
1431            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1432                alter_owner::handle_alter_owner(
1433                    handler_args,
1434                    name,
1435                    new_owner_name,
1436                    StatementType::ALTER_FUNCTION,
1437                    args,
1438                )
1439                .await
1440            }
1441        },
1442        Statement::AlterConnection { name, operation } => match operation {
1443            AlterConnectionOperation::SetSchema { new_schema_name } => {
1444                alter_set_schema::handle_alter_set_schema(
1445                    handler_args,
1446                    name,
1447                    new_schema_name,
1448                    StatementType::ALTER_CONNECTION,
1449                    None,
1450                )
1451                .await
1452            }
1453            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1454                alter_owner::handle_alter_owner(
1455                    handler_args,
1456                    name,
1457                    new_owner_name,
1458                    StatementType::ALTER_CONNECTION,
1459                    None,
1460                )
1461                .await
1462            }
1463            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1464                alter_connection_props::handle_alter_connection_connector_props(
1465                    handler_args,
1466                    name,
1467                    alter_props,
1468                )
1469                .await
1470            }
1471        },
1472        Statement::AlterSystem { param, value } => {
1473            alter_system::handle_alter_system(handler_args, param, value).await
1474        }
1475        Statement::AlterSecret { name, operation } => match operation {
1476            AlterSecretOperation::ChangeCredential {
1477                with_options,
1478                new_credential,
1479            } => {
1480                alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1481                    .await
1482            }
1483            AlterSecretOperation::ChangeOwner { new_owner_name } => {
1484                alter_owner::handle_alter_owner(
1485                    handler_args,
1486                    name,
1487                    new_owner_name,
1488                    StatementType::ALTER_SECRET,
1489                    None,
1490                )
1491                .await
1492            }
1493        },
1494        Statement::AlterFragment {
1495            fragment_ids,
1496            operation,
1497        } => match operation {
1498            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1499                let [fragment_id] = fragment_ids.as_slice() else {
1500                    return Err(ErrorCode::InvalidInputSyntax(
1501                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1502                            .to_owned(),
1503                    )
1504                    .into());
1505                };
1506                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1507                    &handler_args.session,
1508                    PbThrottleTarget::Fragment,
1509                    risingwave_pb::common::PbThrottleType::Backfill,
1510                    *fragment_id,
1511                    rate_limit,
1512                    StatementType::SET_VARIABLE,
1513                )
1514                .await
1515            }
1516            AlterFragmentOperation::SetParallelism { parallelism } => {
1517                alter_parallelism::handle_alter_fragment_parallelism(
1518                    handler_args,
1519                    fragment_ids.into_iter().map_into().collect(),
1520                    parallelism,
1521                )
1522                .await
1523            }
1524        },
1525        Statement::AlterDefaultPrivileges { .. } => {
1526            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1527        }
1528        Statement::StartTransaction { modes } => {
1529            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1530        }
1531        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1532        Statement::Commit { chain } => {
1533            transaction::handle_commit(handler_args, COMMIT, chain).await
1534        }
1535        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1536        Statement::Rollback { chain } => {
1537            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1538        }
1539        Statement::SetTransaction {
1540            modes,
1541            snapshot,
1542            session,
1543        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1544        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1545        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1546        Statement::Comment {
1547            object_type,
1548            object_name,
1549            comment,
1550        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1551        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1552        Statement::Prepare {
1553            name,
1554            data_types,
1555            statement,
1556        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1557        Statement::Deallocate { name, prepare } => {
1558            prepared_statement::handle_deallocate(name, prepare).await
1559        }
1560        Statement::Vacuum { object_name, full } => {
1561            vacuum::handle_vacuum(handler_args, object_name, full).await
1562        }
1563        Statement::Refresh { table_name } => {
1564            refresh::handle_refresh(handler_args, table_name).await
1565        }
1566        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1567    }
1568}
1569
1570fn check_ban_ddl_for_iceberg_engine_table(
1571    session: Arc<SessionImpl>,
1572    stmt: &Statement,
1573) -> Result<()> {
1574    match stmt {
1575        Statement::AlterTable {
1576            name,
1577            operation:
1578                operation @ (AlterTableOperation::AddColumn { .. }
1579                | AlterTableOperation::DropColumn { .. }),
1580        } => {
1581            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1582            if table.is_iceberg_engine_table() {
1583                bail!(
1584                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1585                    operation,
1586                    schema_name,
1587                    name
1588                );
1589            }
1590        }
1591
1592        Statement::AlterTable {
1593            name,
1594            operation: AlterTableOperation::RenameTable { .. },
1595        } => {
1596            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1597            if table.is_iceberg_engine_table() {
1598                bail!(
1599                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1600                    schema_name,
1601                    name
1602                );
1603            }
1604        }
1605
1606        Statement::AlterTable {
1607            name,
1608            operation: AlterTableOperation::SetParallelism { .. },
1609        } => {
1610            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1611            if table.is_iceberg_engine_table() {
1612                bail!(
1613                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1614                    schema_name,
1615                    name
1616                );
1617            }
1618        }
1619        Statement::AlterTable {
1620            name,
1621            operation: AlterTableOperation::SetBackfillParallelism { .. },
1622        } => {
1623            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1624            if table.is_iceberg_engine_table() {
1625                bail!(
1626                    "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1627                    schema_name,
1628                    name
1629                );
1630            }
1631        }
1632
1633        Statement::AlterTable {
1634            name,
1635            operation: AlterTableOperation::SetSchema { .. },
1636        } => {
1637            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1638            if table.is_iceberg_engine_table() {
1639                bail!(
1640                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1641                    schema_name,
1642                    name
1643                );
1644            }
1645        }
1646
1647        Statement::AlterTable {
1648            name,
1649            operation: AlterTableOperation::RefreshSchema,
1650        } => {
1651            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1652            if table.is_iceberg_engine_table() {
1653                bail!(
1654                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1655                    schema_name,
1656                    name
1657                );
1658            }
1659        }
1660
1661        Statement::AlterTable {
1662            name,
1663            operation: AlterTableOperation::SetSourceRateLimit { .. },
1664        } => {
1665            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1666            if table.is_iceberg_engine_table() {
1667                bail!(
1668                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1669                    schema_name,
1670                    name
1671                );
1672            }
1673        }
1674
1675        _ => {}
1676    }
1677
1678    Ok(())
1679}