risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX};
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_compaction_group;
46mod alter_connection_props;
47mod alter_database_param;
48mod alter_mv;
49mod alter_owner;
50mod alter_parallelism;
51mod alter_rename;
52mod alter_resource_group;
53mod alter_secret;
54mod alter_set_schema;
55mod alter_sink_props;
56mod alter_source_column;
57mod alter_source_props;
58mod alter_source_with_sr;
59mod alter_streaming_config;
60mod alter_streaming_enable_unaligned_join;
61mod alter_streaming_rate_limit;
62mod alter_subscription_retention;
63mod alter_swap_rename;
64mod alter_system;
65mod alter_table_column;
66pub mod alter_table_drop_connector;
67pub mod alter_table_props;
68mod alter_table_with_sr;
69pub mod alter_user;
70mod alter_utils;
71mod alter_watermark;
72mod backup;
73pub mod cancel_job;
74pub mod close_cursor;
75mod comment;
76pub mod create_aggregate;
77pub mod create_connection;
78mod create_database;
79pub mod create_function;
80pub mod create_index;
81pub mod create_mv;
82pub mod create_schema;
83pub mod create_secret;
84pub mod create_sink;
85pub mod create_source;
86pub mod create_sql_function;
87pub mod create_subscription;
88pub mod create_table;
89pub mod create_table_as;
90pub mod create_user;
91pub mod create_view;
92pub mod declare_cursor;
93mod delete_meta_snapshot;
94pub mod describe;
95pub mod discard;
96mod drop_connection;
97mod drop_database;
98pub mod drop_function;
99mod drop_index;
100pub mod drop_mv;
101mod drop_schema;
102pub mod drop_secret;
103pub mod drop_sink;
104pub mod drop_source;
105pub mod drop_subscription;
106pub mod drop_table;
107pub mod drop_user;
108mod drop_view;
109pub mod explain;
110pub mod explain_analyze_stream_job;
111pub mod extended_handle;
112pub mod fetch_cursor;
113mod flush;
114pub mod handle_privilege;
115pub mod kill_process;
116mod prepared_statement;
117pub mod privilege;
118pub mod query;
119mod recover;
120mod refresh;
121mod reset_source;
122pub mod show;
123mod transaction;
124mod use_db;
125pub mod util;
126pub mod vacuum;
127pub mod variable;
128mod wait;
129
130pub use alter_table_column::{
131    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
132};
133
134/// The [`PgResponseBuilder`] used by RisingWave.
135pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
136
137/// The [`PgResponse`] used by RisingWave.
138pub type RwPgResponse = PgResponse<PgResponseStream>;
139
140#[easy_ext::ext(RwPgResponseBuilderExt)]
141impl RwPgResponseBuilder {
142    /// Append rows to the response.
143    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
144        let fields = T::fields();
145        self.values(
146            rows.into_iter()
147                .map(|row| {
148                    Row::new(
149                        row.into_owned_row()
150                            .into_iter()
151                            .zip_eq_fast(&fields)
152                            .map(|(datum, (_, ty))| {
153                                datum.map(|scalar| {
154                                    scalar.as_scalar_ref_impl().text_format(ty).into()
155                                })
156                            })
157                            .collect(),
158                    )
159                })
160                .collect_vec()
161                .into(),
162            fields_to_descriptors(fields),
163        )
164    }
165}
166
167pub fn fields_to_descriptors(
168    fields: Vec<(&str, risingwave_common::types::DataType)>,
169) -> Vec<PgFieldDescriptor> {
170    fields
171        .iter()
172        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
173        .collect()
174}
175
176pub enum PgResponseStream {
177    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
178    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
179    Rows(BoxStream<'static, RowSetResult>),
180}
181
182impl Stream for PgResponseStream {
183    type Item = std::result::Result<Vec<Row>, BoxedError>;
184
185    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
186        match &mut *self {
187            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
188            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
189            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
190        }
191    }
192}
193
194impl From<Vec<Row>> for PgResponseStream {
195    fn from(rows: Vec<Row>) -> Self {
196        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
197    }
198}
199
200#[derive(Clone)]
201pub struct HandlerArgs {
202    pub session: Arc<SessionImpl>,
203    pub sql: Arc<str>,
204    pub normalized_sql: String,
205    pub with_options: WithOptions,
206}
207
208impl HandlerArgs {
209    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
210        Ok(Self {
211            session,
212            sql,
213            with_options: WithOptions::try_from(stmt)?,
214            normalized_sql: Self::normalize_sql(stmt),
215        })
216    }
217
218    /// Get normalized SQL from the statement.
219    ///
220    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
221    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
222    ///   make it suitable for the `SHOW CREATE` statements.
223    fn normalize_sql(stmt: &Statement) -> String {
224        let mut stmt = stmt.clone();
225        match &mut stmt {
226            Statement::CreateView {
227                or_replace,
228                if_not_exists,
229                ..
230            } => {
231                *or_replace = false;
232                *if_not_exists = false;
233            }
234            Statement::CreateTable {
235                or_replace,
236                if_not_exists,
237                ..
238            } => {
239                *or_replace = false;
240                *if_not_exists = false;
241            }
242            Statement::CreateIndex { if_not_exists, .. } => {
243                *if_not_exists = false;
244            }
245            Statement::CreateSource {
246                stmt: CreateSourceStatement { if_not_exists, .. },
247                ..
248            } => {
249                *if_not_exists = false;
250            }
251            Statement::CreateSink {
252                stmt: CreateSinkStatement { if_not_exists, .. },
253            } => {
254                *if_not_exists = false;
255            }
256            Statement::CreateSubscription {
257                stmt: CreateSubscriptionStatement { if_not_exists, .. },
258            } => {
259                *if_not_exists = false;
260            }
261            Statement::CreateConnection {
262                stmt: CreateConnectionStatement { if_not_exists, .. },
263            } => {
264                *if_not_exists = false;
265            }
266            _ => {}
267        }
268        stmt.to_string()
269    }
270}
271
272#[expect(clippy::large_stack_frames)]
273pub async fn handle(
274    session: Arc<SessionImpl>,
275    stmt: Statement,
276    sql: Arc<str>,
277    formats: Vec<Format>,
278) -> Result<RwPgResponse> {
279    session.clear_cancel_query_flag();
280    let _guard = session.txn_begin_implicit();
281    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
282
283    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
284
285    match stmt {
286        Statement::Explain {
287            statement,
288            analyze,
289            options,
290        } => {
291            Box::pin(explain::handle_explain(
292                handler_args,
293                *statement,
294                options,
295                analyze,
296            ))
297            .await
298        }
299        Statement::ExplainAnalyzeStreamJob {
300            target,
301            duration_secs,
302        } => {
303            explain_analyze_stream_job::handle_explain_analyze_stream_job(
304                handler_args,
305                target,
306                duration_secs,
307            )
308            .await
309        }
310        Statement::CreateSource { stmt } => {
311            create_source::handle_create_source(handler_args, stmt).await
312        }
313        Statement::CreateSink { stmt } => {
314            create_sink::handle_create_sink(handler_args, stmt, false).await
315        }
316        Statement::CreateSubscription { stmt } => {
317            create_subscription::handle_create_subscription(handler_args, stmt).await
318        }
319        Statement::CreateConnection { stmt } => {
320            create_connection::handle_create_connection(handler_args, stmt).await
321        }
322        Statement::CreateSecret { stmt } => {
323            create_secret::handle_create_secret(handler_args, stmt).await
324        }
325        Statement::CreateFunction {
326            or_replace,
327            temporary,
328            if_not_exists,
329            name,
330            args,
331            returns,
332            params,
333            with_options,
334        } => {
335            // For general udf, `language` clause could be ignored
336            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
337            if params.language.is_none()
338                || !params
339                    .language
340                    .as_ref()
341                    .unwrap()
342                    .real_value()
343                    .eq_ignore_ascii_case("sql")
344            {
345                create_function::handle_create_function(
346                    handler_args,
347                    or_replace,
348                    temporary,
349                    if_not_exists,
350                    name,
351                    args,
352                    returns,
353                    params,
354                    with_options,
355                )
356                .await
357            } else {
358                create_sql_function::handle_create_sql_function(
359                    handler_args,
360                    or_replace,
361                    temporary,
362                    if_not_exists,
363                    name,
364                    args,
365                    returns,
366                    params,
367                )
368                .await
369            }
370        }
371        Statement::CreateAggregate {
372            or_replace,
373            if_not_exists,
374            name,
375            args,
376            returns,
377            params,
378            ..
379        } => {
380            create_aggregate::handle_create_aggregate(
381                handler_args,
382                or_replace,
383                if_not_exists,
384                name,
385                args,
386                returns,
387                params,
388            )
389            .await
390        }
391        Statement::CreateTable {
392            name,
393            columns,
394            wildcard_idx,
395            constraints,
396            query,
397            with_options: _, // It is put in OptimizerContext
398            // Not supported things
399            or_replace,
400            temporary,
401            if_not_exists,
402            format_encode,
403            source_watermarks,
404            append_only,
405            on_conflict,
406            with_version_columns,
407            cdc_table_info,
408            include_column_options,
409            webhook_info,
410            engine,
411        } => {
412            if or_replace {
413                bail_not_implemented!("CREATE OR REPLACE TABLE");
414            }
415            if temporary {
416                bail_not_implemented!("CREATE TEMPORARY TABLE");
417            }
418            if let Some(query) = query {
419                return create_table_as::handle_create_as(
420                    handler_args,
421                    name,
422                    if_not_exists,
423                    query,
424                    columns,
425                    append_only,
426                    on_conflict,
427                    with_version_columns
428                        .iter()
429                        .map(|col| col.real_value())
430                        .collect(),
431                    engine,
432                )
433                .await;
434            }
435            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
436            Box::pin(create_table::handle_create_table(
437                handler_args,
438                name,
439                columns,
440                wildcard_idx,
441                constraints,
442                if_not_exists,
443                format_encode,
444                source_watermarks,
445                append_only,
446                on_conflict,
447                with_version_columns
448                    .iter()
449                    .map(|col| col.real_value())
450                    .collect(),
451                cdc_table_info,
452                include_column_options,
453                webhook_info,
454                engine,
455            ))
456            .await
457        }
458        Statement::CreateDatabase {
459            db_name,
460            if_not_exists,
461            owner,
462            resource_group,
463            barrier_interval_ms,
464            checkpoint_frequency,
465        } => {
466            create_database::handle_create_database(
467                handler_args,
468                db_name,
469                if_not_exists,
470                owner,
471                resource_group,
472                barrier_interval_ms,
473                checkpoint_frequency,
474            )
475            .await
476        }
477        Statement::CreateSchema {
478            schema_name,
479            if_not_exists,
480            owner,
481        } => {
482            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
483                .await
484        }
485        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
486        Statement::DeclareCursor { stmt } => {
487            declare_cursor::handle_declare_cursor(handler_args, stmt).await
488        }
489        Statement::FetchCursor { stmt } => {
490            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
491        }
492        Statement::CloseCursor { stmt } => {
493            close_cursor::handle_close_cursor(handler_args, stmt).await
494        }
495        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
496        Statement::Grant { .. } => {
497            handle_privilege::handle_grant_privilege(handler_args, stmt).await
498        }
499        Statement::Revoke { .. } => {
500            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
501        }
502        Statement::Describe { name, kind } => match kind {
503            DescribeKind::Fragments => {
504                describe::handle_describe_fragments(handler_args, name).await
505            }
506            DescribeKind::Plain => describe::handle_describe(handler_args, name),
507        },
508        Statement::DescribeFragment { fragment_id } => {
509            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
510        }
511        Statement::Discard(..) => discard::handle_discard(handler_args),
512        Statement::ShowObjects {
513            object: show_object,
514            filter,
515        } => show::handle_show_object(handler_args, show_object, filter).await,
516        Statement::ShowCreateObject { create_type, name } => {
517            show::handle_show_create_object(handler_args, create_type, name)
518        }
519        Statement::ShowTransactionIsolationLevel => {
520            transaction::handle_show_isolation_level(handler_args)
521        }
522        Statement::Drop(DropStatement {
523            object_type,
524            object_name,
525            if_exists,
526            drop_mode,
527        }) => {
528            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
529                match object_type {
530                    ObjectType::MaterializedView
531                    | ObjectType::View
532                    | ObjectType::Sink
533                    | ObjectType::Source
534                    | ObjectType::Subscription
535                    | ObjectType::Index
536                    | ObjectType::Table
537                    | ObjectType::Schema
538                    | ObjectType::Connection
539                    | ObjectType::Secret => true,
540                    ObjectType::Database | ObjectType::User => {
541                        bail_not_implemented!("DROP CASCADE");
542                    }
543                }
544            } else {
545                false
546            };
547            match object_type {
548                ObjectType::Table => {
549                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
550                        .await
551                }
552                ObjectType::MaterializedView => {
553                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
554                }
555                ObjectType::Index => {
556                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
557                        .await
558                }
559                ObjectType::Source => {
560                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
561                        .await
562                }
563                ObjectType::Sink => {
564                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
565                }
566                ObjectType::Subscription => {
567                    drop_subscription::handle_drop_subscription(
568                        handler_args,
569                        object_name,
570                        if_exists,
571                        cascade,
572                    )
573                    .await
574                }
575                ObjectType::Database => {
576                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
577                }
578                ObjectType::Schema => {
579                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
580                        .await
581                }
582                ObjectType::User => {
583                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
584                }
585                ObjectType::View => {
586                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
587                }
588                ObjectType::Connection => {
589                    drop_connection::handle_drop_connection(
590                        handler_args,
591                        object_name,
592                        if_exists,
593                        cascade,
594                    )
595                    .await
596                }
597                ObjectType::Secret => {
598                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
599                        .await
600                }
601            }
602        }
603        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
604        Statement::DropFunction {
605            if_exists,
606            func_desc,
607            option,
608        } => {
609            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
610                .await
611        }
612        Statement::DropAggregate {
613            if_exists,
614            func_desc,
615            option,
616        } => {
617            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
618                .await
619        }
620        Statement::Query(_)
621        | Statement::Insert { .. }
622        | Statement::Delete { .. }
623        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
624        Statement::Copy {
625            entity: CopyEntity::Query(query),
626            target: CopyTarget::Stdout,
627        } => {
628            let response =
629                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
630                    .await?;
631            Ok(response.into_copy_query_to_stdout())
632        }
633        Statement::CreateView {
634            materialized,
635            if_not_exists,
636            name,
637            columns,
638            query,
639            with_options: _, // It is put in OptimizerContext
640            or_replace,      // not supported
641            emit_mode,
642        } => {
643            if or_replace {
644                bail_not_implemented!("CREATE OR REPLACE VIEW");
645            }
646            if materialized {
647                create_mv::handle_create_mv(
648                    handler_args,
649                    if_not_exists,
650                    name,
651                    *query,
652                    columns,
653                    emit_mode,
654                )
655                .await
656            } else {
657                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
658                    .await
659            }
660        }
661        Statement::Flush => flush::handle_flush(handler_args).await,
662        Statement::Wait(target) => wait::handle_wait(handler_args, target).await,
663        Statement::Backup => backup::handle_backup(handler_args).await,
664        Statement::DeleteMetaSnapshots { snapshot_ids } => {
665            delete_meta_snapshot::handle_delete_meta_snapshots(handler_args, snapshot_ids).await
666        }
667        Statement::Recover => recover::handle_recover(handler_args).await,
668        Statement::SetVariable {
669            local: _,
670            variable,
671            value,
672        } => {
673            // special handle for `use database`
674            if variable.real_value().eq_ignore_ascii_case("database") {
675                let x = variable::set_var_to_param_str(&value);
676                let res = use_db::handle_use_db(
677                    handler_args,
678                    ObjectName::from(vec![Ident::from_real_value(
679                        x.as_deref().unwrap_or("default"),
680                    )]),
681                )?;
682                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
683                for notice in res.notices() {
684                    builder = builder.notice(notice);
685                }
686                return Ok(builder.into());
687            }
688            variable::handle_set(handler_args, variable, value)
689        }
690        Statement::SetTimeZone { local: _, value } => {
691            variable::handle_set_time_zone(handler_args, value)
692        }
693        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
694        Statement::CreateIndex {
695            name,
696            table_name,
697            method,
698            columns,
699            include,
700            distributed_by,
701            unique,
702            if_not_exists,
703            with_properties: _,
704        } => {
705            if unique {
706                bail_not_implemented!("create unique index");
707            }
708
709            create_index::handle_create_index(
710                handler_args,
711                if_not_exists,
712                name,
713                table_name,
714                method,
715                columns.clone(),
716                include,
717                distributed_by,
718            )
719            .await
720        }
721        Statement::AlterDatabase { name, operation } => match operation {
722            AlterDatabaseOperation::RenameDatabase { database_name } => {
723                alter_rename::handle_rename_database(handler_args, name, database_name).await
724            }
725            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
726                alter_owner::handle_alter_owner(
727                    handler_args,
728                    name,
729                    new_owner_name,
730                    StatementType::ALTER_DATABASE,
731                    None,
732                )
733                .await
734            }
735            AlterDatabaseOperation::SetParam(config_param) => {
736                let ConfigParam { param, value } = config_param;
737
738                let database_param = match param.real_value().to_uppercase().as_str() {
739                    "BARRIER_INTERVAL_MS" => {
740                        let barrier_interval_ms = match value {
741                            SetVariableValue::Default => None,
742                            SetVariableValue::Single(SetVariableValueSingle::Literal(
743                                Value::Number(num),
744                            )) => {
745                                let num = num.parse::<u32>().map_err(|e| {
746                                    ErrorCode::InvalidInputSyntax(format!(
747                                        "barrier_interval_ms must be a u32 integer: {}",
748                                        e.as_report()
749                                    ))
750                                })?;
751                                Some(num)
752                            }
753                            _ => {
754                                return Err(ErrorCode::InvalidInputSyntax(
755                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
756                                        .to_owned(),
757                                )
758                                .into());
759                            }
760                        };
761                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
762                    }
763                    "CHECKPOINT_FREQUENCY" => {
764                        let checkpoint_frequency = match value {
765                            SetVariableValue::Default => None,
766                            SetVariableValue::Single(SetVariableValueSingle::Literal(
767                                Value::Number(num),
768                            )) => {
769                                let num = num.parse::<u64>().map_err(|e| {
770                                    ErrorCode::InvalidInputSyntax(format!(
771                                        "checkpoint_frequency must be a u64 integer: {}",
772                                        e.as_report()
773                                    ))
774                                })?;
775                                Some(num)
776                            }
777                            _ => {
778                                return Err(ErrorCode::InvalidInputSyntax(
779                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
780                                        .to_owned(),
781                                )
782                                .into());
783                            }
784                        };
785                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
786                    }
787                    _ => {
788                        return Err(ErrorCode::InvalidInputSyntax(format!(
789                            "Unsupported database config parameter: {}",
790                            param.real_value()
791                        ))
792                        .into());
793                    }
794                };
795
796                alter_database_param::handle_alter_database_param(
797                    handler_args,
798                    name,
799                    database_param,
800                )
801                .await
802            }
803        },
804        Statement::AlterSchema { name, operation } => match operation {
805            AlterSchemaOperation::RenameSchema { schema_name } => {
806                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
807            }
808            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
809                alter_owner::handle_alter_owner(
810                    handler_args,
811                    name,
812                    new_owner_name,
813                    StatementType::ALTER_SCHEMA,
814                    None,
815                )
816                .await
817            }
818            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
819                alter_swap_rename::handle_swap_rename(
820                    handler_args,
821                    name,
822                    target_schema,
823                    StatementType::ALTER_SCHEMA,
824                )
825                .await
826            }
827        },
828        Statement::AlterTable { name, operation } => match operation {
829            AlterTableOperation::AddColumn { .. }
830            | AlterTableOperation::DropColumn { .. }
831            | AlterTableOperation::AlterColumn { .. } => {
832                Box::pin(alter_table_column::handle_alter_table_column(
833                    handler_args,
834                    name,
835                    operation,
836                ))
837                .await
838            }
839            AlterTableOperation::AlterWatermark {
840                column_name,
841                expr,
842                with_ttl,
843            } => {
844                Box::pin(alter_watermark::handle_alter_watermark(
845                    handler_args,
846                    name,
847                    column_name,
848                    expr,
849                    with_ttl,
850                ))
851                .await
852            }
853            AlterTableOperation::RenameTable { table_name } => {
854                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
855                    .await
856            }
857            AlterTableOperation::ChangeOwner { new_owner_name } => {
858                alter_owner::handle_alter_owner(
859                    handler_args,
860                    name,
861                    new_owner_name,
862                    StatementType::ALTER_TABLE,
863                    None,
864                )
865                .await
866            }
867            AlterTableOperation::SetParallelism {
868                parallelism,
869                deferred,
870            } => {
871                alter_parallelism::handle_alter_parallelism(
872                    handler_args,
873                    name,
874                    parallelism,
875                    StatementType::ALTER_TABLE,
876                    deferred,
877                )
878                .await
879            }
880            AlterTableOperation::SetBackfillParallelism {
881                parallelism,
882                deferred,
883            } => {
884                alter_parallelism::handle_alter_backfill_parallelism(
885                    handler_args,
886                    name,
887                    parallelism,
888                    StatementType::ALTER_TABLE,
889                    deferred,
890                )
891                .await
892            }
893            AlterTableOperation::SetSchema { new_schema_name } => {
894                alter_set_schema::handle_alter_set_schema(
895                    handler_args,
896                    name,
897                    new_schema_name,
898                    StatementType::ALTER_TABLE,
899                    None,
900                )
901                .await
902            }
903            AlterTableOperation::RefreshSchema => {
904                Box::pin(alter_table_with_sr::handle_refresh_schema(
905                    handler_args,
906                    name,
907                ))
908                .await
909            }
910            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
911                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
912                    handler_args,
913                    PbThrottleTarget::Table,
914                    risingwave_pb::common::PbThrottleType::Source,
915                    name,
916                    rate_limit,
917                )
918                .await
919            }
920            AlterTableOperation::DropConnector => {
921                Box::pin(
922                    alter_table_drop_connector::handle_alter_table_drop_connector(
923                        handler_args,
924                        name,
925                    ),
926                )
927                .await
928            }
929            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
930                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
931                    handler_args,
932                    PbThrottleTarget::Table,
933                    risingwave_pb::common::PbThrottleType::Dml,
934                    name,
935                    rate_limit,
936                )
937                .await
938            }
939            AlterTableOperation::SetConfig { entries } => {
940                alter_streaming_config::handle_alter_streaming_set_config(
941                    handler_args,
942                    name,
943                    entries,
944                    StatementType::ALTER_TABLE,
945                )
946                .await
947            }
948            AlterTableOperation::ResetConfig { keys } => {
949                alter_streaming_config::handle_alter_streaming_reset_config(
950                    handler_args,
951                    name,
952                    keys,
953                    StatementType::ALTER_TABLE,
954                )
955                .await
956            }
957            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
958                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
959                    handler_args,
960                    PbThrottleTarget::Table,
961                    risingwave_pb::common::PbThrottleType::Backfill,
962                    name,
963                    rate_limit,
964                )
965                .await
966            }
967            AlterTableOperation::SwapRenameTable { target_table } => {
968                alter_swap_rename::handle_swap_rename(
969                    handler_args,
970                    name,
971                    target_table,
972                    StatementType::ALTER_TABLE,
973                )
974                .await
975            }
976            AlterTableOperation::AlterConnectorProps { alter_props } => {
977                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
978            }
979            AlterTableOperation::AddConstraint { .. }
980            | AlterTableOperation::DropConstraint { .. }
981            | AlterTableOperation::RenameColumn { .. }
982            | AlterTableOperation::ChangeColumn { .. }
983            | AlterTableOperation::RenameConstraint { .. } => {
984                bail_not_implemented!(
985                    "Unhandled statement: {}",
986                    Statement::AlterTable { name, operation }
987                )
988            }
989        },
990        Statement::AlterIndex { name, operation } => match operation {
991            AlterIndexOperation::RenameIndex { index_name } => {
992                alter_rename::handle_rename_index(handler_args, name, index_name).await
993            }
994            AlterIndexOperation::SetParallelism {
995                parallelism,
996                deferred,
997            } => {
998                alter_parallelism::handle_alter_parallelism(
999                    handler_args,
1000                    name,
1001                    parallelism,
1002                    StatementType::ALTER_INDEX,
1003                    deferred,
1004                )
1005                .await
1006            }
1007            AlterIndexOperation::SetBackfillParallelism {
1008                parallelism,
1009                deferred,
1010            } => {
1011                alter_parallelism::handle_alter_backfill_parallelism(
1012                    handler_args,
1013                    name,
1014                    parallelism,
1015                    StatementType::ALTER_INDEX,
1016                    deferred,
1017                )
1018                .await
1019            }
1020            AlterIndexOperation::SetConfig { entries } => {
1021                alter_streaming_config::handle_alter_streaming_set_config(
1022                    handler_args,
1023                    name,
1024                    entries,
1025                    StatementType::ALTER_INDEX,
1026                )
1027                .await
1028            }
1029            AlterIndexOperation::ResetConfig { keys } => {
1030                alter_streaming_config::handle_alter_streaming_reset_config(
1031                    handler_args,
1032                    name,
1033                    keys,
1034                    StatementType::ALTER_INDEX,
1035                )
1036                .await
1037            }
1038        },
1039        Statement::AlterView {
1040            materialized,
1041            name,
1042            operation,
1043        } => {
1044            let statement_type = if materialized {
1045                StatementType::ALTER_MATERIALIZED_VIEW
1046            } else {
1047                StatementType::ALTER_VIEW
1048            };
1049            match operation {
1050                AlterViewOperation::RenameView { view_name } => {
1051                    if materialized {
1052                        alter_rename::handle_rename_table(
1053                            handler_args,
1054                            TableType::MaterializedView,
1055                            name,
1056                            view_name,
1057                        )
1058                        .await
1059                    } else {
1060                        alter_rename::handle_rename_view(handler_args, name, view_name).await
1061                    }
1062                }
1063                AlterViewOperation::SetParallelism {
1064                    parallelism,
1065                    deferred,
1066                } => {
1067                    if !materialized {
1068                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
1069                    }
1070                    alter_parallelism::handle_alter_parallelism(
1071                        handler_args,
1072                        name,
1073                        parallelism,
1074                        statement_type,
1075                        deferred,
1076                    )
1077                    .await
1078                }
1079                AlterViewOperation::SetBackfillParallelism {
1080                    parallelism,
1081                    deferred,
1082                } => {
1083                    if !materialized {
1084                        bail_not_implemented!("ALTER VIEW SET BACKFILL PARALLELISM");
1085                    }
1086                    alter_parallelism::handle_alter_backfill_parallelism(
1087                        handler_args,
1088                        name,
1089                        parallelism,
1090                        statement_type,
1091                        deferred,
1092                    )
1093                    .await
1094                }
1095                AlterViewOperation::SetResourceGroup {
1096                    resource_group,
1097                    deferred,
1098                } => {
1099                    if !materialized {
1100                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1101                    }
1102                    alter_resource_group::handle_alter_resource_group(
1103                        handler_args,
1104                        name,
1105                        resource_group,
1106                        statement_type,
1107                        deferred,
1108                    )
1109                    .await
1110                }
1111                AlterViewOperation::ChangeOwner { new_owner_name } => {
1112                    alter_owner::handle_alter_owner(
1113                        handler_args,
1114                        name,
1115                        new_owner_name,
1116                        statement_type,
1117                        None,
1118                    )
1119                    .await
1120                }
1121                AlterViewOperation::SetSchema { new_schema_name } => {
1122                    alter_set_schema::handle_alter_set_schema(
1123                        handler_args,
1124                        name,
1125                        new_schema_name,
1126                        statement_type,
1127                        None,
1128                    )
1129                    .await
1130                }
1131                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1132                    if !materialized {
1133                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1134                    }
1135                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1136                        handler_args,
1137                        PbThrottleTarget::Mv,
1138                        risingwave_pb::common::PbThrottleType::Backfill,
1139                        name,
1140                        rate_limit,
1141                    )
1142                    .await
1143                }
1144                AlterViewOperation::SwapRenameView { target_view } => {
1145                    alter_swap_rename::handle_swap_rename(
1146                        handler_args,
1147                        name,
1148                        target_view,
1149                        statement_type,
1150                    )
1151                    .await
1152                }
1153                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1154                    if !materialized {
1155                        bail!(
1156                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1157                        );
1158                    }
1159                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1160                }
1161                AlterViewOperation::AsQuery { query } => {
1162                    if !materialized {
1163                        bail_not_implemented!("ALTER VIEW AS QUERY");
1164                    }
1165                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1166                    if !cfg!(debug_assertions) {
1167                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1168                    }
1169                    alter_mv::handle_alter_mv(handler_args, name, query).await
1170                }
1171                AlterViewOperation::SetConfig { entries } => {
1172                    if !materialized {
1173                        bail!("SET CONFIG is only supported for materialized views");
1174                    }
1175                    alter_streaming_config::handle_alter_streaming_set_config(
1176                        handler_args,
1177                        name,
1178                        entries,
1179                        statement_type,
1180                    )
1181                    .await
1182                }
1183                AlterViewOperation::ResetConfig { keys } => {
1184                    if !materialized {
1185                        bail!("RESET CONFIG is only supported for materialized views");
1186                    }
1187                    alter_streaming_config::handle_alter_streaming_reset_config(
1188                        handler_args,
1189                        name,
1190                        keys,
1191                        statement_type,
1192                    )
1193                    .await
1194                }
1195            }
1196        }
1197
1198        Statement::AlterSink { name, operation } => match operation {
1199            AlterSinkOperation::AlterConnectorProps {
1200                alter_props: changed_props,
1201            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1202            AlterSinkOperation::RenameSink { sink_name } => {
1203                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1204            }
1205            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1206                alter_owner::handle_alter_owner(
1207                    handler_args,
1208                    name,
1209                    new_owner_name,
1210                    StatementType::ALTER_SINK,
1211                    None,
1212                )
1213                .await
1214            }
1215            AlterSinkOperation::SetSchema { new_schema_name } => {
1216                alter_set_schema::handle_alter_set_schema(
1217                    handler_args,
1218                    name,
1219                    new_schema_name,
1220                    StatementType::ALTER_SINK,
1221                    None,
1222                )
1223                .await
1224            }
1225            AlterSinkOperation::SetParallelism {
1226                parallelism,
1227                deferred,
1228            } => {
1229                alter_parallelism::handle_alter_parallelism(
1230                    handler_args,
1231                    name,
1232                    parallelism,
1233                    StatementType::ALTER_SINK,
1234                    deferred,
1235                )
1236                .await
1237            }
1238            AlterSinkOperation::SetBackfillParallelism {
1239                parallelism,
1240                deferred,
1241            } => {
1242                alter_parallelism::handle_alter_backfill_parallelism(
1243                    handler_args,
1244                    name,
1245                    parallelism,
1246                    StatementType::ALTER_SINK,
1247                    deferred,
1248                )
1249                .await
1250            }
1251            AlterSinkOperation::SetConfig { entries } => {
1252                alter_streaming_config::handle_alter_streaming_set_config(
1253                    handler_args,
1254                    name,
1255                    entries,
1256                    StatementType::ALTER_SINK,
1257                )
1258                .await
1259            }
1260            AlterSinkOperation::ResetConfig { keys } => {
1261                alter_streaming_config::handle_alter_streaming_reset_config(
1262                    handler_args,
1263                    name,
1264                    keys,
1265                    StatementType::ALTER_SINK,
1266                )
1267                .await
1268            }
1269            AlterSinkOperation::SwapRenameSink { target_sink } => {
1270                alter_swap_rename::handle_swap_rename(
1271                    handler_args,
1272                    name,
1273                    target_sink,
1274                    StatementType::ALTER_SINK,
1275                )
1276                .await
1277            }
1278            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1279                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1280                    handler_args,
1281                    PbThrottleTarget::Sink,
1282                    risingwave_pb::common::PbThrottleType::Sink,
1283                    name,
1284                    rate_limit,
1285                )
1286                .await
1287            }
1288            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1289                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1290                    handler_args,
1291                    PbThrottleTarget::Sink,
1292                    risingwave_pb::common::PbThrottleType::Backfill,
1293                    name,
1294                    rate_limit,
1295                )
1296                .await
1297            }
1298            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1299                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1300                    handler_args,
1301                    name,
1302                    enable,
1303                )
1304                .await
1305            }
1306        },
1307        Statement::AlterSubscription { name, operation } => match operation {
1308            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1309                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1310                    .await
1311            }
1312            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1313                alter_owner::handle_alter_owner(
1314                    handler_args,
1315                    name,
1316                    new_owner_name,
1317                    StatementType::ALTER_SUBSCRIPTION,
1318                    None,
1319                )
1320                .await
1321            }
1322            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1323                alter_set_schema::handle_alter_set_schema(
1324                    handler_args,
1325                    name,
1326                    new_schema_name,
1327                    StatementType::ALTER_SUBSCRIPTION,
1328                    None,
1329                )
1330                .await
1331            }
1332            AlterSubscriptionOperation::SetRetention { retention } => {
1333                alter_subscription_retention::handle_alter_subscription_retention(
1334                    handler_args,
1335                    name,
1336                    retention,
1337                )
1338                .await
1339            }
1340            AlterSubscriptionOperation::SwapRenameSubscription {
1341                target_subscription,
1342            } => {
1343                alter_swap_rename::handle_swap_rename(
1344                    handler_args,
1345                    name,
1346                    target_subscription,
1347                    StatementType::ALTER_SUBSCRIPTION,
1348                )
1349                .await
1350            }
1351        },
1352        Statement::AlterSource { name, operation } => match operation {
1353            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1354                alter_source_props::handle_alter_source_connector_props(
1355                    handler_args,
1356                    name,
1357                    alter_props,
1358                )
1359                .await
1360            }
1361            AlterSourceOperation::RenameSource { source_name } => {
1362                alter_rename::handle_rename_source(handler_args, name, source_name).await
1363            }
1364            AlterSourceOperation::AddColumn { .. } => {
1365                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1366            }
1367            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1368                alter_owner::handle_alter_owner(
1369                    handler_args,
1370                    name,
1371                    new_owner_name,
1372                    StatementType::ALTER_SOURCE,
1373                    None,
1374                )
1375                .await
1376            }
1377            AlterSourceOperation::SetSchema { new_schema_name } => {
1378                alter_set_schema::handle_alter_set_schema(
1379                    handler_args,
1380                    name,
1381                    new_schema_name,
1382                    StatementType::ALTER_SOURCE,
1383                    None,
1384                )
1385                .await
1386            }
1387            AlterSourceOperation::FormatEncode { format_encode } => {
1388                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1389                    .await
1390            }
1391            AlterSourceOperation::RefreshSchema => {
1392                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1393            }
1394            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1395                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1396                    handler_args,
1397                    PbThrottleTarget::Source,
1398                    risingwave_pb::common::PbThrottleType::Source,
1399                    name,
1400                    rate_limit,
1401                )
1402                .await
1403            }
1404            AlterSourceOperation::SwapRenameSource { target_source } => {
1405                alter_swap_rename::handle_swap_rename(
1406                    handler_args,
1407                    name,
1408                    target_source,
1409                    StatementType::ALTER_SOURCE,
1410                )
1411                .await
1412            }
1413            AlterSourceOperation::SetParallelism {
1414                parallelism,
1415                deferred,
1416            } => {
1417                alter_parallelism::handle_alter_parallelism(
1418                    handler_args,
1419                    name,
1420                    parallelism,
1421                    StatementType::ALTER_SOURCE,
1422                    deferred,
1423                )
1424                .await
1425            }
1426            AlterSourceOperation::SetBackfillParallelism {
1427                parallelism,
1428                deferred,
1429            } => {
1430                alter_parallelism::handle_alter_backfill_parallelism(
1431                    handler_args,
1432                    name,
1433                    parallelism,
1434                    StatementType::ALTER_SOURCE,
1435                    deferred,
1436                )
1437                .await
1438            }
1439            AlterSourceOperation::SetConfig { entries } => {
1440                alter_streaming_config::handle_alter_streaming_set_config(
1441                    handler_args,
1442                    name,
1443                    entries,
1444                    StatementType::ALTER_SOURCE,
1445                )
1446                .await
1447            }
1448            AlterSourceOperation::ResetConfig { keys } => {
1449                alter_streaming_config::handle_alter_streaming_reset_config(
1450                    handler_args,
1451                    name,
1452                    keys,
1453                    StatementType::ALTER_SOURCE,
1454                )
1455                .await
1456            }
1457            AlterSourceOperation::ResetSource => {
1458                reset_source::handle_reset_source(handler_args, name).await
1459            }
1460        },
1461        Statement::AlterFunction {
1462            name,
1463            args,
1464            operation,
1465        } => match operation {
1466            AlterFunctionOperation::SetSchema { new_schema_name } => {
1467                alter_set_schema::handle_alter_set_schema(
1468                    handler_args,
1469                    name,
1470                    new_schema_name,
1471                    StatementType::ALTER_FUNCTION,
1472                    args,
1473                )
1474                .await
1475            }
1476            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1477                alter_owner::handle_alter_owner(
1478                    handler_args,
1479                    name,
1480                    new_owner_name,
1481                    StatementType::ALTER_FUNCTION,
1482                    args,
1483                )
1484                .await
1485            }
1486        },
1487        Statement::AlterConnection { name, operation } => match operation {
1488            AlterConnectionOperation::SetSchema { new_schema_name } => {
1489                alter_set_schema::handle_alter_set_schema(
1490                    handler_args,
1491                    name,
1492                    new_schema_name,
1493                    StatementType::ALTER_CONNECTION,
1494                    None,
1495                )
1496                .await
1497            }
1498            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1499                alter_owner::handle_alter_owner(
1500                    handler_args,
1501                    name,
1502                    new_owner_name,
1503                    StatementType::ALTER_CONNECTION,
1504                    None,
1505                )
1506                .await
1507            }
1508            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1509                alter_connection_props::handle_alter_connection_connector_props(
1510                    handler_args,
1511                    name,
1512                    alter_props,
1513                )
1514                .await
1515            }
1516        },
1517        Statement::AlterSystem { param, value } => {
1518            alter_system::handle_alter_system(handler_args, param, value).await
1519        }
1520        Statement::AlterSecret { name, operation } => match operation {
1521            AlterSecretOperation::ChangeCredential {
1522                with_options,
1523                new_credential,
1524            } => {
1525                alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1526                    .await
1527            }
1528            AlterSecretOperation::ChangeOwner { new_owner_name } => {
1529                alter_owner::handle_alter_owner(
1530                    handler_args,
1531                    name,
1532                    new_owner_name,
1533                    StatementType::ALTER_SECRET,
1534                    None,
1535                )
1536                .await
1537            }
1538        },
1539        Statement::AlterFragment {
1540            fragment_ids,
1541            operation,
1542        } => match operation {
1543            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1544                let [fragment_id] = fragment_ids.as_slice() else {
1545                    return Err(ErrorCode::InvalidInputSyntax(
1546                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1547                            .to_owned(),
1548                    )
1549                    .into());
1550                };
1551                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1552                    &handler_args.session,
1553                    PbThrottleTarget::Fragment,
1554                    risingwave_pb::common::PbThrottleType::Backfill,
1555                    *fragment_id,
1556                    rate_limit,
1557                    StatementType::SET_VARIABLE,
1558                )
1559                .await
1560            }
1561            AlterFragmentOperation::SetParallelism { parallelism } => {
1562                alter_parallelism::handle_alter_fragment_parallelism(
1563                    handler_args,
1564                    fragment_ids.into_iter().map_into().collect(),
1565                    parallelism,
1566                )
1567                .await
1568            }
1569        },
1570        Statement::AlterDefaultPrivileges { .. } => {
1571            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1572        }
1573        Statement::AlterCompactionGroup {
1574            group_ids,
1575            operation,
1576        } => {
1577            alter_compaction_group::handle_alter_compaction_group(
1578                handler_args,
1579                group_ids,
1580                operation,
1581            )
1582            .await
1583        }
1584        Statement::StartTransaction { modes } => {
1585            transaction::handle_begin(handler_args, START_TRANSACTION, modes)
1586        }
1587        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes),
1588        Statement::Commit { chain } => {
1589            transaction::handle_commit(handler_args, COMMIT, chain).await
1590        }
1591        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1592        Statement::Rollback { chain } => {
1593            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1594        }
1595        Statement::SetTransaction {
1596            modes,
1597            snapshot,
1598            session,
1599        } => transaction::handle_set(handler_args, modes, snapshot, session),
1600        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1601        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1602        Statement::Comment {
1603            object_type,
1604            object_name,
1605            comment,
1606        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1607        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1608        Statement::Prepare {
1609            name,
1610            data_types,
1611            statement,
1612        } => prepared_statement::handle_prepare(name, data_types, statement),
1613        Statement::Deallocate { name, prepare } => {
1614            prepared_statement::handle_deallocate(name, prepare)
1615        }
1616        Statement::Vacuum { object_name, full } => {
1617            vacuum::handle_vacuum(handler_args, object_name, full).await
1618        }
1619        Statement::Refresh { table_name } => {
1620            refresh::handle_refresh(handler_args, table_name).await
1621        }
1622        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1623    }
1624}
1625
1626fn check_ban_ddl_for_iceberg_engine_table(
1627    session: Arc<SessionImpl>,
1628    stmt: &Statement,
1629) -> Result<()> {
1630    if let Statement::AlterTable { name, operation } = stmt {
1631        let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1632        if table.is_iceberg_engine_table() {
1633            let has_auto_refresh_schema_sink =
1634                if matches!(operation, AlterTableOperation::AddColumn { .. }) {
1635                    let catalog_reader = session.env().catalog_reader().read_guard();
1636                    let db_name = session.database();
1637                    let sink_name = format!("{}{}", ICEBERG_SINK_PREFIX, table.name());
1638                    let sink = catalog_reader
1639                        .get_schema_by_name(&db_name, &schema_name)
1640                        .ok()
1641                        .and_then(|schema| schema.get_created_sink_by_name(&sink_name));
1642                    sink.and_then(|s| s.auto_refresh_schema_from_table)
1643                        .is_some()
1644                } else {
1645                    false
1646                };
1647
1648            check_ban_alter_table_operation_for_iceberg_engine_table(
1649                operation,
1650                &schema_name,
1651                name,
1652                has_auto_refresh_schema_sink,
1653            )?;
1654        }
1655    }
1656
1657    Ok(())
1658}
1659
1660fn check_ban_alter_table_operation_for_iceberg_engine_table(
1661    operation: &AlterTableOperation,
1662    schema_name: &str,
1663    table_name: &ObjectName,
1664    has_auto_refresh_schema_sink: bool,
1665) -> Result<()> {
1666    match operation {
1667        AlterTableOperation::AddColumn { .. } => {
1668            if !has_auto_refresh_schema_sink {
1669                bail!(
1670                    "ALTER TABLE {} is not supported for iceberg table without auto schema change sink: {}.{}",
1671                    operation,
1672                    schema_name,
1673                    table_name
1674                );
1675            }
1676        }
1677        AlterTableOperation::DropColumn { .. } => {
1678            // TODO: allow DROP COLUMN for iceberg table after iceberg sink schema change supports drop.
1679            bail!(
1680                "ALTER TABLE DROP COLUMN is not supported for iceberg table: {}.{}",
1681                schema_name,
1682                table_name
1683            );
1684        }
1685        AlterTableOperation::RenameColumn { .. }
1686        | AlterTableOperation::ChangeColumn { .. }
1687        | AlterTableOperation::AlterColumn { .. } => {
1688            bail!(
1689                "ALTER TABLE {} is not supported for iceberg table: {}.{}. Existing column schema mutation is not supported currently",
1690                operation,
1691                schema_name,
1692                table_name
1693            );
1694        }
1695        AlterTableOperation::RenameTable { .. } => {
1696            bail!(
1697                "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1698                schema_name,
1699                table_name
1700            );
1701        }
1702        AlterTableOperation::SetParallelism { .. } => {
1703            bail!(
1704                "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1705                schema_name,
1706                table_name
1707            );
1708        }
1709        AlterTableOperation::SetBackfillParallelism { .. } => {
1710            bail!(
1711                "ALTER TABLE SET BACKFILL PARALLELISM is not supported for iceberg table: {}.{}",
1712                schema_name,
1713                table_name
1714            );
1715        }
1716        AlterTableOperation::SetSchema { .. } => {
1717            bail!(
1718                "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1719                schema_name,
1720                table_name
1721            );
1722        }
1723        AlterTableOperation::RefreshSchema => {
1724            bail!(
1725                "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1726                schema_name,
1727                table_name
1728            );
1729        }
1730        AlterTableOperation::SetSourceRateLimit { .. } => {
1731            bail!(
1732                "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1733                schema_name,
1734                table_name
1735            );
1736        }
1737        AlterTableOperation::AlterWatermark { .. } => {
1738            bail!(
1739                "ALTER TABLE ALTER WATERMARK is not supported for iceberg table: {}.{}",
1740                schema_name,
1741                table_name
1742            );
1743        }
1744        _ => {}
1745    }
1746    Ok(())
1747}