risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX};
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_compaction_group;
46mod alter_connection_props;
47mod alter_database_param;
48mod alter_mv;
49mod alter_owner;
50mod alter_parallelism;
51mod alter_rename;
52mod alter_resource_group;
53mod alter_secret;
54mod alter_set_schema;
55mod alter_sink_props;
56mod alter_source_column;
57mod alter_source_props;
58mod alter_source_with_sr;
59mod alter_streaming_config;
60mod alter_streaming_enable_unaligned_join;
61mod alter_streaming_rate_limit;
62mod alter_subscription_retention;
63mod alter_swap_rename;
64mod alter_system;
65mod alter_table_column;
66pub mod alter_table_drop_connector;
67pub mod alter_table_props;
68mod alter_table_with_sr;
69pub mod alter_user;
70mod alter_utils;
71mod alter_watermark;
72mod backup;
73pub mod cancel_job;
74pub mod close_cursor;
75mod comment;
76pub mod create_aggregate;
77pub mod create_connection;
78mod create_database;
79pub mod create_function;
80pub mod create_index;
81pub mod create_mv;
82pub mod create_schema;
83pub mod create_secret;
84pub mod create_sink;
85pub mod create_source;
86pub mod create_sql_function;
87pub mod create_subscription;
88pub mod create_table;
89pub mod create_table_as;
90pub mod create_user;
91pub mod create_view;
92pub mod declare_cursor;
93mod delete_meta_snapshot;
94pub mod describe;
95pub mod discard;
96mod drop_connection;
97mod drop_database;
98pub mod drop_function;
99mod drop_index;
100pub mod drop_mv;
101mod drop_schema;
102pub mod drop_secret;
103pub mod drop_sink;
104pub mod drop_source;
105pub mod drop_subscription;
106pub mod drop_table;
107pub mod drop_user;
108mod drop_view;
109pub mod explain;
110pub mod explain_analyze_stream_job;
111pub mod extended_handle;
112pub mod fetch_cursor;
113mod flush;
114pub mod handle_privilege;
115pub mod kill_process;
116mod prepared_statement;
117pub mod privilege;
118pub mod query;
119mod recover;
120mod refresh;
121mod reset_source;
122pub mod show;
123mod transaction;
124mod use_db;
125pub mod util;
126pub mod vacuum;
127pub mod variable;
128mod wait;
129
130pub use alter_table_column::{
131    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
132};
133
134/// The [`PgResponseBuilder`] used by RisingWave.
135pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
136
137/// The [`PgResponse`] used by RisingWave.
138pub type RwPgResponse = PgResponse<PgResponseStream>;
139
140#[easy_ext::ext(RwPgResponseBuilderExt)]
141impl RwPgResponseBuilder {
142    /// Append rows to the response.
143    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
144        let fields = T::fields();
145        self.values(
146            rows.into_iter()
147                .map(|row| {
148                    Row::new(
149                        row.into_owned_row()
150                            .into_iter()
151                            .zip_eq_fast(&fields)
152                            .map(|(datum, (_, ty))| {
153                                datum.map(|scalar| {
154                                    scalar.as_scalar_ref_impl().text_format(ty).into()
155                                })
156                            })
157                            .collect(),
158                    )
159                })
160                .collect_vec()
161                .into(),
162            fields_to_descriptors(fields),
163        )
164    }
165}
166
167pub fn fields_to_descriptors(
168    fields: Vec<(&str, risingwave_common::types::DataType)>,
169) -> Vec<PgFieldDescriptor> {
170    fields
171        .iter()
172        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
173        .collect()
174}
175
176pub enum PgResponseStream {
177    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
178    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
179    Rows(BoxStream<'static, RowSetResult>),
180}
181
182impl Stream for PgResponseStream {
183    type Item = std::result::Result<Vec<Row>, BoxedError>;
184
185    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
186        match &mut *self {
187            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
188            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
189            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
190        }
191    }
192}
193
194impl From<Vec<Row>> for PgResponseStream {
195    fn from(rows: Vec<Row>) -> Self {
196        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
197    }
198}
199
200#[derive(Clone)]
201pub struct HandlerArgs {
202    pub session: Arc<SessionImpl>,
203    pub sql: Arc<str>,
204    pub normalized_sql: String,
205    pub with_options: WithOptions,
206}
207
208impl HandlerArgs {
209    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
210        Ok(Self {
211            session,
212            sql,
213            with_options: WithOptions::try_from(stmt)?,
214            normalized_sql: Self::normalize_sql(stmt),
215        })
216    }
217
218    /// Get normalized SQL from the statement.
219    ///
220    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
221    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
222    ///   make it suitable for the `SHOW CREATE` statements.
223    fn normalize_sql(stmt: &Statement) -> String {
224        let mut stmt = stmt.clone();
225        match &mut stmt {
226            Statement::CreateView {
227                or_replace,
228                if_not_exists,
229                ..
230            } => {
231                *or_replace = false;
232                *if_not_exists = false;
233            }
234            Statement::CreateTable {
235                or_replace,
236                if_not_exists,
237                ..
238            } => {
239                *or_replace = false;
240                *if_not_exists = false;
241            }
242            Statement::CreateIndex { if_not_exists, .. } => {
243                *if_not_exists = false;
244            }
245            Statement::CreateSource {
246                stmt: CreateSourceStatement { if_not_exists, .. },
247                ..
248            } => {
249                *if_not_exists = false;
250            }
251            Statement::CreateSink {
252                stmt: CreateSinkStatement { if_not_exists, .. },
253            } => {
254                *if_not_exists = false;
255            }
256            Statement::CreateSubscription {
257                stmt: CreateSubscriptionStatement { if_not_exists, .. },
258            } => {
259                *if_not_exists = false;
260            }
261            Statement::CreateConnection {
262                stmt: CreateConnectionStatement { if_not_exists, .. },
263            } => {
264                *if_not_exists = false;
265            }
266            _ => {}
267        }
268        stmt.to_string()
269    }
270}
271
272#[expect(clippy::large_stack_frames)]
273pub async fn handle(
274    session: Arc<SessionImpl>,
275    stmt: Statement,
276    sql: Arc<str>,
277    formats: Vec<Format>,
278) -> Result<RwPgResponse> {
279    session.clear_cancel_query_flag();
280    let _guard = session.txn_begin_implicit();
281    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
282
283    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
284
285    match stmt {
286        Statement::Explain {
287            statement,
288            analyze,
289            options,
290        } => {
291            Box::pin(explain::handle_explain(
292                handler_args,
293                *statement,
294                options,
295                analyze,
296            ))
297            .await
298        }
299        Statement::ExplainAnalyzeStreamJob {
300            target,
301            duration_secs,
302        } => {
303            explain_analyze_stream_job::handle_explain_analyze_stream_job(
304                handler_args,
305                target,
306                duration_secs,
307            )
308            .await
309        }
310        Statement::CreateSource { stmt } => {
311            create_source::handle_create_source(handler_args, stmt).await
312        }
313        Statement::CreateSink { stmt } => {
314            create_sink::handle_create_sink(handler_args, stmt, false).await
315        }
316        Statement::CreateSubscription { stmt } => {
317            create_subscription::handle_create_subscription(handler_args, stmt).await
318        }
319        Statement::CreateConnection { stmt } => {
320            create_connection::handle_create_connection(handler_args, stmt).await
321        }
322        Statement::CreateSecret { stmt } => {
323            create_secret::handle_create_secret(handler_args, stmt).await
324        }
325        Statement::CreateFunction {
326            or_replace,
327            temporary,
328            if_not_exists,
329            name,
330            args,
331            returns,
332            params,
333            with_options,
334        } => {
335            // For general udf, `language` clause could be ignored
336            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
337            if params.language.is_none()
338                || !params
339                    .language
340                    .as_ref()
341                    .unwrap()
342                    .real_value()
343                    .eq_ignore_ascii_case("sql")
344            {
345                create_function::handle_create_function(
346                    handler_args,
347                    or_replace,
348                    temporary,
349                    if_not_exists,
350                    name,
351                    args,
352                    returns,
353                    params,
354                    with_options,
355                )
356                .await
357            } else {
358                create_sql_function::handle_create_sql_function(
359                    handler_args,
360                    or_replace,
361                    temporary,
362                    if_not_exists,
363                    name,
364                    args,
365                    returns,
366                    params,
367                )
368                .await
369            }
370        }
371        Statement::CreateAggregate {
372            or_replace,
373            if_not_exists,
374            name,
375            args,
376            returns,
377            params,
378            ..
379        } => {
380            create_aggregate::handle_create_aggregate(
381                handler_args,
382                or_replace,
383                if_not_exists,
384                name,
385                args,
386                returns,
387                params,
388            )
389            .await
390        }
391        Statement::CreateTable {
392            name,
393            columns,
394            wildcard_idx,
395            constraints,
396            query,
397            with_options: _, // It is put in OptimizerContext
398            // Not supported things
399            or_replace,
400            temporary,
401            if_not_exists,
402            format_encode,
403            source_watermarks,
404            append_only,
405            on_conflict,
406            with_version_columns,
407            cdc_table_info,
408            include_column_options,
409            webhook_info,
410            engine,
411        } => {
412            if or_replace {
413                bail_not_implemented!("CREATE OR REPLACE TABLE");
414            }
415            if temporary {
416                bail_not_implemented!("CREATE TEMPORARY TABLE");
417            }
418            if let Some(query) = query {
419                return create_table_as::handle_create_as(
420                    handler_args,
421                    name,
422                    if_not_exists,
423                    query,
424                    columns,
425                    append_only,
426                    on_conflict,
427                    with_version_columns
428                        .iter()
429                        .map(|col| col.real_value())
430                        .collect(),
431                    engine,
432                )
433                .await;
434            }
435            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
436            Box::pin(create_table::handle_create_table(
437                handler_args,
438                name,
439                columns,
440                wildcard_idx,
441                constraints,
442                if_not_exists,
443                format_encode,
444                source_watermarks,
445                append_only,
446                on_conflict,
447                with_version_columns
448                    .iter()
449                    .map(|col| col.real_value())
450                    .collect(),
451                cdc_table_info,
452                include_column_options,
453                webhook_info,
454                engine,
455            ))
456            .await
457        }
458        Statement::CreateDatabase {
459            db_name,
460            if_not_exists,
461            owner,
462            resource_group,
463            barrier_interval_ms,
464            checkpoint_frequency,
465        } => {
466            create_database::handle_create_database(
467                handler_args,
468                db_name,
469                if_not_exists,
470                owner,
471                resource_group,
472                barrier_interval_ms,
473                checkpoint_frequency,
474            )
475            .await
476        }
477        Statement::CreateSchema {
478            schema_name,
479            if_not_exists,
480            owner,
481        } => {
482            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
483                .await
484        }
485        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
486        Statement::DeclareCursor { stmt } => {
487            declare_cursor::handle_declare_cursor(handler_args, stmt).await
488        }
489        Statement::FetchCursor { stmt } => {
490            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
491        }
492        Statement::CloseCursor { stmt } => {
493            close_cursor::handle_close_cursor(handler_args, stmt).await
494        }
495        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
496        Statement::Grant { .. } => {
497            handle_privilege::handle_grant_privilege(handler_args, stmt).await
498        }
499        Statement::Revoke { .. } => {
500            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
501        }
502        Statement::Describe { name, kind } => match kind {
503            DescribeKind::Fragments => {
504                describe::handle_describe_fragments(handler_args, name).await
505            }
506            DescribeKind::Plain => describe::handle_describe(handler_args, name),
507        },
508        Statement::DescribeFragment { fragment_id } => {
509            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
510        }
511        Statement::Discard(..) => discard::handle_discard(handler_args),
512        Statement::ShowObjects {
513            object: show_object,
514            filter,
515        } => show::handle_show_object(handler_args, show_object, filter).await,
516        Statement::ShowCreateObject { create_type, name } => {
517            show::handle_show_create_object(handler_args, create_type, name)
518        }
519        Statement::ShowTransactionIsolationLevel => {
520            transaction::handle_show_isolation_level(handler_args)
521        }
522        Statement::Drop(DropStatement {
523            object_type,
524            object_name,
525            if_exists,
526            drop_mode,
527        }) => {
528            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
529                match object_type {
530                    ObjectType::MaterializedView
531                    | ObjectType::View
532                    | ObjectType::Sink
533                    | ObjectType::Source
534                    | ObjectType::Subscription
535                    | ObjectType::Index
536                    | ObjectType::Table
537                    | ObjectType::Schema
538                    | ObjectType::Connection
539                    | ObjectType::Secret => true,
540                    ObjectType::Database | ObjectType::User => {
541                        bail_not_implemented!("DROP CASCADE");
542                    }
543                }
544            } else {
545                false
546            };
547            match object_type {
548                ObjectType::Table => {
549                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
550                        .await
551                }
552                ObjectType::MaterializedView => {
553                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
554                }
555                ObjectType::Index => {
556                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
557                        .await
558                }
559                ObjectType::Source => {
560                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
561                        .await
562                }
563                ObjectType::Sink => {
564                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
565                }
566                ObjectType::Subscription => {
567                    drop_subscription::handle_drop_subscription(
568                        handler_args,
569                        object_name,
570                        if_exists,
571                        cascade,
572                    )
573                    .await
574                }
575                ObjectType::Database => {
576                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
577                }
578                ObjectType::Schema => {
579                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
580                        .await
581                }
582                ObjectType::User => {
583                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
584                }
585                ObjectType::View => {
586                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
587                }
588                ObjectType::Connection => {
589                    drop_connection::handle_drop_connection(
590                        handler_args,
591                        object_name,
592                        if_exists,
593                        cascade,
594                    )
595                    .await
596                }
597                ObjectType::Secret => {
598                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
599                        .await
600                }
601            }
602        }
603        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
604        Statement::DropFunction {
605            if_exists,
606            func_desc,
607            option,
608        } => {
609            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
610                .await
611        }
612        Statement::DropAggregate {
613            if_exists,
614            func_desc,
615            option,
616        } => {
617            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
618                .await
619        }
620        Statement::Query(_)
621        | Statement::Insert { .. }
622        | Statement::Delete { .. }
623        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
624        Statement::Copy {
625            entity: CopyEntity::Query(query),
626            target: CopyTarget::Stdout,
627        } => {
628            let response =
629                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
630                    .await?;
631            Ok(response.into_copy_query_to_stdout())
632        }
633        Statement::CreateView {
634            materialized,
635            if_not_exists,
636            name,
637            columns,
638            query,
639            with_options: _, // It is put in OptimizerContext
640            or_replace,      // not supported
641            emit_mode,
642        } => {
643            if or_replace {
644                bail_not_implemented!("CREATE OR REPLACE VIEW");
645            }
646            if materialized {
647                create_mv::handle_create_mv(
648                    handler_args,
649                    if_not_exists,
650                    name,
651                    *query,
652                    columns,
653                    emit_mode,
654                )
655                .await
656            } else {
657                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
658                    .await
659            }
660        }
661        Statement::Flush => flush::handle_flush(handler_args).await,
662        Statement::Wait(target) => wait::handle_wait(handler_args, target).await,
663        Statement::Backup => backup::handle_backup(handler_args).await,
664        Statement::DeleteMetaSnapshots { snapshot_ids } => {
665            delete_meta_snapshot::handle_delete_meta_snapshots(handler_args, snapshot_ids).await
666        }
667        Statement::Recover => recover::handle_recover(handler_args).await,
668        Statement::SetVariable {
669            local: _,
670            variable,
671            value,
672        } => {
673            // special handle for `use database`
674            if variable.real_value().eq_ignore_ascii_case("database") {
675                let x = variable::set_var_to_param_str(&value);
676                let res = use_db::handle_use_db(
677                    handler_args,
678                    ObjectName::from(vec![Ident::from_real_value(
679                        x.as_deref().unwrap_or("default"),
680                    )]),
681                )?;
682                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
683                for notice in res.notices() {
684                    builder = builder.notice(notice);
685                }
686                return Ok(builder.into());
687            }
688            variable::handle_set(handler_args, variable, value)
689        }
690        Statement::SetTimeZone { local: _, value } => {
691            variable::handle_set_time_zone(handler_args, value)
692        }
693        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
694        Statement::CreateIndex {
695            name,
696            table_name,
697            method,
698            columns,
699            include,
700            distributed_by,
701            unique,
702            if_not_exists,
703            with_properties: _,
704        } => {
705            if unique {
706                bail_not_implemented!("create unique index");
707            }
708
709            create_index::handle_create_index(
710                handler_args,
711                if_not_exists,
712                name,
713                table_name,
714                method,
715                columns.clone(),
716                include,
717                distributed_by,
718            )
719            .await
720        }
721        Statement::AlterDatabase { name, operation } => match operation {
722            AlterDatabaseOperation::RenameDatabase { database_name } => {
723                alter_rename::handle_rename_database(handler_args, name, database_name).await
724            }
725            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
726                alter_owner::handle_alter_owner(
727                    handler_args,
728                    name,
729                    new_owner_name,
730                    StatementType::ALTER_DATABASE,
731                    None,
732                )
733                .await
734            }
735            AlterDatabaseOperation::SetParam(config_param) => {
736                let ConfigParam { param, value } = config_param;
737
738                let database_param = match param.real_value().to_uppercase().as_str() {
739                    "BARRIER_INTERVAL_MS" => {
740                        let barrier_interval_ms = match value {
741                            SetVariableValue::Default => None,
742                            SetVariableValue::Single(SetVariableValueSingle::Literal(
743                                Value::Number(num),
744                            )) => {
745                                let num = num.parse::<u32>().map_err(|e| {
746                                    ErrorCode::InvalidInputSyntax(format!(
747                                        "barrier_interval_ms must be a u32 integer: {}",
748                                        e.as_report()
749                                    ))
750                                })?;
751                                Some(num)
752                            }
753                            _ => {
754                                return Err(ErrorCode::InvalidInputSyntax(
755                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
756                                        .to_owned(),
757                                )
758                                .into());
759                            }
760                        };
761                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
762                    }
763                    "CHECKPOINT_FREQUENCY" => {
764                        let checkpoint_frequency = match value {
765                            SetVariableValue::Default => None,
766                            SetVariableValue::Single(SetVariableValueSingle::Literal(
767                                Value::Number(num),
768                            )) => {
769                                let num = num.parse::<u64>().map_err(|e| {
770                                    ErrorCode::InvalidInputSyntax(format!(
771                                        "checkpoint_frequency must be a u64 integer: {}",
772                                        e.as_report()
773                                    ))
774                                })?;
775                                Some(num)
776                            }
777                            _ => {
778                                return Err(ErrorCode::InvalidInputSyntax(
779                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
780                                        .to_owned(),
781                                )
782                                .into());
783                            }
784                        };
785                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
786                    }
787                    _ => {
788                        return Err(ErrorCode::InvalidInputSyntax(format!(
789                            "Unsupported database config parameter: {}",
790                            param.real_value()
791                        ))
792                        .into());
793                    }
794                };
795
796                alter_database_param::handle_alter_database_param(
797                    handler_args,
798                    name,
799                    database_param,
800                )
801                .await
802            }
803            AlterDatabaseOperation::SetResourceGroup {
804                resource_group,
805                deferred,
806            } => {
807                alter_database_param::handle_alter_database_resource_group(
808                    handler_args,
809                    name,
810                    resource_group,
811                    deferred,
812                )
813                .await
814            }
815        },
816        Statement::AlterSchema { name, operation } => match operation {
817            AlterSchemaOperation::RenameSchema { schema_name } => {
818                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
819            }
820            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
821                alter_owner::handle_alter_owner(
822                    handler_args,
823                    name,
824                    new_owner_name,
825                    StatementType::ALTER_SCHEMA,
826                    None,
827                )
828                .await
829            }
830            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
831                alter_swap_rename::handle_swap_rename(
832                    handler_args,
833                    name,
834                    target_schema,
835                    StatementType::ALTER_SCHEMA,
836                )
837                .await
838            }
839        },
840        Statement::AlterTable { name, operation } => match operation {
841            AlterTableOperation::AddColumn { .. }
842            | AlterTableOperation::DropColumn { .. }
843            | AlterTableOperation::AlterColumn { .. } => {
844                Box::pin(alter_table_column::handle_alter_table_column(
845                    handler_args,
846                    name,
847                    operation,
848                ))
849                .await
850            }
851            AlterTableOperation::AlterWatermark {
852                column_name,
853                expr,
854                with_ttl,
855            } => {
856                Box::pin(alter_watermark::handle_alter_watermark(
857                    handler_args,
858                    name,
859                    column_name,
860                    expr,
861                    with_ttl,
862                ))
863                .await
864            }
865            AlterTableOperation::RenameTable { table_name } => {
866                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
867                    .await
868            }
869            AlterTableOperation::ChangeOwner { new_owner_name } => {
870                alter_owner::handle_alter_owner(
871                    handler_args,
872                    name,
873                    new_owner_name,
874                    StatementType::ALTER_TABLE,
875                    None,
876                )
877                .await
878            }
879            AlterTableOperation::SetParallelism {
880                parallelism,
881                deferred,
882            } => {
883                alter_parallelism::handle_alter_parallelism(
884                    handler_args,
885                    name,
886                    parallelism,
887                    StatementType::ALTER_TABLE,
888                    deferred,
889                )
890                .await
891            }
892            AlterTableOperation::SetBackfillParallelism {
893                parallelism,
894                deferred,
895            } => {
896                alter_parallelism::handle_alter_backfill_parallelism(
897                    handler_args,
898                    name,
899                    parallelism,
900                    StatementType::ALTER_TABLE,
901                    deferred,
902                )
903                .await
904            }
905            AlterTableOperation::SetSchema { new_schema_name } => {
906                alter_set_schema::handle_alter_set_schema(
907                    handler_args,
908                    name,
909                    new_schema_name,
910                    StatementType::ALTER_TABLE,
911                    None,
912                )
913                .await
914            }
915            AlterTableOperation::RefreshSchema => {
916                Box::pin(alter_table_with_sr::handle_refresh_schema(
917                    handler_args,
918                    name,
919                ))
920                .await
921            }
922            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
923                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
924                    handler_args,
925                    PbThrottleTarget::Table,
926                    risingwave_pb::common::PbThrottleType::Source,
927                    name,
928                    rate_limit,
929                )
930                .await
931            }
932            AlterTableOperation::DropConnector => {
933                Box::pin(
934                    alter_table_drop_connector::handle_alter_table_drop_connector(
935                        handler_args,
936                        name,
937                    ),
938                )
939                .await
940            }
941            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
942                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
943                    handler_args,
944                    PbThrottleTarget::Table,
945                    risingwave_pb::common::PbThrottleType::Dml,
946                    name,
947                    rate_limit,
948                )
949                .await
950            }
951            AlterTableOperation::SetConfig { entries } => {
952                alter_streaming_config::handle_alter_streaming_set_config(
953                    handler_args,
954                    name,
955                    entries,
956                    StatementType::ALTER_TABLE,
957                )
958                .await
959            }
960            AlterTableOperation::ResetConfig { keys } => {
961                alter_streaming_config::handle_alter_streaming_reset_config(
962                    handler_args,
963                    name,
964                    keys,
965                    StatementType::ALTER_TABLE,
966                )
967                .await
968            }
969            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
970                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
971                    handler_args,
972                    PbThrottleTarget::Table,
973                    risingwave_pb::common::PbThrottleType::Backfill,
974                    name,
975                    rate_limit,
976                )
977                .await
978            }
979            AlterTableOperation::SwapRenameTable { target_table } => {
980                alter_swap_rename::handle_swap_rename(
981                    handler_args,
982                    name,
983                    target_table,
984                    StatementType::ALTER_TABLE,
985                )
986                .await
987            }
988            AlterTableOperation::AlterConnectorProps { alter_props } => {
989                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
990            }
991            AlterTableOperation::AddConstraint { .. }
992            | AlterTableOperation::DropConstraint { .. }
993            | AlterTableOperation::RenameColumn { .. }
994            | AlterTableOperation::ChangeColumn { .. }
995            | AlterTableOperation::RenameConstraint { .. } => {
996                bail_not_implemented!(
997                    "Unhandled statement: {}",
998                    Statement::AlterTable { name, operation }
999                )
1000            }
1001        },
1002        Statement::AlterIndex { name, operation } => match operation {
1003            AlterIndexOperation::RenameIndex { index_name } => {
1004                alter_rename::handle_rename_index(handler_args, name, index_name).await
1005            }
1006            AlterIndexOperation::SetParallelism {
1007                parallelism,
1008                deferred,
1009            } => {
1010                alter_parallelism::handle_alter_parallelism(
1011                    handler_args,
1012                    name,
1013                    parallelism,
1014                    StatementType::ALTER_INDEX,
1015                    deferred,
1016                )
1017                .await
1018            }
1019            AlterIndexOperation::SetBackfillParallelism {
1020                parallelism,
1021                deferred,
1022            } => {
1023                alter_parallelism::handle_alter_backfill_parallelism(
1024                    handler_args,
1025                    name,
1026                    parallelism,
1027                    StatementType::ALTER_INDEX,
1028                    deferred,
1029                )
1030                .await
1031            }
1032            AlterIndexOperation::SetResourceGroup {
1033                resource_group,
1034                deferred,
1035            } => {
1036                alter_resource_group::handle_alter_resource_group(
1037                    handler_args,
1038                    name,
1039                    resource_group,
1040                    StatementType::ALTER_INDEX,
1041                    deferred,
1042                )
1043                .await
1044            }
1045            AlterIndexOperation::SetConfig { entries } => {
1046                alter_streaming_config::handle_alter_streaming_set_config(
1047                    handler_args,
1048                    name,
1049                    entries,
1050                    StatementType::ALTER_INDEX,
1051                )
1052                .await
1053            }
1054            AlterIndexOperation::ResetConfig { keys } => {
1055                alter_streaming_config::handle_alter_streaming_reset_config(
1056                    handler_args,
1057                    name,
1058                    keys,
1059                    StatementType::ALTER_INDEX,
1060                )
1061                .await
1062            }
1063        },
1064        Statement::AlterView {
1065            materialized,
1066            name,
1067            operation,
1068        } => {
1069            let statement_type = if materialized {
1070                StatementType::ALTER_MATERIALIZED_VIEW
1071            } else {
1072                StatementType::ALTER_VIEW
1073            };
1074            match operation {
1075                AlterViewOperation::RenameView { view_name } => {
1076                    if materialized {
1077                        alter_rename::handle_rename_table(
1078                            handler_args,
1079                            TableType::MaterializedView,
1080                            name,
1081                            view_name,
1082                        )
1083                        .await
1084                    } else {
1085                        alter_rename::handle_rename_view(handler_args, name, view_name).await
1086                    }
1087                }
1088                AlterViewOperation::SetParallelism {
1089                    parallelism,
1090                    deferred,
1091                } => {
1092                    if !materialized {
1093                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1094                    }
1095                    alter_parallelism::handle_alter_parallelism(
1096                        handler_args,
1097                        name,
1098                        parallelism,
1099                        statement_type,
1100                        deferred,
1101                    )
1102                    .await
1103                }
1104                AlterViewOperation::SetBackfillParallelism {
1105                    parallelism,
1106                    deferred,
1107                } => {
1108                    if !materialized {
1109                        bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1110                    }
1111                    alter_parallelism::handle_alter_backfill_parallelism(
1112                        handler_args,
1113                        name,
1114                        parallelism,
1115                        statement_type,
1116                        deferred,
1117                    )
1118                    .await
1119                }
1120                AlterViewOperation::SetResourceGroup {
1121                    resource_group,
1122                    deferred,
1123                } => {
1124                    if !materialized {
1125                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1126                    }
1127                    alter_resource_group::handle_alter_resource_group(
1128                        handler_args,
1129                        name,
1130                        resource_group,
1131                        statement_type,
1132                        deferred,
1133                    )
1134                    .await
1135                }
1136                AlterViewOperation::ChangeOwner { new_owner_name } => {
1137                    alter_owner::handle_alter_owner(
1138                        handler_args,
1139                        name,
1140                        new_owner_name,
1141                        statement_type,
1142                        None,
1143                    )
1144                    .await
1145                }
1146                AlterViewOperation::SetSchema { new_schema_name } => {
1147                    alter_set_schema::handle_alter_set_schema(
1148                        handler_args,
1149                        name,
1150                        new_schema_name,
1151                        statement_type,
1152                        None,
1153                    )
1154                    .await
1155                }
1156                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1157                    if !materialized {
1158                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1159                    }
1160                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1161                        handler_args,
1162                        PbThrottleTarget::Mv,
1163                        risingwave_pb::common::PbThrottleType::Backfill,
1164                        name,
1165                        rate_limit,
1166                    )
1167                    .await
1168                }
1169                AlterViewOperation::SwapRenameView { target_view } => {
1170                    alter_swap_rename::handle_swap_rename(
1171                        handler_args,
1172                        name,
1173                        target_view,
1174                        statement_type,
1175                    )
1176                    .await
1177                }
1178                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1179                    if !materialized {
1180                        bail!(
1181                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1182                        );
1183                    }
1184                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1185                }
1186                AlterViewOperation::AsQuery { query } => {
1187                    if !materialized {
1188                        bail_not_implemented!("ALTER VIEW AS QUERY");
1189                    }
1190                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1191                    if !cfg!(debug_assertions) {
1192                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1193                    }
1194                    alter_mv::handle_alter_mv(handler_args, name, query).await
1195                }
1196                AlterViewOperation::SetConfig { entries } => {
1197                    if !materialized {
1198                        bail!("SET CONFIG is only supported for materialized views");
1199                    }
1200                    alter_streaming_config::handle_alter_streaming_set_config(
1201                        handler_args,
1202                        name,
1203                        entries,
1204                        statement_type,
1205                    )
1206                    .await
1207                }
1208                AlterViewOperation::ResetConfig { keys } => {
1209                    if !materialized {
1210                        bail!("RESET CONFIG is only supported for materialized views");
1211                    }
1212                    alter_streaming_config::handle_alter_streaming_reset_config(
1213                        handler_args,
1214                        name,
1215                        keys,
1216                        statement_type,
1217                    )
1218                    .await
1219                }
1220            }
1221        }
1222
1223        Statement::AlterSink { name, operation } => match operation {
1224            AlterSinkOperation::AlterConnectorProps {
1225                alter_props: changed_props,
1226            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1227            AlterSinkOperation::RenameSink { sink_name } => {
1228                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1229            }
1230            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1231                alter_owner::handle_alter_owner(
1232                    handler_args,
1233                    name,
1234                    new_owner_name,
1235                    StatementType::ALTER_SINK,
1236                    None,
1237                )
1238                .await
1239            }
1240            AlterSinkOperation::SetSchema { new_schema_name } => {
1241                alter_set_schema::handle_alter_set_schema(
1242                    handler_args,
1243                    name,
1244                    new_schema_name,
1245                    StatementType::ALTER_SINK,
1246                    None,
1247                )
1248                .await
1249            }
1250            AlterSinkOperation::SetParallelism {
1251                parallelism,
1252                deferred,
1253            } => {
1254                alter_parallelism::handle_alter_parallelism(
1255                    handler_args,
1256                    name,
1257                    parallelism,
1258                    StatementType::ALTER_SINK,
1259                    deferred,
1260                )
1261                .await
1262            }
1263            AlterSinkOperation::SetBackfillParallelism {
1264                parallelism,
1265                deferred,
1266            } => {
1267                alter_parallelism::handle_alter_backfill_parallelism(
1268                    handler_args,
1269                    name,
1270                    parallelism,
1271                    StatementType::ALTER_SINK,
1272                    deferred,
1273                )
1274                .await
1275            }
1276            AlterSinkOperation::SetResourceGroup {
1277                resource_group,
1278                deferred,
1279            } => {
1280                alter_resource_group::handle_alter_resource_group(
1281                    handler_args,
1282                    name,
1283                    resource_group,
1284                    StatementType::ALTER_SINK,
1285                    deferred,
1286                )
1287                .await
1288            }
1289            AlterSinkOperation::SetConfig { entries } => {
1290                alter_streaming_config::handle_alter_streaming_set_config(
1291                    handler_args,
1292                    name,
1293                    entries,
1294                    StatementType::ALTER_SINK,
1295                )
1296                .await
1297            }
1298            AlterSinkOperation::ResetConfig { keys } => {
1299                alter_streaming_config::handle_alter_streaming_reset_config(
1300                    handler_args,
1301                    name,
1302                    keys,
1303                    StatementType::ALTER_SINK,
1304                )
1305                .await
1306            }
1307            AlterSinkOperation::SwapRenameSink { target_sink } => {
1308                alter_swap_rename::handle_swap_rename(
1309                    handler_args,
1310                    name,
1311                    target_sink,
1312                    StatementType::ALTER_SINK,
1313                )
1314                .await
1315            }
1316            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1317                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1318                    handler_args,
1319                    PbThrottleTarget::Sink,
1320                    risingwave_pb::common::PbThrottleType::Sink,
1321                    name,
1322                    rate_limit,
1323                )
1324                .await
1325            }
1326            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1327                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1328                    handler_args,
1329                    PbThrottleTarget::Sink,
1330                    risingwave_pb::common::PbThrottleType::Backfill,
1331                    name,
1332                    rate_limit,
1333                )
1334                .await
1335            }
1336            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1337                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1338                    handler_args,
1339                    name,
1340                    enable,
1341                )
1342                .await
1343            }
1344        },
1345        Statement::AlterSubscription { name, operation } => match operation {
1346            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1347                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1348                    .await
1349            }
1350            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1351                alter_owner::handle_alter_owner(
1352                    handler_args,
1353                    name,
1354                    new_owner_name,
1355                    StatementType::ALTER_SUBSCRIPTION,
1356                    None,
1357                )
1358                .await
1359            }
1360            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1361                alter_set_schema::handle_alter_set_schema(
1362                    handler_args,
1363                    name,
1364                    new_schema_name,
1365                    StatementType::ALTER_SUBSCRIPTION,
1366                    None,
1367                )
1368                .await
1369            }
1370            AlterSubscriptionOperation::SetRetention { retention } => {
1371                alter_subscription_retention::handle_alter_subscription_retention(
1372                    handler_args,
1373                    name,
1374                    retention,
1375                )
1376                .await
1377            }
1378            AlterSubscriptionOperation::SwapRenameSubscription {
1379                target_subscription,
1380            } => {
1381                alter_swap_rename::handle_swap_rename(
1382                    handler_args,
1383                    name,
1384                    target_subscription,
1385                    StatementType::ALTER_SUBSCRIPTION,
1386                )
1387                .await
1388            }
1389        },
1390        Statement::AlterSource { name, operation } => match operation {
1391            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1392                alter_source_props::handle_alter_source_connector_props(
1393                    handler_args,
1394                    name,
1395                    alter_props,
1396                )
1397                .await
1398            }
1399            AlterSourceOperation::RenameSource { source_name } => {
1400                alter_rename::handle_rename_source(handler_args, name, source_name).await
1401            }
1402            AlterSourceOperation::AddColumn { .. } => {
1403                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1404            }
1405            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1406                alter_owner::handle_alter_owner(
1407                    handler_args,
1408                    name,
1409                    new_owner_name,
1410                    StatementType::ALTER_SOURCE,
1411                    None,
1412                )
1413                .await
1414            }
1415            AlterSourceOperation::SetSchema { new_schema_name } => {
1416                alter_set_schema::handle_alter_set_schema(
1417                    handler_args,
1418                    name,
1419                    new_schema_name,
1420                    StatementType::ALTER_SOURCE,
1421                    None,
1422                )
1423                .await
1424            }
1425            AlterSourceOperation::FormatEncode { format_encode } => {
1426                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1427                    .await
1428            }
1429            AlterSourceOperation::RefreshSchema => {
1430                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1431            }
1432            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1433                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1434                    handler_args,
1435                    PbThrottleTarget::Source,
1436                    risingwave_pb::common::PbThrottleType::Source,
1437                    name,
1438                    rate_limit,
1439                )
1440                .await
1441            }
1442            AlterSourceOperation::SwapRenameSource { target_source } => {
1443                alter_swap_rename::handle_swap_rename(
1444                    handler_args,
1445                    name,
1446                    target_source,
1447                    StatementType::ALTER_SOURCE,
1448                )
1449                .await
1450            }
1451            AlterSourceOperation::SetParallelism {
1452                parallelism,
1453                deferred,
1454            } => {
1455                alter_parallelism::handle_alter_parallelism(
1456                    handler_args,
1457                    name,
1458                    parallelism,
1459                    StatementType::ALTER_SOURCE,
1460                    deferred,
1461                )
1462                .await
1463            }
1464            AlterSourceOperation::SetBackfillParallelism {
1465                parallelism,
1466                deferred,
1467            } => {
1468                alter_parallelism::handle_alter_backfill_parallelism(
1469                    handler_args,
1470                    name,
1471                    parallelism,
1472                    StatementType::ALTER_SOURCE,
1473                    deferred,
1474                )
1475                .await
1476            }
1477            AlterSourceOperation::SetConfig { entries } => {
1478                alter_streaming_config::handle_alter_streaming_set_config(
1479                    handler_args,
1480                    name,
1481                    entries,
1482                    StatementType::ALTER_SOURCE,
1483                )
1484                .await
1485            }
1486            AlterSourceOperation::ResetConfig { keys } => {
1487                alter_streaming_config::handle_alter_streaming_reset_config(
1488                    handler_args,
1489                    name,
1490                    keys,
1491                    StatementType::ALTER_SOURCE,
1492                )
1493                .await
1494            }
1495            AlterSourceOperation::ResetSource => {
1496                reset_source::handle_reset_source(handler_args, name).await
1497            }
1498        },
1499        Statement::AlterFunction {
1500            name,
1501            args,
1502            operation,
1503        } => match operation {
1504            AlterFunctionOperation::SetSchema { new_schema_name } => {
1505                alter_set_schema::handle_alter_set_schema(
1506                    handler_args,
1507                    name,
1508                    new_schema_name,
1509                    StatementType::ALTER_FUNCTION,
1510                    args,
1511                )
1512                .await
1513            }
1514            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1515                alter_owner::handle_alter_owner(
1516                    handler_args,
1517                    name,
1518                    new_owner_name,
1519                    StatementType::ALTER_FUNCTION,
1520                    args,
1521                )
1522                .await
1523            }
1524        },
1525        Statement::AlterConnection { name, operation } => match operation {
1526            AlterConnectionOperation::SetSchema { new_schema_name } => {
1527                alter_set_schema::handle_alter_set_schema(
1528                    handler_args,
1529                    name,
1530                    new_schema_name,
1531                    StatementType::ALTER_CONNECTION,
1532                    None,
1533                )
1534                .await
1535            }
1536            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1537                alter_owner::handle_alter_owner(
1538                    handler_args,
1539                    name,
1540                    new_owner_name,
1541                    StatementType::ALTER_CONNECTION,
1542                    None,
1543                )
1544                .await
1545            }
1546            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1547                alter_connection_props::handle_alter_connection_connector_props(
1548                    handler_args,
1549                    name,
1550                    alter_props,
1551                )
1552                .await
1553            }
1554        },
1555        Statement::AlterSystem { param, value } => {
1556            alter_system::handle_alter_system(handler_args, param, value).await
1557        }
1558        Statement::AlterSecret { name, operation } => match operation {
1559            AlterSecretOperation::ChangeCredential {
1560                with_options,
1561                new_credential,
1562            } => {
1563                alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1564                    .await
1565            }
1566            AlterSecretOperation::ChangeOwner { new_owner_name } => {
1567                alter_owner::handle_alter_owner(
1568                    handler_args,
1569                    name,
1570                    new_owner_name,
1571                    StatementType::ALTER_SECRET,
1572                    None,
1573                )
1574                .await
1575            }
1576        },
1577        Statement::AlterFragment {
1578            fragment_ids,
1579            operation,
1580        } => match operation {
1581            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1582                let [fragment_id] = fragment_ids.as_slice() else {
1583                    return Err(ErrorCode::InvalidInputSyntax(
1584                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1585                            .to_owned(),
1586                    )
1587                    .into());
1588                };
1589                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1590                    &handler_args.session,
1591                    PbThrottleTarget::Fragment,
1592                    risingwave_pb::common::PbThrottleType::Backfill,
1593                    *fragment_id,
1594                    rate_limit,
1595                    StatementType::SET_VARIABLE,
1596                )
1597                .await
1598            }
1599            AlterFragmentOperation::SetParallelism { parallelism } => {
1600                alter_parallelism::handle_alter_fragment_parallelism(
1601                    handler_args,
1602                    fragment_ids.into_iter().map_into().collect(),
1603                    parallelism,
1604                )
1605                .await
1606            }
1607        },
1608        Statement::AlterDefaultPrivileges { .. } => {
1609            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1610        }
1611        Statement::AlterCompactionGroup {
1612            group_ids,
1613            operation,
1614        } => {
1615            alter_compaction_group::handle_alter_compaction_group(
1616                handler_args,
1617                group_ids,
1618                operation,
1619            )
1620            .await
1621        }
1622        Statement::StartTransaction { modes } => {
1623            transaction::handle_begin(handler_args, START_TRANSACTION, modes)
1624        }
1625        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes),
1626        Statement::Commit { chain } => {
1627            transaction::handle_commit(handler_args, COMMIT, chain).await
1628        }
1629        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1630        Statement::Rollback { chain } => {
1631            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1632        }
1633        Statement::SetTransaction {
1634            modes,
1635            snapshot,
1636            session,
1637        } => transaction::handle_set(handler_args, modes, snapshot, session),
1638        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1639        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1640        Statement::Comment {
1641            object_type,
1642            object_name,
1643            comment,
1644        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1645        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1646        Statement::Prepare {
1647            name,
1648            data_types,
1649            statement,
1650        } => prepared_statement::handle_prepare(name, data_types, statement),
1651        Statement::Deallocate { name, prepare } => {
1652            prepared_statement::handle_deallocate(name, prepare)
1653        }
1654        Statement::Vacuum { object_name, full } => {
1655            vacuum::handle_vacuum(handler_args, object_name, full).await
1656        }
1657        Statement::Refresh { table_name } => {
1658            refresh::handle_refresh(handler_args, table_name).await
1659        }
1660        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1661    }
1662}
1663
1664fn check_ban_ddl_for_iceberg_engine_table(
1665    session: Arc<SessionImpl>,
1666    stmt: &Statement,
1667) -> Result<()> {
1668    if let Statement::AlterTable { name, operation } = stmt {
1669        let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1670        if table.is_iceberg_engine_table() {
1671            let has_auto_refresh_schema_sink = if matches!(
1672                operation,
1673                AlterTableOperation::AddColumn { .. } | AlterTableOperation::DropColumn { .. }
1674            ) {
1675                let catalog_reader = session.env().catalog_reader().read_guard();
1676                let db_name = session.database();
1677                let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name());
1678                let sink = catalog_reader
1679                    .get_schema_by_name(&db_name, &schema_name)
1680                    .ok()
1681                    .and_then(|schema| schema.get_created_sink_by_name(&sink_name));
1682                sink.and_then(|s| s.auto_refresh_schema_from_table)
1683                    .is_some()
1684            } else {
1685                false
1686            };
1687
1688            check_ban_alter_table_operation_for_iceberg_engine_table(
1689                operation,
1690                &schema_name,
1691                name,
1692                has_auto_refresh_schema_sink,
1693            )?;
1694        }
1695    }
1696
1697    Ok(())
1698}
1699
1700fn check_ban_alter_table_operation_for_iceberg_engine_table(
1701    operation: &AlterTableOperation,
1702    schema_name: &str,
1703    table_name: &ObjectName,
1704    has_auto_refresh_schema_sink: bool,
1705) -> Result<()> {
1706    match operation {
1707        AlterTableOperation::AddColumn { .. } => {
1708            if !has_auto_refresh_schema_sink {
1709                bail!(
1710                    "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1711                    operation,
1712                    schema_name,
1713                    table_name
1714                );
1715            }
1716        }
1717        AlterTableOperation::DropColumn { .. } => {
1718            if !has_auto_refresh_schema_sink {
1719                bail!(
1720                    "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1721                    operation,
1722                    schema_name,
1723                    table_name
1724                );
1725            }
1726        }
1727        AlterTableOperation::RenameColumn { .. }
1728        | AlterTableOperation::ChangeColumn { .. }
1729        | AlterTableOperation::AlterColumn { .. } => {
1730            bail!(
1731                "ALTER TABLE {} is not supported for iceberg table: {}.{}. Existing column schema mutation is not supported currently",
1732                operation,
1733                schema_name,
1734                table_name
1735            );
1736        }
1737        AlterTableOperation::RenameTable { .. } => {
1738            bail!(
1739                "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1740                schema_name,
1741                table_name
1742            );
1743        }
1744        AlterTableOperation::SetParallelism { .. } => {
1745            bail!(
1746                "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1747                schema_name,
1748                table_name
1749            );
1750        }
1751        AlterTableOperation::SetBackfillParallelism { .. } => {
1752            bail!(
1753                "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1754                schema_name,
1755                table_name
1756            );
1757        }
1758        AlterTableOperation::SetSchema { .. } => {
1759            bail!(
1760                "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1761                schema_name,
1762                table_name
1763            );
1764        }
1765        AlterTableOperation::RefreshSchema => {
1766            bail!(
1767                "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1768                schema_name,
1769                table_name
1770            );
1771        }
1772        AlterTableOperation::SetSourceRateLimit { .. } => {
1773            bail!(
1774                "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1775                schema_name,
1776                table_name
1777            );
1778        }
1779        AlterTableOperation::AlterWatermark { .. } => {
1780            bail!(
1781                "ALTER TABLE ALTER WATERMARK is not supported for iceberg table: {}.{}",
1782                schema_name,
1783                table_name
1784            );
1785        }
1786        _ => {}
1787    }
1788    Ok(())
1789}