risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_subscription_retention;
62mod alter_swap_rename;
63mod alter_system;
64mod alter_table_column;
65pub mod alter_table_drop_connector;
66pub mod alter_table_props;
67mod alter_table_with_sr;
68pub mod alter_user;
69mod alter_utils;
70pub mod cancel_job;
71pub mod close_cursor;
72mod comment;
73pub mod create_aggregate;
74pub mod create_connection;
75mod create_database;
76pub mod create_function;
77pub mod create_index;
78pub mod create_mv;
79pub mod create_schema;
80pub mod create_secret;
81pub mod create_sink;
82pub mod create_source;
83pub mod create_sql_function;
84pub mod create_subscription;
85pub mod create_table;
86pub mod create_table_as;
87pub mod create_user;
88pub mod create_view;
89pub mod declare_cursor;
90pub mod describe;
91pub mod discard;
92mod drop_connection;
93mod drop_database;
94pub mod drop_function;
95mod drop_index;
96pub mod drop_mv;
97mod drop_schema;
98pub mod drop_secret;
99pub mod drop_sink;
100pub mod drop_source;
101pub mod drop_subscription;
102pub mod drop_table;
103pub mod drop_user;
104mod drop_view;
105pub mod explain;
106pub mod explain_analyze_stream_job;
107pub mod extended_handle;
108pub mod fetch_cursor;
109mod flush;
110pub mod handle_privilege;
111pub mod kill_process;
112mod prepared_statement;
113pub mod privilege;
114pub mod query;
115mod recover;
116mod refresh;
117mod reset_source;
118pub mod show;
119mod transaction;
120mod use_db;
121pub mod util;
122pub mod vacuum;
123pub mod variable;
124mod wait;
125
126pub use alter_table_column::{
127    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
128};
129
130/// The [`PgResponseBuilder`] used by RisingWave.
131pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
132
133/// The [`PgResponse`] used by RisingWave.
134pub type RwPgResponse = PgResponse<PgResponseStream>;
135
136#[easy_ext::ext(RwPgResponseBuilderExt)]
137impl RwPgResponseBuilder {
138    /// Append rows to the response.
139    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
140        let fields = T::fields();
141        self.values(
142            rows.into_iter()
143                .map(|row| {
144                    Row::new(
145                        row.into_owned_row()
146                            .into_iter()
147                            .zip_eq_fast(&fields)
148                            .map(|(datum, (_, ty))| {
149                                datum.map(|scalar| {
150                                    scalar.as_scalar_ref_impl().text_format(ty).into()
151                                })
152                            })
153                            .collect(),
154                    )
155                })
156                .collect_vec()
157                .into(),
158            fields_to_descriptors(fields),
159        )
160    }
161}
162
163pub fn fields_to_descriptors(
164    fields: Vec<(&str, risingwave_common::types::DataType)>,
165) -> Vec<PgFieldDescriptor> {
166    fields
167        .iter()
168        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
169        .collect()
170}
171
172pub enum PgResponseStream {
173    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
174    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
175    Rows(BoxStream<'static, RowSetResult>),
176}
177
178impl Stream for PgResponseStream {
179    type Item = std::result::Result<Vec<Row>, BoxedError>;
180
181    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
182        match &mut *self {
183            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
184            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
185            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
186        }
187    }
188}
189
190impl From<Vec<Row>> for PgResponseStream {
191    fn from(rows: Vec<Row>) -> Self {
192        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
193    }
194}
195
196#[derive(Clone)]
197pub struct HandlerArgs {
198    pub session: Arc<SessionImpl>,
199    pub sql: Arc<str>,
200    pub normalized_sql: String,
201    pub with_options: WithOptions,
202}
203
204impl HandlerArgs {
205    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
206        Ok(Self {
207            session,
208            sql,
209            with_options: WithOptions::try_from(stmt)?,
210            normalized_sql: Self::normalize_sql(stmt),
211        })
212    }
213
214    /// Get normalized SQL from the statement.
215    ///
216    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
217    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
218    ///   make it suitable for the `SHOW CREATE` statements.
219    fn normalize_sql(stmt: &Statement) -> String {
220        let mut stmt = stmt.clone();
221        match &mut stmt {
222            Statement::CreateView {
223                or_replace,
224                if_not_exists,
225                ..
226            } => {
227                *or_replace = false;
228                *if_not_exists = false;
229            }
230            Statement::CreateTable {
231                or_replace,
232                if_not_exists,
233                ..
234            } => {
235                *or_replace = false;
236                *if_not_exists = false;
237            }
238            Statement::CreateIndex { if_not_exists, .. } => {
239                *if_not_exists = false;
240            }
241            Statement::CreateSource {
242                stmt: CreateSourceStatement { if_not_exists, .. },
243                ..
244            } => {
245                *if_not_exists = false;
246            }
247            Statement::CreateSink {
248                stmt: CreateSinkStatement { if_not_exists, .. },
249            } => {
250                *if_not_exists = false;
251            }
252            Statement::CreateSubscription {
253                stmt: CreateSubscriptionStatement { if_not_exists, .. },
254            } => {
255                *if_not_exists = false;
256            }
257            Statement::CreateConnection {
258                stmt: CreateConnectionStatement { if_not_exists, .. },
259            } => {
260                *if_not_exists = false;
261            }
262            _ => {}
263        }
264        stmt.to_string()
265    }
266}
267
268#[allow(clippy::large_stack_frames)]
269pub async fn handle(
270    session: Arc<SessionImpl>,
271    stmt: Statement,
272    sql: Arc<str>,
273    formats: Vec<Format>,
274) -> Result<RwPgResponse> {
275    session.clear_cancel_query_flag();
276    let _guard = session.txn_begin_implicit();
277    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
278
279    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
280
281    match stmt {
282        Statement::Explain {
283            statement,
284            analyze,
285            options,
286        } => {
287            Box::pin(explain::handle_explain(
288                handler_args,
289                *statement,
290                options,
291                analyze,
292            ))
293            .await
294        }
295        Statement::ExplainAnalyzeStreamJob {
296            target,
297            duration_secs,
298        } => {
299            explain_analyze_stream_job::handle_explain_analyze_stream_job(
300                handler_args,
301                target,
302                duration_secs,
303            )
304            .await
305        }
306        Statement::CreateSource { stmt } => {
307            create_source::handle_create_source(handler_args, stmt).await
308        }
309        Statement::CreateSink { stmt } => {
310            create_sink::handle_create_sink(handler_args, stmt, false).await
311        }
312        Statement::CreateSubscription { stmt } => {
313            create_subscription::handle_create_subscription(handler_args, stmt).await
314        }
315        Statement::CreateConnection { stmt } => {
316            create_connection::handle_create_connection(handler_args, stmt).await
317        }
318        Statement::CreateSecret { stmt } => {
319            create_secret::handle_create_secret(handler_args, stmt).await
320        }
321        Statement::CreateFunction {
322            or_replace,
323            temporary,
324            if_not_exists,
325            name,
326            args,
327            returns,
328            params,
329            with_options,
330        } => {
331            // For general udf, `language` clause could be ignored
332            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
333            if params.language.is_none()
334                || !params
335                    .language
336                    .as_ref()
337                    .unwrap()
338                    .real_value()
339                    .eq_ignore_ascii_case("sql")
340            {
341                create_function::handle_create_function(
342                    handler_args,
343                    or_replace,
344                    temporary,
345                    if_not_exists,
346                    name,
347                    args,
348                    returns,
349                    params,
350                    with_options,
351                )
352                .await
353            } else {
354                create_sql_function::handle_create_sql_function(
355                    handler_args,
356                    or_replace,
357                    temporary,
358                    if_not_exists,
359                    name,
360                    args,
361                    returns,
362                    params,
363                )
364                .await
365            }
366        }
367        Statement::CreateAggregate {
368            or_replace,
369            if_not_exists,
370            name,
371            args,
372            returns,
373            params,
374            ..
375        } => {
376            create_aggregate::handle_create_aggregate(
377                handler_args,
378                or_replace,
379                if_not_exists,
380                name,
381                args,
382                returns,
383                params,
384            )
385            .await
386        }
387        Statement::CreateTable {
388            name,
389            columns,
390            wildcard_idx,
391            constraints,
392            query,
393            with_options: _, // It is put in OptimizerContext
394            // Not supported things
395            or_replace,
396            temporary,
397            if_not_exists,
398            format_encode,
399            source_watermarks,
400            append_only,
401            on_conflict,
402            with_version_columns,
403            cdc_table_info,
404            include_column_options,
405            webhook_info,
406            engine,
407        } => {
408            if or_replace {
409                bail_not_implemented!("CREATE OR REPLACE TABLE");
410            }
411            if temporary {
412                bail_not_implemented!("CREATE TEMPORARY TABLE");
413            }
414            if let Some(query) = query {
415                return create_table_as::handle_create_as(
416                    handler_args,
417                    name,
418                    if_not_exists,
419                    query,
420                    columns,
421                    append_only,
422                    on_conflict,
423                    with_version_columns
424                        .iter()
425                        .map(|col| col.real_value())
426                        .collect(),
427                    engine,
428                )
429                .await;
430            }
431            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
432            Box::pin(create_table::handle_create_table(
433                handler_args,
434                name,
435                columns,
436                wildcard_idx,
437                constraints,
438                if_not_exists,
439                format_encode,
440                source_watermarks,
441                append_only,
442                on_conflict,
443                with_version_columns
444                    .iter()
445                    .map(|col| col.real_value())
446                    .collect(),
447                cdc_table_info,
448                include_column_options,
449                webhook_info,
450                engine,
451            ))
452            .await
453        }
454        Statement::CreateDatabase {
455            db_name,
456            if_not_exists,
457            owner,
458            resource_group,
459            barrier_interval_ms,
460            checkpoint_frequency,
461        } => {
462            create_database::handle_create_database(
463                handler_args,
464                db_name,
465                if_not_exists,
466                owner,
467                resource_group,
468                barrier_interval_ms,
469                checkpoint_frequency,
470            )
471            .await
472        }
473        Statement::CreateSchema {
474            schema_name,
475            if_not_exists,
476            owner,
477        } => {
478            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
479                .await
480        }
481        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
482        Statement::DeclareCursor { stmt } => {
483            declare_cursor::handle_declare_cursor(handler_args, stmt).await
484        }
485        Statement::FetchCursor { stmt } => {
486            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
487        }
488        Statement::CloseCursor { stmt } => {
489            close_cursor::handle_close_cursor(handler_args, stmt).await
490        }
491        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
492        Statement::Grant { .. } => {
493            handle_privilege::handle_grant_privilege(handler_args, stmt).await
494        }
495        Statement::Revoke { .. } => {
496            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
497        }
498        Statement::Describe { name, kind } => match kind {
499            DescribeKind::Fragments => {
500                describe::handle_describe_fragments(handler_args, name).await
501            }
502            DescribeKind::Plain => describe::handle_describe(handler_args, name),
503        },
504        Statement::DescribeFragment { fragment_id } => {
505            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
506        }
507        Statement::Discard(..) => discard::handle_discard(handler_args),
508        Statement::ShowObjects {
509            object: show_object,
510            filter,
511        } => show::handle_show_object(handler_args, show_object, filter).await,
512        Statement::ShowCreateObject { create_type, name } => {
513            show::handle_show_create_object(handler_args, create_type, name)
514        }
515        Statement::ShowTransactionIsolationLevel => {
516            transaction::handle_show_isolation_level(handler_args)
517        }
518        Statement::Drop(DropStatement {
519            object_type,
520            object_name,
521            if_exists,
522            drop_mode,
523        }) => {
524            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
525                match object_type {
526                    ObjectType::MaterializedView
527                    | ObjectType::View
528                    | ObjectType::Sink
529                    | ObjectType::Source
530                    | ObjectType::Subscription
531                    | ObjectType::Index
532                    | ObjectType::Table
533                    | ObjectType::Schema
534                    | ObjectType::Connection
535                    | ObjectType::Secret => true,
536                    ObjectType::Database | ObjectType::User => {
537                        bail_not_implemented!("DROP CASCADE");
538                    }
539                }
540            } else {
541                false
542            };
543            match object_type {
544                ObjectType::Table => {
545                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
546                        .await
547                }
548                ObjectType::MaterializedView => {
549                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
550                }
551                ObjectType::Index => {
552                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
553                        .await
554                }
555                ObjectType::Source => {
556                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
557                        .await
558                }
559                ObjectType::Sink => {
560                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
561                }
562                ObjectType::Subscription => {
563                    drop_subscription::handle_drop_subscription(
564                        handler_args,
565                        object_name,
566                        if_exists,
567                        cascade,
568                    )
569                    .await
570                }
571                ObjectType::Database => {
572                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
573                }
574                ObjectType::Schema => {
575                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
576                        .await
577                }
578                ObjectType::User => {
579                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
580                }
581                ObjectType::View => {
582                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
583                }
584                ObjectType::Connection => {
585                    drop_connection::handle_drop_connection(
586                        handler_args,
587                        object_name,
588                        if_exists,
589                        cascade,
590                    )
591                    .await
592                }
593                ObjectType::Secret => {
594                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
595                        .await
596                }
597            }
598        }
599        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
600        Statement::DropFunction {
601            if_exists,
602            func_desc,
603            option,
604        } => {
605            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
606                .await
607        }
608        Statement::DropAggregate {
609            if_exists,
610            func_desc,
611            option,
612        } => {
613            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
614                .await
615        }
616        Statement::Query(_)
617        | Statement::Insert { .. }
618        | Statement::Delete { .. }
619        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
620        Statement::Copy {
621            entity: CopyEntity::Query(query),
622            target: CopyTarget::Stdout,
623        } => {
624            let response =
625                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
626                    .await?;
627            Ok(response.into_copy_query_to_stdout())
628        }
629        Statement::CreateView {
630            materialized,
631            if_not_exists,
632            name,
633            columns,
634            query,
635            with_options: _, // It is put in OptimizerContext
636            or_replace,      // not supported
637            emit_mode,
638        } => {
639            if or_replace {
640                bail_not_implemented!("CREATE OR REPLACE VIEW");
641            }
642            if materialized {
643                create_mv::handle_create_mv(
644                    handler_args,
645                    if_not_exists,
646                    name,
647                    *query,
648                    columns,
649                    emit_mode,
650                )
651                .await
652            } else {
653                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
654                    .await
655            }
656        }
657        Statement::Flush => flush::handle_flush(handler_args).await,
658        Statement::Wait => wait::handle_wait(handler_args).await,
659        Statement::Recover => recover::handle_recover(handler_args).await,
660        Statement::SetVariable {
661            local: _,
662            variable,
663            value,
664        } => {
665            // special handle for `use database`
666            if variable.real_value().eq_ignore_ascii_case("database") {
667                let x = variable::set_var_to_param_str(&value);
668                let res = use_db::handle_use_db(
669                    handler_args,
670                    ObjectName::from(vec![Ident::from_real_value(
671                        x.as_deref().unwrap_or("default"),
672                    )]),
673                )?;
674                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
675                for notice in res.notices() {
676                    builder = builder.notice(notice);
677                }
678                return Ok(builder.into());
679            }
680            variable::handle_set(handler_args, variable, value)
681        }
682        Statement::SetTimeZone { local: _, value } => {
683            variable::handle_set_time_zone(handler_args, value)
684        }
685        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
686        Statement::CreateIndex {
687            name,
688            table_name,
689            method,
690            columns,
691            include,
692            distributed_by,
693            unique,
694            if_not_exists,
695            with_properties: _,
696        } => {
697            if unique {
698                bail_not_implemented!("create unique index");
699            }
700
701            create_index::handle_create_index(
702                handler_args,
703                if_not_exists,
704                name,
705                table_name,
706                method,
707                columns.clone(),
708                include,
709                distributed_by,
710            )
711            .await
712        }
713        Statement::AlterDatabase { name, operation } => match operation {
714            AlterDatabaseOperation::RenameDatabase { database_name } => {
715                alter_rename::handle_rename_database(handler_args, name, database_name).await
716            }
717            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
718                alter_owner::handle_alter_owner(
719                    handler_args,
720                    name,
721                    new_owner_name,
722                    StatementType::ALTER_DATABASE,
723                    None,
724                )
725                .await
726            }
727            AlterDatabaseOperation::SetParam(config_param) => {
728                let ConfigParam { param, value } = config_param;
729
730                let database_param = match param.real_value().to_uppercase().as_str() {
731                    "BARRIER_INTERVAL_MS" => {
732                        let barrier_interval_ms = match value {
733                            SetVariableValue::Default => None,
734                            SetVariableValue::Single(SetVariableValueSingle::Literal(
735                                Value::Number(num),
736                            )) => {
737                                let num = num.parse::<u32>().map_err(|e| {
738                                    ErrorCode::InvalidInputSyntax(format!(
739                                        "barrier_interval_ms must be a u32 integer: {}",
740                                        e.as_report()
741                                    ))
742                                })?;
743                                Some(num)
744                            }
745                            _ => {
746                                return Err(ErrorCode::InvalidInputSyntax(
747                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
748                                        .to_owned(),
749                                )
750                                .into());
751                            }
752                        };
753                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
754                    }
755                    "CHECKPOINT_FREQUENCY" => {
756                        let checkpoint_frequency = match value {
757                            SetVariableValue::Default => None,
758                            SetVariableValue::Single(SetVariableValueSingle::Literal(
759                                Value::Number(num),
760                            )) => {
761                                let num = num.parse::<u64>().map_err(|e| {
762                                    ErrorCode::InvalidInputSyntax(format!(
763                                        "checkpoint_frequency must be a u64 integer: {}",
764                                        e.as_report()
765                                    ))
766                                })?;
767                                Some(num)
768                            }
769                            _ => {
770                                return Err(ErrorCode::InvalidInputSyntax(
771                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
772                                        .to_owned(),
773                                )
774                                .into());
775                            }
776                        };
777                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
778                    }
779                    _ => {
780                        return Err(ErrorCode::InvalidInputSyntax(format!(
781                            "Unsupported database config parameter: {}",
782                            param.real_value()
783                        ))
784                        .into());
785                    }
786                };
787
788                alter_database_param::handle_alter_database_param(
789                    handler_args,
790                    name,
791                    database_param,
792                )
793                .await
794            }
795        },
796        Statement::AlterSchema { name, operation } => match operation {
797            AlterSchemaOperation::RenameSchema { schema_name } => {
798                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
799            }
800            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
801                alter_owner::handle_alter_owner(
802                    handler_args,
803                    name,
804                    new_owner_name,
805                    StatementType::ALTER_SCHEMA,
806                    None,
807                )
808                .await
809            }
810            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
811                alter_swap_rename::handle_swap_rename(
812                    handler_args,
813                    name,
814                    target_schema,
815                    StatementType::ALTER_SCHEMA,
816                )
817                .await
818            }
819        },
820        Statement::AlterTable { name, operation } => match operation {
821            AlterTableOperation::AddColumn { .. }
822            | AlterTableOperation::DropColumn { .. }
823            | AlterTableOperation::AlterColumn { .. } => {
824                Box::pin(alter_table_column::handle_alter_table_column(
825                    handler_args,
826                    name,
827                    operation,
828                ))
829                .await
830            }
831            AlterTableOperation::RenameTable { table_name } => {
832                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
833                    .await
834            }
835            AlterTableOperation::ChangeOwner { new_owner_name } => {
836                alter_owner::handle_alter_owner(
837                    handler_args,
838                    name,
839                    new_owner_name,
840                    StatementType::ALTER_TABLE,
841                    None,
842                )
843                .await
844            }
845            AlterTableOperation::SetParallelism {
846                parallelism,
847                deferred,
848            } => {
849                alter_parallelism::handle_alter_parallelism(
850                    handler_args,
851                    name,
852                    parallelism,
853                    StatementType::ALTER_TABLE,
854                    deferred,
855                )
856                .await
857            }
858            AlterTableOperation::SetBackfillParallelism {
859                parallelism,
860                deferred,
861            } => {
862                alter_parallelism::handle_alter_backfill_parallelism(
863                    handler_args,
864                    name,
865                    parallelism,
866                    StatementType::ALTER_TABLE,
867                    deferred,
868                )
869                .await
870            }
871            AlterTableOperation::SetSchema { new_schema_name } => {
872                alter_set_schema::handle_alter_set_schema(
873                    handler_args,
874                    name,
875                    new_schema_name,
876                    StatementType::ALTER_TABLE,
877                    None,
878                )
879                .await
880            }
881            AlterTableOperation::RefreshSchema => {
882                Box::pin(alter_table_with_sr::handle_refresh_schema(
883                    handler_args,
884                    name,
885                ))
886                .await
887            }
888            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
889                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
890                    handler_args,
891                    PbThrottleTarget::Table,
892                    risingwave_pb::common::PbThrottleType::Source,
893                    name,
894                    rate_limit,
895                )
896                .await
897            }
898            AlterTableOperation::DropConnector => {
899                Box::pin(
900                    alter_table_drop_connector::handle_alter_table_drop_connector(
901                        handler_args,
902                        name,
903                    ),
904                )
905                .await
906            }
907            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
908                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
909                    handler_args,
910                    PbThrottleTarget::Table,
911                    risingwave_pb::common::PbThrottleType::Dml,
912                    name,
913                    rate_limit,
914                )
915                .await
916            }
917            AlterTableOperation::SetConfig { entries } => {
918                alter_streaming_config::handle_alter_streaming_set_config(
919                    handler_args,
920                    name,
921                    entries,
922                    StatementType::ALTER_TABLE,
923                )
924                .await
925            }
926            AlterTableOperation::ResetConfig { keys } => {
927                alter_streaming_config::handle_alter_streaming_reset_config(
928                    handler_args,
929                    name,
930                    keys,
931                    StatementType::ALTER_TABLE,
932                )
933                .await
934            }
935            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
936                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
937                    handler_args,
938                    PbThrottleTarget::Table,
939                    risingwave_pb::common::PbThrottleType::Backfill,
940                    name,
941                    rate_limit,
942                )
943                .await
944            }
945            AlterTableOperation::SwapRenameTable { target_table } => {
946                alter_swap_rename::handle_swap_rename(
947                    handler_args,
948                    name,
949                    target_table,
950                    StatementType::ALTER_TABLE,
951                )
952                .await
953            }
954            AlterTableOperation::AlterConnectorProps { alter_props } => {
955                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
956            }
957            AlterTableOperation::AddConstraint { .. }
958            | AlterTableOperation::DropConstraint { .. }
959            | AlterTableOperation::RenameColumn { .. }
960            | AlterTableOperation::ChangeColumn { .. }
961            | AlterTableOperation::RenameConstraint { .. } => {
962                bail_not_implemented!(
963                    "Unhandled statement: {}",
964                    Statement::AlterTable { name, operation }
965                )
966            }
967        },
968        Statement::AlterIndex { name, operation } => match operation {
969            AlterIndexOperation::RenameIndex { index_name } => {
970                alter_rename::handle_rename_index(handler_args, name, index_name).await
971            }
972            AlterIndexOperation::SetParallelism {
973                parallelism,
974                deferred,
975            } => {
976                alter_parallelism::handle_alter_parallelism(
977                    handler_args,
978                    name,
979                    parallelism,
980                    StatementType::ALTER_INDEX,
981                    deferred,
982                )
983                .await
984            }
985            AlterIndexOperation::SetBackfillParallelism {
986                parallelism,
987                deferred,
988            } => {
989                alter_parallelism::handle_alter_backfill_parallelism(
990                    handler_args,
991                    name,
992                    parallelism,
993                    StatementType::ALTER_INDEX,
994                    deferred,
995                )
996                .await
997            }
998            AlterIndexOperation::SetConfig { entries } => {
999                alter_streaming_config::handle_alter_streaming_set_config(
1000                    handler_args,
1001                    name,
1002                    entries,
1003                    StatementType::ALTER_INDEX,
1004                )
1005                .await
1006            }
1007            AlterIndexOperation::ResetConfig { keys } => {
1008                alter_streaming_config::handle_alter_streaming_reset_config(
1009                    handler_args,
1010                    name,
1011                    keys,
1012                    StatementType::ALTER_INDEX,
1013                )
1014                .await
1015            }
1016        },
1017        Statement::AlterView {
1018            materialized,
1019            name,
1020            operation,
1021        } => {
1022            let statement_type = if materialized {
1023                StatementType::ALTER_MATERIALIZED_VIEW
1024            } else {
1025                StatementType::ALTER_VIEW
1026            };
1027            match operation {
1028                AlterViewOperation::RenameView { view_name } => {
1029                    if materialized {
1030                        alter_rename::handle_rename_table(
1031                            handler_args,
1032                            TableType::MaterializedView,
1033                            name,
1034                            view_name,
1035                        )
1036                        .await
1037                    } else {
1038                        alter_rename::handle_rename_view(handler_args, name, view_name).await
1039                    }
1040                }
1041                AlterViewOperation::SetParallelism {
1042                    parallelism,
1043                    deferred,
1044                } => {
1045                    if !materialized {
1046                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1047                    }
1048                    alter_parallelism::handle_alter_parallelism(
1049                        handler_args,
1050                        name,
1051                        parallelism,
1052                        statement_type,
1053                        deferred,
1054                    )
1055                    .await
1056                }
1057                AlterViewOperation::SetBackfillParallelism {
1058                    parallelism,
1059                    deferred,
1060                } => {
1061                    if !materialized {
1062                        bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1063                    }
1064                    alter_parallelism::handle_alter_backfill_parallelism(
1065                        handler_args,
1066                        name,
1067                        parallelism,
1068                        statement_type,
1069                        deferred,
1070                    )
1071                    .await
1072                }
1073                AlterViewOperation::SetResourceGroup {
1074                    resource_group,
1075                    deferred,
1076                } => {
1077                    if !materialized {
1078                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1079                    }
1080                    alter_resource_group::handle_alter_resource_group(
1081                        handler_args,
1082                        name,
1083                        resource_group,
1084                        statement_type,
1085                        deferred,
1086                    )
1087                    .await
1088                }
1089                AlterViewOperation::ChangeOwner { new_owner_name } => {
1090                    alter_owner::handle_alter_owner(
1091                        handler_args,
1092                        name,
1093                        new_owner_name,
1094                        statement_type,
1095                        None,
1096                    )
1097                    .await
1098                }
1099                AlterViewOperation::SetSchema { new_schema_name } => {
1100                    alter_set_schema::handle_alter_set_schema(
1101                        handler_args,
1102                        name,
1103                        new_schema_name,
1104                        statement_type,
1105                        None,
1106                    )
1107                    .await
1108                }
1109                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1110                    if !materialized {
1111                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1112                    }
1113                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1114                        handler_args,
1115                        PbThrottleTarget::Mv,
1116                        risingwave_pb::common::PbThrottleType::Backfill,
1117                        name,
1118                        rate_limit,
1119                    )
1120                    .await
1121                }
1122                AlterViewOperation::SwapRenameView { target_view } => {
1123                    alter_swap_rename::handle_swap_rename(
1124                        handler_args,
1125                        name,
1126                        target_view,
1127                        statement_type,
1128                    )
1129                    .await
1130                }
1131                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1132                    if !materialized {
1133                        bail!(
1134                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1135                        );
1136                    }
1137                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1138                }
1139                AlterViewOperation::AsQuery { query } => {
1140                    if !materialized {
1141                        bail_not_implemented!("ALTER VIEW AS QUERY");
1142                    }
1143                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1144                    if !cfg!(debug_assertions) {
1145                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1146                    }
1147                    alter_mv::handle_alter_mv(handler_args, name, query).await
1148                }
1149                AlterViewOperation::SetConfig { entries } => {
1150                    if !materialized {
1151                        bail!("SET CONFIG is only supported for materialized views");
1152                    }
1153                    alter_streaming_config::handle_alter_streaming_set_config(
1154                        handler_args,
1155                        name,
1156                        entries,
1157                        statement_type,
1158                    )
1159                    .await
1160                }
1161                AlterViewOperation::ResetConfig { keys } => {
1162                    if !materialized {
1163                        bail!("RESET CONFIG is only supported for materialized views");
1164                    }
1165                    alter_streaming_config::handle_alter_streaming_reset_config(
1166                        handler_args,
1167                        name,
1168                        keys,
1169                        statement_type,
1170                    )
1171                    .await
1172                }
1173            }
1174        }
1175
1176        Statement::AlterSink { name, operation } => match operation {
1177            AlterSinkOperation::AlterConnectorProps {
1178                alter_props: changed_props,
1179            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1180            AlterSinkOperation::RenameSink { sink_name } => {
1181                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1182            }
1183            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1184                alter_owner::handle_alter_owner(
1185                    handler_args,
1186                    name,
1187                    new_owner_name,
1188                    StatementType::ALTER_SINK,
1189                    None,
1190                )
1191                .await
1192            }
1193            AlterSinkOperation::SetSchema { new_schema_name } => {
1194                alter_set_schema::handle_alter_set_schema(
1195                    handler_args,
1196                    name,
1197                    new_schema_name,
1198                    StatementType::ALTER_SINK,
1199                    None,
1200                )
1201                .await
1202            }
1203            AlterSinkOperation::SetParallelism {
1204                parallelism,
1205                deferred,
1206            } => {
1207                alter_parallelism::handle_alter_parallelism(
1208                    handler_args,
1209                    name,
1210                    parallelism,
1211                    StatementType::ALTER_SINK,
1212                    deferred,
1213                )
1214                .await
1215            }
1216            AlterSinkOperation::SetBackfillParallelism {
1217                parallelism,
1218                deferred,
1219            } => {
1220                alter_parallelism::handle_alter_backfill_parallelism(
1221                    handler_args,
1222                    name,
1223                    parallelism,
1224                    StatementType::ALTER_SINK,
1225                    deferred,
1226                )
1227                .await
1228            }
1229            AlterSinkOperation::SetConfig { entries } => {
1230                alter_streaming_config::handle_alter_streaming_set_config(
1231                    handler_args,
1232                    name,
1233                    entries,
1234                    StatementType::ALTER_SINK,
1235                )
1236                .await
1237            }
1238            AlterSinkOperation::ResetConfig { keys } => {
1239                alter_streaming_config::handle_alter_streaming_reset_config(
1240                    handler_args,
1241                    name,
1242                    keys,
1243                    StatementType::ALTER_SINK,
1244                )
1245                .await
1246            }
1247            AlterSinkOperation::SwapRenameSink { target_sink } => {
1248                alter_swap_rename::handle_swap_rename(
1249                    handler_args,
1250                    name,
1251                    target_sink,
1252                    StatementType::ALTER_SINK,
1253                )
1254                .await
1255            }
1256            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1257                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1258                    handler_args,
1259                    PbThrottleTarget::Sink,
1260                    risingwave_pb::common::PbThrottleType::Sink,
1261                    name,
1262                    rate_limit,
1263                )
1264                .await
1265            }
1266            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1267                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1268                    handler_args,
1269                    PbThrottleTarget::Sink,
1270                    risingwave_pb::common::PbThrottleType::Backfill,
1271                    name,
1272                    rate_limit,
1273                )
1274                .await
1275            }
1276            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1277                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1278                    handler_args,
1279                    name,
1280                    enable,
1281                )
1282                .await
1283            }
1284        },
1285        Statement::AlterSubscription { name, operation } => match operation {
1286            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1287                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1288                    .await
1289            }
1290            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1291                alter_owner::handle_alter_owner(
1292                    handler_args,
1293                    name,
1294                    new_owner_name,
1295                    StatementType::ALTER_SUBSCRIPTION,
1296                    None,
1297                )
1298                .await
1299            }
1300            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1301                alter_set_schema::handle_alter_set_schema(
1302                    handler_args,
1303                    name,
1304                    new_schema_name,
1305                    StatementType::ALTER_SUBSCRIPTION,
1306                    None,
1307                )
1308                .await
1309            }
1310            AlterSubscriptionOperation::SetRetention { retention } => {
1311                alter_subscription_retention::handle_alter_subscription_retention(
1312                    handler_args,
1313                    name,
1314                    retention,
1315                )
1316                .await
1317            }
1318            AlterSubscriptionOperation::SwapRenameSubscription {
1319                target_subscription,
1320            } => {
1321                alter_swap_rename::handle_swap_rename(
1322                    handler_args,
1323                    name,
1324                    target_subscription,
1325                    StatementType::ALTER_SUBSCRIPTION,
1326                )
1327                .await
1328            }
1329        },
1330        Statement::AlterSource { name, operation } => match operation {
1331            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1332                alter_source_props::handle_alter_source_connector_props(
1333                    handler_args,
1334                    name,
1335                    alter_props,
1336                )
1337                .await
1338            }
1339            AlterSourceOperation::RenameSource { source_name } => {
1340                alter_rename::handle_rename_source(handler_args, name, source_name).await
1341            }
1342            AlterSourceOperation::AddColumn { .. } => {
1343                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1344            }
1345            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1346                alter_owner::handle_alter_owner(
1347                    handler_args,
1348                    name,
1349                    new_owner_name,
1350                    StatementType::ALTER_SOURCE,
1351                    None,
1352                )
1353                .await
1354            }
1355            AlterSourceOperation::SetSchema { new_schema_name } => {
1356                alter_set_schema::handle_alter_set_schema(
1357                    handler_args,
1358                    name,
1359                    new_schema_name,
1360                    StatementType::ALTER_SOURCE,
1361                    None,
1362                )
1363                .await
1364            }
1365            AlterSourceOperation::FormatEncode { format_encode } => {
1366                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1367                    .await
1368            }
1369            AlterSourceOperation::RefreshSchema => {
1370                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1371            }
1372            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1373                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1374                    handler_args,
1375                    PbThrottleTarget::Source,
1376                    risingwave_pb::common::PbThrottleType::Source,
1377                    name,
1378                    rate_limit,
1379                )
1380                .await
1381            }
1382            AlterSourceOperation::SwapRenameSource { target_source } => {
1383                alter_swap_rename::handle_swap_rename(
1384                    handler_args,
1385                    name,
1386                    target_source,
1387                    StatementType::ALTER_SOURCE,
1388                )
1389                .await
1390            }
1391            AlterSourceOperation::SetParallelism {
1392                parallelism,
1393                deferred,
1394            } => {
1395                alter_parallelism::handle_alter_parallelism(
1396                    handler_args,
1397                    name,
1398                    parallelism,
1399                    StatementType::ALTER_SOURCE,
1400                    deferred,
1401                )
1402                .await
1403            }
1404            AlterSourceOperation::SetBackfillParallelism {
1405                parallelism,
1406                deferred,
1407            } => {
1408                alter_parallelism::handle_alter_backfill_parallelism(
1409                    handler_args,
1410                    name,
1411                    parallelism,
1412                    StatementType::ALTER_SOURCE,
1413                    deferred,
1414                )
1415                .await
1416            }
1417            AlterSourceOperation::SetConfig { entries } => {
1418                alter_streaming_config::handle_alter_streaming_set_config(
1419                    handler_args,
1420                    name,
1421                    entries,
1422                    StatementType::ALTER_SOURCE,
1423                )
1424                .await
1425            }
1426            AlterSourceOperation::ResetConfig { keys } => {
1427                alter_streaming_config::handle_alter_streaming_reset_config(
1428                    handler_args,
1429                    name,
1430                    keys,
1431                    StatementType::ALTER_SOURCE,
1432                )
1433                .await
1434            }
1435            AlterSourceOperation::ResetSource => {
1436                reset_source::handle_reset_source(handler_args, name).await
1437            }
1438        },
1439        Statement::AlterFunction {
1440            name,
1441            args,
1442            operation,
1443        } => match operation {
1444            AlterFunctionOperation::SetSchema { new_schema_name } => {
1445                alter_set_schema::handle_alter_set_schema(
1446                    handler_args,
1447                    name,
1448                    new_schema_name,
1449                    StatementType::ALTER_FUNCTION,
1450                    args,
1451                )
1452                .await
1453            }
1454            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1455                alter_owner::handle_alter_owner(
1456                    handler_args,
1457                    name,
1458                    new_owner_name,
1459                    StatementType::ALTER_FUNCTION,
1460                    args,
1461                )
1462                .await
1463            }
1464        },
1465        Statement::AlterConnection { name, operation } => match operation {
1466            AlterConnectionOperation::SetSchema { new_schema_name } => {
1467                alter_set_schema::handle_alter_set_schema(
1468                    handler_args,
1469                    name,
1470                    new_schema_name,
1471                    StatementType::ALTER_CONNECTION,
1472                    None,
1473                )
1474                .await
1475            }
1476            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1477                alter_owner::handle_alter_owner(
1478                    handler_args,
1479                    name,
1480                    new_owner_name,
1481                    StatementType::ALTER_CONNECTION,
1482                    None,
1483                )
1484                .await
1485            }
1486            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1487                alter_connection_props::handle_alter_connection_connector_props(
1488                    handler_args,
1489                    name,
1490                    alter_props,
1491                )
1492                .await
1493            }
1494        },
1495        Statement::AlterSystem { param, value } => {
1496            alter_system::handle_alter_system(handler_args, param, value).await
1497        }
1498        Statement::AlterSecret { name, operation } => match operation {
1499            AlterSecretOperation::ChangeCredential {
1500                with_options,
1501                new_credential,
1502            } => {
1503                alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1504                    .await
1505            }
1506            AlterSecretOperation::ChangeOwner { new_owner_name } => {
1507                alter_owner::handle_alter_owner(
1508                    handler_args,
1509                    name,
1510                    new_owner_name,
1511                    StatementType::ALTER_SECRET,
1512                    None,
1513                )
1514                .await
1515            }
1516        },
1517        Statement::AlterFragment {
1518            fragment_ids,
1519            operation,
1520        } => match operation {
1521            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1522                let [fragment_id] = fragment_ids.as_slice() else {
1523                    return Err(ErrorCode::InvalidInputSyntax(
1524                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1525                            .to_owned(),
1526                    )
1527                    .into());
1528                };
1529                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1530                    &handler_args.session,
1531                    PbThrottleTarget::Fragment,
1532                    risingwave_pb::common::PbThrottleType::Backfill,
1533                    *fragment_id,
1534                    rate_limit,
1535                    StatementType::SET_VARIABLE,
1536                )
1537                .await
1538            }
1539            AlterFragmentOperation::SetParallelism { parallelism } => {
1540                alter_parallelism::handle_alter_fragment_parallelism(
1541                    handler_args,
1542                    fragment_ids.into_iter().map_into().collect(),
1543                    parallelism,
1544                )
1545                .await
1546            }
1547        },
1548        Statement::AlterDefaultPrivileges { .. } => {
1549            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1550        }
1551        Statement::StartTransaction { modes } => {
1552            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1553        }
1554        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1555        Statement::Commit { chain } => {
1556            transaction::handle_commit(handler_args, COMMIT, chain).await
1557        }
1558        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1559        Statement::Rollback { chain } => {
1560            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1561        }
1562        Statement::SetTransaction {
1563            modes,
1564            snapshot,
1565            session,
1566        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1567        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1568        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1569        Statement::Comment {
1570            object_type,
1571            object_name,
1572            comment,
1573        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1574        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1575        Statement::Prepare {
1576            name,
1577            data_types,
1578            statement,
1579        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1580        Statement::Deallocate { name, prepare } => {
1581            prepared_statement::handle_deallocate(name, prepare).await
1582        }
1583        Statement::Vacuum { object_name, full } => {
1584            vacuum::handle_vacuum(handler_args, object_name, full).await
1585        }
1586        Statement::Refresh { table_name } => {
1587            refresh::handle_refresh(handler_args, table_name).await
1588        }
1589        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1590    }
1591}
1592
1593fn check_ban_ddl_for_iceberg_engine_table(
1594    session: Arc<SessionImpl>,
1595    stmt: &Statement,
1596) -> Result<()> {
1597    match stmt {
1598        Statement::AlterTable {
1599            name,
1600            operation:
1601                operation @ (AlterTableOperation::AddColumn { .. }
1602                | AlterTableOperation::DropColumn { .. }),
1603        } => {
1604            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1605            if table.is_iceberg_engine_table() {
1606                bail!(
1607                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1608                    operation,
1609                    schema_name,
1610                    name
1611                );
1612            }
1613        }
1614
1615        Statement::AlterTable {
1616            name,
1617            operation: AlterTableOperation::RenameTable { .. },
1618        } => {
1619            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1620            if table.is_iceberg_engine_table() {
1621                bail!(
1622                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1623                    schema_name,
1624                    name
1625                );
1626            }
1627        }
1628
1629        Statement::AlterTable {
1630            name,
1631            operation: AlterTableOperation::SetParallelism { .. },
1632        } => {
1633            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1634            if table.is_iceberg_engine_table() {
1635                bail!(
1636                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1637                    schema_name,
1638                    name
1639                );
1640            }
1641        }
1642        Statement::AlterTable {
1643            name,
1644            operation: AlterTableOperation::SetBackfillParallelism { .. },
1645        } => {
1646            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1647            if table.is_iceberg_engine_table() {
1648                bail!(
1649                    "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1650                    schema_name,
1651                    name
1652                );
1653            }
1654        }
1655
1656        Statement::AlterTable {
1657            name,
1658            operation: AlterTableOperation::SetSchema { .. },
1659        } => {
1660            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1661            if table.is_iceberg_engine_table() {
1662                bail!(
1663                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1664                    schema_name,
1665                    name
1666                );
1667            }
1668        }
1669
1670        Statement::AlterTable {
1671            name,
1672            operation: AlterTableOperation::RefreshSchema,
1673        } => {
1674            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1675            if table.is_iceberg_engine_table() {
1676                bail!(
1677                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1678                    schema_name,
1679                    name
1680                );
1681            }
1682        }
1683
1684        Statement::AlterTable {
1685            name,
1686            operation: AlterTableOperation::SetSourceRateLimit { .. },
1687        } => {
1688            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1689            if table.is_iceberg_engine_table() {
1690                bail!(
1691                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1692                    schema_name,
1693                    name
1694                );
1695            }
1696        }
1697
1698        _ => {}
1699    }
1700
1701    Ok(())
1702}