risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX};
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_subscription_retention;
62mod alter_swap_rename;
63mod alter_system;
64mod alter_table_column;
65pub mod alter_table_drop_connector;
66pub mod alter_table_props;
67mod alter_table_with_sr;
68pub mod alter_user;
69mod alter_utils;
70mod backup;
71pub mod cancel_job;
72pub mod close_cursor;
73mod comment;
74pub mod create_aggregate;
75pub mod create_connection;
76mod create_database;
77pub mod create_function;
78pub mod create_index;
79pub mod create_mv;
80pub mod create_schema;
81pub mod create_secret;
82pub mod create_sink;
83pub mod create_source;
84pub mod create_sql_function;
85pub mod create_subscription;
86pub mod create_table;
87pub mod create_table_as;
88pub mod create_user;
89pub mod create_view;
90pub mod declare_cursor;
91mod delete_meta_snapshot;
92pub mod describe;
93pub mod discard;
94mod drop_connection;
95mod drop_database;
96pub mod drop_function;
97mod drop_index;
98pub mod drop_mv;
99mod drop_schema;
100pub mod drop_secret;
101pub mod drop_sink;
102pub mod drop_source;
103pub mod drop_subscription;
104pub mod drop_table;
105pub mod drop_user;
106mod drop_view;
107pub mod explain;
108pub mod explain_analyze_stream_job;
109pub mod extended_handle;
110pub mod fetch_cursor;
111mod flush;
112pub mod handle_privilege;
113pub mod kill_process;
114mod prepared_statement;
115pub mod privilege;
116pub mod query;
117mod recover;
118mod refresh;
119mod reset_source;
120pub mod show;
121mod transaction;
122mod use_db;
123pub mod util;
124pub mod vacuum;
125pub mod variable;
126mod wait;
127
128pub use alter_table_column::{
129    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
130};
131
132/// The [`PgResponseBuilder`] used by RisingWave.
133pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
134
135/// The [`PgResponse`] used by RisingWave.
136pub type RwPgResponse = PgResponse<PgResponseStream>;
137
138#[easy_ext::ext(RwPgResponseBuilderExt)]
139impl RwPgResponseBuilder {
140    /// Append rows to the response.
141    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
142        let fields = T::fields();
143        self.values(
144            rows.into_iter()
145                .map(|row| {
146                    Row::new(
147                        row.into_owned_row()
148                            .into_iter()
149                            .zip_eq_fast(&fields)
150                            .map(|(datum, (_, ty))| {
151                                datum.map(|scalar| {
152                                    scalar.as_scalar_ref_impl().text_format(ty).into()
153                                })
154                            })
155                            .collect(),
156                    )
157                })
158                .collect_vec()
159                .into(),
160            fields_to_descriptors(fields),
161        )
162    }
163}
164
165pub fn fields_to_descriptors(
166    fields: Vec<(&str, risingwave_common::types::DataType)>,
167) -> Vec<PgFieldDescriptor> {
168    fields
169        .iter()
170        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
171        .collect()
172}
173
174pub enum PgResponseStream {
175    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
176    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
177    Rows(BoxStream<'static, RowSetResult>),
178}
179
180impl Stream for PgResponseStream {
181    type Item = std::result::Result<Vec<Row>, BoxedError>;
182
183    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
184        match &mut *self {
185            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
186            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
187            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
188        }
189    }
190}
191
192impl From<Vec<Row>> for PgResponseStream {
193    fn from(rows: Vec<Row>) -> Self {
194        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
195    }
196}
197
198#[derive(Clone)]
199pub struct HandlerArgs {
200    pub session: Arc<SessionImpl>,
201    pub sql: Arc<str>,
202    pub normalized_sql: String,
203    pub with_options: WithOptions,
204}
205
206impl HandlerArgs {
207    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
208        Ok(Self {
209            session,
210            sql,
211            with_options: WithOptions::try_from(stmt)?,
212            normalized_sql: Self::normalize_sql(stmt),
213        })
214    }
215
216    /// Get normalized SQL from the statement.
217    ///
218    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
219    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
220    ///   make it suitable for the `SHOW CREATE` statements.
221    fn normalize_sql(stmt: &Statement) -> String {
222        let mut stmt = stmt.clone();
223        match &mut stmt {
224            Statement::CreateView {
225                or_replace,
226                if_not_exists,
227                ..
228            } => {
229                *or_replace = false;
230                *if_not_exists = false;
231            }
232            Statement::CreateTable {
233                or_replace,
234                if_not_exists,
235                ..
236            } => {
237                *or_replace = false;
238                *if_not_exists = false;
239            }
240            Statement::CreateIndex { if_not_exists, .. } => {
241                *if_not_exists = false;
242            }
243            Statement::CreateSource {
244                stmt: CreateSourceStatement { if_not_exists, .. },
245                ..
246            } => {
247                *if_not_exists = false;
248            }
249            Statement::CreateSink {
250                stmt: CreateSinkStatement { if_not_exists, .. },
251            } => {
252                *if_not_exists = false;
253            }
254            Statement::CreateSubscription {
255                stmt: CreateSubscriptionStatement { if_not_exists, .. },
256            } => {
257                *if_not_exists = false;
258            }
259            Statement::CreateConnection {
260                stmt: CreateConnectionStatement { if_not_exists, .. },
261            } => {
262                *if_not_exists = false;
263            }
264            _ => {}
265        }
266        stmt.to_string()
267    }
268}
269
270#[allow(clippy::large_stack_frames)]
271pub async fn handle(
272    session: Arc<SessionImpl>,
273    stmt: Statement,
274    sql: Arc<str>,
275    formats: Vec<Format>,
276) -> Result<RwPgResponse> {
277    session.clear_cancel_query_flag();
278    let _guard = session.txn_begin_implicit();
279    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
280
281    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
282
283    match stmt {
284        Statement::Explain {
285            statement,
286            analyze,
287            options,
288        } => {
289            Box::pin(explain::handle_explain(
290                handler_args,
291                *statement,
292                options,
293                analyze,
294            ))
295            .await
296        }
297        Statement::ExplainAnalyzeStreamJob {
298            target,
299            duration_secs,
300        } => {
301            explain_analyze_stream_job::handle_explain_analyze_stream_job(
302                handler_args,
303                target,
304                duration_secs,
305            )
306            .await
307        }
308        Statement::CreateSource { stmt } => {
309            create_source::handle_create_source(handler_args, stmt).await
310        }
311        Statement::CreateSink { stmt } => {
312            create_sink::handle_create_sink(handler_args, stmt, false).await
313        }
314        Statement::CreateSubscription { stmt } => {
315            create_subscription::handle_create_subscription(handler_args, stmt).await
316        }
317        Statement::CreateConnection { stmt } => {
318            create_connection::handle_create_connection(handler_args, stmt).await
319        }
320        Statement::CreateSecret { stmt } => {
321            create_secret::handle_create_secret(handler_args, stmt).await
322        }
323        Statement::CreateFunction {
324            or_replace,
325            temporary,
326            if_not_exists,
327            name,
328            args,
329            returns,
330            params,
331            with_options,
332        } => {
333            // For general udf, `language` clause could be ignored
334            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
335            if params.language.is_none()
336                || !params
337                    .language
338                    .as_ref()
339                    .unwrap()
340                    .real_value()
341                    .eq_ignore_ascii_case("sql")
342            {
343                create_function::handle_create_function(
344                    handler_args,
345                    or_replace,
346                    temporary,
347                    if_not_exists,
348                    name,
349                    args,
350                    returns,
351                    params,
352                    with_options,
353                )
354                .await
355            } else {
356                create_sql_function::handle_create_sql_function(
357                    handler_args,
358                    or_replace,
359                    temporary,
360                    if_not_exists,
361                    name,
362                    args,
363                    returns,
364                    params,
365                )
366                .await
367            }
368        }
369        Statement::CreateAggregate {
370            or_replace,
371            if_not_exists,
372            name,
373            args,
374            returns,
375            params,
376            ..
377        } => {
378            create_aggregate::handle_create_aggregate(
379                handler_args,
380                or_replace,
381                if_not_exists,
382                name,
383                args,
384                returns,
385                params,
386            )
387            .await
388        }
389        Statement::CreateTable {
390            name,
391            columns,
392            wildcard_idx,
393            constraints,
394            query,
395            with_options: _, // It is put in OptimizerContext
396            // Not supported things
397            or_replace,
398            temporary,
399            if_not_exists,
400            format_encode,
401            source_watermarks,
402            append_only,
403            on_conflict,
404            with_version_columns,
405            cdc_table_info,
406            include_column_options,
407            webhook_info,
408            engine,
409        } => {
410            if or_replace {
411                bail_not_implemented!("CREATE OR REPLACE TABLE");
412            }
413            if temporary {
414                bail_not_implemented!("CREATE TEMPORARY TABLE");
415            }
416            if let Some(query) = query {
417                return create_table_as::handle_create_as(
418                    handler_args,
419                    name,
420                    if_not_exists,
421                    query,
422                    columns,
423                    append_only,
424                    on_conflict,
425                    with_version_columns
426                        .iter()
427                        .map(|col| col.real_value())
428                        .collect(),
429                    engine,
430                )
431                .await;
432            }
433            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
434            Box::pin(create_table::handle_create_table(
435                handler_args,
436                name,
437                columns,
438                wildcard_idx,
439                constraints,
440                if_not_exists,
441                format_encode,
442                source_watermarks,
443                append_only,
444                on_conflict,
445                with_version_columns
446                    .iter()
447                    .map(|col| col.real_value())
448                    .collect(),
449                cdc_table_info,
450                include_column_options,
451                webhook_info,
452                engine,
453            ))
454            .await
455        }
456        Statement::CreateDatabase {
457            db_name,
458            if_not_exists,
459            owner,
460            resource_group,
461            barrier_interval_ms,
462            checkpoint_frequency,
463        } => {
464            create_database::handle_create_database(
465                handler_args,
466                db_name,
467                if_not_exists,
468                owner,
469                resource_group,
470                barrier_interval_ms,
471                checkpoint_frequency,
472            )
473            .await
474        }
475        Statement::CreateSchema {
476            schema_name,
477            if_not_exists,
478            owner,
479        } => {
480            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
481                .await
482        }
483        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
484        Statement::DeclareCursor { stmt } => {
485            declare_cursor::handle_declare_cursor(handler_args, stmt).await
486        }
487        Statement::FetchCursor { stmt } => {
488            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
489        }
490        Statement::CloseCursor { stmt } => {
491            close_cursor::handle_close_cursor(handler_args, stmt).await
492        }
493        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
494        Statement::Grant { .. } => {
495            handle_privilege::handle_grant_privilege(handler_args, stmt).await
496        }
497        Statement::Revoke { .. } => {
498            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
499        }
500        Statement::Describe { name, kind } => match kind {
501            DescribeKind::Fragments => {
502                describe::handle_describe_fragments(handler_args, name).await
503            }
504            DescribeKind::Plain => describe::handle_describe(handler_args, name),
505        },
506        Statement::DescribeFragment { fragment_id } => {
507            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
508        }
509        Statement::Discard(..) => discard::handle_discard(handler_args),
510        Statement::ShowObjects {
511            object: show_object,
512            filter,
513        } => show::handle_show_object(handler_args, show_object, filter).await,
514        Statement::ShowCreateObject { create_type, name } => {
515            show::handle_show_create_object(handler_args, create_type, name)
516        }
517        Statement::ShowTransactionIsolationLevel => {
518            transaction::handle_show_isolation_level(handler_args)
519        }
520        Statement::Drop(DropStatement {
521            object_type,
522            object_name,
523            if_exists,
524            drop_mode,
525        }) => {
526            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
527                match object_type {
528                    ObjectType::MaterializedView
529                    | ObjectType::View
530                    | ObjectType::Sink
531                    | ObjectType::Source
532                    | ObjectType::Subscription
533                    | ObjectType::Index
534                    | ObjectType::Table
535                    | ObjectType::Schema
536                    | ObjectType::Connection
537                    | ObjectType::Secret => true,
538                    ObjectType::Database | ObjectType::User => {
539                        bail_not_implemented!("DROP CASCADE");
540                    }
541                }
542            } else {
543                false
544            };
545            match object_type {
546                ObjectType::Table => {
547                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
548                        .await
549                }
550                ObjectType::MaterializedView => {
551                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
552                }
553                ObjectType::Index => {
554                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
555                        .await
556                }
557                ObjectType::Source => {
558                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
559                        .await
560                }
561                ObjectType::Sink => {
562                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
563                }
564                ObjectType::Subscription => {
565                    drop_subscription::handle_drop_subscription(
566                        handler_args,
567                        object_name,
568                        if_exists,
569                        cascade,
570                    )
571                    .await
572                }
573                ObjectType::Database => {
574                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
575                }
576                ObjectType::Schema => {
577                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
578                        .await
579                }
580                ObjectType::User => {
581                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
582                }
583                ObjectType::View => {
584                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
585                }
586                ObjectType::Connection => {
587                    drop_connection::handle_drop_connection(
588                        handler_args,
589                        object_name,
590                        if_exists,
591                        cascade,
592                    )
593                    .await
594                }
595                ObjectType::Secret => {
596                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
597                        .await
598                }
599            }
600        }
601        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
602        Statement::DropFunction {
603            if_exists,
604            func_desc,
605            option,
606        } => {
607            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
608                .await
609        }
610        Statement::DropAggregate {
611            if_exists,
612            func_desc,
613            option,
614        } => {
615            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
616                .await
617        }
618        Statement::Query(_)
619        | Statement::Insert { .. }
620        | Statement::Delete { .. }
621        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
622        Statement::Copy {
623            entity: CopyEntity::Query(query),
624            target: CopyTarget::Stdout,
625        } => {
626            let response =
627                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
628                    .await?;
629            Ok(response.into_copy_query_to_stdout())
630        }
631        Statement::CreateView {
632            materialized,
633            if_not_exists,
634            name,
635            columns,
636            query,
637            with_options: _, // It is put in OptimizerContext
638            or_replace,      // not supported
639            emit_mode,
640        } => {
641            if or_replace {
642                bail_not_implemented!("CREATE OR REPLACE VIEW");
643            }
644            if materialized {
645                create_mv::handle_create_mv(
646                    handler_args,
647                    if_not_exists,
648                    name,
649                    *query,
650                    columns,
651                    emit_mode,
652                )
653                .await
654            } else {
655                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
656                    .await
657            }
658        }
659        Statement::Flush => flush::handle_flush(handler_args).await,
660        Statement::Wait => wait::handle_wait(handler_args).await,
661        Statement::Backup => backup::handle_backup(handler_args).await,
662        Statement::DeleteMetaSnapshots { snapshot_ids } => {
663            delete_meta_snapshot::handle_delete_meta_snapshots(handler_args, snapshot_ids).await
664        }
665        Statement::Recover => recover::handle_recover(handler_args).await,
666        Statement::SetVariable {
667            local: _,
668            variable,
669            value,
670        } => {
671            // special handle for `use database`
672            if variable.real_value().eq_ignore_ascii_case("database") {
673                let x = variable::set_var_to_param_str(&value);
674                let res = use_db::handle_use_db(
675                    handler_args,
676                    ObjectName::from(vec![Ident::from_real_value(
677                        x.as_deref().unwrap_or("default"),
678                    )]),
679                )?;
680                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
681                for notice in res.notices() {
682                    builder = builder.notice(notice);
683                }
684                return Ok(builder.into());
685            }
686            variable::handle_set(handler_args, variable, value)
687        }
688        Statement::SetTimeZone { local: _, value } => {
689            variable::handle_set_time_zone(handler_args, value)
690        }
691        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
692        Statement::CreateIndex {
693            name,
694            table_name,
695            method,
696            columns,
697            include,
698            distributed_by,
699            unique,
700            if_not_exists,
701            with_properties: _,
702        } => {
703            if unique {
704                bail_not_implemented!("create unique index");
705            }
706
707            create_index::handle_create_index(
708                handler_args,
709                if_not_exists,
710                name,
711                table_name,
712                method,
713                columns.clone(),
714                include,
715                distributed_by,
716            )
717            .await
718        }
719        Statement::AlterDatabase { name, operation } => match operation {
720            AlterDatabaseOperation::RenameDatabase { database_name } => {
721                alter_rename::handle_rename_database(handler_args, name, database_name).await
722            }
723            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
724                alter_owner::handle_alter_owner(
725                    handler_args,
726                    name,
727                    new_owner_name,
728                    StatementType::ALTER_DATABASE,
729                    None,
730                )
731                .await
732            }
733            AlterDatabaseOperation::SetParam(config_param) => {
734                let ConfigParam { param, value } = config_param;
735
736                let database_param = match param.real_value().to_uppercase().as_str() {
737                    "BARRIER_INTERVAL_MS" => {
738                        let barrier_interval_ms = match value {
739                            SetVariableValue::Default => None,
740                            SetVariableValue::Single(SetVariableValueSingle::Literal(
741                                Value::Number(num),
742                            )) => {
743                                let num = num.parse::<u32>().map_err(|e| {
744                                    ErrorCode::InvalidInputSyntax(format!(
745                                        "barrier_interval_ms must be a u32 integer: {}",
746                                        e.as_report()
747                                    ))
748                                })?;
749                                Some(num)
750                            }
751                            _ => {
752                                return Err(ErrorCode::InvalidInputSyntax(
753                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
754                                        .to_owned(),
755                                )
756                                .into());
757                            }
758                        };
759                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
760                    }
761                    "CHECKPOINT_FREQUENCY" => {
762                        let checkpoint_frequency = match value {
763                            SetVariableValue::Default => None,
764                            SetVariableValue::Single(SetVariableValueSingle::Literal(
765                                Value::Number(num),
766                            )) => {
767                                let num = num.parse::<u64>().map_err(|e| {
768                                    ErrorCode::InvalidInputSyntax(format!(
769                                        "checkpoint_frequency must be a u64 integer: {}",
770                                        e.as_report()
771                                    ))
772                                })?;
773                                Some(num)
774                            }
775                            _ => {
776                                return Err(ErrorCode::InvalidInputSyntax(
777                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
778                                        .to_owned(),
779                                )
780                                .into());
781                            }
782                        };
783                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
784                    }
785                    _ => {
786                        return Err(ErrorCode::InvalidInputSyntax(format!(
787                            "Unsupported database config parameter: {}",
788                            param.real_value()
789                        ))
790                        .into());
791                    }
792                };
793
794                alter_database_param::handle_alter_database_param(
795                    handler_args,
796                    name,
797                    database_param,
798                )
799                .await
800            }
801        },
802        Statement::AlterSchema { name, operation } => match operation {
803            AlterSchemaOperation::RenameSchema { schema_name } => {
804                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
805            }
806            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
807                alter_owner::handle_alter_owner(
808                    handler_args,
809                    name,
810                    new_owner_name,
811                    StatementType::ALTER_SCHEMA,
812                    None,
813                )
814                .await
815            }
816            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
817                alter_swap_rename::handle_swap_rename(
818                    handler_args,
819                    name,
820                    target_schema,
821                    StatementType::ALTER_SCHEMA,
822                )
823                .await
824            }
825        },
826        Statement::AlterTable { name, operation } => match operation {
827            AlterTableOperation::AddColumn { .. }
828            | AlterTableOperation::DropColumn { .. }
829            | AlterTableOperation::AlterColumn { .. } => {
830                Box::pin(alter_table_column::handle_alter_table_column(
831                    handler_args,
832                    name,
833                    operation,
834                ))
835                .await
836            }
837            AlterTableOperation::RenameTable { table_name } => {
838                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
839                    .await
840            }
841            AlterTableOperation::ChangeOwner { new_owner_name } => {
842                alter_owner::handle_alter_owner(
843                    handler_args,
844                    name,
845                    new_owner_name,
846                    StatementType::ALTER_TABLE,
847                    None,
848                )
849                .await
850            }
851            AlterTableOperation::SetParallelism {
852                parallelism,
853                deferred,
854            } => {
855                alter_parallelism::handle_alter_parallelism(
856                    handler_args,
857                    name,
858                    parallelism,
859                    StatementType::ALTER_TABLE,
860                    deferred,
861                )
862                .await
863            }
864            AlterTableOperation::SetBackfillParallelism {
865                parallelism,
866                deferred,
867            } => {
868                alter_parallelism::handle_alter_backfill_parallelism(
869                    handler_args,
870                    name,
871                    parallelism,
872                    StatementType::ALTER_TABLE,
873                    deferred,
874                )
875                .await
876            }
877            AlterTableOperation::SetSchema { new_schema_name } => {
878                alter_set_schema::handle_alter_set_schema(
879                    handler_args,
880                    name,
881                    new_schema_name,
882                    StatementType::ALTER_TABLE,
883                    None,
884                )
885                .await
886            }
887            AlterTableOperation::RefreshSchema => {
888                Box::pin(alter_table_with_sr::handle_refresh_schema(
889                    handler_args,
890                    name,
891                ))
892                .await
893            }
894            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
895                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
896                    handler_args,
897                    PbThrottleTarget::Table,
898                    risingwave_pb::common::PbThrottleType::Source,
899                    name,
900                    rate_limit,
901                )
902                .await
903            }
904            AlterTableOperation::DropConnector => {
905                Box::pin(
906                    alter_table_drop_connector::handle_alter_table_drop_connector(
907                        handler_args,
908                        name,
909                    ),
910                )
911                .await
912            }
913            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
914                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
915                    handler_args,
916                    PbThrottleTarget::Table,
917                    risingwave_pb::common::PbThrottleType::Dml,
918                    name,
919                    rate_limit,
920                )
921                .await
922            }
923            AlterTableOperation::SetConfig { entries } => {
924                alter_streaming_config::handle_alter_streaming_set_config(
925                    handler_args,
926                    name,
927                    entries,
928                    StatementType::ALTER_TABLE,
929                )
930                .await
931            }
932            AlterTableOperation::ResetConfig { keys } => {
933                alter_streaming_config::handle_alter_streaming_reset_config(
934                    handler_args,
935                    name,
936                    keys,
937                    StatementType::ALTER_TABLE,
938                )
939                .await
940            }
941            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
942                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
943                    handler_args,
944                    PbThrottleTarget::Table,
945                    risingwave_pb::common::PbThrottleType::Backfill,
946                    name,
947                    rate_limit,
948                )
949                .await
950            }
951            AlterTableOperation::SwapRenameTable { target_table } => {
952                alter_swap_rename::handle_swap_rename(
953                    handler_args,
954                    name,
955                    target_table,
956                    StatementType::ALTER_TABLE,
957                )
958                .await
959            }
960            AlterTableOperation::AlterConnectorProps { alter_props } => {
961                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
962            }
963            AlterTableOperation::AddConstraint { .. }
964            | AlterTableOperation::DropConstraint { .. }
965            | AlterTableOperation::RenameColumn { .. }
966            | AlterTableOperation::ChangeColumn { .. }
967            | AlterTableOperation::RenameConstraint { .. } => {
968                bail_not_implemented!(
969                    "Unhandled statement: {}",
970                    Statement::AlterTable { name, operation }
971                )
972            }
973        },
974        Statement::AlterIndex { name, operation } => match operation {
975            AlterIndexOperation::RenameIndex { index_name } => {
976                alter_rename::handle_rename_index(handler_args, name, index_name).await
977            }
978            AlterIndexOperation::SetParallelism {
979                parallelism,
980                deferred,
981            } => {
982                alter_parallelism::handle_alter_parallelism(
983                    handler_args,
984                    name,
985                    parallelism,
986                    StatementType::ALTER_INDEX,
987                    deferred,
988                )
989                .await
990            }
991            AlterIndexOperation::SetBackfillParallelism {
992                parallelism,
993                deferred,
994            } => {
995                alter_parallelism::handle_alter_backfill_parallelism(
996                    handler_args,
997                    name,
998                    parallelism,
999                    StatementType::ALTER_INDEX,
1000                    deferred,
1001                )
1002                .await
1003            }
1004            AlterIndexOperation::SetConfig { entries } => {
1005                alter_streaming_config::handle_alter_streaming_set_config(
1006                    handler_args,
1007                    name,
1008                    entries,
1009                    StatementType::ALTER_INDEX,
1010                )
1011                .await
1012            }
1013            AlterIndexOperation::ResetConfig { keys } => {
1014                alter_streaming_config::handle_alter_streaming_reset_config(
1015                    handler_args,
1016                    name,
1017                    keys,
1018                    StatementType::ALTER_INDEX,
1019                )
1020                .await
1021            }
1022        },
1023        Statement::AlterView {
1024            materialized,
1025            name,
1026            operation,
1027        } => {
1028            let statement_type = if materialized {
1029                StatementType::ALTER_MATERIALIZED_VIEW
1030            } else {
1031                StatementType::ALTER_VIEW
1032            };
1033            match operation {
1034                AlterViewOperation::RenameView { view_name } => {
1035                    if materialized {
1036                        alter_rename::handle_rename_table(
1037                            handler_args,
1038                            TableType::MaterializedView,
1039                            name,
1040                            view_name,
1041                        )
1042                        .await
1043                    } else {
1044                        alter_rename::handle_rename_view(handler_args, name, view_name).await
1045                    }
1046                }
1047                AlterViewOperation::SetParallelism {
1048                    parallelism,
1049                    deferred,
1050                } => {
1051                    if !materialized {
1052                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1053                    }
1054                    alter_parallelism::handle_alter_parallelism(
1055                        handler_args,
1056                        name,
1057                        parallelism,
1058                        statement_type,
1059                        deferred,
1060                    )
1061                    .await
1062                }
1063                AlterViewOperation::SetBackfillParallelism {
1064                    parallelism,
1065                    deferred,
1066                } => {
1067                    if !materialized {
1068                        bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1069                    }
1070                    alter_parallelism::handle_alter_backfill_parallelism(
1071                        handler_args,
1072                        name,
1073                        parallelism,
1074                        statement_type,
1075                        deferred,
1076                    )
1077                    .await
1078                }
1079                AlterViewOperation::SetResourceGroup {
1080                    resource_group,
1081                    deferred,
1082                } => {
1083                    if !materialized {
1084                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1085                    }
1086                    alter_resource_group::handle_alter_resource_group(
1087                        handler_args,
1088                        name,
1089                        resource_group,
1090                        statement_type,
1091                        deferred,
1092                    )
1093                    .await
1094                }
1095                AlterViewOperation::ChangeOwner { new_owner_name } => {
1096                    alter_owner::handle_alter_owner(
1097                        handler_args,
1098                        name,
1099                        new_owner_name,
1100                        statement_type,
1101                        None,
1102                    )
1103                    .await
1104                }
1105                AlterViewOperation::SetSchema { new_schema_name } => {
1106                    alter_set_schema::handle_alter_set_schema(
1107                        handler_args,
1108                        name,
1109                        new_schema_name,
1110                        statement_type,
1111                        None,
1112                    )
1113                    .await
1114                }
1115                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1116                    if !materialized {
1117                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1118                    }
1119                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1120                        handler_args,
1121                        PbThrottleTarget::Mv,
1122                        risingwave_pb::common::PbThrottleType::Backfill,
1123                        name,
1124                        rate_limit,
1125                    )
1126                    .await
1127                }
1128                AlterViewOperation::SwapRenameView { target_view } => {
1129                    alter_swap_rename::handle_swap_rename(
1130                        handler_args,
1131                        name,
1132                        target_view,
1133                        statement_type,
1134                    )
1135                    .await
1136                }
1137                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1138                    if !materialized {
1139                        bail!(
1140                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1141                        );
1142                    }
1143                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1144                }
1145                AlterViewOperation::AsQuery { query } => {
1146                    if !materialized {
1147                        bail_not_implemented!("ALTER VIEW AS QUERY");
1148                    }
1149                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1150                    if !cfg!(debug_assertions) {
1151                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1152                    }
1153                    alter_mv::handle_alter_mv(handler_args, name, query).await
1154                }
1155                AlterViewOperation::SetConfig { entries } => {
1156                    if !materialized {
1157                        bail!("SET CONFIG is only supported for materialized views");
1158                    }
1159                    alter_streaming_config::handle_alter_streaming_set_config(
1160                        handler_args,
1161                        name,
1162                        entries,
1163                        statement_type,
1164                    )
1165                    .await
1166                }
1167                AlterViewOperation::ResetConfig { keys } => {
1168                    if !materialized {
1169                        bail!("RESET CONFIG is only supported for materialized views");
1170                    }
1171                    alter_streaming_config::handle_alter_streaming_reset_config(
1172                        handler_args,
1173                        name,
1174                        keys,
1175                        statement_type,
1176                    )
1177                    .await
1178                }
1179            }
1180        }
1181
1182        Statement::AlterSink { name, operation } => match operation {
1183            AlterSinkOperation::AlterConnectorProps {
1184                alter_props: changed_props,
1185            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1186            AlterSinkOperation::RenameSink { sink_name } => {
1187                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1188            }
1189            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1190                alter_owner::handle_alter_owner(
1191                    handler_args,
1192                    name,
1193                    new_owner_name,
1194                    StatementType::ALTER_SINK,
1195                    None,
1196                )
1197                .await
1198            }
1199            AlterSinkOperation::SetSchema { new_schema_name } => {
1200                alter_set_schema::handle_alter_set_schema(
1201                    handler_args,
1202                    name,
1203                    new_schema_name,
1204                    StatementType::ALTER_SINK,
1205                    None,
1206                )
1207                .await
1208            }
1209            AlterSinkOperation::SetParallelism {
1210                parallelism,
1211                deferred,
1212            } => {
1213                alter_parallelism::handle_alter_parallelism(
1214                    handler_args,
1215                    name,
1216                    parallelism,
1217                    StatementType::ALTER_SINK,
1218                    deferred,
1219                )
1220                .await
1221            }
1222            AlterSinkOperation::SetBackfillParallelism {
1223                parallelism,
1224                deferred,
1225            } => {
1226                alter_parallelism::handle_alter_backfill_parallelism(
1227                    handler_args,
1228                    name,
1229                    parallelism,
1230                    StatementType::ALTER_SINK,
1231                    deferred,
1232                )
1233                .await
1234            }
1235            AlterSinkOperation::SetConfig { entries } => {
1236                alter_streaming_config::handle_alter_streaming_set_config(
1237                    handler_args,
1238                    name,
1239                    entries,
1240                    StatementType::ALTER_SINK,
1241                )
1242                .await
1243            }
1244            AlterSinkOperation::ResetConfig { keys } => {
1245                alter_streaming_config::handle_alter_streaming_reset_config(
1246                    handler_args,
1247                    name,
1248                    keys,
1249                    StatementType::ALTER_SINK,
1250                )
1251                .await
1252            }
1253            AlterSinkOperation::SwapRenameSink { target_sink } => {
1254                alter_swap_rename::handle_swap_rename(
1255                    handler_args,
1256                    name,
1257                    target_sink,
1258                    StatementType::ALTER_SINK,
1259                )
1260                .await
1261            }
1262            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1263                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1264                    handler_args,
1265                    PbThrottleTarget::Sink,
1266                    risingwave_pb::common::PbThrottleType::Sink,
1267                    name,
1268                    rate_limit,
1269                )
1270                .await
1271            }
1272            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1273                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1274                    handler_args,
1275                    PbThrottleTarget::Sink,
1276                    risingwave_pb::common::PbThrottleType::Backfill,
1277                    name,
1278                    rate_limit,
1279                )
1280                .await
1281            }
1282            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1283                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1284                    handler_args,
1285                    name,
1286                    enable,
1287                )
1288                .await
1289            }
1290        },
1291        Statement::AlterSubscription { name, operation } => match operation {
1292            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1293                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1294                    .await
1295            }
1296            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1297                alter_owner::handle_alter_owner(
1298                    handler_args,
1299                    name,
1300                    new_owner_name,
1301                    StatementType::ALTER_SUBSCRIPTION,
1302                    None,
1303                )
1304                .await
1305            }
1306            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1307                alter_set_schema::handle_alter_set_schema(
1308                    handler_args,
1309                    name,
1310                    new_schema_name,
1311                    StatementType::ALTER_SUBSCRIPTION,
1312                    None,
1313                )
1314                .await
1315            }
1316            AlterSubscriptionOperation::SetRetention { retention } => {
1317                alter_subscription_retention::handle_alter_subscription_retention(
1318                    handler_args,
1319                    name,
1320                    retention,
1321                )
1322                .await
1323            }
1324            AlterSubscriptionOperation::SwapRenameSubscription {
1325                target_subscription,
1326            } => {
1327                alter_swap_rename::handle_swap_rename(
1328                    handler_args,
1329                    name,
1330                    target_subscription,
1331                    StatementType::ALTER_SUBSCRIPTION,
1332                )
1333                .await
1334            }
1335        },
1336        Statement::AlterSource { name, operation } => match operation {
1337            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1338                alter_source_props::handle_alter_source_connector_props(
1339                    handler_args,
1340                    name,
1341                    alter_props,
1342                )
1343                .await
1344            }
1345            AlterSourceOperation::RenameSource { source_name } => {
1346                alter_rename::handle_rename_source(handler_args, name, source_name).await
1347            }
1348            AlterSourceOperation::AddColumn { .. } => {
1349                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1350            }
1351            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1352                alter_owner::handle_alter_owner(
1353                    handler_args,
1354                    name,
1355                    new_owner_name,
1356                    StatementType::ALTER_SOURCE,
1357                    None,
1358                )
1359                .await
1360            }
1361            AlterSourceOperation::SetSchema { new_schema_name } => {
1362                alter_set_schema::handle_alter_set_schema(
1363                    handler_args,
1364                    name,
1365                    new_schema_name,
1366                    StatementType::ALTER_SOURCE,
1367                    None,
1368                )
1369                .await
1370            }
1371            AlterSourceOperation::FormatEncode { format_encode } => {
1372                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1373                    .await
1374            }
1375            AlterSourceOperation::RefreshSchema => {
1376                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1377            }
1378            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1379                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1380                    handler_args,
1381                    PbThrottleTarget::Source,
1382                    risingwave_pb::common::PbThrottleType::Source,
1383                    name,
1384                    rate_limit,
1385                )
1386                .await
1387            }
1388            AlterSourceOperation::SwapRenameSource { target_source } => {
1389                alter_swap_rename::handle_swap_rename(
1390                    handler_args,
1391                    name,
1392                    target_source,
1393                    StatementType::ALTER_SOURCE,
1394                )
1395                .await
1396            }
1397            AlterSourceOperation::SetParallelism {
1398                parallelism,
1399                deferred,
1400            } => {
1401                alter_parallelism::handle_alter_parallelism(
1402                    handler_args,
1403                    name,
1404                    parallelism,
1405                    StatementType::ALTER_SOURCE,
1406                    deferred,
1407                )
1408                .await
1409            }
1410            AlterSourceOperation::SetBackfillParallelism {
1411                parallelism,
1412                deferred,
1413            } => {
1414                alter_parallelism::handle_alter_backfill_parallelism(
1415                    handler_args,
1416                    name,
1417                    parallelism,
1418                    StatementType::ALTER_SOURCE,
1419                    deferred,
1420                )
1421                .await
1422            }
1423            AlterSourceOperation::SetConfig { entries } => {
1424                alter_streaming_config::handle_alter_streaming_set_config(
1425                    handler_args,
1426                    name,
1427                    entries,
1428                    StatementType::ALTER_SOURCE,
1429                )
1430                .await
1431            }
1432            AlterSourceOperation::ResetConfig { keys } => {
1433                alter_streaming_config::handle_alter_streaming_reset_config(
1434                    handler_args,
1435                    name,
1436                    keys,
1437                    StatementType::ALTER_SOURCE,
1438                )
1439                .await
1440            }
1441            AlterSourceOperation::ResetSource => {
1442                reset_source::handle_reset_source(handler_args, name).await
1443            }
1444        },
1445        Statement::AlterFunction {
1446            name,
1447            args,
1448            operation,
1449        } => match operation {
1450            AlterFunctionOperation::SetSchema { new_schema_name } => {
1451                alter_set_schema::handle_alter_set_schema(
1452                    handler_args,
1453                    name,
1454                    new_schema_name,
1455                    StatementType::ALTER_FUNCTION,
1456                    args,
1457                )
1458                .await
1459            }
1460            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1461                alter_owner::handle_alter_owner(
1462                    handler_args,
1463                    name,
1464                    new_owner_name,
1465                    StatementType::ALTER_FUNCTION,
1466                    args,
1467                )
1468                .await
1469            }
1470        },
1471        Statement::AlterConnection { name, operation } => match operation {
1472            AlterConnectionOperation::SetSchema { new_schema_name } => {
1473                alter_set_schema::handle_alter_set_schema(
1474                    handler_args,
1475                    name,
1476                    new_schema_name,
1477                    StatementType::ALTER_CONNECTION,
1478                    None,
1479                )
1480                .await
1481            }
1482            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1483                alter_owner::handle_alter_owner(
1484                    handler_args,
1485                    name,
1486                    new_owner_name,
1487                    StatementType::ALTER_CONNECTION,
1488                    None,
1489                )
1490                .await
1491            }
1492            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1493                alter_connection_props::handle_alter_connection_connector_props(
1494                    handler_args,
1495                    name,
1496                    alter_props,
1497                )
1498                .await
1499            }
1500        },
1501        Statement::AlterSystem { param, value } => {
1502            alter_system::handle_alter_system(handler_args, param, value).await
1503        }
1504        Statement::AlterSecret { name, operation } => match operation {
1505            AlterSecretOperation::ChangeCredential {
1506                with_options,
1507                new_credential,
1508            } => {
1509                alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1510                    .await
1511            }
1512            AlterSecretOperation::ChangeOwner { new_owner_name } => {
1513                alter_owner::handle_alter_owner(
1514                    handler_args,
1515                    name,
1516                    new_owner_name,
1517                    StatementType::ALTER_SECRET,
1518                    None,
1519                )
1520                .await
1521            }
1522        },
1523        Statement::AlterFragment {
1524            fragment_ids,
1525            operation,
1526        } => match operation {
1527            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1528                let [fragment_id] = fragment_ids.as_slice() else {
1529                    return Err(ErrorCode::InvalidInputSyntax(
1530                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1531                            .to_owned(),
1532                    )
1533                    .into());
1534                };
1535                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1536                    &handler_args.session,
1537                    PbThrottleTarget::Fragment,
1538                    risingwave_pb::common::PbThrottleType::Backfill,
1539                    *fragment_id,
1540                    rate_limit,
1541                    StatementType::SET_VARIABLE,
1542                )
1543                .await
1544            }
1545            AlterFragmentOperation::SetParallelism { parallelism } => {
1546                alter_parallelism::handle_alter_fragment_parallelism(
1547                    handler_args,
1548                    fragment_ids.into_iter().map_into().collect(),
1549                    parallelism,
1550                )
1551                .await
1552            }
1553        },
1554        Statement::AlterDefaultPrivileges { .. } => {
1555            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1556        }
1557        Statement::StartTransaction { modes } => {
1558            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1559        }
1560        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1561        Statement::Commit { chain } => {
1562            transaction::handle_commit(handler_args, COMMIT, chain).await
1563        }
1564        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1565        Statement::Rollback { chain } => {
1566            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1567        }
1568        Statement::SetTransaction {
1569            modes,
1570            snapshot,
1571            session,
1572        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1573        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1574        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1575        Statement::Comment {
1576            object_type,
1577            object_name,
1578            comment,
1579        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1580        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1581        Statement::Prepare {
1582            name,
1583            data_types,
1584            statement,
1585        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1586        Statement::Deallocate { name, prepare } => {
1587            prepared_statement::handle_deallocate(name, prepare).await
1588        }
1589        Statement::Vacuum { object_name, full } => {
1590            vacuum::handle_vacuum(handler_args, object_name, full).await
1591        }
1592        Statement::Refresh { table_name } => {
1593            refresh::handle_refresh(handler_args, table_name).await
1594        }
1595        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1596    }
1597}
1598
1599fn check_ban_ddl_for_iceberg_engine_table(
1600    session: Arc<SessionImpl>,
1601    stmt: &Statement,
1602) -> Result<()> {
1603    if let Statement::AlterTable { name, operation } = stmt {
1604        let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1605        if table.is_iceberg_engine_table() {
1606            let has_auto_refresh_schema_sink =
1607                if matches!(operation, AlterTableOperation::AddColumn { .. }) {
1608                    let catalog_reader = session.env().catalog_reader().read_guard();
1609                    let db_name = session.database();
1610                    let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name());
1611                    let sink = catalog_reader
1612                        .get_schema_by_name(&db_name, &schema_name)
1613                        .ok()
1614                        .and_then(|schema| schema.get_created_sink_by_name(&sink_name));
1615                    sink.and_then(|s| s.auto_refresh_schema_from_table)
1616                        .is_some()
1617                } else {
1618                    false
1619                };
1620
1621            check_ban_alter_table_operation_for_iceberg_engine_table(
1622                operation,
1623                &schema_name,
1624                name,
1625                has_auto_refresh_schema_sink,
1626            )?;
1627        }
1628    }
1629
1630    Ok(())
1631}
1632
1633fn check_ban_alter_table_operation_for_iceberg_engine_table(
1634    operation: &AlterTableOperation,
1635    schema_name: &str,
1636    table_name: &ObjectName,
1637    has_auto_refresh_schema_sink: bool,
1638) -> Result<()> {
1639    match operation {
1640        AlterTableOperation::AddColumn { .. } => {
1641            if !has_auto_refresh_schema_sink {
1642                bail!(
1643                    "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1644                    operation,
1645                    schema_name,
1646                    table_name
1647                );
1648            }
1649        }
1650        AlterTableOperation::DropColumn { .. } => {
1651            // TODO: allow DROP COLUMN for iceberg table after iceberg sink schema change supports drop.
1652            bail!(
1653                "ALTER TABLE DROP COLUMN is not supported for iceberg table: {}.{}",
1654                schema_name,
1655                table_name
1656            );
1657        }
1658        AlterTableOperation::RenameColumn { .. }
1659        | AlterTableOperation::ChangeColumn { .. }
1660        | AlterTableOperation::AlterColumn { .. } => {
1661            bail!(
1662                "ALTER TABLE {} is not supported for iceberg table: {}.{}. Existing column schema mutation is not supported currently",
1663                operation,
1664                schema_name,
1665                table_name
1666            );
1667        }
1668        AlterTableOperation::RenameTable { .. } => {
1669            bail!(
1670                "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1671                schema_name,
1672                table_name
1673            );
1674        }
1675        AlterTableOperation::SetParallelism { .. } => {
1676            bail!(
1677                "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1678                schema_name,
1679                table_name
1680            );
1681        }
1682        AlterTableOperation::SetBackfillParallelism { .. } => {
1683            bail!(
1684                "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1685                schema_name,
1686                table_name
1687            );
1688        }
1689        AlterTableOperation::SetSchema { .. } => {
1690            bail!(
1691                "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1692                schema_name,
1693                table_name
1694            );
1695        }
1696        AlterTableOperation::RefreshSchema => {
1697            bail!(
1698                "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1699                schema_name,
1700                table_name
1701            );
1702        }
1703        AlterTableOperation::SetSourceRateLimit { .. } => {
1704            bail!(
1705                "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1706                schema_name,
1707                table_name
1708            );
1709        }
1710        _ => {}
1711    }
1712    Ok(())
1713}