risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63mod alter_table_with_sr;
64pub mod alter_user;
65pub mod cancel_job;
66pub mod close_cursor;
67mod comment;
68pub mod create_aggregate;
69pub mod create_connection;
70mod create_database;
71pub mod create_function;
72pub mod create_index;
73pub mod create_mv;
74pub mod create_schema;
75pub mod create_secret;
76pub mod create_sink;
77pub mod create_source;
78pub mod create_sql_function;
79pub mod create_subscription;
80pub mod create_table;
81pub mod create_table_as;
82pub mod create_user;
83pub mod create_view;
84pub mod declare_cursor;
85pub mod describe;
86pub mod discard;
87mod drop_connection;
88mod drop_database;
89pub mod drop_function;
90mod drop_index;
91pub mod drop_mv;
92mod drop_schema;
93pub mod drop_secret;
94pub mod drop_sink;
95pub mod drop_source;
96pub mod drop_subscription;
97pub mod drop_table;
98pub mod drop_user;
99mod drop_view;
100pub mod explain;
101pub mod explain_analyze_stream_job;
102pub mod extended_handle;
103pub mod fetch_cursor;
104mod flush;
105pub mod handle_privilege;
106pub mod kill_process;
107mod prepared_statement;
108pub mod privilege;
109pub mod query;
110mod recover;
111mod refresh;
112pub mod show;
113mod transaction;
114mod use_db;
115pub mod util;
116pub mod vacuum;
117pub mod variable;
118mod wait;
119
120pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
121
122/// The [`PgResponseBuilder`] used by RisingWave.
123pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
124
125/// The [`PgResponse`] used by RisingWave.
126pub type RwPgResponse = PgResponse<PgResponseStream>;
127
128#[easy_ext::ext(RwPgResponseBuilderExt)]
129impl RwPgResponseBuilder {
130    /// Append rows to the response.
131    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
132        let fields = T::fields();
133        self.values(
134            rows.into_iter()
135                .map(|row| {
136                    Row::new(
137                        row.into_owned_row()
138                            .into_iter()
139                            .zip_eq_fast(&fields)
140                            .map(|(datum, (_, ty))| {
141                                datum.map(|scalar| {
142                                    scalar.as_scalar_ref_impl().text_format(ty).into()
143                                })
144                            })
145                            .collect(),
146                    )
147                })
148                .collect_vec()
149                .into(),
150            fields_to_descriptors(fields),
151        )
152    }
153}
154
155pub fn fields_to_descriptors(
156    fields: Vec<(&str, risingwave_common::types::DataType)>,
157) -> Vec<PgFieldDescriptor> {
158    fields
159        .iter()
160        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
161        .collect()
162}
163
164pub enum PgResponseStream {
165    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
166    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
167    Rows(BoxStream<'static, RowSetResult>),
168}
169
170impl Stream for PgResponseStream {
171    type Item = std::result::Result<Vec<Row>, BoxedError>;
172
173    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174        match &mut *self {
175            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
176            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
177            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
178        }
179    }
180}
181
182impl From<Vec<Row>> for PgResponseStream {
183    fn from(rows: Vec<Row>) -> Self {
184        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
185    }
186}
187
188#[derive(Clone)]
189pub struct HandlerArgs {
190    pub session: Arc<SessionImpl>,
191    pub sql: Arc<str>,
192    pub normalized_sql: String,
193    pub with_options: WithOptions,
194}
195
196impl HandlerArgs {
197    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
198        Ok(Self {
199            session,
200            sql,
201            with_options: WithOptions::try_from(stmt)?,
202            normalized_sql: Self::normalize_sql(stmt),
203        })
204    }
205
206    /// Get normalized SQL from the statement.
207    ///
208    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
209    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
210    ///   make it suitable for the `SHOW CREATE` statements.
211    fn normalize_sql(stmt: &Statement) -> String {
212        let mut stmt = stmt.clone();
213        match &mut stmt {
214            Statement::CreateView {
215                or_replace,
216                if_not_exists,
217                ..
218            } => {
219                *or_replace = false;
220                *if_not_exists = false;
221            }
222            Statement::CreateTable {
223                or_replace,
224                if_not_exists,
225                ..
226            } => {
227                *or_replace = false;
228                *if_not_exists = false;
229            }
230            Statement::CreateIndex { if_not_exists, .. } => {
231                *if_not_exists = false;
232            }
233            Statement::CreateSource {
234                stmt: CreateSourceStatement { if_not_exists, .. },
235                ..
236            } => {
237                *if_not_exists = false;
238            }
239            Statement::CreateSink {
240                stmt: CreateSinkStatement { if_not_exists, .. },
241            } => {
242                *if_not_exists = false;
243            }
244            Statement::CreateSubscription {
245                stmt: CreateSubscriptionStatement { if_not_exists, .. },
246            } => {
247                *if_not_exists = false;
248            }
249            Statement::CreateConnection {
250                stmt: CreateConnectionStatement { if_not_exists, .. },
251            } => {
252                *if_not_exists = false;
253            }
254            _ => {}
255        }
256        stmt.to_string()
257    }
258}
259
260pub async fn handle(
261    session: Arc<SessionImpl>,
262    stmt: Statement,
263    sql: Arc<str>,
264    formats: Vec<Format>,
265) -> Result<RwPgResponse> {
266    session.clear_cancel_query_flag();
267    let _guard = session.txn_begin_implicit();
268    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
269
270    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
271
272    match stmt {
273        Statement::Explain {
274            statement,
275            analyze,
276            options,
277        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
278        Statement::ExplainAnalyzeStreamJob {
279            target,
280            duration_secs,
281        } => {
282            explain_analyze_stream_job::handle_explain_analyze_stream_job(
283                handler_args,
284                target,
285                duration_secs,
286            )
287            .await
288        }
289        Statement::CreateSource { stmt } => {
290            create_source::handle_create_source(handler_args, stmt).await
291        }
292        Statement::CreateSink { stmt } => {
293            create_sink::handle_create_sink(handler_args, stmt, false).await
294        }
295        Statement::CreateSubscription { stmt } => {
296            create_subscription::handle_create_subscription(handler_args, stmt).await
297        }
298        Statement::CreateConnection { stmt } => {
299            create_connection::handle_create_connection(handler_args, stmt).await
300        }
301        Statement::CreateSecret { stmt } => {
302            create_secret::handle_create_secret(handler_args, stmt).await
303        }
304        Statement::CreateFunction {
305            or_replace,
306            temporary,
307            if_not_exists,
308            name,
309            args,
310            returns,
311            params,
312            with_options,
313        } => {
314            // For general udf, `language` clause could be ignored
315            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
316            if params.language.is_none()
317                || !params
318                    .language
319                    .as_ref()
320                    .unwrap()
321                    .real_value()
322                    .eq_ignore_ascii_case("sql")
323            {
324                create_function::handle_create_function(
325                    handler_args,
326                    or_replace,
327                    temporary,
328                    if_not_exists,
329                    name,
330                    args,
331                    returns,
332                    params,
333                    with_options,
334                )
335                .await
336            } else {
337                create_sql_function::handle_create_sql_function(
338                    handler_args,
339                    or_replace,
340                    temporary,
341                    if_not_exists,
342                    name,
343                    args,
344                    returns,
345                    params,
346                )
347                .await
348            }
349        }
350        Statement::CreateAggregate {
351            or_replace,
352            if_not_exists,
353            name,
354            args,
355            returns,
356            params,
357            ..
358        } => {
359            create_aggregate::handle_create_aggregate(
360                handler_args,
361                or_replace,
362                if_not_exists,
363                name,
364                args,
365                returns,
366                params,
367            )
368            .await
369        }
370        Statement::CreateTable {
371            name,
372            columns,
373            wildcard_idx,
374            constraints,
375            query,
376            with_options: _, // It is put in OptimizerContext
377            // Not supported things
378            or_replace,
379            temporary,
380            if_not_exists,
381            format_encode,
382            source_watermarks,
383            append_only,
384            on_conflict,
385            with_version_columns,
386            cdc_table_info,
387            include_column_options,
388            webhook_info,
389            engine,
390        } => {
391            if or_replace {
392                bail_not_implemented!("CREATE OR REPLACE TABLE");
393            }
394            if temporary {
395                bail_not_implemented!("CREATE TEMPORARY TABLE");
396            }
397            if let Some(query) = query {
398                return create_table_as::handle_create_as(
399                    handler_args,
400                    name,
401                    if_not_exists,
402                    query,
403                    columns,
404                    append_only,
405                    on_conflict,
406                    with_version_columns
407                        .iter()
408                        .map(|col| col.real_value())
409                        .collect(),
410                    engine,
411                )
412                .await;
413            }
414            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
415            create_table::handle_create_table(
416                handler_args,
417                name,
418                columns,
419                wildcard_idx,
420                constraints,
421                if_not_exists,
422                format_encode,
423                source_watermarks,
424                append_only,
425                on_conflict,
426                with_version_columns
427                    .iter()
428                    .map(|col| col.real_value())
429                    .collect(),
430                cdc_table_info,
431                include_column_options,
432                webhook_info,
433                engine,
434            )
435            .await
436        }
437        Statement::CreateDatabase {
438            db_name,
439            if_not_exists,
440            owner,
441            resource_group,
442            barrier_interval_ms,
443            checkpoint_frequency,
444        } => {
445            create_database::handle_create_database(
446                handler_args,
447                db_name,
448                if_not_exists,
449                owner,
450                resource_group,
451                barrier_interval_ms,
452                checkpoint_frequency,
453            )
454            .await
455        }
456        Statement::CreateSchema {
457            schema_name,
458            if_not_exists,
459            owner,
460        } => {
461            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
462                .await
463        }
464        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
465        Statement::DeclareCursor { stmt } => {
466            declare_cursor::handle_declare_cursor(handler_args, stmt).await
467        }
468        Statement::FetchCursor { stmt } => {
469            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
470        }
471        Statement::CloseCursor { stmt } => {
472            close_cursor::handle_close_cursor(handler_args, stmt).await
473        }
474        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
475        Statement::Grant { .. } => {
476            handle_privilege::handle_grant_privilege(handler_args, stmt).await
477        }
478        Statement::Revoke { .. } => {
479            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
480        }
481        Statement::Describe { name, kind } => match kind {
482            DescribeKind::Fragments => {
483                describe::handle_describe_fragments(handler_args, name).await
484            }
485            DescribeKind::Plain => describe::handle_describe(handler_args, name),
486        },
487        Statement::DescribeFragment { fragment_id } => {
488            describe::handle_describe_fragment(handler_args, fragment_id).await
489        }
490        Statement::Discard(..) => discard::handle_discard(handler_args),
491        Statement::ShowObjects {
492            object: show_object,
493            filter,
494        } => show::handle_show_object(handler_args, show_object, filter).await,
495        Statement::ShowCreateObject { create_type, name } => {
496            show::handle_show_create_object(handler_args, create_type, name)
497        }
498        Statement::ShowTransactionIsolationLevel => {
499            transaction::handle_show_isolation_level(handler_args)
500        }
501        Statement::Drop(DropStatement {
502            object_type,
503            object_name,
504            if_exists,
505            drop_mode,
506        }) => {
507            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
508                match object_type {
509                    ObjectType::MaterializedView
510                    | ObjectType::View
511                    | ObjectType::Sink
512                    | ObjectType::Source
513                    | ObjectType::Subscription
514                    | ObjectType::Index
515                    | ObjectType::Table
516                    | ObjectType::Schema
517                    | ObjectType::Connection => true,
518                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
519                        bail_not_implemented!("DROP CASCADE");
520                    }
521                }
522            } else {
523                false
524            };
525            match object_type {
526                ObjectType::Table => {
527                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
528                        .await
529                }
530                ObjectType::MaterializedView => {
531                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
532                }
533                ObjectType::Index => {
534                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
535                        .await
536                }
537                ObjectType::Source => {
538                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
539                        .await
540                }
541                ObjectType::Sink => {
542                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
543                }
544                ObjectType::Subscription => {
545                    drop_subscription::handle_drop_subscription(
546                        handler_args,
547                        object_name,
548                        if_exists,
549                        cascade,
550                    )
551                    .await
552                }
553                ObjectType::Database => {
554                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
555                }
556                ObjectType::Schema => {
557                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
558                        .await
559                }
560                ObjectType::User => {
561                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
562                }
563                ObjectType::View => {
564                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
565                }
566                ObjectType::Connection => {
567                    drop_connection::handle_drop_connection(
568                        handler_args,
569                        object_name,
570                        if_exists,
571                        cascade,
572                    )
573                    .await
574                }
575                ObjectType::Secret => {
576                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
577                }
578            }
579        }
580        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
581        Statement::DropFunction {
582            if_exists,
583            func_desc,
584            option,
585        } => {
586            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
587                .await
588        }
589        Statement::DropAggregate {
590            if_exists,
591            func_desc,
592            option,
593        } => {
594            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
595                .await
596        }
597        Statement::Query(_)
598        | Statement::Insert { .. }
599        | Statement::Delete { .. }
600        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
601        Statement::CreateView {
602            materialized,
603            if_not_exists,
604            name,
605            columns,
606            query,
607            with_options: _, // It is put in OptimizerContext
608            or_replace,      // not supported
609            emit_mode,
610        } => {
611            if or_replace {
612                bail_not_implemented!("CREATE OR REPLACE VIEW");
613            }
614            if materialized {
615                create_mv::handle_create_mv(
616                    handler_args,
617                    if_not_exists,
618                    name,
619                    *query,
620                    columns,
621                    emit_mode,
622                )
623                .await
624            } else {
625                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
626                    .await
627            }
628        }
629        Statement::Flush => flush::handle_flush(handler_args).await,
630        Statement::Wait => wait::handle_wait(handler_args).await,
631        Statement::Recover => recover::handle_recover(handler_args).await,
632        Statement::SetVariable {
633            local: _,
634            variable,
635            value,
636        } => {
637            // special handle for `use database`
638            if variable.real_value().eq_ignore_ascii_case("database") {
639                let x = variable::set_var_to_param_str(&value);
640                let res = use_db::handle_use_db(
641                    handler_args,
642                    ObjectName::from(vec![Ident::from_real_value(
643                        x.as_deref().unwrap_or("default"),
644                    )]),
645                )?;
646                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
647                for notice in res.notices() {
648                    builder = builder.notice(notice);
649                }
650                return Ok(builder.into());
651            }
652            variable::handle_set(handler_args, variable, value)
653        }
654        Statement::SetTimeZone { local: _, value } => {
655            variable::handle_set_time_zone(handler_args, value)
656        }
657        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
658        Statement::CreateIndex {
659            name,
660            table_name,
661            method,
662            columns,
663            include,
664            distributed_by,
665            unique,
666            if_not_exists,
667            with_properties: _,
668        } => {
669            if unique {
670                bail_not_implemented!("create unique index");
671            }
672
673            create_index::handle_create_index(
674                handler_args,
675                if_not_exists,
676                name,
677                table_name,
678                method,
679                columns.to_vec(),
680                include,
681                distributed_by,
682            )
683            .await
684        }
685        Statement::AlterDatabase { name, operation } => match operation {
686            AlterDatabaseOperation::RenameDatabase { database_name } => {
687                alter_rename::handle_rename_database(handler_args, name, database_name).await
688            }
689            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
690                alter_owner::handle_alter_owner(
691                    handler_args,
692                    name,
693                    new_owner_name,
694                    StatementType::ALTER_DATABASE,
695                )
696                .await
697            }
698            AlterDatabaseOperation::SetParam(config_param) => {
699                let ConfigParam { param, value } = config_param;
700
701                let database_param = match param.real_value().to_uppercase().as_str() {
702                    "BARRIER_INTERVAL_MS" => {
703                        let barrier_interval_ms = match value {
704                            SetVariableValue::Default => None,
705                            SetVariableValue::Single(SetVariableValueSingle::Literal(
706                                Value::Number(num),
707                            )) => {
708                                let num = num.parse::<u32>().map_err(|e| {
709                                    ErrorCode::InvalidInputSyntax(format!(
710                                        "barrier_interval_ms must be a u32 integer: {}",
711                                        e.as_report()
712                                    ))
713                                })?;
714                                Some(num)
715                            }
716                            _ => {
717                                return Err(ErrorCode::InvalidInputSyntax(
718                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
719                                        .to_owned(),
720                                )
721                                .into());
722                            }
723                        };
724                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
725                    }
726                    "CHECKPOINT_FREQUENCY" => {
727                        let checkpoint_frequency = match value {
728                            SetVariableValue::Default => None,
729                            SetVariableValue::Single(SetVariableValueSingle::Literal(
730                                Value::Number(num),
731                            )) => {
732                                let num = num.parse::<u64>().map_err(|e| {
733                                    ErrorCode::InvalidInputSyntax(format!(
734                                        "checkpoint_frequency must be a u64 integer: {}",
735                                        e.as_report()
736                                    ))
737                                })?;
738                                Some(num)
739                            }
740                            _ => {
741                                return Err(ErrorCode::InvalidInputSyntax(
742                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
743                                        .to_owned(),
744                                )
745                                .into());
746                            }
747                        };
748                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
749                    }
750                    _ => {
751                        return Err(ErrorCode::InvalidInputSyntax(format!(
752                            "Unsupported database config parameter: {}",
753                            param.real_value()
754                        ))
755                        .into());
756                    }
757                };
758
759                alter_database_param::handle_alter_database_param(
760                    handler_args,
761                    name,
762                    database_param,
763                )
764                .await
765            }
766        },
767        Statement::AlterSchema { name, operation } => match operation {
768            AlterSchemaOperation::RenameSchema { schema_name } => {
769                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
770            }
771            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
772                alter_owner::handle_alter_owner(
773                    handler_args,
774                    name,
775                    new_owner_name,
776                    StatementType::ALTER_SCHEMA,
777                )
778                .await
779            }
780            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
781                alter_swap_rename::handle_swap_rename(
782                    handler_args,
783                    name,
784                    target_schema,
785                    StatementType::ALTER_SCHEMA,
786                )
787                .await
788            }
789        },
790        Statement::AlterTable { name, operation } => match operation {
791            AlterTableOperation::AddColumn { .. }
792            | AlterTableOperation::DropColumn { .. }
793            | AlterTableOperation::AlterColumn { .. } => {
794                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
795            }
796            AlterTableOperation::RenameTable { table_name } => {
797                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
798                    .await
799            }
800            AlterTableOperation::ChangeOwner { new_owner_name } => {
801                alter_owner::handle_alter_owner(
802                    handler_args,
803                    name,
804                    new_owner_name,
805                    StatementType::ALTER_TABLE,
806                )
807                .await
808            }
809            AlterTableOperation::SetParallelism {
810                parallelism,
811                deferred,
812            } => {
813                alter_parallelism::handle_alter_parallelism(
814                    handler_args,
815                    name,
816                    parallelism,
817                    StatementType::ALTER_TABLE,
818                    deferred,
819                )
820                .await
821            }
822            AlterTableOperation::SetSchema { new_schema_name } => {
823                alter_set_schema::handle_alter_set_schema(
824                    handler_args,
825                    name,
826                    new_schema_name,
827                    StatementType::ALTER_TABLE,
828                    None,
829                )
830                .await
831            }
832            AlterTableOperation::RefreshSchema => {
833                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
834            }
835            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
836                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
837                    handler_args,
838                    PbThrottleTarget::TableWithSource,
839                    name,
840                    rate_limit,
841                )
842                .await
843            }
844            AlterTableOperation::DropConnector => {
845                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
846                    .await
847            }
848            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
849                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
850                    handler_args,
851                    PbThrottleTarget::TableDml,
852                    name,
853                    rate_limit,
854                )
855                .await
856            }
857            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
858                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
859                    handler_args,
860                    PbThrottleTarget::CdcTable,
861                    name,
862                    rate_limit,
863                )
864                .await
865            }
866            AlterTableOperation::SwapRenameTable { target_table } => {
867                alter_swap_rename::handle_swap_rename(
868                    handler_args,
869                    name,
870                    target_table,
871                    StatementType::ALTER_TABLE,
872                )
873                .await
874            }
875            AlterTableOperation::AlterConnectorProps { alter_props } => {
876                // If exists a associated source, it should be of the same name.
877                crate::handler::alter_source_props::handle_alter_table_connector_props(
878                    handler_args,
879                    name,
880                    alter_props,
881                )
882                .await
883            }
884            AlterTableOperation::AddConstraint { .. }
885            | AlterTableOperation::DropConstraint { .. }
886            | AlterTableOperation::RenameColumn { .. }
887            | AlterTableOperation::ChangeColumn { .. }
888            | AlterTableOperation::RenameConstraint { .. } => {
889                bail_not_implemented!(
890                    "Unhandled statement: {}",
891                    Statement::AlterTable { name, operation }
892                )
893            }
894        },
895        Statement::AlterIndex { name, operation } => match operation {
896            AlterIndexOperation::RenameIndex { index_name } => {
897                alter_rename::handle_rename_index(handler_args, name, index_name).await
898            }
899            AlterIndexOperation::SetParallelism {
900                parallelism,
901                deferred,
902            } => {
903                alter_parallelism::handle_alter_parallelism(
904                    handler_args,
905                    name,
906                    parallelism,
907                    StatementType::ALTER_INDEX,
908                    deferred,
909                )
910                .await
911            }
912        },
913        Statement::AlterView {
914            materialized,
915            name,
916            operation,
917        } => {
918            let statement_type = if materialized {
919                StatementType::ALTER_MATERIALIZED_VIEW
920            } else {
921                StatementType::ALTER_VIEW
922            };
923            match operation {
924                AlterViewOperation::RenameView { view_name } => {
925                    if materialized {
926                        alter_rename::handle_rename_table(
927                            handler_args,
928                            TableType::MaterializedView,
929                            name,
930                            view_name,
931                        )
932                        .await
933                    } else {
934                        alter_rename::handle_rename_view(handler_args, name, view_name).await
935                    }
936                }
937                AlterViewOperation::SetParallelism {
938                    parallelism,
939                    deferred,
940                } => {
941                    if !materialized {
942                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
943                    }
944                    alter_parallelism::handle_alter_parallelism(
945                        handler_args,
946                        name,
947                        parallelism,
948                        statement_type,
949                        deferred,
950                    )
951                    .await
952                }
953                AlterViewOperation::SetResourceGroup {
954                    resource_group,
955                    deferred,
956                } => {
957                    if !materialized {
958                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
959                    }
960                    alter_resource_group::handle_alter_resource_group(
961                        handler_args,
962                        name,
963                        resource_group,
964                        statement_type,
965                        deferred,
966                    )
967                    .await
968                }
969                AlterViewOperation::ChangeOwner { new_owner_name } => {
970                    alter_owner::handle_alter_owner(
971                        handler_args,
972                        name,
973                        new_owner_name,
974                        statement_type,
975                    )
976                    .await
977                }
978                AlterViewOperation::SetSchema { new_schema_name } => {
979                    alter_set_schema::handle_alter_set_schema(
980                        handler_args,
981                        name,
982                        new_schema_name,
983                        statement_type,
984                        None,
985                    )
986                    .await
987                }
988                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
989                    if !materialized {
990                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
991                    }
992                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
993                        handler_args,
994                        PbThrottleTarget::Mv,
995                        name,
996                        rate_limit,
997                    )
998                    .await
999                }
1000                AlterViewOperation::SwapRenameView { target_view } => {
1001                    alter_swap_rename::handle_swap_rename(
1002                        handler_args,
1003                        name,
1004                        target_view,
1005                        statement_type,
1006                    )
1007                    .await
1008                }
1009                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1010                    if !materialized {
1011                        bail!(
1012                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1013                        );
1014                    }
1015                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1016                }
1017                AlterViewOperation::AsQuery { query } => {
1018                    if !materialized {
1019                        bail_not_implemented!("ALTER VIEW AS QUERY");
1020                    }
1021                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1022                    if !cfg!(debug_assertions) {
1023                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1024                    }
1025                    alter_mv::handle_alter_mv(handler_args, name, query).await
1026                }
1027            }
1028        }
1029
1030        Statement::AlterSink { name, operation } => match operation {
1031            AlterSinkOperation::AlterConnectorProps {
1032                alter_props: changed_props,
1033            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1034            AlterSinkOperation::RenameSink { sink_name } => {
1035                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1036            }
1037            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1038                alter_owner::handle_alter_owner(
1039                    handler_args,
1040                    name,
1041                    new_owner_name,
1042                    StatementType::ALTER_SINK,
1043                )
1044                .await
1045            }
1046            AlterSinkOperation::SetSchema { new_schema_name } => {
1047                alter_set_schema::handle_alter_set_schema(
1048                    handler_args,
1049                    name,
1050                    new_schema_name,
1051                    StatementType::ALTER_SINK,
1052                    None,
1053                )
1054                .await
1055            }
1056            AlterSinkOperation::SetParallelism {
1057                parallelism,
1058                deferred,
1059            } => {
1060                alter_parallelism::handle_alter_parallelism(
1061                    handler_args,
1062                    name,
1063                    parallelism,
1064                    StatementType::ALTER_SINK,
1065                    deferred,
1066                )
1067                .await
1068            }
1069            AlterSinkOperation::SwapRenameSink { target_sink } => {
1070                alter_swap_rename::handle_swap_rename(
1071                    handler_args,
1072                    name,
1073                    target_sink,
1074                    StatementType::ALTER_SINK,
1075                )
1076                .await
1077            }
1078            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1079                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1080                    handler_args,
1081                    PbThrottleTarget::Sink,
1082                    name,
1083                    rate_limit,
1084                )
1085                .await
1086            }
1087            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1088                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1089                    handler_args,
1090                    name,
1091                    enable,
1092                )
1093                .await
1094            }
1095        },
1096        Statement::AlterSubscription { name, operation } => match operation {
1097            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1098                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1099                    .await
1100            }
1101            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1102                alter_owner::handle_alter_owner(
1103                    handler_args,
1104                    name,
1105                    new_owner_name,
1106                    StatementType::ALTER_SUBSCRIPTION,
1107                )
1108                .await
1109            }
1110            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1111                alter_set_schema::handle_alter_set_schema(
1112                    handler_args,
1113                    name,
1114                    new_schema_name,
1115                    StatementType::ALTER_SUBSCRIPTION,
1116                    None,
1117                )
1118                .await
1119            }
1120            AlterSubscriptionOperation::SwapRenameSubscription {
1121                target_subscription,
1122            } => {
1123                alter_swap_rename::handle_swap_rename(
1124                    handler_args,
1125                    name,
1126                    target_subscription,
1127                    StatementType::ALTER_SUBSCRIPTION,
1128                )
1129                .await
1130            }
1131        },
1132        Statement::AlterSource { name, operation } => match operation {
1133            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1134                alter_source_props::handle_alter_source_connector_props(
1135                    handler_args,
1136                    name,
1137                    alter_props,
1138                )
1139                .await
1140            }
1141            AlterSourceOperation::RenameSource { source_name } => {
1142                alter_rename::handle_rename_source(handler_args, name, source_name).await
1143            }
1144            AlterSourceOperation::AddColumn { .. } => {
1145                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1146            }
1147            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1148                alter_owner::handle_alter_owner(
1149                    handler_args,
1150                    name,
1151                    new_owner_name,
1152                    StatementType::ALTER_SOURCE,
1153                )
1154                .await
1155            }
1156            AlterSourceOperation::SetSchema { new_schema_name } => {
1157                alter_set_schema::handle_alter_set_schema(
1158                    handler_args,
1159                    name,
1160                    new_schema_name,
1161                    StatementType::ALTER_SOURCE,
1162                    None,
1163                )
1164                .await
1165            }
1166            AlterSourceOperation::FormatEncode { format_encode } => {
1167                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1168                    .await
1169            }
1170            AlterSourceOperation::RefreshSchema => {
1171                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1172            }
1173            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1174                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1175                    handler_args,
1176                    PbThrottleTarget::Source,
1177                    name,
1178                    rate_limit,
1179                )
1180                .await
1181            }
1182            AlterSourceOperation::SwapRenameSource { target_source } => {
1183                alter_swap_rename::handle_swap_rename(
1184                    handler_args,
1185                    name,
1186                    target_source,
1187                    StatementType::ALTER_SOURCE,
1188                )
1189                .await
1190            }
1191            AlterSourceOperation::SetParallelism {
1192                parallelism,
1193                deferred,
1194            } => {
1195                alter_parallelism::handle_alter_parallelism(
1196                    handler_args,
1197                    name,
1198                    parallelism,
1199                    StatementType::ALTER_SOURCE,
1200                    deferred,
1201                )
1202                .await
1203            }
1204        },
1205        Statement::AlterFunction {
1206            name,
1207            args,
1208            operation,
1209        } => match operation {
1210            AlterFunctionOperation::SetSchema { new_schema_name } => {
1211                alter_set_schema::handle_alter_set_schema(
1212                    handler_args,
1213                    name,
1214                    new_schema_name,
1215                    StatementType::ALTER_FUNCTION,
1216                    args,
1217                )
1218                .await
1219            }
1220        },
1221        Statement::AlterConnection { name, operation } => match operation {
1222            AlterConnectionOperation::SetSchema { new_schema_name } => {
1223                alter_set_schema::handle_alter_set_schema(
1224                    handler_args,
1225                    name,
1226                    new_schema_name,
1227                    StatementType::ALTER_CONNECTION,
1228                    None,
1229                )
1230                .await
1231            }
1232            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1233                alter_owner::handle_alter_owner(
1234                    handler_args,
1235                    name,
1236                    new_owner_name,
1237                    StatementType::ALTER_CONNECTION,
1238                )
1239                .await
1240            }
1241        },
1242        Statement::AlterSystem { param, value } => {
1243            alter_system::handle_alter_system(handler_args, param, value).await
1244        }
1245        Statement::AlterSecret {
1246            name,
1247            with_options,
1248            operation,
1249        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1250        Statement::AlterFragment {
1251            fragment_id,
1252            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1253        } => {
1254            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1255                &handler_args.session,
1256                PbThrottleTarget::Fragment,
1257                fragment_id,
1258                rate_limit,
1259                StatementType::SET_VARIABLE,
1260            )
1261            .await
1262        }
1263        Statement::AlterDefaultPrivileges { .. } => {
1264            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1265        }
1266        Statement::StartTransaction { modes } => {
1267            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1268        }
1269        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1270        Statement::Commit { chain } => {
1271            transaction::handle_commit(handler_args, COMMIT, chain).await
1272        }
1273        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1274        Statement::Rollback { chain } => {
1275            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1276        }
1277        Statement::SetTransaction {
1278            modes,
1279            snapshot,
1280            session,
1281        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1282        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1283        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1284        Statement::Comment {
1285            object_type,
1286            object_name,
1287            comment,
1288        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1289        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1290        Statement::Prepare {
1291            name,
1292            data_types,
1293            statement,
1294        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1295        Statement::Deallocate { name, prepare } => {
1296            prepared_statement::handle_deallocate(name, prepare).await
1297        }
1298        Statement::Vacuum { object_name, full } => {
1299            vacuum::handle_vacuum(handler_args, object_name, full).await
1300        }
1301        Statement::Refresh { table_name } => {
1302            refresh::handle_refresh(handler_args, table_name).await
1303        }
1304        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1305    }
1306}
1307
1308fn check_ban_ddl_for_iceberg_engine_table(
1309    session: Arc<SessionImpl>,
1310    stmt: &Statement,
1311) -> Result<()> {
1312    match stmt {
1313        Statement::AlterTable {
1314            name,
1315            operation:
1316                operation @ (AlterTableOperation::AddColumn { .. }
1317                | AlterTableOperation::DropColumn { .. }),
1318        } => {
1319            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1320            if table.is_iceberg_engine_table() {
1321                bail!(
1322                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1323                    operation,
1324                    schema_name,
1325                    name
1326                );
1327            }
1328        }
1329
1330        Statement::AlterTable {
1331            name,
1332            operation: AlterTableOperation::RenameTable { .. },
1333        } => {
1334            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1335            if table.is_iceberg_engine_table() {
1336                bail!(
1337                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1338                    schema_name,
1339                    name
1340                );
1341            }
1342        }
1343
1344        Statement::AlterTable {
1345            name,
1346            operation: AlterTableOperation::ChangeOwner { .. },
1347        } => {
1348            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1349            if table.is_iceberg_engine_table() {
1350                bail!(
1351                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1352                    schema_name,
1353                    name
1354                );
1355            }
1356        }
1357
1358        Statement::AlterTable {
1359            name,
1360            operation: AlterTableOperation::SetParallelism { .. },
1361        } => {
1362            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1363            if table.is_iceberg_engine_table() {
1364                bail!(
1365                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1366                    schema_name,
1367                    name
1368                );
1369            }
1370        }
1371
1372        Statement::AlterTable {
1373            name,
1374            operation: AlterTableOperation::SetSchema { .. },
1375        } => {
1376            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1377            if table.is_iceberg_engine_table() {
1378                bail!(
1379                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1380                    schema_name,
1381                    name
1382                );
1383            }
1384        }
1385
1386        Statement::AlterTable {
1387            name,
1388            operation: AlterTableOperation::RefreshSchema,
1389        } => {
1390            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1391            if table.is_iceberg_engine_table() {
1392                bail!(
1393                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1394                    schema_name,
1395                    name
1396                );
1397            }
1398        }
1399
1400        Statement::AlterTable {
1401            name,
1402            operation: AlterTableOperation::SetSourceRateLimit { .. },
1403        } => {
1404            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1405            if table.is_iceberg_engine_table() {
1406                bail!(
1407                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1408                    schema_name,
1409                    name
1410                );
1411            }
1412        }
1413
1414        _ => {}
1415    }
1416
1417    Ok(())
1418}