risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::types::Fields;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::{bail, bail_not_implemented};
30use risingwave_pb::meta::PbThrottleTarget;
31use risingwave_sqlparser::ast::*;
32use util::get_table_catalog_by_table_name;
33
34use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
35use crate::catalog::table_catalog::TableType;
36use crate::error::{ErrorCode, Result};
37use crate::handler::cancel_job::handle_cancel;
38use crate::handler::kill_process::handle_kill;
39use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
40use crate::session::SessionImpl;
41use crate::utils::WithOptions;
42
43mod alter_owner;
44mod alter_parallelism;
45mod alter_rename;
46mod alter_resource_group;
47mod alter_secret;
48mod alter_set_schema;
49mod alter_source_column;
50mod alter_source_with_sr;
51mod alter_streaming_rate_limit;
52mod alter_swap_rename;
53mod alter_system;
54mod alter_table_column;
55pub mod alter_table_drop_connector;
56mod alter_table_with_sr;
57pub mod alter_user;
58pub mod cancel_job;
59pub mod close_cursor;
60mod comment;
61pub mod create_aggregate;
62pub mod create_connection;
63mod create_database;
64pub mod create_function;
65pub mod create_index;
66pub mod create_mv;
67pub mod create_schema;
68pub mod create_secret;
69pub mod create_sink;
70pub mod create_source;
71pub mod create_sql_function;
72pub mod create_subscription;
73pub mod create_table;
74pub mod create_table_as;
75pub mod create_user;
76pub mod create_view;
77pub mod declare_cursor;
78pub mod describe;
79pub mod discard;
80mod drop_connection;
81mod drop_database;
82pub mod drop_function;
83mod drop_index;
84pub mod drop_mv;
85mod drop_schema;
86pub mod drop_secret;
87pub mod drop_sink;
88pub mod drop_source;
89pub mod drop_subscription;
90pub mod drop_table;
91pub mod drop_user;
92mod drop_view;
93pub mod explain;
94pub mod explain_analyze_stream_job;
95pub mod extended_handle;
96pub mod fetch_cursor;
97mod flush;
98pub mod handle_privilege;
99mod kill_process;
100pub mod privilege;
101pub mod query;
102mod recover;
103pub mod show;
104mod transaction;
105mod use_db;
106pub mod util;
107pub mod variable;
108mod wait;
109
110pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
111
112/// The [`PgResponseBuilder`] used by RisingWave.
113pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
114
115/// The [`PgResponse`] used by RisingWave.
116pub type RwPgResponse = PgResponse<PgResponseStream>;
117
118#[easy_ext::ext(RwPgResponseBuilderExt)]
119impl RwPgResponseBuilder {
120    /// Append rows to the response.
121    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
122        let fields = T::fields();
123        self.values(
124            rows.into_iter()
125                .map(|row| {
126                    Row::new(
127                        row.into_owned_row()
128                            .into_iter()
129                            .zip_eq_fast(&fields)
130                            .map(|(datum, (_, ty))| {
131                                datum.map(|scalar| {
132                                    scalar.as_scalar_ref_impl().text_format(ty).into()
133                                })
134                            })
135                            .collect(),
136                    )
137                })
138                .collect_vec()
139                .into(),
140            fields_to_descriptors(fields),
141        )
142    }
143}
144
145pub fn fields_to_descriptors(
146    fields: Vec<(&str, risingwave_common::types::DataType)>,
147) -> Vec<PgFieldDescriptor> {
148    fields
149        .iter()
150        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
151        .collect()
152}
153
154pub enum PgResponseStream {
155    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
156    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
157    Rows(BoxStream<'static, RowSetResult>),
158}
159
160impl Stream for PgResponseStream {
161    type Item = std::result::Result<Vec<Row>, BoxedError>;
162
163    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
164        match &mut *self {
165            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
166            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
167            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
168        }
169    }
170}
171
172impl From<Vec<Row>> for PgResponseStream {
173    fn from(rows: Vec<Row>) -> Self {
174        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
175    }
176}
177
178#[derive(Clone)]
179pub struct HandlerArgs {
180    pub session: Arc<SessionImpl>,
181    pub sql: Arc<str>,
182    pub normalized_sql: String,
183    pub with_options: WithOptions,
184}
185
186impl HandlerArgs {
187    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
188        Ok(Self {
189            session,
190            sql,
191            with_options: WithOptions::try_from(stmt)?,
192            normalized_sql: Self::normalize_sql(stmt),
193        })
194    }
195
196    /// Get normalized SQL from the statement.
197    ///
198    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
199    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
200    ///   make it suitable for the `SHOW CREATE` statements.
201    fn normalize_sql(stmt: &Statement) -> String {
202        let mut stmt = stmt.clone();
203        match &mut stmt {
204            Statement::CreateView {
205                or_replace,
206                if_not_exists,
207                ..
208            } => {
209                *or_replace = false;
210                *if_not_exists = false;
211            }
212            Statement::CreateTable {
213                or_replace,
214                if_not_exists,
215                ..
216            } => {
217                *or_replace = false;
218                *if_not_exists = false;
219            }
220            Statement::CreateIndex { if_not_exists, .. } => {
221                *if_not_exists = false;
222            }
223            Statement::CreateSource {
224                stmt: CreateSourceStatement { if_not_exists, .. },
225                ..
226            } => {
227                *if_not_exists = false;
228            }
229            Statement::CreateSink {
230                stmt: CreateSinkStatement { if_not_exists, .. },
231            } => {
232                *if_not_exists = false;
233            }
234            Statement::CreateSubscription {
235                stmt: CreateSubscriptionStatement { if_not_exists, .. },
236            } => {
237                *if_not_exists = false;
238            }
239            Statement::CreateConnection {
240                stmt: CreateConnectionStatement { if_not_exists, .. },
241            } => {
242                *if_not_exists = false;
243            }
244            _ => {}
245        }
246        stmt.to_string()
247    }
248}
249
250pub async fn handle(
251    session: Arc<SessionImpl>,
252    stmt: Statement,
253    sql: Arc<str>,
254    formats: Vec<Format>,
255) -> Result<RwPgResponse> {
256    session.clear_cancel_query_flag();
257    let _guard = session.txn_begin_implicit();
258    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
259
260    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
261
262    match stmt {
263        Statement::Explain {
264            statement,
265            analyze,
266            options,
267        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
268        Statement::ExplainAnalyzeStreamJob {
269            target,
270            duration_secs,
271        } => {
272            explain_analyze_stream_job::handle_explain_analyze_stream_job(
273                handler_args,
274                target,
275                duration_secs,
276            )
277            .await
278        }
279        Statement::CreateSource { stmt } => {
280            create_source::handle_create_source(handler_args, stmt).await
281        }
282        Statement::CreateSink { stmt } => create_sink::handle_create_sink(handler_args, stmt).await,
283        Statement::CreateSubscription { stmt } => {
284            create_subscription::handle_create_subscription(handler_args, stmt).await
285        }
286        Statement::CreateConnection { stmt } => {
287            create_connection::handle_create_connection(handler_args, stmt).await
288        }
289        Statement::CreateSecret { stmt } => {
290            create_secret::handle_create_secret(handler_args, stmt).await
291        }
292        Statement::CreateFunction {
293            or_replace,
294            temporary,
295            if_not_exists,
296            name,
297            args,
298            returns,
299            params,
300            with_options,
301        } => {
302            // For general udf, `language` clause could be ignored
303            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
304            if params.language.is_none()
305                || !params
306                    .language
307                    .as_ref()
308                    .unwrap()
309                    .real_value()
310                    .eq_ignore_ascii_case("sql")
311            {
312                create_function::handle_create_function(
313                    handler_args,
314                    or_replace,
315                    temporary,
316                    if_not_exists,
317                    name,
318                    args,
319                    returns,
320                    params,
321                    with_options,
322                )
323                .await
324            } else {
325                create_sql_function::handle_create_sql_function(
326                    handler_args,
327                    or_replace,
328                    temporary,
329                    if_not_exists,
330                    name,
331                    args,
332                    returns,
333                    params,
334                )
335                .await
336            }
337        }
338        Statement::CreateAggregate {
339            or_replace,
340            if_not_exists,
341            name,
342            args,
343            returns,
344            params,
345            ..
346        } => {
347            create_aggregate::handle_create_aggregate(
348                handler_args,
349                or_replace,
350                if_not_exists,
351                name,
352                args,
353                returns,
354                params,
355            )
356            .await
357        }
358        Statement::CreateTable {
359            name,
360            columns,
361            wildcard_idx,
362            constraints,
363            query,
364            with_options: _, // It is put in OptimizerContext
365            // Not supported things
366            or_replace,
367            temporary,
368            if_not_exists,
369            format_encode,
370            source_watermarks,
371            append_only,
372            on_conflict,
373            with_version_column,
374            cdc_table_info,
375            include_column_options,
376            webhook_info,
377            engine,
378        } => {
379            if or_replace {
380                bail_not_implemented!("CREATE OR REPLACE TABLE");
381            }
382            if temporary {
383                bail_not_implemented!("CREATE TEMPORARY TABLE");
384            }
385            if let Some(query) = query {
386                return create_table_as::handle_create_as(
387                    handler_args,
388                    name,
389                    if_not_exists,
390                    query,
391                    columns,
392                    append_only,
393                    on_conflict,
394                    with_version_column.map(|x| x.real_value()),
395                    engine,
396                )
397                .await;
398            }
399            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
400            create_table::handle_create_table(
401                handler_args,
402                name,
403                columns,
404                wildcard_idx,
405                constraints,
406                if_not_exists,
407                format_encode,
408                source_watermarks,
409                append_only,
410                on_conflict,
411                with_version_column.map(|x| x.real_value()),
412                cdc_table_info,
413                include_column_options,
414                webhook_info,
415                engine,
416            )
417            .await
418        }
419        Statement::CreateDatabase {
420            db_name,
421            if_not_exists,
422            owner,
423            resource_group,
424        } => {
425            create_database::handle_create_database(
426                handler_args,
427                db_name,
428                if_not_exists,
429                owner,
430                resource_group,
431            )
432            .await
433        }
434        Statement::CreateSchema {
435            schema_name,
436            if_not_exists,
437            owner,
438        } => {
439            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
440                .await
441        }
442        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
443        Statement::DeclareCursor { stmt } => {
444            declare_cursor::handle_declare_cursor(handler_args, stmt).await
445        }
446        Statement::FetchCursor { stmt } => {
447            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
448        }
449        Statement::CloseCursor { stmt } => {
450            close_cursor::handle_close_cursor(handler_args, stmt).await
451        }
452        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
453        Statement::Grant { .. } => {
454            handle_privilege::handle_grant_privilege(handler_args, stmt).await
455        }
456        Statement::Revoke { .. } => {
457            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
458        }
459        Statement::Describe { name } => describe::handle_describe(handler_args, name),
460        Statement::Discard(..) => discard::handle_discard(handler_args),
461        Statement::ShowObjects {
462            object: show_object,
463            filter,
464        } => show::handle_show_object(handler_args, show_object, filter).await,
465        Statement::ShowCreateObject { create_type, name } => {
466            show::handle_show_create_object(handler_args, create_type, name)
467        }
468        Statement::ShowTransactionIsolationLevel => {
469            transaction::handle_show_isolation_level(handler_args)
470        }
471        Statement::Drop(DropStatement {
472            object_type,
473            object_name,
474            if_exists,
475            drop_mode,
476        }) => {
477            let mut cascade = false;
478            if let AstOption::Some(DropMode::Cascade) = drop_mode {
479                match object_type {
480                    ObjectType::MaterializedView
481                    | ObjectType::View
482                    | ObjectType::Sink
483                    | ObjectType::Source
484                    | ObjectType::Subscription
485                    | ObjectType::Index
486                    | ObjectType::Table
487                    | ObjectType::Schema => {
488                        cascade = true;
489                    }
490                    ObjectType::Database
491                    | ObjectType::User
492                    | ObjectType::Connection
493                    | ObjectType::Secret => {
494                        bail_not_implemented!("DROP CASCADE");
495                    }
496                };
497            };
498            match object_type {
499                ObjectType::Table => {
500                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
501                        .await
502                }
503                ObjectType::MaterializedView => {
504                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
505                }
506                ObjectType::Index => {
507                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
508                        .await
509                }
510                ObjectType::Source => {
511                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
512                        .await
513                }
514                ObjectType::Sink => {
515                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
516                }
517                ObjectType::Subscription => {
518                    drop_subscription::handle_drop_subscription(
519                        handler_args,
520                        object_name,
521                        if_exists,
522                        cascade,
523                    )
524                    .await
525                }
526                ObjectType::Database => {
527                    drop_database::handle_drop_database(
528                        handler_args,
529                        object_name,
530                        if_exists,
531                        drop_mode.into(),
532                    )
533                    .await
534                }
535                ObjectType::Schema => {
536                    drop_schema::handle_drop_schema(
537                        handler_args,
538                        object_name,
539                        if_exists,
540                        drop_mode.into(),
541                    )
542                    .await
543                }
544                ObjectType::User => {
545                    drop_user::handle_drop_user(
546                        handler_args,
547                        object_name,
548                        if_exists,
549                        drop_mode.into(),
550                    )
551                    .await
552                }
553                ObjectType::View => {
554                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
555                }
556                ObjectType::Connection => {
557                    drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
558                        .await
559                }
560                ObjectType::Secret => {
561                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
562                }
563            }
564        }
565        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
566        Statement::DropFunction {
567            if_exists,
568            func_desc,
569            option,
570        } => {
571            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
572                .await
573        }
574        Statement::DropAggregate {
575            if_exists,
576            func_desc,
577            option,
578        } => {
579            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
580                .await
581        }
582        Statement::Query(_)
583        | Statement::Insert { .. }
584        | Statement::Delete { .. }
585        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
586        Statement::CreateView {
587            materialized,
588            if_not_exists,
589            name,
590            columns,
591            query,
592            with_options: _, // It is put in OptimizerContext
593            or_replace,      // not supported
594            emit_mode,
595        } => {
596            if or_replace {
597                bail_not_implemented!("CREATE OR REPLACE VIEW");
598            }
599            if materialized {
600                create_mv::handle_create_mv(
601                    handler_args,
602                    if_not_exists,
603                    name,
604                    *query,
605                    columns,
606                    emit_mode,
607                )
608                .await
609            } else {
610                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
611                    .await
612            }
613        }
614        Statement::Flush => flush::handle_flush(handler_args).await,
615        Statement::Wait => wait::handle_wait(handler_args).await,
616        Statement::Recover => recover::handle_recover(handler_args).await,
617        Statement::SetVariable {
618            local: _,
619            variable,
620            value,
621        } => {
622            // special handle for `use database`
623            if variable.real_value().eq_ignore_ascii_case("database") {
624                let x = variable::set_var_to_param_str(&value);
625                let res = use_db::handle_use_db(
626                    handler_args,
627                    ObjectName::from(vec![Ident::new_unchecked(
628                        x.unwrap_or("default".to_owned()),
629                    )]),
630                )?;
631                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
632                for notice in res.notices() {
633                    builder = builder.notice(notice);
634                }
635                return Ok(builder.into());
636            }
637            variable::handle_set(handler_args, variable, value)
638        }
639        Statement::SetTimeZone { local: _, value } => {
640            variable::handle_set_time_zone(handler_args, value)
641        }
642        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
643        Statement::CreateIndex {
644            name,
645            table_name,
646            columns,
647            include,
648            distributed_by,
649            unique,
650            if_not_exists,
651        } => {
652            if unique {
653                bail_not_implemented!("create unique index");
654            }
655
656            create_index::handle_create_index(
657                handler_args,
658                if_not_exists,
659                name,
660                table_name,
661                columns.to_vec(),
662                include,
663                distributed_by,
664            )
665            .await
666        }
667        Statement::AlterDatabase { name, operation } => match operation {
668            AlterDatabaseOperation::RenameDatabase { database_name } => {
669                alter_rename::handle_rename_database(handler_args, name, database_name).await
670            }
671            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
672                alter_owner::handle_alter_owner(
673                    handler_args,
674                    name,
675                    new_owner_name,
676                    StatementType::ALTER_DATABASE,
677                )
678                .await
679            }
680        },
681        Statement::AlterSchema { name, operation } => match operation {
682            AlterSchemaOperation::RenameSchema { schema_name } => {
683                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
684            }
685            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
686                alter_owner::handle_alter_owner(
687                    handler_args,
688                    name,
689                    new_owner_name,
690                    StatementType::ALTER_SCHEMA,
691                )
692                .await
693            }
694            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
695                alter_swap_rename::handle_swap_rename(
696                    handler_args,
697                    name,
698                    target_schema,
699                    StatementType::ALTER_SCHEMA,
700                )
701                .await
702            }
703        },
704        Statement::AlterTable { name, operation } => match operation {
705            AlterTableOperation::AddColumn { .. }
706            | AlterTableOperation::DropColumn { .. }
707            | AlterTableOperation::AlterColumn { .. } => {
708                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
709            }
710            AlterTableOperation::RenameTable { table_name } => {
711                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
712                    .await
713            }
714            AlterTableOperation::ChangeOwner { new_owner_name } => {
715                alter_owner::handle_alter_owner(
716                    handler_args,
717                    name,
718                    new_owner_name,
719                    StatementType::ALTER_TABLE,
720                )
721                .await
722            }
723            AlterTableOperation::SetParallelism {
724                parallelism,
725                deferred,
726            } => {
727                alter_parallelism::handle_alter_parallelism(
728                    handler_args,
729                    name,
730                    parallelism,
731                    StatementType::ALTER_TABLE,
732                    deferred,
733                )
734                .await
735            }
736            AlterTableOperation::SetSchema { new_schema_name } => {
737                alter_set_schema::handle_alter_set_schema(
738                    handler_args,
739                    name,
740                    new_schema_name,
741                    StatementType::ALTER_TABLE,
742                    None,
743                )
744                .await
745            }
746            AlterTableOperation::RefreshSchema => {
747                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
748            }
749            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
750                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
751                    handler_args,
752                    PbThrottleTarget::TableWithSource,
753                    name,
754                    rate_limit,
755                )
756                .await
757            }
758            AlterTableOperation::DropConnector => {
759                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
760                    .await
761            }
762            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
763                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
764                    handler_args,
765                    PbThrottleTarget::TableDml,
766                    name,
767                    rate_limit,
768                )
769                .await
770            }
771            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
772                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
773                    handler_args,
774                    PbThrottleTarget::CdcTable,
775                    name,
776                    rate_limit,
777                )
778                .await
779            }
780            AlterTableOperation::SwapRenameTable { target_table } => {
781                alter_swap_rename::handle_swap_rename(
782                    handler_args,
783                    name,
784                    target_table,
785                    StatementType::ALTER_TABLE,
786                )
787                .await
788            }
789            AlterTableOperation::AddConstraint { .. }
790            | AlterTableOperation::DropConstraint { .. }
791            | AlterTableOperation::RenameColumn { .. }
792            | AlterTableOperation::ChangeColumn { .. }
793            | AlterTableOperation::RenameConstraint { .. } => {
794                bail_not_implemented!(
795                    "Unhandled statement: {}",
796                    Statement::AlterTable { name, operation }
797                )
798            }
799        },
800        Statement::AlterIndex { name, operation } => match operation {
801            AlterIndexOperation::RenameIndex { index_name } => {
802                alter_rename::handle_rename_index(handler_args, name, index_name).await
803            }
804            AlterIndexOperation::SetParallelism {
805                parallelism,
806                deferred,
807            } => {
808                alter_parallelism::handle_alter_parallelism(
809                    handler_args,
810                    name,
811                    parallelism,
812                    StatementType::ALTER_INDEX,
813                    deferred,
814                )
815                .await
816            }
817        },
818        Statement::AlterView {
819            materialized,
820            name,
821            operation,
822        } => {
823            let statement_type = if materialized {
824                StatementType::ALTER_MATERIALIZED_VIEW
825            } else {
826                StatementType::ALTER_VIEW
827            };
828            match operation {
829                AlterViewOperation::RenameView { view_name } => {
830                    if materialized {
831                        alter_rename::handle_rename_table(
832                            handler_args,
833                            TableType::MaterializedView,
834                            name,
835                            view_name,
836                        )
837                        .await
838                    } else {
839                        alter_rename::handle_rename_view(handler_args, name, view_name).await
840                    }
841                }
842                AlterViewOperation::SetParallelism {
843                    parallelism,
844                    deferred,
845                } => {
846                    if !materialized {
847                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
848                    }
849                    alter_parallelism::handle_alter_parallelism(
850                        handler_args,
851                        name,
852                        parallelism,
853                        statement_type,
854                        deferred,
855                    )
856                    .await
857                }
858                AlterViewOperation::SetResourceGroup {
859                    resource_group,
860                    deferred,
861                } => {
862                    if !materialized {
863                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
864                    }
865                    alter_resource_group::handle_alter_resource_group(
866                        handler_args,
867                        name,
868                        resource_group,
869                        statement_type,
870                        deferred,
871                    )
872                    .await
873                }
874                AlterViewOperation::ChangeOwner { new_owner_name } => {
875                    alter_owner::handle_alter_owner(
876                        handler_args,
877                        name,
878                        new_owner_name,
879                        statement_type,
880                    )
881                    .await
882                }
883                AlterViewOperation::SetSchema { new_schema_name } => {
884                    alter_set_schema::handle_alter_set_schema(
885                        handler_args,
886                        name,
887                        new_schema_name,
888                        statement_type,
889                        None,
890                    )
891                    .await
892                }
893                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
894                    if !materialized {
895                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
896                    }
897                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
898                        handler_args,
899                        PbThrottleTarget::Mv,
900                        name,
901                        rate_limit,
902                    )
903                    .await
904                }
905                AlterViewOperation::SwapRenameView { target_view } => {
906                    alter_swap_rename::handle_swap_rename(
907                        handler_args,
908                        name,
909                        target_view,
910                        statement_type,
911                    )
912                    .await
913                }
914            }
915        }
916        Statement::AlterSink { name, operation } => match operation {
917            AlterSinkOperation::RenameSink { sink_name } => {
918                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
919            }
920            AlterSinkOperation::ChangeOwner { new_owner_name } => {
921                alter_owner::handle_alter_owner(
922                    handler_args,
923                    name,
924                    new_owner_name,
925                    StatementType::ALTER_SINK,
926                )
927                .await
928            }
929            AlterSinkOperation::SetSchema { new_schema_name } => {
930                alter_set_schema::handle_alter_set_schema(
931                    handler_args,
932                    name,
933                    new_schema_name,
934                    StatementType::ALTER_SINK,
935                    None,
936                )
937                .await
938            }
939            AlterSinkOperation::SetParallelism {
940                parallelism,
941                deferred,
942            } => {
943                alter_parallelism::handle_alter_parallelism(
944                    handler_args,
945                    name,
946                    parallelism,
947                    StatementType::ALTER_SINK,
948                    deferred,
949                )
950                .await
951            }
952            AlterSinkOperation::SwapRenameSink { target_sink } => {
953                alter_swap_rename::handle_swap_rename(
954                    handler_args,
955                    name,
956                    target_sink,
957                    StatementType::ALTER_SINK,
958                )
959                .await
960            }
961            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
962                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
963                    handler_args,
964                    PbThrottleTarget::Sink,
965                    name,
966                    rate_limit,
967                )
968                .await
969            }
970        },
971        Statement::AlterSubscription { name, operation } => match operation {
972            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
973                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
974                    .await
975            }
976            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
977                alter_owner::handle_alter_owner(
978                    handler_args,
979                    name,
980                    new_owner_name,
981                    StatementType::ALTER_SUBSCRIPTION,
982                )
983                .await
984            }
985            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
986                alter_set_schema::handle_alter_set_schema(
987                    handler_args,
988                    name,
989                    new_schema_name,
990                    StatementType::ALTER_SUBSCRIPTION,
991                    None,
992                )
993                .await
994            }
995            AlterSubscriptionOperation::SwapRenameSubscription {
996                target_subscription,
997            } => {
998                alter_swap_rename::handle_swap_rename(
999                    handler_args,
1000                    name,
1001                    target_subscription,
1002                    StatementType::ALTER_SUBSCRIPTION,
1003                )
1004                .await
1005            }
1006        },
1007        Statement::AlterSource { name, operation } => match operation {
1008            AlterSourceOperation::RenameSource { source_name } => {
1009                alter_rename::handle_rename_source(handler_args, name, source_name).await
1010            }
1011            AlterSourceOperation::AddColumn { .. } => {
1012                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1013            }
1014            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1015                alter_owner::handle_alter_owner(
1016                    handler_args,
1017                    name,
1018                    new_owner_name,
1019                    StatementType::ALTER_SOURCE,
1020                )
1021                .await
1022            }
1023            AlterSourceOperation::SetSchema { new_schema_name } => {
1024                alter_set_schema::handle_alter_set_schema(
1025                    handler_args,
1026                    name,
1027                    new_schema_name,
1028                    StatementType::ALTER_SOURCE,
1029                    None,
1030                )
1031                .await
1032            }
1033            AlterSourceOperation::FormatEncode { format_encode } => {
1034                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1035                    .await
1036            }
1037            AlterSourceOperation::RefreshSchema => {
1038                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1039            }
1040            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1041                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1042                    handler_args,
1043                    PbThrottleTarget::Source,
1044                    name,
1045                    rate_limit,
1046                )
1047                .await
1048            }
1049            AlterSourceOperation::SwapRenameSource { target_source } => {
1050                alter_swap_rename::handle_swap_rename(
1051                    handler_args,
1052                    name,
1053                    target_source,
1054                    StatementType::ALTER_SOURCE,
1055                )
1056                .await
1057            }
1058            AlterSourceOperation::SetParallelism {
1059                parallelism,
1060                deferred,
1061            } => {
1062                alter_parallelism::handle_alter_parallelism(
1063                    handler_args,
1064                    name,
1065                    parallelism,
1066                    StatementType::ALTER_SOURCE,
1067                    deferred,
1068                )
1069                .await
1070            }
1071        },
1072        Statement::AlterFunction {
1073            name,
1074            args,
1075            operation,
1076        } => match operation {
1077            AlterFunctionOperation::SetSchema { new_schema_name } => {
1078                alter_set_schema::handle_alter_set_schema(
1079                    handler_args,
1080                    name,
1081                    new_schema_name,
1082                    StatementType::ALTER_FUNCTION,
1083                    args,
1084                )
1085                .await
1086            }
1087        },
1088        Statement::AlterConnection { name, operation } => match operation {
1089            AlterConnectionOperation::SetSchema { new_schema_name } => {
1090                alter_set_schema::handle_alter_set_schema(
1091                    handler_args,
1092                    name,
1093                    new_schema_name,
1094                    StatementType::ALTER_CONNECTION,
1095                    None,
1096                )
1097                .await
1098            }
1099            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1100                alter_owner::handle_alter_owner(
1101                    handler_args,
1102                    name,
1103                    new_owner_name,
1104                    StatementType::ALTER_CONNECTION,
1105                )
1106                .await
1107            }
1108        },
1109        Statement::AlterSystem { param, value } => {
1110            alter_system::handle_alter_system(handler_args, param, value).await
1111        }
1112        Statement::AlterSecret {
1113            name,
1114            with_options,
1115            operation,
1116        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1117        Statement::AlterFragment {
1118            fragment_id,
1119            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1120        } => {
1121            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1122                &handler_args.session,
1123                PbThrottleTarget::Fragment,
1124                fragment_id,
1125                rate_limit,
1126                StatementType::SET_VARIABLE,
1127            )
1128            .await
1129        }
1130        Statement::StartTransaction { modes } => {
1131            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1132        }
1133        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1134        Statement::Commit { chain } => {
1135            transaction::handle_commit(handler_args, COMMIT, chain).await
1136        }
1137        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1138        Statement::Rollback { chain } => {
1139            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1140        }
1141        Statement::SetTransaction {
1142            modes,
1143            snapshot,
1144            session,
1145        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1146        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1147        Statement::Kill(process_id) => handle_kill(handler_args, process_id).await,
1148        Statement::Comment {
1149            object_type,
1150            object_name,
1151            comment,
1152        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1153        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1154        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1155    }
1156}
1157
1158fn check_ban_ddl_for_iceberg_engine_table(
1159    session: Arc<SessionImpl>,
1160    stmt: &Statement,
1161) -> Result<()> {
1162    match stmt {
1163        Statement::AlterTable {
1164            name,
1165            operation:
1166                operation @ (AlterTableOperation::AddColumn { .. }
1167                | AlterTableOperation::DropColumn { .. }),
1168        } => {
1169            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1170            if table.is_iceberg_engine_table() {
1171                bail!(
1172                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1173                    operation,
1174                    schema_name,
1175                    name
1176                );
1177            }
1178        }
1179
1180        Statement::AlterTable {
1181            name,
1182            operation: AlterTableOperation::RenameTable { .. },
1183        } => {
1184            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1185            if table.is_iceberg_engine_table() {
1186                bail!(
1187                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1188                    schema_name,
1189                    name
1190                );
1191            }
1192        }
1193
1194        Statement::AlterTable {
1195            name,
1196            operation: AlterTableOperation::ChangeOwner { .. },
1197        } => {
1198            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1199            if table.is_iceberg_engine_table() {
1200                bail!(
1201                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1202                    schema_name,
1203                    name
1204                );
1205            }
1206        }
1207
1208        Statement::AlterTable {
1209            name,
1210            operation: AlterTableOperation::SetParallelism { .. },
1211        } => {
1212            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1213            if table.is_iceberg_engine_table() {
1214                bail!(
1215                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1216                    schema_name,
1217                    name
1218                );
1219            }
1220        }
1221
1222        Statement::AlterTable {
1223            name,
1224            operation: AlterTableOperation::SetSchema { .. },
1225        } => {
1226            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1227            if table.is_iceberg_engine_table() {
1228                bail!(
1229                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1230                    schema_name,
1231                    name
1232                );
1233            }
1234        }
1235
1236        Statement::AlterTable {
1237            name,
1238            operation: AlterTableOperation::RefreshSchema,
1239        } => {
1240            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1241            if table.is_iceberg_engine_table() {
1242                bail!(
1243                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1244                    schema_name,
1245                    name
1246                );
1247            }
1248        }
1249
1250        Statement::AlterTable {
1251            name,
1252            operation: AlterTableOperation::SetSourceRateLimit { .. },
1253        } => {
1254            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1255            if table.is_iceberg_engine_table() {
1256                bail!(
1257                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1258                    schema_name,
1259                    name
1260                );
1261            }
1262        }
1263
1264        _ => {}
1265    }
1266
1267    Ok(())
1268}