risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::types::Fields;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::{bail, bail_not_implemented};
30use risingwave_pb::meta::PbThrottleTarget;
31use risingwave_sqlparser::ast::*;
32use util::get_table_catalog_by_table_name;
33
34use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
35use crate::catalog::table_catalog::TableType;
36use crate::error::{ErrorCode, Result};
37use crate::handler::cancel_job::handle_cancel;
38use crate::handler::kill_process::handle_kill;
39use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
40use crate::session::SessionImpl;
41use crate::utils::WithOptions;
42
43mod alter_owner;
44mod alter_parallelism;
45mod alter_rename;
46mod alter_resource_group;
47mod alter_secret;
48mod alter_set_schema;
49mod alter_sink_props;
50mod alter_source_column;
51mod alter_source_with_sr;
52mod alter_streaming_rate_limit;
53mod alter_swap_rename;
54mod alter_system;
55mod alter_table_column;
56pub mod alter_table_drop_connector;
57mod alter_table_with_sr;
58pub mod alter_user;
59pub mod cancel_job;
60pub mod close_cursor;
61mod comment;
62pub mod create_aggregate;
63pub mod create_connection;
64mod create_database;
65pub mod create_function;
66pub mod create_index;
67pub mod create_mv;
68pub mod create_schema;
69pub mod create_secret;
70pub mod create_sink;
71pub mod create_source;
72pub mod create_sql_function;
73pub mod create_subscription;
74pub mod create_table;
75pub mod create_table_as;
76pub mod create_user;
77pub mod create_view;
78pub mod declare_cursor;
79pub mod describe;
80pub mod discard;
81mod drop_connection;
82mod drop_database;
83pub mod drop_function;
84mod drop_index;
85pub mod drop_mv;
86mod drop_schema;
87pub mod drop_secret;
88pub mod drop_sink;
89pub mod drop_source;
90pub mod drop_subscription;
91pub mod drop_table;
92pub mod drop_user;
93mod drop_view;
94pub mod explain;
95pub mod explain_analyze_stream_job;
96pub mod extended_handle;
97pub mod fetch_cursor;
98mod flush;
99pub mod handle_privilege;
100mod kill_process;
101pub mod privilege;
102pub mod query;
103mod recover;
104pub mod show;
105mod transaction;
106mod use_db;
107pub mod util;
108pub mod variable;
109mod wait;
110
111pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
112
113/// The [`PgResponseBuilder`] used by RisingWave.
114pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
115
116/// The [`PgResponse`] used by RisingWave.
117pub type RwPgResponse = PgResponse<PgResponseStream>;
118
119#[easy_ext::ext(RwPgResponseBuilderExt)]
120impl RwPgResponseBuilder {
121    /// Append rows to the response.
122    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
123        let fields = T::fields();
124        self.values(
125            rows.into_iter()
126                .map(|row| {
127                    Row::new(
128                        row.into_owned_row()
129                            .into_iter()
130                            .zip_eq_fast(&fields)
131                            .map(|(datum, (_, ty))| {
132                                datum.map(|scalar| {
133                                    scalar.as_scalar_ref_impl().text_format(ty).into()
134                                })
135                            })
136                            .collect(),
137                    )
138                })
139                .collect_vec()
140                .into(),
141            fields_to_descriptors(fields),
142        )
143    }
144}
145
146pub fn fields_to_descriptors(
147    fields: Vec<(&str, risingwave_common::types::DataType)>,
148) -> Vec<PgFieldDescriptor> {
149    fields
150        .iter()
151        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
152        .collect()
153}
154
155pub enum PgResponseStream {
156    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
157    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
158    Rows(BoxStream<'static, RowSetResult>),
159}
160
161impl Stream for PgResponseStream {
162    type Item = std::result::Result<Vec<Row>, BoxedError>;
163
164    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165        match &mut *self {
166            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
167            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
168            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
169        }
170    }
171}
172
173impl From<Vec<Row>> for PgResponseStream {
174    fn from(rows: Vec<Row>) -> Self {
175        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
176    }
177}
178
179#[derive(Clone)]
180pub struct HandlerArgs {
181    pub session: Arc<SessionImpl>,
182    pub sql: Arc<str>,
183    pub normalized_sql: String,
184    pub with_options: WithOptions,
185}
186
187impl HandlerArgs {
188    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
189        Ok(Self {
190            session,
191            sql,
192            with_options: WithOptions::try_from(stmt)?,
193            normalized_sql: Self::normalize_sql(stmt),
194        })
195    }
196
197    /// Get normalized SQL from the statement.
198    ///
199    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
200    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
201    ///   make it suitable for the `SHOW CREATE` statements.
202    fn normalize_sql(stmt: &Statement) -> String {
203        let mut stmt = stmt.clone();
204        match &mut stmt {
205            Statement::CreateView {
206                or_replace,
207                if_not_exists,
208                ..
209            } => {
210                *or_replace = false;
211                *if_not_exists = false;
212            }
213            Statement::CreateTable {
214                or_replace,
215                if_not_exists,
216                ..
217            } => {
218                *or_replace = false;
219                *if_not_exists = false;
220            }
221            Statement::CreateIndex { if_not_exists, .. } => {
222                *if_not_exists = false;
223            }
224            Statement::CreateSource {
225                stmt: CreateSourceStatement { if_not_exists, .. },
226                ..
227            } => {
228                *if_not_exists = false;
229            }
230            Statement::CreateSink {
231                stmt: CreateSinkStatement { if_not_exists, .. },
232            } => {
233                *if_not_exists = false;
234            }
235            Statement::CreateSubscription {
236                stmt: CreateSubscriptionStatement { if_not_exists, .. },
237            } => {
238                *if_not_exists = false;
239            }
240            Statement::CreateConnection {
241                stmt: CreateConnectionStatement { if_not_exists, .. },
242            } => {
243                *if_not_exists = false;
244            }
245            _ => {}
246        }
247        stmt.to_string()
248    }
249}
250
251pub async fn handle(
252    session: Arc<SessionImpl>,
253    stmt: Statement,
254    sql: Arc<str>,
255    formats: Vec<Format>,
256) -> Result<RwPgResponse> {
257    session.clear_cancel_query_flag();
258    let _guard = session.txn_begin_implicit();
259    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
260
261    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
262
263    match stmt {
264        Statement::Explain {
265            statement,
266            analyze,
267            options,
268        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
269        Statement::ExplainAnalyzeStreamJob {
270            target,
271            duration_secs,
272        } => {
273            explain_analyze_stream_job::handle_explain_analyze_stream_job(
274                handler_args,
275                target,
276                duration_secs,
277            )
278            .await
279        }
280        Statement::CreateSource { stmt } => {
281            create_source::handle_create_source(handler_args, stmt).await
282        }
283        Statement::CreateSink { stmt } => create_sink::handle_create_sink(handler_args, stmt).await,
284        Statement::CreateSubscription { stmt } => {
285            create_subscription::handle_create_subscription(handler_args, stmt).await
286        }
287        Statement::CreateConnection { stmt } => {
288            create_connection::handle_create_connection(handler_args, stmt).await
289        }
290        Statement::CreateSecret { stmt } => {
291            create_secret::handle_create_secret(handler_args, stmt).await
292        }
293        Statement::CreateFunction {
294            or_replace,
295            temporary,
296            if_not_exists,
297            name,
298            args,
299            returns,
300            params,
301            with_options,
302        } => {
303            // For general udf, `language` clause could be ignored
304            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
305            if params.language.is_none()
306                || !params
307                    .language
308                    .as_ref()
309                    .unwrap()
310                    .real_value()
311                    .eq_ignore_ascii_case("sql")
312            {
313                create_function::handle_create_function(
314                    handler_args,
315                    or_replace,
316                    temporary,
317                    if_not_exists,
318                    name,
319                    args,
320                    returns,
321                    params,
322                    with_options,
323                )
324                .await
325            } else {
326                create_sql_function::handle_create_sql_function(
327                    handler_args,
328                    or_replace,
329                    temporary,
330                    if_not_exists,
331                    name,
332                    args,
333                    returns,
334                    params,
335                )
336                .await
337            }
338        }
339        Statement::CreateAggregate {
340            or_replace,
341            if_not_exists,
342            name,
343            args,
344            returns,
345            params,
346            ..
347        } => {
348            create_aggregate::handle_create_aggregate(
349                handler_args,
350                or_replace,
351                if_not_exists,
352                name,
353                args,
354                returns,
355                params,
356            )
357            .await
358        }
359        Statement::CreateTable {
360            name,
361            columns,
362            wildcard_idx,
363            constraints,
364            query,
365            with_options: _, // It is put in OptimizerContext
366            // Not supported things
367            or_replace,
368            temporary,
369            if_not_exists,
370            format_encode,
371            source_watermarks,
372            append_only,
373            on_conflict,
374            with_version_column,
375            cdc_table_info,
376            include_column_options,
377            webhook_info,
378            engine,
379        } => {
380            if or_replace {
381                bail_not_implemented!("CREATE OR REPLACE TABLE");
382            }
383            if temporary {
384                bail_not_implemented!("CREATE TEMPORARY TABLE");
385            }
386            if let Some(query) = query {
387                return create_table_as::handle_create_as(
388                    handler_args,
389                    name,
390                    if_not_exists,
391                    query,
392                    columns,
393                    append_only,
394                    on_conflict,
395                    with_version_column.map(|x| x.real_value()),
396                    engine,
397                )
398                .await;
399            }
400            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
401            create_table::handle_create_table(
402                handler_args,
403                name,
404                columns,
405                wildcard_idx,
406                constraints,
407                if_not_exists,
408                format_encode,
409                source_watermarks,
410                append_only,
411                on_conflict,
412                with_version_column.map(|x| x.real_value()),
413                cdc_table_info,
414                include_column_options,
415                webhook_info,
416                engine,
417            )
418            .await
419        }
420        Statement::CreateDatabase {
421            db_name,
422            if_not_exists,
423            owner,
424            resource_group,
425        } => {
426            create_database::handle_create_database(
427                handler_args,
428                db_name,
429                if_not_exists,
430                owner,
431                resource_group,
432            )
433            .await
434        }
435        Statement::CreateSchema {
436            schema_name,
437            if_not_exists,
438            owner,
439        } => {
440            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
441                .await
442        }
443        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
444        Statement::DeclareCursor { stmt } => {
445            declare_cursor::handle_declare_cursor(handler_args, stmt).await
446        }
447        Statement::FetchCursor { stmt } => {
448            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
449        }
450        Statement::CloseCursor { stmt } => {
451            close_cursor::handle_close_cursor(handler_args, stmt).await
452        }
453        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
454        Statement::Grant { .. } => {
455            handle_privilege::handle_grant_privilege(handler_args, stmt).await
456        }
457        Statement::Revoke { .. } => {
458            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
459        }
460        Statement::Describe { name, kind } => match kind {
461            DescribeKind::Fragments => {
462                describe::handle_describe_fragments(handler_args, name).await
463            }
464            DescribeKind::Plain => describe::handle_describe(handler_args, name),
465        },
466        Statement::Discard(..) => discard::handle_discard(handler_args),
467        Statement::ShowObjects {
468            object: show_object,
469            filter,
470        } => show::handle_show_object(handler_args, show_object, filter).await,
471        Statement::ShowCreateObject { create_type, name } => {
472            show::handle_show_create_object(handler_args, create_type, name)
473        }
474        Statement::ShowTransactionIsolationLevel => {
475            transaction::handle_show_isolation_level(handler_args)
476        }
477        Statement::Drop(DropStatement {
478            object_type,
479            object_name,
480            if_exists,
481            drop_mode,
482        }) => {
483            let mut cascade = false;
484            if let AstOption::Some(DropMode::Cascade) = drop_mode {
485                match object_type {
486                    ObjectType::MaterializedView
487                    | ObjectType::View
488                    | ObjectType::Sink
489                    | ObjectType::Source
490                    | ObjectType::Subscription
491                    | ObjectType::Index
492                    | ObjectType::Table
493                    | ObjectType::Schema => {
494                        cascade = true;
495                    }
496                    ObjectType::Database
497                    | ObjectType::User
498                    | ObjectType::Connection
499                    | ObjectType::Secret => {
500                        bail_not_implemented!("DROP CASCADE");
501                    }
502                };
503            };
504            match object_type {
505                ObjectType::Table => {
506                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
507                        .await
508                }
509                ObjectType::MaterializedView => {
510                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
511                }
512                ObjectType::Index => {
513                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
514                        .await
515                }
516                ObjectType::Source => {
517                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
518                        .await
519                }
520                ObjectType::Sink => {
521                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
522                }
523                ObjectType::Subscription => {
524                    drop_subscription::handle_drop_subscription(
525                        handler_args,
526                        object_name,
527                        if_exists,
528                        cascade,
529                    )
530                    .await
531                }
532                ObjectType::Database => {
533                    drop_database::handle_drop_database(
534                        handler_args,
535                        object_name,
536                        if_exists,
537                        drop_mode.into(),
538                    )
539                    .await
540                }
541                ObjectType::Schema => {
542                    drop_schema::handle_drop_schema(
543                        handler_args,
544                        object_name,
545                        if_exists,
546                        drop_mode.into(),
547                    )
548                    .await
549                }
550                ObjectType::User => {
551                    drop_user::handle_drop_user(
552                        handler_args,
553                        object_name,
554                        if_exists,
555                        drop_mode.into(),
556                    )
557                    .await
558                }
559                ObjectType::View => {
560                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
561                }
562                ObjectType::Connection => {
563                    drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
564                        .await
565                }
566                ObjectType::Secret => {
567                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
568                }
569            }
570        }
571        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
572        Statement::DropFunction {
573            if_exists,
574            func_desc,
575            option,
576        } => {
577            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
578                .await
579        }
580        Statement::DropAggregate {
581            if_exists,
582            func_desc,
583            option,
584        } => {
585            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
586                .await
587        }
588        Statement::Query(_)
589        | Statement::Insert { .. }
590        | Statement::Delete { .. }
591        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
592        Statement::CreateView {
593            materialized,
594            if_not_exists,
595            name,
596            columns,
597            query,
598            with_options: _, // It is put in OptimizerContext
599            or_replace,      // not supported
600            emit_mode,
601        } => {
602            if or_replace {
603                bail_not_implemented!("CREATE OR REPLACE VIEW");
604            }
605            if materialized {
606                create_mv::handle_create_mv(
607                    handler_args,
608                    if_not_exists,
609                    name,
610                    *query,
611                    columns,
612                    emit_mode,
613                )
614                .await
615            } else {
616                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
617                    .await
618            }
619        }
620        Statement::Flush => flush::handle_flush(handler_args).await,
621        Statement::Wait => wait::handle_wait(handler_args).await,
622        Statement::Recover => recover::handle_recover(handler_args).await,
623        Statement::SetVariable {
624            local: _,
625            variable,
626            value,
627        } => {
628            // special handle for `use database`
629            if variable.real_value().eq_ignore_ascii_case("database") {
630                let x = variable::set_var_to_param_str(&value);
631                let res = use_db::handle_use_db(
632                    handler_args,
633                    ObjectName::from(vec![Ident::new_unchecked(
634                        x.unwrap_or("default".to_owned()),
635                    )]),
636                )?;
637                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
638                for notice in res.notices() {
639                    builder = builder.notice(notice);
640                }
641                return Ok(builder.into());
642            }
643            variable::handle_set(handler_args, variable, value)
644        }
645        Statement::SetTimeZone { local: _, value } => {
646            variable::handle_set_time_zone(handler_args, value)
647        }
648        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
649        Statement::CreateIndex {
650            name,
651            table_name,
652            columns,
653            include,
654            distributed_by,
655            unique,
656            if_not_exists,
657        } => {
658            if unique {
659                bail_not_implemented!("create unique index");
660            }
661
662            create_index::handle_create_index(
663                handler_args,
664                if_not_exists,
665                name,
666                table_name,
667                columns.to_vec(),
668                include,
669                distributed_by,
670            )
671            .await
672        }
673        Statement::AlterDatabase { name, operation } => match operation {
674            AlterDatabaseOperation::RenameDatabase { database_name } => {
675                alter_rename::handle_rename_database(handler_args, name, database_name).await
676            }
677            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
678                alter_owner::handle_alter_owner(
679                    handler_args,
680                    name,
681                    new_owner_name,
682                    StatementType::ALTER_DATABASE,
683                )
684                .await
685            }
686        },
687        Statement::AlterSchema { name, operation } => match operation {
688            AlterSchemaOperation::RenameSchema { schema_name } => {
689                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
690            }
691            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
692                alter_owner::handle_alter_owner(
693                    handler_args,
694                    name,
695                    new_owner_name,
696                    StatementType::ALTER_SCHEMA,
697                )
698                .await
699            }
700            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
701                alter_swap_rename::handle_swap_rename(
702                    handler_args,
703                    name,
704                    target_schema,
705                    StatementType::ALTER_SCHEMA,
706                )
707                .await
708            }
709        },
710        Statement::AlterTable { name, operation } => match operation {
711            AlterTableOperation::AddColumn { .. }
712            | AlterTableOperation::DropColumn { .. }
713            | AlterTableOperation::AlterColumn { .. } => {
714                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
715            }
716            AlterTableOperation::RenameTable { table_name } => {
717                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
718                    .await
719            }
720            AlterTableOperation::ChangeOwner { new_owner_name } => {
721                alter_owner::handle_alter_owner(
722                    handler_args,
723                    name,
724                    new_owner_name,
725                    StatementType::ALTER_TABLE,
726                )
727                .await
728            }
729            AlterTableOperation::SetParallelism {
730                parallelism,
731                deferred,
732            } => {
733                alter_parallelism::handle_alter_parallelism(
734                    handler_args,
735                    name,
736                    parallelism,
737                    StatementType::ALTER_TABLE,
738                    deferred,
739                )
740                .await
741            }
742            AlterTableOperation::SetSchema { new_schema_name } => {
743                alter_set_schema::handle_alter_set_schema(
744                    handler_args,
745                    name,
746                    new_schema_name,
747                    StatementType::ALTER_TABLE,
748                    None,
749                )
750                .await
751            }
752            AlterTableOperation::RefreshSchema => {
753                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
754            }
755            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
756                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
757                    handler_args,
758                    PbThrottleTarget::TableWithSource,
759                    name,
760                    rate_limit,
761                )
762                .await
763            }
764            AlterTableOperation::DropConnector => {
765                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
766                    .await
767            }
768            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
769                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
770                    handler_args,
771                    PbThrottleTarget::TableDml,
772                    name,
773                    rate_limit,
774                )
775                .await
776            }
777            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
778                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
779                    handler_args,
780                    PbThrottleTarget::CdcTable,
781                    name,
782                    rate_limit,
783                )
784                .await
785            }
786            AlterTableOperation::SwapRenameTable { target_table } => {
787                alter_swap_rename::handle_swap_rename(
788                    handler_args,
789                    name,
790                    target_table,
791                    StatementType::ALTER_TABLE,
792                )
793                .await
794            }
795            AlterTableOperation::AddConstraint { .. }
796            | AlterTableOperation::DropConstraint { .. }
797            | AlterTableOperation::RenameColumn { .. }
798            | AlterTableOperation::ChangeColumn { .. }
799            | AlterTableOperation::RenameConstraint { .. } => {
800                bail_not_implemented!(
801                    "Unhandled statement: {}",
802                    Statement::AlterTable { name, operation }
803                )
804            }
805        },
806        Statement::AlterIndex { name, operation } => match operation {
807            AlterIndexOperation::RenameIndex { index_name } => {
808                alter_rename::handle_rename_index(handler_args, name, index_name).await
809            }
810            AlterIndexOperation::SetParallelism {
811                parallelism,
812                deferred,
813            } => {
814                alter_parallelism::handle_alter_parallelism(
815                    handler_args,
816                    name,
817                    parallelism,
818                    StatementType::ALTER_INDEX,
819                    deferred,
820                )
821                .await
822            }
823        },
824        Statement::AlterView {
825            materialized,
826            name,
827            operation,
828        } => {
829            let statement_type = if materialized {
830                StatementType::ALTER_MATERIALIZED_VIEW
831            } else {
832                StatementType::ALTER_VIEW
833            };
834            match operation {
835                AlterViewOperation::RenameView { view_name } => {
836                    if materialized {
837                        alter_rename::handle_rename_table(
838                            handler_args,
839                            TableType::MaterializedView,
840                            name,
841                            view_name,
842                        )
843                        .await
844                    } else {
845                        alter_rename::handle_rename_view(handler_args, name, view_name).await
846                    }
847                }
848                AlterViewOperation::SetParallelism {
849                    parallelism,
850                    deferred,
851                } => {
852                    if !materialized {
853                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
854                    }
855                    alter_parallelism::handle_alter_parallelism(
856                        handler_args,
857                        name,
858                        parallelism,
859                        statement_type,
860                        deferred,
861                    )
862                    .await
863                }
864                AlterViewOperation::SetResourceGroup {
865                    resource_group,
866                    deferred,
867                } => {
868                    if !materialized {
869                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
870                    }
871                    alter_resource_group::handle_alter_resource_group(
872                        handler_args,
873                        name,
874                        resource_group,
875                        statement_type,
876                        deferred,
877                    )
878                    .await
879                }
880                AlterViewOperation::ChangeOwner { new_owner_name } => {
881                    alter_owner::handle_alter_owner(
882                        handler_args,
883                        name,
884                        new_owner_name,
885                        statement_type,
886                    )
887                    .await
888                }
889                AlterViewOperation::SetSchema { new_schema_name } => {
890                    alter_set_schema::handle_alter_set_schema(
891                        handler_args,
892                        name,
893                        new_schema_name,
894                        statement_type,
895                        None,
896                    )
897                    .await
898                }
899                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
900                    if !materialized {
901                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
902                    }
903                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
904                        handler_args,
905                        PbThrottleTarget::Mv,
906                        name,
907                        rate_limit,
908                    )
909                    .await
910                }
911                AlterViewOperation::SwapRenameView { target_view } => {
912                    alter_swap_rename::handle_swap_rename(
913                        handler_args,
914                        name,
915                        target_view,
916                        statement_type,
917                    )
918                    .await
919                }
920            }
921        }
922
923        Statement::AlterSink { name, operation } => match operation {
924            AlterSinkOperation::SetSinkProps { changed_props } => {
925                alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await
926            }
927            AlterSinkOperation::RenameSink { sink_name } => {
928                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
929            }
930            AlterSinkOperation::ChangeOwner { new_owner_name } => {
931                alter_owner::handle_alter_owner(
932                    handler_args,
933                    name,
934                    new_owner_name,
935                    StatementType::ALTER_SINK,
936                )
937                .await
938            }
939            AlterSinkOperation::SetSchema { new_schema_name } => {
940                alter_set_schema::handle_alter_set_schema(
941                    handler_args,
942                    name,
943                    new_schema_name,
944                    StatementType::ALTER_SINK,
945                    None,
946                )
947                .await
948            }
949            AlterSinkOperation::SetParallelism {
950                parallelism,
951                deferred,
952            } => {
953                alter_parallelism::handle_alter_parallelism(
954                    handler_args,
955                    name,
956                    parallelism,
957                    StatementType::ALTER_SINK,
958                    deferred,
959                )
960                .await
961            }
962            AlterSinkOperation::SwapRenameSink { target_sink } => {
963                alter_swap_rename::handle_swap_rename(
964                    handler_args,
965                    name,
966                    target_sink,
967                    StatementType::ALTER_SINK,
968                )
969                .await
970            }
971            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
972                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
973                    handler_args,
974                    PbThrottleTarget::Sink,
975                    name,
976                    rate_limit,
977                )
978                .await
979            }
980        },
981        Statement::AlterSubscription { name, operation } => match operation {
982            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
983                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
984                    .await
985            }
986            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
987                alter_owner::handle_alter_owner(
988                    handler_args,
989                    name,
990                    new_owner_name,
991                    StatementType::ALTER_SUBSCRIPTION,
992                )
993                .await
994            }
995            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
996                alter_set_schema::handle_alter_set_schema(
997                    handler_args,
998                    name,
999                    new_schema_name,
1000                    StatementType::ALTER_SUBSCRIPTION,
1001                    None,
1002                )
1003                .await
1004            }
1005            AlterSubscriptionOperation::SwapRenameSubscription {
1006                target_subscription,
1007            } => {
1008                alter_swap_rename::handle_swap_rename(
1009                    handler_args,
1010                    name,
1011                    target_subscription,
1012                    StatementType::ALTER_SUBSCRIPTION,
1013                )
1014                .await
1015            }
1016        },
1017        Statement::AlterSource { name, operation } => match operation {
1018            AlterSourceOperation::RenameSource { source_name } => {
1019                alter_rename::handle_rename_source(handler_args, name, source_name).await
1020            }
1021            AlterSourceOperation::AddColumn { .. } => {
1022                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1023            }
1024            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1025                alter_owner::handle_alter_owner(
1026                    handler_args,
1027                    name,
1028                    new_owner_name,
1029                    StatementType::ALTER_SOURCE,
1030                )
1031                .await
1032            }
1033            AlterSourceOperation::SetSchema { new_schema_name } => {
1034                alter_set_schema::handle_alter_set_schema(
1035                    handler_args,
1036                    name,
1037                    new_schema_name,
1038                    StatementType::ALTER_SOURCE,
1039                    None,
1040                )
1041                .await
1042            }
1043            AlterSourceOperation::FormatEncode { format_encode } => {
1044                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1045                    .await
1046            }
1047            AlterSourceOperation::RefreshSchema => {
1048                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1049            }
1050            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1051                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1052                    handler_args,
1053                    PbThrottleTarget::Source,
1054                    name,
1055                    rate_limit,
1056                )
1057                .await
1058            }
1059            AlterSourceOperation::SwapRenameSource { target_source } => {
1060                alter_swap_rename::handle_swap_rename(
1061                    handler_args,
1062                    name,
1063                    target_source,
1064                    StatementType::ALTER_SOURCE,
1065                )
1066                .await
1067            }
1068            AlterSourceOperation::SetParallelism {
1069                parallelism,
1070                deferred,
1071            } => {
1072                alter_parallelism::handle_alter_parallelism(
1073                    handler_args,
1074                    name,
1075                    parallelism,
1076                    StatementType::ALTER_SOURCE,
1077                    deferred,
1078                )
1079                .await
1080            }
1081        },
1082        Statement::AlterFunction {
1083            name,
1084            args,
1085            operation,
1086        } => match operation {
1087            AlterFunctionOperation::SetSchema { new_schema_name } => {
1088                alter_set_schema::handle_alter_set_schema(
1089                    handler_args,
1090                    name,
1091                    new_schema_name,
1092                    StatementType::ALTER_FUNCTION,
1093                    args,
1094                )
1095                .await
1096            }
1097        },
1098        Statement::AlterConnection { name, operation } => match operation {
1099            AlterConnectionOperation::SetSchema { new_schema_name } => {
1100                alter_set_schema::handle_alter_set_schema(
1101                    handler_args,
1102                    name,
1103                    new_schema_name,
1104                    StatementType::ALTER_CONNECTION,
1105                    None,
1106                )
1107                .await
1108            }
1109            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1110                alter_owner::handle_alter_owner(
1111                    handler_args,
1112                    name,
1113                    new_owner_name,
1114                    StatementType::ALTER_CONNECTION,
1115                )
1116                .await
1117            }
1118        },
1119        Statement::AlterSystem { param, value } => {
1120            alter_system::handle_alter_system(handler_args, param, value).await
1121        }
1122        Statement::AlterSecret {
1123            name,
1124            with_options,
1125            operation,
1126        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1127        Statement::AlterFragment {
1128            fragment_id,
1129            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1130        } => {
1131            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1132                &handler_args.session,
1133                PbThrottleTarget::Fragment,
1134                fragment_id,
1135                rate_limit,
1136                StatementType::SET_VARIABLE,
1137            )
1138            .await
1139        }
1140        Statement::StartTransaction { modes } => {
1141            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1142        }
1143        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1144        Statement::Commit { chain } => {
1145            transaction::handle_commit(handler_args, COMMIT, chain).await
1146        }
1147        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1148        Statement::Rollback { chain } => {
1149            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1150        }
1151        Statement::SetTransaction {
1152            modes,
1153            snapshot,
1154            session,
1155        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1156        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1157        Statement::Kill(process_id) => handle_kill(handler_args, process_id).await,
1158        Statement::Comment {
1159            object_type,
1160            object_name,
1161            comment,
1162        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1163        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1164        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1165    }
1166}
1167
1168fn check_ban_ddl_for_iceberg_engine_table(
1169    session: Arc<SessionImpl>,
1170    stmt: &Statement,
1171) -> Result<()> {
1172    match stmt {
1173        Statement::AlterTable {
1174            name,
1175            operation:
1176                operation @ (AlterTableOperation::AddColumn { .. }
1177                | AlterTableOperation::DropColumn { .. }),
1178        } => {
1179            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1180            if table.is_iceberg_engine_table() {
1181                bail!(
1182                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1183                    operation,
1184                    schema_name,
1185                    name
1186                );
1187            }
1188        }
1189
1190        Statement::AlterTable {
1191            name,
1192            operation: AlterTableOperation::RenameTable { .. },
1193        } => {
1194            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1195            if table.is_iceberg_engine_table() {
1196                bail!(
1197                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1198                    schema_name,
1199                    name
1200                );
1201            }
1202        }
1203
1204        Statement::AlterTable {
1205            name,
1206            operation: AlterTableOperation::ChangeOwner { .. },
1207        } => {
1208            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1209            if table.is_iceberg_engine_table() {
1210                bail!(
1211                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1212                    schema_name,
1213                    name
1214                );
1215            }
1216        }
1217
1218        Statement::AlterTable {
1219            name,
1220            operation: AlterTableOperation::SetParallelism { .. },
1221        } => {
1222            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1223            if table.is_iceberg_engine_table() {
1224                bail!(
1225                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1226                    schema_name,
1227                    name
1228                );
1229            }
1230        }
1231
1232        Statement::AlterTable {
1233            name,
1234            operation: AlterTableOperation::SetSchema { .. },
1235        } => {
1236            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1237            if table.is_iceberg_engine_table() {
1238                bail!(
1239                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1240                    schema_name,
1241                    name
1242                );
1243            }
1244        }
1245
1246        Statement::AlterTable {
1247            name,
1248            operation: AlterTableOperation::RefreshSchema,
1249        } => {
1250            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1251            if table.is_iceberg_engine_table() {
1252                bail!(
1253                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1254                    schema_name,
1255                    name
1256                );
1257            }
1258        }
1259
1260        Statement::AlterTable {
1261            name,
1262            operation: AlterTableOperation::SetSourceRateLimit { .. },
1263        } => {
1264            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1265            if table.is_iceberg_engine_table() {
1266                bail!(
1267                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1268                    schema_name,
1269                    name
1270                );
1271            }
1272        }
1273
1274        _ => {}
1275    }
1276
1277    Ok(())
1278}