risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::types::Fields;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::{bail, bail_not_implemented};
30use risingwave_pb::meta::PbThrottleTarget;
31use risingwave_sqlparser::ast::*;
32use util::get_table_catalog_by_table_name;
33
34use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
35use crate::catalog::table_catalog::TableType;
36use crate::error::{ErrorCode, Result};
37use crate::handler::cancel_job::handle_cancel;
38use crate::handler::kill_process::handle_kill;
39use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
40use crate::session::SessionImpl;
41use crate::utils::WithOptions;
42
43mod alter_owner;
44mod alter_parallelism;
45mod alter_rename;
46mod alter_resource_group;
47mod alter_secret;
48mod alter_set_schema;
49mod alter_sink_props;
50mod alter_source_column;
51mod alter_source_with_sr;
52mod alter_streaming_rate_limit;
53mod alter_swap_rename;
54mod alter_system;
55mod alter_table_column;
56pub mod alter_table_drop_connector;
57mod alter_table_with_sr;
58pub mod alter_user;
59pub mod cancel_job;
60pub mod close_cursor;
61mod comment;
62pub mod create_aggregate;
63pub mod create_connection;
64mod create_database;
65pub mod create_function;
66pub mod create_index;
67pub mod create_mv;
68pub mod create_schema;
69pub mod create_secret;
70pub mod create_sink;
71pub mod create_source;
72pub mod create_sql_function;
73pub mod create_subscription;
74pub mod create_table;
75pub mod create_table_as;
76pub mod create_user;
77pub mod create_view;
78pub mod declare_cursor;
79pub mod describe;
80pub mod discard;
81mod drop_connection;
82mod drop_database;
83pub mod drop_function;
84mod drop_index;
85pub mod drop_mv;
86mod drop_schema;
87pub mod drop_secret;
88pub mod drop_sink;
89pub mod drop_source;
90pub mod drop_subscription;
91pub mod drop_table;
92pub mod drop_user;
93mod drop_view;
94pub mod explain;
95pub mod explain_analyze_stream_job;
96pub mod extended_handle;
97pub mod fetch_cursor;
98mod flush;
99pub mod handle_privilege;
100mod kill_process;
101pub mod privilege;
102pub mod query;
103mod recover;
104pub mod show;
105mod transaction;
106mod use_db;
107pub mod util;
108pub mod variable;
109mod wait;
110
111pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
112
113/// The [`PgResponseBuilder`] used by RisingWave.
114pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
115
116/// The [`PgResponse`] used by RisingWave.
117pub type RwPgResponse = PgResponse<PgResponseStream>;
118
119#[easy_ext::ext(RwPgResponseBuilderExt)]
120impl RwPgResponseBuilder {
121    /// Append rows to the response.
122    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
123        let fields = T::fields();
124        self.values(
125            rows.into_iter()
126                .map(|row| {
127                    Row::new(
128                        row.into_owned_row()
129                            .into_iter()
130                            .zip_eq_fast(&fields)
131                            .map(|(datum, (_, ty))| {
132                                datum.map(|scalar| {
133                                    scalar.as_scalar_ref_impl().text_format(ty).into()
134                                })
135                            })
136                            .collect(),
137                    )
138                })
139                .collect_vec()
140                .into(),
141            fields_to_descriptors(fields),
142        )
143    }
144}
145
146pub fn fields_to_descriptors(
147    fields: Vec<(&str, risingwave_common::types::DataType)>,
148) -> Vec<PgFieldDescriptor> {
149    fields
150        .iter()
151        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
152        .collect()
153}
154
155pub enum PgResponseStream {
156    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
157    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
158    Rows(BoxStream<'static, RowSetResult>),
159}
160
161impl Stream for PgResponseStream {
162    type Item = std::result::Result<Vec<Row>, BoxedError>;
163
164    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165        match &mut *self {
166            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
167            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
168            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
169        }
170    }
171}
172
173impl From<Vec<Row>> for PgResponseStream {
174    fn from(rows: Vec<Row>) -> Self {
175        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
176    }
177}
178
179#[derive(Clone)]
180pub struct HandlerArgs {
181    pub session: Arc<SessionImpl>,
182    pub sql: Arc<str>,
183    pub normalized_sql: String,
184    pub with_options: WithOptions,
185}
186
187impl HandlerArgs {
188    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
189        Ok(Self {
190            session,
191            sql,
192            with_options: WithOptions::try_from(stmt)?,
193            normalized_sql: Self::normalize_sql(stmt),
194        })
195    }
196
197    /// Get normalized SQL from the statement.
198    ///
199    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
200    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
201    ///   make it suitable for the `SHOW CREATE` statements.
202    fn normalize_sql(stmt: &Statement) -> String {
203        let mut stmt = stmt.clone();
204        match &mut stmt {
205            Statement::CreateView {
206                or_replace,
207                if_not_exists,
208                ..
209            } => {
210                *or_replace = false;
211                *if_not_exists = false;
212            }
213            Statement::CreateTable {
214                or_replace,
215                if_not_exists,
216                ..
217            } => {
218                *or_replace = false;
219                *if_not_exists = false;
220            }
221            Statement::CreateIndex { if_not_exists, .. } => {
222                *if_not_exists = false;
223            }
224            Statement::CreateSource {
225                stmt: CreateSourceStatement { if_not_exists, .. },
226                ..
227            } => {
228                *if_not_exists = false;
229            }
230            Statement::CreateSink {
231                stmt: CreateSinkStatement { if_not_exists, .. },
232            } => {
233                *if_not_exists = false;
234            }
235            Statement::CreateSubscription {
236                stmt: CreateSubscriptionStatement { if_not_exists, .. },
237            } => {
238                *if_not_exists = false;
239            }
240            Statement::CreateConnection {
241                stmt: CreateConnectionStatement { if_not_exists, .. },
242            } => {
243                *if_not_exists = false;
244            }
245            _ => {}
246        }
247        stmt.to_string()
248    }
249}
250
251pub async fn handle(
252    session: Arc<SessionImpl>,
253    stmt: Statement,
254    sql: Arc<str>,
255    formats: Vec<Format>,
256) -> Result<RwPgResponse> {
257    session.clear_cancel_query_flag();
258    let _guard = session.txn_begin_implicit();
259    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
260
261    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
262
263    match stmt {
264        Statement::Explain {
265            statement,
266            analyze,
267            options,
268        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
269        Statement::ExplainAnalyzeStreamJob {
270            target,
271            duration_secs,
272        } => {
273            explain_analyze_stream_job::handle_explain_analyze_stream_job(
274                handler_args,
275                target,
276                duration_secs,
277            )
278            .await
279        }
280        Statement::CreateSource { stmt } => {
281            create_source::handle_create_source(handler_args, stmt).await
282        }
283        Statement::CreateSink { stmt } => {
284            create_sink::handle_create_sink(handler_args, stmt, false).await
285        }
286        Statement::CreateSubscription { stmt } => {
287            create_subscription::handle_create_subscription(handler_args, stmt).await
288        }
289        Statement::CreateConnection { stmt } => {
290            create_connection::handle_create_connection(handler_args, stmt).await
291        }
292        Statement::CreateSecret { stmt } => {
293            create_secret::handle_create_secret(handler_args, stmt).await
294        }
295        Statement::CreateFunction {
296            or_replace,
297            temporary,
298            if_not_exists,
299            name,
300            args,
301            returns,
302            params,
303            with_options,
304        } => {
305            // For general udf, `language` clause could be ignored
306            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
307            if params.language.is_none()
308                || !params
309                    .language
310                    .as_ref()
311                    .unwrap()
312                    .real_value()
313                    .eq_ignore_ascii_case("sql")
314            {
315                create_function::handle_create_function(
316                    handler_args,
317                    or_replace,
318                    temporary,
319                    if_not_exists,
320                    name,
321                    args,
322                    returns,
323                    params,
324                    with_options,
325                )
326                .await
327            } else {
328                create_sql_function::handle_create_sql_function(
329                    handler_args,
330                    or_replace,
331                    temporary,
332                    if_not_exists,
333                    name,
334                    args,
335                    returns,
336                    params,
337                )
338                .await
339            }
340        }
341        Statement::CreateAggregate {
342            or_replace,
343            if_not_exists,
344            name,
345            args,
346            returns,
347            params,
348            ..
349        } => {
350            create_aggregate::handle_create_aggregate(
351                handler_args,
352                or_replace,
353                if_not_exists,
354                name,
355                args,
356                returns,
357                params,
358            )
359            .await
360        }
361        Statement::CreateTable {
362            name,
363            columns,
364            wildcard_idx,
365            constraints,
366            query,
367            with_options: _, // It is put in OptimizerContext
368            // Not supported things
369            or_replace,
370            temporary,
371            if_not_exists,
372            format_encode,
373            source_watermarks,
374            append_only,
375            on_conflict,
376            with_version_column,
377            cdc_table_info,
378            include_column_options,
379            webhook_info,
380            engine,
381        } => {
382            if or_replace {
383                bail_not_implemented!("CREATE OR REPLACE TABLE");
384            }
385            if temporary {
386                bail_not_implemented!("CREATE TEMPORARY TABLE");
387            }
388            if let Some(query) = query {
389                return create_table_as::handle_create_as(
390                    handler_args,
391                    name,
392                    if_not_exists,
393                    query,
394                    columns,
395                    append_only,
396                    on_conflict,
397                    with_version_column.map(|x| x.real_value()),
398                    engine,
399                )
400                .await;
401            }
402            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
403            create_table::handle_create_table(
404                handler_args,
405                name,
406                columns,
407                wildcard_idx,
408                constraints,
409                if_not_exists,
410                format_encode,
411                source_watermarks,
412                append_only,
413                on_conflict,
414                with_version_column.map(|x| x.real_value()),
415                cdc_table_info,
416                include_column_options,
417                webhook_info,
418                engine,
419            )
420            .await
421        }
422        Statement::CreateDatabase {
423            db_name,
424            if_not_exists,
425            owner,
426            resource_group,
427        } => {
428            create_database::handle_create_database(
429                handler_args,
430                db_name,
431                if_not_exists,
432                owner,
433                resource_group,
434            )
435            .await
436        }
437        Statement::CreateSchema {
438            schema_name,
439            if_not_exists,
440            owner,
441        } => {
442            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
443                .await
444        }
445        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
446        Statement::DeclareCursor { stmt } => {
447            declare_cursor::handle_declare_cursor(handler_args, stmt).await
448        }
449        Statement::FetchCursor { stmt } => {
450            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
451        }
452        Statement::CloseCursor { stmt } => {
453            close_cursor::handle_close_cursor(handler_args, stmt).await
454        }
455        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
456        Statement::Grant { .. } => {
457            handle_privilege::handle_grant_privilege(handler_args, stmt).await
458        }
459        Statement::Revoke { .. } => {
460            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
461        }
462        Statement::Describe { name, kind } => match kind {
463            DescribeKind::Fragments => {
464                describe::handle_describe_fragments(handler_args, name).await
465            }
466            DescribeKind::Plain => describe::handle_describe(handler_args, name),
467        },
468        Statement::DescribeFragment { fragment_id } => {
469            describe::handle_describe_fragment(handler_args, fragment_id).await
470        }
471        Statement::Discard(..) => discard::handle_discard(handler_args),
472        Statement::ShowObjects {
473            object: show_object,
474            filter,
475        } => show::handle_show_object(handler_args, show_object, filter).await,
476        Statement::ShowCreateObject { create_type, name } => {
477            show::handle_show_create_object(handler_args, create_type, name)
478        }
479        Statement::ShowTransactionIsolationLevel => {
480            transaction::handle_show_isolation_level(handler_args)
481        }
482        Statement::Drop(DropStatement {
483            object_type,
484            object_name,
485            if_exists,
486            drop_mode,
487        }) => {
488            let mut cascade = false;
489            if let AstOption::Some(DropMode::Cascade) = drop_mode {
490                match object_type {
491                    ObjectType::MaterializedView
492                    | ObjectType::View
493                    | ObjectType::Sink
494                    | ObjectType::Source
495                    | ObjectType::Subscription
496                    | ObjectType::Index
497                    | ObjectType::Table
498                    | ObjectType::Schema => {
499                        cascade = true;
500                    }
501                    ObjectType::Database
502                    | ObjectType::User
503                    | ObjectType::Connection
504                    | ObjectType::Secret => {
505                        bail_not_implemented!("DROP CASCADE");
506                    }
507                };
508            };
509            match object_type {
510                ObjectType::Table => {
511                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
512                        .await
513                }
514                ObjectType::MaterializedView => {
515                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
516                }
517                ObjectType::Index => {
518                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
519                        .await
520                }
521                ObjectType::Source => {
522                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
523                        .await
524                }
525                ObjectType::Sink => {
526                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
527                }
528                ObjectType::Subscription => {
529                    drop_subscription::handle_drop_subscription(
530                        handler_args,
531                        object_name,
532                        if_exists,
533                        cascade,
534                    )
535                    .await
536                }
537                ObjectType::Database => {
538                    drop_database::handle_drop_database(
539                        handler_args,
540                        object_name,
541                        if_exists,
542                        drop_mode.into(),
543                    )
544                    .await
545                }
546                ObjectType::Schema => {
547                    drop_schema::handle_drop_schema(
548                        handler_args,
549                        object_name,
550                        if_exists,
551                        drop_mode.into(),
552                    )
553                    .await
554                }
555                ObjectType::User => {
556                    drop_user::handle_drop_user(
557                        handler_args,
558                        object_name,
559                        if_exists,
560                        drop_mode.into(),
561                    )
562                    .await
563                }
564                ObjectType::View => {
565                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
566                }
567                ObjectType::Connection => {
568                    drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
569                        .await
570                }
571                ObjectType::Secret => {
572                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
573                }
574            }
575        }
576        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
577        Statement::DropFunction {
578            if_exists,
579            func_desc,
580            option,
581        } => {
582            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
583                .await
584        }
585        Statement::DropAggregate {
586            if_exists,
587            func_desc,
588            option,
589        } => {
590            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
591                .await
592        }
593        Statement::Query(_)
594        | Statement::Insert { .. }
595        | Statement::Delete { .. }
596        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
597        Statement::CreateView {
598            materialized,
599            if_not_exists,
600            name,
601            columns,
602            query,
603            with_options: _, // It is put in OptimizerContext
604            or_replace,      // not supported
605            emit_mode,
606        } => {
607            if or_replace {
608                bail_not_implemented!("CREATE OR REPLACE VIEW");
609            }
610            if materialized {
611                create_mv::handle_create_mv(
612                    handler_args,
613                    if_not_exists,
614                    name,
615                    *query,
616                    columns,
617                    emit_mode,
618                )
619                .await
620            } else {
621                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
622                    .await
623            }
624        }
625        Statement::Flush => flush::handle_flush(handler_args).await,
626        Statement::Wait => wait::handle_wait(handler_args).await,
627        Statement::Recover => recover::handle_recover(handler_args).await,
628        Statement::SetVariable {
629            local: _,
630            variable,
631            value,
632        } => {
633            // special handle for `use database`
634            if variable.real_value().eq_ignore_ascii_case("database") {
635                let x = variable::set_var_to_param_str(&value);
636                let res = use_db::handle_use_db(
637                    handler_args,
638                    ObjectName::from(vec![Ident::new_unchecked(
639                        x.unwrap_or("default".to_owned()),
640                    )]),
641                )?;
642                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
643                for notice in res.notices() {
644                    builder = builder.notice(notice);
645                }
646                return Ok(builder.into());
647            }
648            variable::handle_set(handler_args, variable, value)
649        }
650        Statement::SetTimeZone { local: _, value } => {
651            variable::handle_set_time_zone(handler_args, value)
652        }
653        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
654        Statement::CreateIndex {
655            name,
656            table_name,
657            columns,
658            include,
659            distributed_by,
660            unique,
661            if_not_exists,
662        } => {
663            if unique {
664                bail_not_implemented!("create unique index");
665            }
666
667            create_index::handle_create_index(
668                handler_args,
669                if_not_exists,
670                name,
671                table_name,
672                columns.to_vec(),
673                include,
674                distributed_by,
675            )
676            .await
677        }
678        Statement::AlterDatabase { name, operation } => match operation {
679            AlterDatabaseOperation::RenameDatabase { database_name } => {
680                alter_rename::handle_rename_database(handler_args, name, database_name).await
681            }
682            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
683                alter_owner::handle_alter_owner(
684                    handler_args,
685                    name,
686                    new_owner_name,
687                    StatementType::ALTER_DATABASE,
688                )
689                .await
690            }
691        },
692        Statement::AlterSchema { name, operation } => match operation {
693            AlterSchemaOperation::RenameSchema { schema_name } => {
694                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
695            }
696            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
697                alter_owner::handle_alter_owner(
698                    handler_args,
699                    name,
700                    new_owner_name,
701                    StatementType::ALTER_SCHEMA,
702                )
703                .await
704            }
705            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
706                alter_swap_rename::handle_swap_rename(
707                    handler_args,
708                    name,
709                    target_schema,
710                    StatementType::ALTER_SCHEMA,
711                )
712                .await
713            }
714        },
715        Statement::AlterTable { name, operation } => match operation {
716            AlterTableOperation::AddColumn { .. }
717            | AlterTableOperation::DropColumn { .. }
718            | AlterTableOperation::AlterColumn { .. } => {
719                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
720            }
721            AlterTableOperation::RenameTable { table_name } => {
722                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
723                    .await
724            }
725            AlterTableOperation::ChangeOwner { new_owner_name } => {
726                alter_owner::handle_alter_owner(
727                    handler_args,
728                    name,
729                    new_owner_name,
730                    StatementType::ALTER_TABLE,
731                )
732                .await
733            }
734            AlterTableOperation::SetParallelism {
735                parallelism,
736                deferred,
737            } => {
738                alter_parallelism::handle_alter_parallelism(
739                    handler_args,
740                    name,
741                    parallelism,
742                    StatementType::ALTER_TABLE,
743                    deferred,
744                )
745                .await
746            }
747            AlterTableOperation::SetSchema { new_schema_name } => {
748                alter_set_schema::handle_alter_set_schema(
749                    handler_args,
750                    name,
751                    new_schema_name,
752                    StatementType::ALTER_TABLE,
753                    None,
754                )
755                .await
756            }
757            AlterTableOperation::RefreshSchema => {
758                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
759            }
760            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
761                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
762                    handler_args,
763                    PbThrottleTarget::TableWithSource,
764                    name,
765                    rate_limit,
766                )
767                .await
768            }
769            AlterTableOperation::DropConnector => {
770                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
771                    .await
772            }
773            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
774                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
775                    handler_args,
776                    PbThrottleTarget::TableDml,
777                    name,
778                    rate_limit,
779                )
780                .await
781            }
782            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
783                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
784                    handler_args,
785                    PbThrottleTarget::CdcTable,
786                    name,
787                    rate_limit,
788                )
789                .await
790            }
791            AlterTableOperation::SwapRenameTable { target_table } => {
792                alter_swap_rename::handle_swap_rename(
793                    handler_args,
794                    name,
795                    target_table,
796                    StatementType::ALTER_TABLE,
797                )
798                .await
799            }
800            AlterTableOperation::AddConstraint { .. }
801            | AlterTableOperation::DropConstraint { .. }
802            | AlterTableOperation::RenameColumn { .. }
803            | AlterTableOperation::ChangeColumn { .. }
804            | AlterTableOperation::RenameConstraint { .. } => {
805                bail_not_implemented!(
806                    "Unhandled statement: {}",
807                    Statement::AlterTable { name, operation }
808                )
809            }
810        },
811        Statement::AlterIndex { name, operation } => match operation {
812            AlterIndexOperation::RenameIndex { index_name } => {
813                alter_rename::handle_rename_index(handler_args, name, index_name).await
814            }
815            AlterIndexOperation::SetParallelism {
816                parallelism,
817                deferred,
818            } => {
819                alter_parallelism::handle_alter_parallelism(
820                    handler_args,
821                    name,
822                    parallelism,
823                    StatementType::ALTER_INDEX,
824                    deferred,
825                )
826                .await
827            }
828        },
829        Statement::AlterView {
830            materialized,
831            name,
832            operation,
833        } => {
834            let statement_type = if materialized {
835                StatementType::ALTER_MATERIALIZED_VIEW
836            } else {
837                StatementType::ALTER_VIEW
838            };
839            match operation {
840                AlterViewOperation::RenameView { view_name } => {
841                    if materialized {
842                        alter_rename::handle_rename_table(
843                            handler_args,
844                            TableType::MaterializedView,
845                            name,
846                            view_name,
847                        )
848                        .await
849                    } else {
850                        alter_rename::handle_rename_view(handler_args, name, view_name).await
851                    }
852                }
853                AlterViewOperation::SetParallelism {
854                    parallelism,
855                    deferred,
856                } => {
857                    if !materialized {
858                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
859                    }
860                    alter_parallelism::handle_alter_parallelism(
861                        handler_args,
862                        name,
863                        parallelism,
864                        statement_type,
865                        deferred,
866                    )
867                    .await
868                }
869                AlterViewOperation::SetResourceGroup {
870                    resource_group,
871                    deferred,
872                } => {
873                    if !materialized {
874                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
875                    }
876                    alter_resource_group::handle_alter_resource_group(
877                        handler_args,
878                        name,
879                        resource_group,
880                        statement_type,
881                        deferred,
882                    )
883                    .await
884                }
885                AlterViewOperation::ChangeOwner { new_owner_name } => {
886                    alter_owner::handle_alter_owner(
887                        handler_args,
888                        name,
889                        new_owner_name,
890                        statement_type,
891                    )
892                    .await
893                }
894                AlterViewOperation::SetSchema { new_schema_name } => {
895                    alter_set_schema::handle_alter_set_schema(
896                        handler_args,
897                        name,
898                        new_schema_name,
899                        statement_type,
900                        None,
901                    )
902                    .await
903                }
904                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
905                    if !materialized {
906                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
907                    }
908                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
909                        handler_args,
910                        PbThrottleTarget::Mv,
911                        name,
912                        rate_limit,
913                    )
914                    .await
915                }
916                AlterViewOperation::SwapRenameView { target_view } => {
917                    alter_swap_rename::handle_swap_rename(
918                        handler_args,
919                        name,
920                        target_view,
921                        statement_type,
922                    )
923                    .await
924                }
925            }
926        }
927
928        Statement::AlterSink { name, operation } => match operation {
929            AlterSinkOperation::SetSinkProps { changed_props } => {
930                alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await
931            }
932            AlterSinkOperation::RenameSink { sink_name } => {
933                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
934            }
935            AlterSinkOperation::ChangeOwner { new_owner_name } => {
936                alter_owner::handle_alter_owner(
937                    handler_args,
938                    name,
939                    new_owner_name,
940                    StatementType::ALTER_SINK,
941                )
942                .await
943            }
944            AlterSinkOperation::SetSchema { new_schema_name } => {
945                alter_set_schema::handle_alter_set_schema(
946                    handler_args,
947                    name,
948                    new_schema_name,
949                    StatementType::ALTER_SINK,
950                    None,
951                )
952                .await
953            }
954            AlterSinkOperation::SetParallelism {
955                parallelism,
956                deferred,
957            } => {
958                alter_parallelism::handle_alter_parallelism(
959                    handler_args,
960                    name,
961                    parallelism,
962                    StatementType::ALTER_SINK,
963                    deferred,
964                )
965                .await
966            }
967            AlterSinkOperation::SwapRenameSink { target_sink } => {
968                alter_swap_rename::handle_swap_rename(
969                    handler_args,
970                    name,
971                    target_sink,
972                    StatementType::ALTER_SINK,
973                )
974                .await
975            }
976            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
977                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
978                    handler_args,
979                    PbThrottleTarget::Sink,
980                    name,
981                    rate_limit,
982                )
983                .await
984            }
985        },
986        Statement::AlterSubscription { name, operation } => match operation {
987            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
988                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
989                    .await
990            }
991            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
992                alter_owner::handle_alter_owner(
993                    handler_args,
994                    name,
995                    new_owner_name,
996                    StatementType::ALTER_SUBSCRIPTION,
997                )
998                .await
999            }
1000            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1001                alter_set_schema::handle_alter_set_schema(
1002                    handler_args,
1003                    name,
1004                    new_schema_name,
1005                    StatementType::ALTER_SUBSCRIPTION,
1006                    None,
1007                )
1008                .await
1009            }
1010            AlterSubscriptionOperation::SwapRenameSubscription {
1011                target_subscription,
1012            } => {
1013                alter_swap_rename::handle_swap_rename(
1014                    handler_args,
1015                    name,
1016                    target_subscription,
1017                    StatementType::ALTER_SUBSCRIPTION,
1018                )
1019                .await
1020            }
1021        },
1022        Statement::AlterSource { name, operation } => match operation {
1023            AlterSourceOperation::RenameSource { source_name } => {
1024                alter_rename::handle_rename_source(handler_args, name, source_name).await
1025            }
1026            AlterSourceOperation::AddColumn { .. } => {
1027                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1028            }
1029            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1030                alter_owner::handle_alter_owner(
1031                    handler_args,
1032                    name,
1033                    new_owner_name,
1034                    StatementType::ALTER_SOURCE,
1035                )
1036                .await
1037            }
1038            AlterSourceOperation::SetSchema { new_schema_name } => {
1039                alter_set_schema::handle_alter_set_schema(
1040                    handler_args,
1041                    name,
1042                    new_schema_name,
1043                    StatementType::ALTER_SOURCE,
1044                    None,
1045                )
1046                .await
1047            }
1048            AlterSourceOperation::FormatEncode { format_encode } => {
1049                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1050                    .await
1051            }
1052            AlterSourceOperation::RefreshSchema => {
1053                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1054            }
1055            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1056                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1057                    handler_args,
1058                    PbThrottleTarget::Source,
1059                    name,
1060                    rate_limit,
1061                )
1062                .await
1063            }
1064            AlterSourceOperation::SwapRenameSource { target_source } => {
1065                alter_swap_rename::handle_swap_rename(
1066                    handler_args,
1067                    name,
1068                    target_source,
1069                    StatementType::ALTER_SOURCE,
1070                )
1071                .await
1072            }
1073            AlterSourceOperation::SetParallelism {
1074                parallelism,
1075                deferred,
1076            } => {
1077                alter_parallelism::handle_alter_parallelism(
1078                    handler_args,
1079                    name,
1080                    parallelism,
1081                    StatementType::ALTER_SOURCE,
1082                    deferred,
1083                )
1084                .await
1085            }
1086        },
1087        Statement::AlterFunction {
1088            name,
1089            args,
1090            operation,
1091        } => match operation {
1092            AlterFunctionOperation::SetSchema { new_schema_name } => {
1093                alter_set_schema::handle_alter_set_schema(
1094                    handler_args,
1095                    name,
1096                    new_schema_name,
1097                    StatementType::ALTER_FUNCTION,
1098                    args,
1099                )
1100                .await
1101            }
1102        },
1103        Statement::AlterConnection { name, operation } => match operation {
1104            AlterConnectionOperation::SetSchema { new_schema_name } => {
1105                alter_set_schema::handle_alter_set_schema(
1106                    handler_args,
1107                    name,
1108                    new_schema_name,
1109                    StatementType::ALTER_CONNECTION,
1110                    None,
1111                )
1112                .await
1113            }
1114            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1115                alter_owner::handle_alter_owner(
1116                    handler_args,
1117                    name,
1118                    new_owner_name,
1119                    StatementType::ALTER_CONNECTION,
1120                )
1121                .await
1122            }
1123        },
1124        Statement::AlterSystem { param, value } => {
1125            alter_system::handle_alter_system(handler_args, param, value).await
1126        }
1127        Statement::AlterSecret {
1128            name,
1129            with_options,
1130            operation,
1131        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1132        Statement::AlterFragment {
1133            fragment_id,
1134            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1135        } => {
1136            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1137                &handler_args.session,
1138                PbThrottleTarget::Fragment,
1139                fragment_id,
1140                rate_limit,
1141                StatementType::SET_VARIABLE,
1142            )
1143            .await
1144        }
1145        Statement::StartTransaction { modes } => {
1146            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1147        }
1148        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1149        Statement::Commit { chain } => {
1150            transaction::handle_commit(handler_args, COMMIT, chain).await
1151        }
1152        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1153        Statement::Rollback { chain } => {
1154            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1155        }
1156        Statement::SetTransaction {
1157            modes,
1158            snapshot,
1159            session,
1160        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1161        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1162        Statement::Kill(process_id) => handle_kill(handler_args, process_id).await,
1163        Statement::Comment {
1164            object_type,
1165            object_name,
1166            comment,
1167        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1168        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1169        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1170    }
1171}
1172
1173fn check_ban_ddl_for_iceberg_engine_table(
1174    session: Arc<SessionImpl>,
1175    stmt: &Statement,
1176) -> Result<()> {
1177    match stmt {
1178        Statement::AlterTable {
1179            name,
1180            operation:
1181                operation @ (AlterTableOperation::AddColumn { .. }
1182                | AlterTableOperation::DropColumn { .. }),
1183        } => {
1184            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1185            if table.is_iceberg_engine_table() {
1186                bail!(
1187                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1188                    operation,
1189                    schema_name,
1190                    name
1191                );
1192            }
1193        }
1194
1195        Statement::AlterTable {
1196            name,
1197            operation: AlterTableOperation::RenameTable { .. },
1198        } => {
1199            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1200            if table.is_iceberg_engine_table() {
1201                bail!(
1202                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1203                    schema_name,
1204                    name
1205                );
1206            }
1207        }
1208
1209        Statement::AlterTable {
1210            name,
1211            operation: AlterTableOperation::ChangeOwner { .. },
1212        } => {
1213            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1214            if table.is_iceberg_engine_table() {
1215                bail!(
1216                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1217                    schema_name,
1218                    name
1219                );
1220            }
1221        }
1222
1223        Statement::AlterTable {
1224            name,
1225            operation: AlterTableOperation::SetParallelism { .. },
1226        } => {
1227            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1228            if table.is_iceberg_engine_table() {
1229                bail!(
1230                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1231                    schema_name,
1232                    name
1233                );
1234            }
1235        }
1236
1237        Statement::AlterTable {
1238            name,
1239            operation: AlterTableOperation::SetSchema { .. },
1240        } => {
1241            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1242            if table.is_iceberg_engine_table() {
1243                bail!(
1244                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1245                    schema_name,
1246                    name
1247                );
1248            }
1249        }
1250
1251        Statement::AlterTable {
1252            name,
1253            operation: AlterTableOperation::RefreshSchema,
1254        } => {
1255            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1256            if table.is_iceberg_engine_table() {
1257                bail!(
1258                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1259                    schema_name,
1260                    name
1261                );
1262            }
1263        }
1264
1265        Statement::AlterTable {
1266            name,
1267            operation: AlterTableOperation::SetSourceRateLimit { .. },
1268        } => {
1269            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1270            if table.is_iceberg_engine_table() {
1271                bail!(
1272                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1273                    schema_name,
1274                    name
1275                );
1276            }
1277        }
1278
1279        _ => {}
1280    }
1281
1282    Ok(())
1283}