risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63pub mod alter_table_props;
64mod alter_table_with_sr;
65pub mod alter_user;
66pub mod cancel_job;
67pub mod close_cursor;
68mod comment;
69pub mod create_aggregate;
70pub mod create_connection;
71mod create_database;
72pub mod create_function;
73pub mod create_index;
74pub mod create_mv;
75pub mod create_schema;
76pub mod create_secret;
77pub mod create_sink;
78pub mod create_source;
79pub mod create_sql_function;
80pub mod create_subscription;
81pub mod create_table;
82pub mod create_table_as;
83pub mod create_user;
84pub mod create_view;
85pub mod declare_cursor;
86pub mod describe;
87pub mod discard;
88mod drop_connection;
89mod drop_database;
90pub mod drop_function;
91mod drop_index;
92pub mod drop_mv;
93mod drop_schema;
94pub mod drop_secret;
95pub mod drop_sink;
96pub mod drop_source;
97pub mod drop_subscription;
98pub mod drop_table;
99pub mod drop_user;
100mod drop_view;
101pub mod explain;
102pub mod explain_analyze_stream_job;
103pub mod extended_handle;
104pub mod fetch_cursor;
105mod flush;
106pub mod handle_privilege;
107pub mod kill_process;
108mod prepared_statement;
109pub mod privilege;
110pub mod query;
111mod recover;
112mod refresh;
113pub mod show;
114mod transaction;
115mod use_db;
116pub mod util;
117pub mod vacuum;
118pub mod variable;
119mod wait;
120
121pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
122
123/// The [`PgResponseBuilder`] used by RisingWave.
124pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
125
126/// The [`PgResponse`] used by RisingWave.
127pub type RwPgResponse = PgResponse<PgResponseStream>;
128
129#[easy_ext::ext(RwPgResponseBuilderExt)]
130impl RwPgResponseBuilder {
131    /// Append rows to the response.
132    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
133        let fields = T::fields();
134        self.values(
135            rows.into_iter()
136                .map(|row| {
137                    Row::new(
138                        row.into_owned_row()
139                            .into_iter()
140                            .zip_eq_fast(&fields)
141                            .map(|(datum, (_, ty))| {
142                                datum.map(|scalar| {
143                                    scalar.as_scalar_ref_impl().text_format(ty).into()
144                                })
145                            })
146                            .collect(),
147                    )
148                })
149                .collect_vec()
150                .into(),
151            fields_to_descriptors(fields),
152        )
153    }
154}
155
156pub fn fields_to_descriptors(
157    fields: Vec<(&str, risingwave_common::types::DataType)>,
158) -> Vec<PgFieldDescriptor> {
159    fields
160        .iter()
161        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
162        .collect()
163}
164
165pub enum PgResponseStream {
166    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
167    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
168    Rows(BoxStream<'static, RowSetResult>),
169}
170
171impl Stream for PgResponseStream {
172    type Item = std::result::Result<Vec<Row>, BoxedError>;
173
174    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175        match &mut *self {
176            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
177            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
178            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
179        }
180    }
181}
182
183impl From<Vec<Row>> for PgResponseStream {
184    fn from(rows: Vec<Row>) -> Self {
185        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
186    }
187}
188
189#[derive(Clone)]
190pub struct HandlerArgs {
191    pub session: Arc<SessionImpl>,
192    pub sql: Arc<str>,
193    pub normalized_sql: String,
194    pub with_options: WithOptions,
195}
196
197impl HandlerArgs {
198    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
199        Ok(Self {
200            session,
201            sql,
202            with_options: WithOptions::try_from(stmt)?,
203            normalized_sql: Self::normalize_sql(stmt),
204        })
205    }
206
207    /// Get normalized SQL from the statement.
208    ///
209    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
210    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
211    ///   make it suitable for the `SHOW CREATE` statements.
212    fn normalize_sql(stmt: &Statement) -> String {
213        let mut stmt = stmt.clone();
214        match &mut stmt {
215            Statement::CreateView {
216                or_replace,
217                if_not_exists,
218                ..
219            } => {
220                *or_replace = false;
221                *if_not_exists = false;
222            }
223            Statement::CreateTable {
224                or_replace,
225                if_not_exists,
226                ..
227            } => {
228                *or_replace = false;
229                *if_not_exists = false;
230            }
231            Statement::CreateIndex { if_not_exists, .. } => {
232                *if_not_exists = false;
233            }
234            Statement::CreateSource {
235                stmt: CreateSourceStatement { if_not_exists, .. },
236                ..
237            } => {
238                *if_not_exists = false;
239            }
240            Statement::CreateSink {
241                stmt: CreateSinkStatement { if_not_exists, .. },
242            } => {
243                *if_not_exists = false;
244            }
245            Statement::CreateSubscription {
246                stmt: CreateSubscriptionStatement { if_not_exists, .. },
247            } => {
248                *if_not_exists = false;
249            }
250            Statement::CreateConnection {
251                stmt: CreateConnectionStatement { if_not_exists, .. },
252            } => {
253                *if_not_exists = false;
254            }
255            _ => {}
256        }
257        stmt.to_string()
258    }
259}
260
261pub async fn handle(
262    session: Arc<SessionImpl>,
263    stmt: Statement,
264    sql: Arc<str>,
265    formats: Vec<Format>,
266) -> Result<RwPgResponse> {
267    session.clear_cancel_query_flag();
268    let _guard = session.txn_begin_implicit();
269    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
270
271    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
272
273    match stmt {
274        Statement::Explain {
275            statement,
276            analyze,
277            options,
278        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
279        Statement::ExplainAnalyzeStreamJob {
280            target,
281            duration_secs,
282        } => {
283            explain_analyze_stream_job::handle_explain_analyze_stream_job(
284                handler_args,
285                target,
286                duration_secs,
287            )
288            .await
289        }
290        Statement::CreateSource { stmt } => {
291            create_source::handle_create_source(handler_args, stmt).await
292        }
293        Statement::CreateSink { stmt } => {
294            create_sink::handle_create_sink(handler_args, stmt, false).await
295        }
296        Statement::CreateSubscription { stmt } => {
297            create_subscription::handle_create_subscription(handler_args, stmt).await
298        }
299        Statement::CreateConnection { stmt } => {
300            create_connection::handle_create_connection(handler_args, stmt).await
301        }
302        Statement::CreateSecret { stmt } => {
303            create_secret::handle_create_secret(handler_args, stmt).await
304        }
305        Statement::CreateFunction {
306            or_replace,
307            temporary,
308            if_not_exists,
309            name,
310            args,
311            returns,
312            params,
313            with_options,
314        } => {
315            // For general udf, `language` clause could be ignored
316            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
317            if params.language.is_none()
318                || !params
319                    .language
320                    .as_ref()
321                    .unwrap()
322                    .real_value()
323                    .eq_ignore_ascii_case("sql")
324            {
325                create_function::handle_create_function(
326                    handler_args,
327                    or_replace,
328                    temporary,
329                    if_not_exists,
330                    name,
331                    args,
332                    returns,
333                    params,
334                    with_options,
335                )
336                .await
337            } else {
338                create_sql_function::handle_create_sql_function(
339                    handler_args,
340                    or_replace,
341                    temporary,
342                    if_not_exists,
343                    name,
344                    args,
345                    returns,
346                    params,
347                )
348                .await
349            }
350        }
351        Statement::CreateAggregate {
352            or_replace,
353            if_not_exists,
354            name,
355            args,
356            returns,
357            params,
358            ..
359        } => {
360            create_aggregate::handle_create_aggregate(
361                handler_args,
362                or_replace,
363                if_not_exists,
364                name,
365                args,
366                returns,
367                params,
368            )
369            .await
370        }
371        Statement::CreateTable {
372            name,
373            columns,
374            wildcard_idx,
375            constraints,
376            query,
377            with_options: _, // It is put in OptimizerContext
378            // Not supported things
379            or_replace,
380            temporary,
381            if_not_exists,
382            format_encode,
383            source_watermarks,
384            append_only,
385            on_conflict,
386            with_version_columns,
387            cdc_table_info,
388            include_column_options,
389            webhook_info,
390            engine,
391        } => {
392            if or_replace {
393                bail_not_implemented!("CREATE OR REPLACE TABLE");
394            }
395            if temporary {
396                bail_not_implemented!("CREATE TEMPORARY TABLE");
397            }
398            if let Some(query) = query {
399                return create_table_as::handle_create_as(
400                    handler_args,
401                    name,
402                    if_not_exists,
403                    query,
404                    columns,
405                    append_only,
406                    on_conflict,
407                    with_version_columns
408                        .iter()
409                        .map(|col| col.real_value())
410                        .collect(),
411                    engine,
412                )
413                .await;
414            }
415            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
416            create_table::handle_create_table(
417                handler_args,
418                name,
419                columns,
420                wildcard_idx,
421                constraints,
422                if_not_exists,
423                format_encode,
424                source_watermarks,
425                append_only,
426                on_conflict,
427                with_version_columns
428                    .iter()
429                    .map(|col| col.real_value())
430                    .collect(),
431                cdc_table_info,
432                include_column_options,
433                webhook_info,
434                engine,
435            )
436            .await
437        }
438        Statement::CreateDatabase {
439            db_name,
440            if_not_exists,
441            owner,
442            resource_group,
443            barrier_interval_ms,
444            checkpoint_frequency,
445        } => {
446            create_database::handle_create_database(
447                handler_args,
448                db_name,
449                if_not_exists,
450                owner,
451                resource_group,
452                barrier_interval_ms,
453                checkpoint_frequency,
454            )
455            .await
456        }
457        Statement::CreateSchema {
458            schema_name,
459            if_not_exists,
460            owner,
461        } => {
462            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
463                .await
464        }
465        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
466        Statement::DeclareCursor { stmt } => {
467            declare_cursor::handle_declare_cursor(handler_args, stmt).await
468        }
469        Statement::FetchCursor { stmt } => {
470            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
471        }
472        Statement::CloseCursor { stmt } => {
473            close_cursor::handle_close_cursor(handler_args, stmt).await
474        }
475        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
476        Statement::Grant { .. } => {
477            handle_privilege::handle_grant_privilege(handler_args, stmt).await
478        }
479        Statement::Revoke { .. } => {
480            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
481        }
482        Statement::Describe { name, kind } => match kind {
483            DescribeKind::Fragments => {
484                describe::handle_describe_fragments(handler_args, name).await
485            }
486            DescribeKind::Plain => describe::handle_describe(handler_args, name),
487        },
488        Statement::DescribeFragment { fragment_id } => {
489            describe::handle_describe_fragment(handler_args, fragment_id).await
490        }
491        Statement::Discard(..) => discard::handle_discard(handler_args),
492        Statement::ShowObjects {
493            object: show_object,
494            filter,
495        } => show::handle_show_object(handler_args, show_object, filter).await,
496        Statement::ShowCreateObject { create_type, name } => {
497            show::handle_show_create_object(handler_args, create_type, name)
498        }
499        Statement::ShowTransactionIsolationLevel => {
500            transaction::handle_show_isolation_level(handler_args)
501        }
502        Statement::Drop(DropStatement {
503            object_type,
504            object_name,
505            if_exists,
506            drop_mode,
507        }) => {
508            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
509                match object_type {
510                    ObjectType::MaterializedView
511                    | ObjectType::View
512                    | ObjectType::Sink
513                    | ObjectType::Source
514                    | ObjectType::Subscription
515                    | ObjectType::Index
516                    | ObjectType::Table
517                    | ObjectType::Schema
518                    | ObjectType::Connection => true,
519                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
520                        bail_not_implemented!("DROP CASCADE");
521                    }
522                }
523            } else {
524                false
525            };
526            match object_type {
527                ObjectType::Table => {
528                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
529                        .await
530                }
531                ObjectType::MaterializedView => {
532                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
533                }
534                ObjectType::Index => {
535                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
536                        .await
537                }
538                ObjectType::Source => {
539                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
540                        .await
541                }
542                ObjectType::Sink => {
543                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
544                }
545                ObjectType::Subscription => {
546                    drop_subscription::handle_drop_subscription(
547                        handler_args,
548                        object_name,
549                        if_exists,
550                        cascade,
551                    )
552                    .await
553                }
554                ObjectType::Database => {
555                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
556                }
557                ObjectType::Schema => {
558                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
559                        .await
560                }
561                ObjectType::User => {
562                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
563                }
564                ObjectType::View => {
565                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
566                }
567                ObjectType::Connection => {
568                    drop_connection::handle_drop_connection(
569                        handler_args,
570                        object_name,
571                        if_exists,
572                        cascade,
573                    )
574                    .await
575                }
576                ObjectType::Secret => {
577                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
578                }
579            }
580        }
581        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
582        Statement::DropFunction {
583            if_exists,
584            func_desc,
585            option,
586        } => {
587            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
588                .await
589        }
590        Statement::DropAggregate {
591            if_exists,
592            func_desc,
593            option,
594        } => {
595            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
596                .await
597        }
598        Statement::Query(_)
599        | Statement::Insert { .. }
600        | Statement::Delete { .. }
601        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
602        Statement::CreateView {
603            materialized,
604            if_not_exists,
605            name,
606            columns,
607            query,
608            with_options: _, // It is put in OptimizerContext
609            or_replace,      // not supported
610            emit_mode,
611        } => {
612            if or_replace {
613                bail_not_implemented!("CREATE OR REPLACE VIEW");
614            }
615            if materialized {
616                create_mv::handle_create_mv(
617                    handler_args,
618                    if_not_exists,
619                    name,
620                    *query,
621                    columns,
622                    emit_mode,
623                )
624                .await
625            } else {
626                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
627                    .await
628            }
629        }
630        Statement::Flush => flush::handle_flush(handler_args).await,
631        Statement::Wait => wait::handle_wait(handler_args).await,
632        Statement::Recover => recover::handle_recover(handler_args).await,
633        Statement::SetVariable {
634            local: _,
635            variable,
636            value,
637        } => {
638            // special handle for `use database`
639            if variable.real_value().eq_ignore_ascii_case("database") {
640                let x = variable::set_var_to_param_str(&value);
641                let res = use_db::handle_use_db(
642                    handler_args,
643                    ObjectName::from(vec![Ident::from_real_value(
644                        x.as_deref().unwrap_or("default"),
645                    )]),
646                )?;
647                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
648                for notice in res.notices() {
649                    builder = builder.notice(notice);
650                }
651                return Ok(builder.into());
652            }
653            variable::handle_set(handler_args, variable, value)
654        }
655        Statement::SetTimeZone { local: _, value } => {
656            variable::handle_set_time_zone(handler_args, value)
657        }
658        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
659        Statement::CreateIndex {
660            name,
661            table_name,
662            method,
663            columns,
664            include,
665            distributed_by,
666            unique,
667            if_not_exists,
668            with_properties: _,
669        } => {
670            if unique {
671                bail_not_implemented!("create unique index");
672            }
673
674            create_index::handle_create_index(
675                handler_args,
676                if_not_exists,
677                name,
678                table_name,
679                method,
680                columns.to_vec(),
681                include,
682                distributed_by,
683            )
684            .await
685        }
686        Statement::AlterDatabase { name, operation } => match operation {
687            AlterDatabaseOperation::RenameDatabase { database_name } => {
688                alter_rename::handle_rename_database(handler_args, name, database_name).await
689            }
690            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
691                alter_owner::handle_alter_owner(
692                    handler_args,
693                    name,
694                    new_owner_name,
695                    StatementType::ALTER_DATABASE,
696                )
697                .await
698            }
699            AlterDatabaseOperation::SetParam(config_param) => {
700                let ConfigParam { param, value } = config_param;
701
702                let database_param = match param.real_value().to_uppercase().as_str() {
703                    "BARRIER_INTERVAL_MS" => {
704                        let barrier_interval_ms = match value {
705                            SetVariableValue::Default => None,
706                            SetVariableValue::Single(SetVariableValueSingle::Literal(
707                                Value::Number(num),
708                            )) => {
709                                let num = num.parse::<u32>().map_err(|e| {
710                                    ErrorCode::InvalidInputSyntax(format!(
711                                        "barrier_interval_ms must be a u32 integer: {}",
712                                        e.as_report()
713                                    ))
714                                })?;
715                                Some(num)
716                            }
717                            _ => {
718                                return Err(ErrorCode::InvalidInputSyntax(
719                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
720                                        .to_owned(),
721                                )
722                                .into());
723                            }
724                        };
725                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
726                    }
727                    "CHECKPOINT_FREQUENCY" => {
728                        let checkpoint_frequency = match value {
729                            SetVariableValue::Default => None,
730                            SetVariableValue::Single(SetVariableValueSingle::Literal(
731                                Value::Number(num),
732                            )) => {
733                                let num = num.parse::<u64>().map_err(|e| {
734                                    ErrorCode::InvalidInputSyntax(format!(
735                                        "checkpoint_frequency must be a u64 integer: {}",
736                                        e.as_report()
737                                    ))
738                                })?;
739                                Some(num)
740                            }
741                            _ => {
742                                return Err(ErrorCode::InvalidInputSyntax(
743                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
744                                        .to_owned(),
745                                )
746                                .into());
747                            }
748                        };
749                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
750                    }
751                    _ => {
752                        return Err(ErrorCode::InvalidInputSyntax(format!(
753                            "Unsupported database config parameter: {}",
754                            param.real_value()
755                        ))
756                        .into());
757                    }
758                };
759
760                alter_database_param::handle_alter_database_param(
761                    handler_args,
762                    name,
763                    database_param,
764                )
765                .await
766            }
767        },
768        Statement::AlterSchema { name, operation } => match operation {
769            AlterSchemaOperation::RenameSchema { schema_name } => {
770                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
771            }
772            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
773                alter_owner::handle_alter_owner(
774                    handler_args,
775                    name,
776                    new_owner_name,
777                    StatementType::ALTER_SCHEMA,
778                )
779                .await
780            }
781            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
782                alter_swap_rename::handle_swap_rename(
783                    handler_args,
784                    name,
785                    target_schema,
786                    StatementType::ALTER_SCHEMA,
787                )
788                .await
789            }
790        },
791        Statement::AlterTable { name, operation } => match operation {
792            AlterTableOperation::AddColumn { .. }
793            | AlterTableOperation::DropColumn { .. }
794            | AlterTableOperation::AlterColumn { .. } => {
795                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
796            }
797            AlterTableOperation::RenameTable { table_name } => {
798                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
799                    .await
800            }
801            AlterTableOperation::ChangeOwner { new_owner_name } => {
802                alter_owner::handle_alter_owner(
803                    handler_args,
804                    name,
805                    new_owner_name,
806                    StatementType::ALTER_TABLE,
807                )
808                .await
809            }
810            AlterTableOperation::SetParallelism {
811                parallelism,
812                deferred,
813            } => {
814                alter_parallelism::handle_alter_parallelism(
815                    handler_args,
816                    name,
817                    parallelism,
818                    StatementType::ALTER_TABLE,
819                    deferred,
820                )
821                .await
822            }
823            AlterTableOperation::SetSchema { new_schema_name } => {
824                alter_set_schema::handle_alter_set_schema(
825                    handler_args,
826                    name,
827                    new_schema_name,
828                    StatementType::ALTER_TABLE,
829                    None,
830                )
831                .await
832            }
833            AlterTableOperation::RefreshSchema => {
834                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
835            }
836            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
837                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
838                    handler_args,
839                    PbThrottleTarget::TableWithSource,
840                    name,
841                    rate_limit,
842                )
843                .await
844            }
845            AlterTableOperation::DropConnector => {
846                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
847                    .await
848            }
849            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
850                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
851                    handler_args,
852                    PbThrottleTarget::TableDml,
853                    name,
854                    rate_limit,
855                )
856                .await
857            }
858            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
859                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
860                    handler_args,
861                    PbThrottleTarget::CdcTable,
862                    name,
863                    rate_limit,
864                )
865                .await
866            }
867            AlterTableOperation::SwapRenameTable { target_table } => {
868                alter_swap_rename::handle_swap_rename(
869                    handler_args,
870                    name,
871                    target_table,
872                    StatementType::ALTER_TABLE,
873                )
874                .await
875            }
876            AlterTableOperation::AlterConnectorProps { alter_props } => {
877                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
878            }
879            AlterTableOperation::AddConstraint { .. }
880            | AlterTableOperation::DropConstraint { .. }
881            | AlterTableOperation::RenameColumn { .. }
882            | AlterTableOperation::ChangeColumn { .. }
883            | AlterTableOperation::RenameConstraint { .. } => {
884                bail_not_implemented!(
885                    "Unhandled statement: {}",
886                    Statement::AlterTable { name, operation }
887                )
888            }
889        },
890        Statement::AlterIndex { name, operation } => match operation {
891            AlterIndexOperation::RenameIndex { index_name } => {
892                alter_rename::handle_rename_index(handler_args, name, index_name).await
893            }
894            AlterIndexOperation::SetParallelism {
895                parallelism,
896                deferred,
897            } => {
898                alter_parallelism::handle_alter_parallelism(
899                    handler_args,
900                    name,
901                    parallelism,
902                    StatementType::ALTER_INDEX,
903                    deferred,
904                )
905                .await
906            }
907        },
908        Statement::AlterView {
909            materialized,
910            name,
911            operation,
912        } => {
913            let statement_type = if materialized {
914                StatementType::ALTER_MATERIALIZED_VIEW
915            } else {
916                StatementType::ALTER_VIEW
917            };
918            match operation {
919                AlterViewOperation::RenameView { view_name } => {
920                    if materialized {
921                        alter_rename::handle_rename_table(
922                            handler_args,
923                            TableType::MaterializedView,
924                            name,
925                            view_name,
926                        )
927                        .await
928                    } else {
929                        alter_rename::handle_rename_view(handler_args, name, view_name).await
930                    }
931                }
932                AlterViewOperation::SetParallelism {
933                    parallelism,
934                    deferred,
935                } => {
936                    if !materialized {
937                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
938                    }
939                    alter_parallelism::handle_alter_parallelism(
940                        handler_args,
941                        name,
942                        parallelism,
943                        statement_type,
944                        deferred,
945                    )
946                    .await
947                }
948                AlterViewOperation::SetResourceGroup {
949                    resource_group,
950                    deferred,
951                } => {
952                    if !materialized {
953                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
954                    }
955                    alter_resource_group::handle_alter_resource_group(
956                        handler_args,
957                        name,
958                        resource_group,
959                        statement_type,
960                        deferred,
961                    )
962                    .await
963                }
964                AlterViewOperation::ChangeOwner { new_owner_name } => {
965                    alter_owner::handle_alter_owner(
966                        handler_args,
967                        name,
968                        new_owner_name,
969                        statement_type,
970                    )
971                    .await
972                }
973                AlterViewOperation::SetSchema { new_schema_name } => {
974                    alter_set_schema::handle_alter_set_schema(
975                        handler_args,
976                        name,
977                        new_schema_name,
978                        statement_type,
979                        None,
980                    )
981                    .await
982                }
983                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
984                    if !materialized {
985                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
986                    }
987                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
988                        handler_args,
989                        PbThrottleTarget::Mv,
990                        name,
991                        rate_limit,
992                    )
993                    .await
994                }
995                AlterViewOperation::SwapRenameView { target_view } => {
996                    alter_swap_rename::handle_swap_rename(
997                        handler_args,
998                        name,
999                        target_view,
1000                        statement_type,
1001                    )
1002                    .await
1003                }
1004                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1005                    if !materialized {
1006                        bail!(
1007                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1008                        );
1009                    }
1010                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1011                }
1012                AlterViewOperation::AsQuery { query } => {
1013                    if !materialized {
1014                        bail_not_implemented!("ALTER VIEW AS QUERY");
1015                    }
1016                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1017                    if !cfg!(debug_assertions) {
1018                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1019                    }
1020                    alter_mv::handle_alter_mv(handler_args, name, query).await
1021                }
1022            }
1023        }
1024
1025        Statement::AlterSink { name, operation } => match operation {
1026            AlterSinkOperation::AlterConnectorProps {
1027                alter_props: changed_props,
1028            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1029            AlterSinkOperation::RenameSink { sink_name } => {
1030                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1031            }
1032            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1033                alter_owner::handle_alter_owner(
1034                    handler_args,
1035                    name,
1036                    new_owner_name,
1037                    StatementType::ALTER_SINK,
1038                )
1039                .await
1040            }
1041            AlterSinkOperation::SetSchema { new_schema_name } => {
1042                alter_set_schema::handle_alter_set_schema(
1043                    handler_args,
1044                    name,
1045                    new_schema_name,
1046                    StatementType::ALTER_SINK,
1047                    None,
1048                )
1049                .await
1050            }
1051            AlterSinkOperation::SetParallelism {
1052                parallelism,
1053                deferred,
1054            } => {
1055                alter_parallelism::handle_alter_parallelism(
1056                    handler_args,
1057                    name,
1058                    parallelism,
1059                    StatementType::ALTER_SINK,
1060                    deferred,
1061                )
1062                .await
1063            }
1064            AlterSinkOperation::SwapRenameSink { target_sink } => {
1065                alter_swap_rename::handle_swap_rename(
1066                    handler_args,
1067                    name,
1068                    target_sink,
1069                    StatementType::ALTER_SINK,
1070                )
1071                .await
1072            }
1073            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1074                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1075                    handler_args,
1076                    PbThrottleTarget::Sink,
1077                    name,
1078                    rate_limit,
1079                )
1080                .await
1081            }
1082            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1083                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1084                    handler_args,
1085                    name,
1086                    enable,
1087                )
1088                .await
1089            }
1090        },
1091        Statement::AlterSubscription { name, operation } => match operation {
1092            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1093                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1094                    .await
1095            }
1096            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1097                alter_owner::handle_alter_owner(
1098                    handler_args,
1099                    name,
1100                    new_owner_name,
1101                    StatementType::ALTER_SUBSCRIPTION,
1102                )
1103                .await
1104            }
1105            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1106                alter_set_schema::handle_alter_set_schema(
1107                    handler_args,
1108                    name,
1109                    new_schema_name,
1110                    StatementType::ALTER_SUBSCRIPTION,
1111                    None,
1112                )
1113                .await
1114            }
1115            AlterSubscriptionOperation::SwapRenameSubscription {
1116                target_subscription,
1117            } => {
1118                alter_swap_rename::handle_swap_rename(
1119                    handler_args,
1120                    name,
1121                    target_subscription,
1122                    StatementType::ALTER_SUBSCRIPTION,
1123                )
1124                .await
1125            }
1126        },
1127        Statement::AlterSource { name, operation } => match operation {
1128            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1129                alter_source_props::handle_alter_source_connector_props(
1130                    handler_args,
1131                    name,
1132                    alter_props,
1133                )
1134                .await
1135            }
1136            AlterSourceOperation::RenameSource { source_name } => {
1137                alter_rename::handle_rename_source(handler_args, name, source_name).await
1138            }
1139            AlterSourceOperation::AddColumn { .. } => {
1140                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1141            }
1142            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1143                alter_owner::handle_alter_owner(
1144                    handler_args,
1145                    name,
1146                    new_owner_name,
1147                    StatementType::ALTER_SOURCE,
1148                )
1149                .await
1150            }
1151            AlterSourceOperation::SetSchema { new_schema_name } => {
1152                alter_set_schema::handle_alter_set_schema(
1153                    handler_args,
1154                    name,
1155                    new_schema_name,
1156                    StatementType::ALTER_SOURCE,
1157                    None,
1158                )
1159                .await
1160            }
1161            AlterSourceOperation::FormatEncode { format_encode } => {
1162                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1163                    .await
1164            }
1165            AlterSourceOperation::RefreshSchema => {
1166                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1167            }
1168            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1169                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1170                    handler_args,
1171                    PbThrottleTarget::Source,
1172                    name,
1173                    rate_limit,
1174                )
1175                .await
1176            }
1177            AlterSourceOperation::SwapRenameSource { target_source } => {
1178                alter_swap_rename::handle_swap_rename(
1179                    handler_args,
1180                    name,
1181                    target_source,
1182                    StatementType::ALTER_SOURCE,
1183                )
1184                .await
1185            }
1186            AlterSourceOperation::SetParallelism {
1187                parallelism,
1188                deferred,
1189            } => {
1190                alter_parallelism::handle_alter_parallelism(
1191                    handler_args,
1192                    name,
1193                    parallelism,
1194                    StatementType::ALTER_SOURCE,
1195                    deferred,
1196                )
1197                .await
1198            }
1199        },
1200        Statement::AlterFunction {
1201            name,
1202            args,
1203            operation,
1204        } => match operation {
1205            AlterFunctionOperation::SetSchema { new_schema_name } => {
1206                alter_set_schema::handle_alter_set_schema(
1207                    handler_args,
1208                    name,
1209                    new_schema_name,
1210                    StatementType::ALTER_FUNCTION,
1211                    args,
1212                )
1213                .await
1214            }
1215        },
1216        Statement::AlterConnection { name, operation } => match operation {
1217            AlterConnectionOperation::SetSchema { new_schema_name } => {
1218                alter_set_schema::handle_alter_set_schema(
1219                    handler_args,
1220                    name,
1221                    new_schema_name,
1222                    StatementType::ALTER_CONNECTION,
1223                    None,
1224                )
1225                .await
1226            }
1227            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1228                alter_owner::handle_alter_owner(
1229                    handler_args,
1230                    name,
1231                    new_owner_name,
1232                    StatementType::ALTER_CONNECTION,
1233                )
1234                .await
1235            }
1236        },
1237        Statement::AlterSystem { param, value } => {
1238            alter_system::handle_alter_system(handler_args, param, value).await
1239        }
1240        Statement::AlterSecret {
1241            name,
1242            with_options,
1243            operation,
1244        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1245        Statement::AlterFragment {
1246            fragment_id,
1247            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1248        } => {
1249            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1250                &handler_args.session,
1251                PbThrottleTarget::Fragment,
1252                fragment_id,
1253                rate_limit,
1254                StatementType::SET_VARIABLE,
1255            )
1256            .await
1257        }
1258        Statement::AlterDefaultPrivileges { .. } => {
1259            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1260        }
1261        Statement::StartTransaction { modes } => {
1262            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1263        }
1264        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1265        Statement::Commit { chain } => {
1266            transaction::handle_commit(handler_args, COMMIT, chain).await
1267        }
1268        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1269        Statement::Rollback { chain } => {
1270            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1271        }
1272        Statement::SetTransaction {
1273            modes,
1274            snapshot,
1275            session,
1276        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1277        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1278        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1279        Statement::Comment {
1280            object_type,
1281            object_name,
1282            comment,
1283        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1284        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1285        Statement::Prepare {
1286            name,
1287            data_types,
1288            statement,
1289        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1290        Statement::Deallocate { name, prepare } => {
1291            prepared_statement::handle_deallocate(name, prepare).await
1292        }
1293        Statement::Vacuum { object_name, full } => {
1294            vacuum::handle_vacuum(handler_args, object_name, full).await
1295        }
1296        Statement::Refresh { table_name } => {
1297            refresh::handle_refresh(handler_args, table_name).await
1298        }
1299        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1300    }
1301}
1302
1303fn check_ban_ddl_for_iceberg_engine_table(
1304    session: Arc<SessionImpl>,
1305    stmt: &Statement,
1306) -> Result<()> {
1307    match stmt {
1308        Statement::AlterTable {
1309            name,
1310            operation:
1311                operation @ (AlterTableOperation::AddColumn { .. }
1312                | AlterTableOperation::DropColumn { .. }),
1313        } => {
1314            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1315            if table.is_iceberg_engine_table() {
1316                bail!(
1317                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1318                    operation,
1319                    schema_name,
1320                    name
1321                );
1322            }
1323        }
1324
1325        Statement::AlterTable {
1326            name,
1327            operation: AlterTableOperation::RenameTable { .. },
1328        } => {
1329            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1330            if table.is_iceberg_engine_table() {
1331                bail!(
1332                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1333                    schema_name,
1334                    name
1335                );
1336            }
1337        }
1338
1339        Statement::AlterTable {
1340            name,
1341            operation: AlterTableOperation::ChangeOwner { .. },
1342        } => {
1343            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1344            if table.is_iceberg_engine_table() {
1345                bail!(
1346                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1347                    schema_name,
1348                    name
1349                );
1350            }
1351        }
1352
1353        Statement::AlterTable {
1354            name,
1355            operation: AlterTableOperation::SetParallelism { .. },
1356        } => {
1357            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1358            if table.is_iceberg_engine_table() {
1359                bail!(
1360                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1361                    schema_name,
1362                    name
1363                );
1364            }
1365        }
1366
1367        Statement::AlterTable {
1368            name,
1369            operation: AlterTableOperation::SetSchema { .. },
1370        } => {
1371            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1372            if table.is_iceberg_engine_table() {
1373                bail!(
1374                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1375                    schema_name,
1376                    name
1377                );
1378            }
1379        }
1380
1381        Statement::AlterTable {
1382            name,
1383            operation: AlterTableOperation::RefreshSchema,
1384        } => {
1385            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1386            if table.is_iceberg_engine_table() {
1387                bail!(
1388                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1389                    schema_name,
1390                    name
1391                );
1392            }
1393        }
1394
1395        Statement::AlterTable {
1396            name,
1397            operation: AlterTableOperation::SetSourceRateLimit { .. },
1398        } => {
1399            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1400            if table.is_iceberg_engine_table() {
1401                bail!(
1402                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1403                    schema_name,
1404                    name
1405                );
1406            }
1407        }
1408
1409        _ => {}
1410    }
1411
1412    Ok(())
1413}