risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63mod alter_table_with_sr;
64pub mod alter_user;
65pub mod cancel_job;
66pub mod close_cursor;
67mod comment;
68pub mod create_aggregate;
69pub mod create_connection;
70mod create_database;
71pub mod create_function;
72pub mod create_index;
73pub mod create_mv;
74pub mod create_schema;
75pub mod create_secret;
76pub mod create_sink;
77pub mod create_source;
78pub mod create_sql_function;
79pub mod create_subscription;
80pub mod create_table;
81pub mod create_table_as;
82pub mod create_user;
83pub mod create_view;
84pub mod declare_cursor;
85pub mod describe;
86pub mod discard;
87mod drop_connection;
88mod drop_database;
89pub mod drop_function;
90mod drop_index;
91pub mod drop_mv;
92mod drop_schema;
93pub mod drop_secret;
94pub mod drop_sink;
95pub mod drop_source;
96pub mod drop_subscription;
97pub mod drop_table;
98pub mod drop_user;
99mod drop_view;
100pub mod explain;
101pub mod explain_analyze_stream_job;
102pub mod extended_handle;
103pub mod fetch_cursor;
104mod flush;
105pub mod handle_privilege;
106pub mod kill_process;
107mod prepared_statement;
108pub mod privilege;
109pub mod query;
110mod recover;
111pub mod show;
112mod transaction;
113mod use_db;
114pub mod util;
115pub mod variable;
116mod wait;
117
118pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
119
120/// The [`PgResponseBuilder`] used by RisingWave.
121pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
122
123/// The [`PgResponse`] used by RisingWave.
124pub type RwPgResponse = PgResponse<PgResponseStream>;
125
126#[easy_ext::ext(RwPgResponseBuilderExt)]
127impl RwPgResponseBuilder {
128    /// Append rows to the response.
129    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
130        let fields = T::fields();
131        self.values(
132            rows.into_iter()
133                .map(|row| {
134                    Row::new(
135                        row.into_owned_row()
136                            .into_iter()
137                            .zip_eq_fast(&fields)
138                            .map(|(datum, (_, ty))| {
139                                datum.map(|scalar| {
140                                    scalar.as_scalar_ref_impl().text_format(ty).into()
141                                })
142                            })
143                            .collect(),
144                    )
145                })
146                .collect_vec()
147                .into(),
148            fields_to_descriptors(fields),
149        )
150    }
151}
152
153pub fn fields_to_descriptors(
154    fields: Vec<(&str, risingwave_common::types::DataType)>,
155) -> Vec<PgFieldDescriptor> {
156    fields
157        .iter()
158        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
159        .collect()
160}
161
162pub enum PgResponseStream {
163    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
164    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
165    Rows(BoxStream<'static, RowSetResult>),
166}
167
168impl Stream for PgResponseStream {
169    type Item = std::result::Result<Vec<Row>, BoxedError>;
170
171    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172        match &mut *self {
173            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
174            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
175            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
176        }
177    }
178}
179
180impl From<Vec<Row>> for PgResponseStream {
181    fn from(rows: Vec<Row>) -> Self {
182        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
183    }
184}
185
186#[derive(Clone)]
187pub struct HandlerArgs {
188    pub session: Arc<SessionImpl>,
189    pub sql: Arc<str>,
190    pub normalized_sql: String,
191    pub with_options: WithOptions,
192}
193
194impl HandlerArgs {
195    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
196        Ok(Self {
197            session,
198            sql,
199            with_options: WithOptions::try_from(stmt)?,
200            normalized_sql: Self::normalize_sql(stmt),
201        })
202    }
203
204    /// Get normalized SQL from the statement.
205    ///
206    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
207    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
208    ///   make it suitable for the `SHOW CREATE` statements.
209    fn normalize_sql(stmt: &Statement) -> String {
210        let mut stmt = stmt.clone();
211        match &mut stmt {
212            Statement::CreateView {
213                or_replace,
214                if_not_exists,
215                ..
216            } => {
217                *or_replace = false;
218                *if_not_exists = false;
219            }
220            Statement::CreateTable {
221                or_replace,
222                if_not_exists,
223                ..
224            } => {
225                *or_replace = false;
226                *if_not_exists = false;
227            }
228            Statement::CreateIndex { if_not_exists, .. } => {
229                *if_not_exists = false;
230            }
231            Statement::CreateSource {
232                stmt: CreateSourceStatement { if_not_exists, .. },
233                ..
234            } => {
235                *if_not_exists = false;
236            }
237            Statement::CreateSink {
238                stmt: CreateSinkStatement { if_not_exists, .. },
239            } => {
240                *if_not_exists = false;
241            }
242            Statement::CreateSubscription {
243                stmt: CreateSubscriptionStatement { if_not_exists, .. },
244            } => {
245                *if_not_exists = false;
246            }
247            Statement::CreateConnection {
248                stmt: CreateConnectionStatement { if_not_exists, .. },
249            } => {
250                *if_not_exists = false;
251            }
252            _ => {}
253        }
254        stmt.to_string()
255    }
256}
257
258pub async fn handle(
259    session: Arc<SessionImpl>,
260    stmt: Statement,
261    sql: Arc<str>,
262    formats: Vec<Format>,
263) -> Result<RwPgResponse> {
264    session.clear_cancel_query_flag();
265    let _guard = session.txn_begin_implicit();
266    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
267
268    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
269
270    match stmt {
271        Statement::Explain {
272            statement,
273            analyze,
274            options,
275        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
276        Statement::ExplainAnalyzeStreamJob {
277            target,
278            duration_secs,
279        } => {
280            explain_analyze_stream_job::handle_explain_analyze_stream_job(
281                handler_args,
282                target,
283                duration_secs,
284            )
285            .await
286        }
287        Statement::CreateSource { stmt } => {
288            create_source::handle_create_source(handler_args, stmt).await
289        }
290        Statement::CreateSink { stmt } => {
291            create_sink::handle_create_sink(handler_args, stmt, false).await
292        }
293        Statement::CreateSubscription { stmt } => {
294            create_subscription::handle_create_subscription(handler_args, stmt).await
295        }
296        Statement::CreateConnection { stmt } => {
297            create_connection::handle_create_connection(handler_args, stmt).await
298        }
299        Statement::CreateSecret { stmt } => {
300            create_secret::handle_create_secret(handler_args, stmt).await
301        }
302        Statement::CreateFunction {
303            or_replace,
304            temporary,
305            if_not_exists,
306            name,
307            args,
308            returns,
309            params,
310            with_options,
311        } => {
312            // For general udf, `language` clause could be ignored
313            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
314            if params.language.is_none()
315                || !params
316                    .language
317                    .as_ref()
318                    .unwrap()
319                    .real_value()
320                    .eq_ignore_ascii_case("sql")
321            {
322                create_function::handle_create_function(
323                    handler_args,
324                    or_replace,
325                    temporary,
326                    if_not_exists,
327                    name,
328                    args,
329                    returns,
330                    params,
331                    with_options,
332                )
333                .await
334            } else {
335                create_sql_function::handle_create_sql_function(
336                    handler_args,
337                    or_replace,
338                    temporary,
339                    if_not_exists,
340                    name,
341                    args,
342                    returns,
343                    params,
344                )
345                .await
346            }
347        }
348        Statement::CreateAggregate {
349            or_replace,
350            if_not_exists,
351            name,
352            args,
353            returns,
354            params,
355            ..
356        } => {
357            create_aggregate::handle_create_aggregate(
358                handler_args,
359                or_replace,
360                if_not_exists,
361                name,
362                args,
363                returns,
364                params,
365            )
366            .await
367        }
368        Statement::CreateTable {
369            name,
370            columns,
371            wildcard_idx,
372            constraints,
373            query,
374            with_options: _, // It is put in OptimizerContext
375            // Not supported things
376            or_replace,
377            temporary,
378            if_not_exists,
379            format_encode,
380            source_watermarks,
381            append_only,
382            on_conflict,
383            with_version_column,
384            cdc_table_info,
385            include_column_options,
386            webhook_info,
387            engine,
388        } => {
389            if or_replace {
390                bail_not_implemented!("CREATE OR REPLACE TABLE");
391            }
392            if temporary {
393                bail_not_implemented!("CREATE TEMPORARY TABLE");
394            }
395            if let Some(query) = query {
396                return create_table_as::handle_create_as(
397                    handler_args,
398                    name,
399                    if_not_exists,
400                    query,
401                    columns,
402                    append_only,
403                    on_conflict,
404                    with_version_column.map(|x| x.real_value()),
405                    engine,
406                )
407                .await;
408            }
409            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
410            create_table::handle_create_table(
411                handler_args,
412                name,
413                columns,
414                wildcard_idx,
415                constraints,
416                if_not_exists,
417                format_encode,
418                source_watermarks,
419                append_only,
420                on_conflict,
421                with_version_column.map(|x| x.real_value()),
422                cdc_table_info,
423                include_column_options,
424                webhook_info,
425                engine,
426            )
427            .await
428        }
429        Statement::CreateDatabase {
430            db_name,
431            if_not_exists,
432            owner,
433            resource_group,
434            barrier_interval_ms,
435            checkpoint_frequency,
436        } => {
437            create_database::handle_create_database(
438                handler_args,
439                db_name,
440                if_not_exists,
441                owner,
442                resource_group,
443                barrier_interval_ms,
444                checkpoint_frequency,
445            )
446            .await
447        }
448        Statement::CreateSchema {
449            schema_name,
450            if_not_exists,
451            owner,
452        } => {
453            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
454                .await
455        }
456        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
457        Statement::DeclareCursor { stmt } => {
458            declare_cursor::handle_declare_cursor(handler_args, stmt).await
459        }
460        Statement::FetchCursor { stmt } => {
461            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
462        }
463        Statement::CloseCursor { stmt } => {
464            close_cursor::handle_close_cursor(handler_args, stmt).await
465        }
466        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
467        Statement::Grant { .. } => {
468            handle_privilege::handle_grant_privilege(handler_args, stmt).await
469        }
470        Statement::Revoke { .. } => {
471            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
472        }
473        Statement::Describe { name, kind } => match kind {
474            DescribeKind::Fragments => {
475                describe::handle_describe_fragments(handler_args, name).await
476            }
477            DescribeKind::Plain => describe::handle_describe(handler_args, name),
478        },
479        Statement::DescribeFragment { fragment_id } => {
480            describe::handle_describe_fragment(handler_args, fragment_id).await
481        }
482        Statement::Discard(..) => discard::handle_discard(handler_args),
483        Statement::ShowObjects {
484            object: show_object,
485            filter,
486        } => show::handle_show_object(handler_args, show_object, filter).await,
487        Statement::ShowCreateObject { create_type, name } => {
488            show::handle_show_create_object(handler_args, create_type, name)
489        }
490        Statement::ShowTransactionIsolationLevel => {
491            transaction::handle_show_isolation_level(handler_args)
492        }
493        Statement::Drop(DropStatement {
494            object_type,
495            object_name,
496            if_exists,
497            drop_mode,
498        }) => {
499            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
500                match object_type {
501                    ObjectType::MaterializedView
502                    | ObjectType::View
503                    | ObjectType::Sink
504                    | ObjectType::Source
505                    | ObjectType::Subscription
506                    | ObjectType::Index
507                    | ObjectType::Table
508                    | ObjectType::Schema => true,
509                    ObjectType::Database
510                    | ObjectType::User
511                    | ObjectType::Connection
512                    | ObjectType::Secret => {
513                        bail_not_implemented!("DROP CASCADE");
514                    }
515                }
516            } else {
517                false
518            };
519            match object_type {
520                ObjectType::Table => {
521                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
522                        .await
523                }
524                ObjectType::MaterializedView => {
525                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
526                }
527                ObjectType::Index => {
528                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
529                        .await
530                }
531                ObjectType::Source => {
532                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
533                        .await
534                }
535                ObjectType::Sink => {
536                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
537                }
538                ObjectType::Subscription => {
539                    drop_subscription::handle_drop_subscription(
540                        handler_args,
541                        object_name,
542                        if_exists,
543                        cascade,
544                    )
545                    .await
546                }
547                ObjectType::Database => {
548                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
549                }
550                ObjectType::Schema => {
551                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
552                        .await
553                }
554                ObjectType::User => {
555                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
556                }
557                ObjectType::View => {
558                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
559                }
560                ObjectType::Connection => {
561                    drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
562                        .await
563                }
564                ObjectType::Secret => {
565                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
566                }
567            }
568        }
569        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
570        Statement::DropFunction {
571            if_exists,
572            func_desc,
573            option,
574        } => {
575            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
576                .await
577        }
578        Statement::DropAggregate {
579            if_exists,
580            func_desc,
581            option,
582        } => {
583            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
584                .await
585        }
586        Statement::Query(_)
587        | Statement::Insert { .. }
588        | Statement::Delete { .. }
589        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
590        Statement::CreateView {
591            materialized,
592            if_not_exists,
593            name,
594            columns,
595            query,
596            with_options: _, // It is put in OptimizerContext
597            or_replace,      // not supported
598            emit_mode,
599        } => {
600            if or_replace {
601                bail_not_implemented!("CREATE OR REPLACE VIEW");
602            }
603            if materialized {
604                create_mv::handle_create_mv(
605                    handler_args,
606                    if_not_exists,
607                    name,
608                    *query,
609                    columns,
610                    emit_mode,
611                )
612                .await
613            } else {
614                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
615                    .await
616            }
617        }
618        Statement::Flush => flush::handle_flush(handler_args).await,
619        Statement::Wait => wait::handle_wait(handler_args).await,
620        Statement::Recover => recover::handle_recover(handler_args).await,
621        Statement::SetVariable {
622            local: _,
623            variable,
624            value,
625        } => {
626            // special handle for `use database`
627            if variable.real_value().eq_ignore_ascii_case("database") {
628                let x = variable::set_var_to_param_str(&value);
629                let res = use_db::handle_use_db(
630                    handler_args,
631                    ObjectName::from(vec![Ident::from_real_value(
632                        x.as_deref().unwrap_or("default"),
633                    )]),
634                )?;
635                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
636                for notice in res.notices() {
637                    builder = builder.notice(notice);
638                }
639                return Ok(builder.into());
640            }
641            variable::handle_set(handler_args, variable, value)
642        }
643        Statement::SetTimeZone { local: _, value } => {
644            variable::handle_set_time_zone(handler_args, value)
645        }
646        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
647        Statement::CreateIndex {
648            name,
649            table_name,
650            columns,
651            include,
652            distributed_by,
653            unique,
654            if_not_exists,
655        } => {
656            if unique {
657                bail_not_implemented!("create unique index");
658            }
659
660            create_index::handle_create_index(
661                handler_args,
662                if_not_exists,
663                name,
664                table_name,
665                columns.to_vec(),
666                include,
667                distributed_by,
668            )
669            .await
670        }
671        Statement::AlterDatabase { name, operation } => match operation {
672            AlterDatabaseOperation::RenameDatabase { database_name } => {
673                alter_rename::handle_rename_database(handler_args, name, database_name).await
674            }
675            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
676                alter_owner::handle_alter_owner(
677                    handler_args,
678                    name,
679                    new_owner_name,
680                    StatementType::ALTER_DATABASE,
681                )
682                .await
683            }
684            AlterDatabaseOperation::SetParam(config_param) => {
685                let ConfigParam { param, value } = config_param;
686
687                let database_param = match param.real_value().to_uppercase().as_str() {
688                    "BARRIER_INTERVAL_MS" => {
689                        let barrier_interval_ms = match value {
690                            SetVariableValue::Default => None,
691                            SetVariableValue::Single(SetVariableValueSingle::Literal(
692                                Value::Number(num),
693                            )) => {
694                                let num = num.parse::<u32>().map_err(|e| {
695                                    ErrorCode::InvalidInputSyntax(format!(
696                                        "barrier_interval_ms must be a u32 integer: {}",
697                                        e.as_report()
698                                    ))
699                                })?;
700                                Some(num)
701                            }
702                            _ => {
703                                return Err(ErrorCode::InvalidInputSyntax(
704                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
705                                        .to_owned(),
706                                )
707                                .into());
708                            }
709                        };
710                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
711                    }
712                    "CHECKPOINT_FREQUENCY" => {
713                        let checkpoint_frequency = match value {
714                            SetVariableValue::Default => None,
715                            SetVariableValue::Single(SetVariableValueSingle::Literal(
716                                Value::Number(num),
717                            )) => {
718                                let num = num.parse::<u64>().map_err(|e| {
719                                    ErrorCode::InvalidInputSyntax(format!(
720                                        "checkpoint_frequency must be a u64 integer: {}",
721                                        e.as_report()
722                                    ))
723                                })?;
724                                Some(num)
725                            }
726                            _ => {
727                                return Err(ErrorCode::InvalidInputSyntax(
728                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
729                                        .to_owned(),
730                                )
731                                .into());
732                            }
733                        };
734                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
735                    }
736                    _ => {
737                        return Err(ErrorCode::InvalidInputSyntax(format!(
738                            "Unsupported database config parameter: {}",
739                            param.real_value()
740                        ))
741                        .into());
742                    }
743                };
744
745                alter_database_param::handle_alter_database_param(
746                    handler_args,
747                    name,
748                    database_param,
749                )
750                .await
751            }
752        },
753        Statement::AlterSchema { name, operation } => match operation {
754            AlterSchemaOperation::RenameSchema { schema_name } => {
755                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
756            }
757            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
758                alter_owner::handle_alter_owner(
759                    handler_args,
760                    name,
761                    new_owner_name,
762                    StatementType::ALTER_SCHEMA,
763                )
764                .await
765            }
766            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
767                alter_swap_rename::handle_swap_rename(
768                    handler_args,
769                    name,
770                    target_schema,
771                    StatementType::ALTER_SCHEMA,
772                )
773                .await
774            }
775        },
776        Statement::AlterTable { name, operation } => match operation {
777            AlterTableOperation::AddColumn { .. }
778            | AlterTableOperation::DropColumn { .. }
779            | AlterTableOperation::AlterColumn { .. } => {
780                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
781            }
782            AlterTableOperation::RenameTable { table_name } => {
783                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
784                    .await
785            }
786            AlterTableOperation::ChangeOwner { new_owner_name } => {
787                alter_owner::handle_alter_owner(
788                    handler_args,
789                    name,
790                    new_owner_name,
791                    StatementType::ALTER_TABLE,
792                )
793                .await
794            }
795            AlterTableOperation::SetParallelism {
796                parallelism,
797                deferred,
798            } => {
799                alter_parallelism::handle_alter_parallelism(
800                    handler_args,
801                    name,
802                    parallelism,
803                    StatementType::ALTER_TABLE,
804                    deferred,
805                )
806                .await
807            }
808            AlterTableOperation::SetSchema { new_schema_name } => {
809                alter_set_schema::handle_alter_set_schema(
810                    handler_args,
811                    name,
812                    new_schema_name,
813                    StatementType::ALTER_TABLE,
814                    None,
815                )
816                .await
817            }
818            AlterTableOperation::RefreshSchema => {
819                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
820            }
821            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
822                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
823                    handler_args,
824                    PbThrottleTarget::TableWithSource,
825                    name,
826                    rate_limit,
827                )
828                .await
829            }
830            AlterTableOperation::DropConnector => {
831                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
832                    .await
833            }
834            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
835                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
836                    handler_args,
837                    PbThrottleTarget::TableDml,
838                    name,
839                    rate_limit,
840                )
841                .await
842            }
843            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
844                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
845                    handler_args,
846                    PbThrottleTarget::CdcTable,
847                    name,
848                    rate_limit,
849                )
850                .await
851            }
852            AlterTableOperation::SwapRenameTable { target_table } => {
853                alter_swap_rename::handle_swap_rename(
854                    handler_args,
855                    name,
856                    target_table,
857                    StatementType::ALTER_TABLE,
858                )
859                .await
860            }
861            AlterTableOperation::AlterConnectorProps { alter_props } => {
862                // If exists a associated source, it should be of the same name.
863                crate::handler::alter_source_props::handle_alter_table_connector_props(
864                    handler_args,
865                    name,
866                    alter_props,
867                )
868                .await
869            }
870            AlterTableOperation::AddConstraint { .. }
871            | AlterTableOperation::DropConstraint { .. }
872            | AlterTableOperation::RenameColumn { .. }
873            | AlterTableOperation::ChangeColumn { .. }
874            | AlterTableOperation::RenameConstraint { .. } => {
875                bail_not_implemented!(
876                    "Unhandled statement: {}",
877                    Statement::AlterTable { name, operation }
878                )
879            }
880        },
881        Statement::AlterIndex { name, operation } => match operation {
882            AlterIndexOperation::RenameIndex { index_name } => {
883                alter_rename::handle_rename_index(handler_args, name, index_name).await
884            }
885            AlterIndexOperation::SetParallelism {
886                parallelism,
887                deferred,
888            } => {
889                alter_parallelism::handle_alter_parallelism(
890                    handler_args,
891                    name,
892                    parallelism,
893                    StatementType::ALTER_INDEX,
894                    deferred,
895                )
896                .await
897            }
898        },
899        Statement::AlterView {
900            materialized,
901            name,
902            operation,
903        } => {
904            let statement_type = if materialized {
905                StatementType::ALTER_MATERIALIZED_VIEW
906            } else {
907                StatementType::ALTER_VIEW
908            };
909            match operation {
910                AlterViewOperation::RenameView { view_name } => {
911                    if materialized {
912                        alter_rename::handle_rename_table(
913                            handler_args,
914                            TableType::MaterializedView,
915                            name,
916                            view_name,
917                        )
918                        .await
919                    } else {
920                        alter_rename::handle_rename_view(handler_args, name, view_name).await
921                    }
922                }
923                AlterViewOperation::SetParallelism {
924                    parallelism,
925                    deferred,
926                } => {
927                    if !materialized {
928                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
929                    }
930                    alter_parallelism::handle_alter_parallelism(
931                        handler_args,
932                        name,
933                        parallelism,
934                        statement_type,
935                        deferred,
936                    )
937                    .await
938                }
939                AlterViewOperation::SetResourceGroup {
940                    resource_group,
941                    deferred,
942                } => {
943                    if !materialized {
944                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
945                    }
946                    alter_resource_group::handle_alter_resource_group(
947                        handler_args,
948                        name,
949                        resource_group,
950                        statement_type,
951                        deferred,
952                    )
953                    .await
954                }
955                AlterViewOperation::ChangeOwner { new_owner_name } => {
956                    alter_owner::handle_alter_owner(
957                        handler_args,
958                        name,
959                        new_owner_name,
960                        statement_type,
961                    )
962                    .await
963                }
964                AlterViewOperation::SetSchema { new_schema_name } => {
965                    alter_set_schema::handle_alter_set_schema(
966                        handler_args,
967                        name,
968                        new_schema_name,
969                        statement_type,
970                        None,
971                    )
972                    .await
973                }
974                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
975                    if !materialized {
976                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
977                    }
978                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
979                        handler_args,
980                        PbThrottleTarget::Mv,
981                        name,
982                        rate_limit,
983                    )
984                    .await
985                }
986                AlterViewOperation::SwapRenameView { target_view } => {
987                    alter_swap_rename::handle_swap_rename(
988                        handler_args,
989                        name,
990                        target_view,
991                        statement_type,
992                    )
993                    .await
994                }
995                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
996                    if !materialized {
997                        bail!(
998                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
999                        );
1000                    }
1001                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1002                }
1003                AlterViewOperation::AsQuery { query } => {
1004                    if !materialized {
1005                        bail_not_implemented!("ALTER VIEW AS QUERY");
1006                    }
1007                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1008                    if !cfg!(debug_assertions) {
1009                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1010                    }
1011                    alter_mv::handle_alter_mv(handler_args, name, query).await
1012                }
1013            }
1014        }
1015
1016        Statement::AlterSink { name, operation } => match operation {
1017            AlterSinkOperation::AlterConnectorProps {
1018                alter_props: changed_props,
1019            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1020            AlterSinkOperation::RenameSink { sink_name } => {
1021                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1022            }
1023            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1024                alter_owner::handle_alter_owner(
1025                    handler_args,
1026                    name,
1027                    new_owner_name,
1028                    StatementType::ALTER_SINK,
1029                )
1030                .await
1031            }
1032            AlterSinkOperation::SetSchema { new_schema_name } => {
1033                alter_set_schema::handle_alter_set_schema(
1034                    handler_args,
1035                    name,
1036                    new_schema_name,
1037                    StatementType::ALTER_SINK,
1038                    None,
1039                )
1040                .await
1041            }
1042            AlterSinkOperation::SetParallelism {
1043                parallelism,
1044                deferred,
1045            } => {
1046                alter_parallelism::handle_alter_parallelism(
1047                    handler_args,
1048                    name,
1049                    parallelism,
1050                    StatementType::ALTER_SINK,
1051                    deferred,
1052                )
1053                .await
1054            }
1055            AlterSinkOperation::SwapRenameSink { target_sink } => {
1056                alter_swap_rename::handle_swap_rename(
1057                    handler_args,
1058                    name,
1059                    target_sink,
1060                    StatementType::ALTER_SINK,
1061                )
1062                .await
1063            }
1064            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1065                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1066                    handler_args,
1067                    PbThrottleTarget::Sink,
1068                    name,
1069                    rate_limit,
1070                )
1071                .await
1072            }
1073            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1074                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1075                    handler_args,
1076                    name,
1077                    enable,
1078                )
1079                .await
1080            }
1081        },
1082        Statement::AlterSubscription { name, operation } => match operation {
1083            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1084                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1085                    .await
1086            }
1087            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1088                alter_owner::handle_alter_owner(
1089                    handler_args,
1090                    name,
1091                    new_owner_name,
1092                    StatementType::ALTER_SUBSCRIPTION,
1093                )
1094                .await
1095            }
1096            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1097                alter_set_schema::handle_alter_set_schema(
1098                    handler_args,
1099                    name,
1100                    new_schema_name,
1101                    StatementType::ALTER_SUBSCRIPTION,
1102                    None,
1103                )
1104                .await
1105            }
1106            AlterSubscriptionOperation::SwapRenameSubscription {
1107                target_subscription,
1108            } => {
1109                alter_swap_rename::handle_swap_rename(
1110                    handler_args,
1111                    name,
1112                    target_subscription,
1113                    StatementType::ALTER_SUBSCRIPTION,
1114                )
1115                .await
1116            }
1117        },
1118        Statement::AlterSource { name, operation } => match operation {
1119            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1120                alter_source_props::handle_alter_source_connector_props(
1121                    handler_args,
1122                    name,
1123                    alter_props,
1124                )
1125                .await
1126            }
1127            AlterSourceOperation::RenameSource { source_name } => {
1128                alter_rename::handle_rename_source(handler_args, name, source_name).await
1129            }
1130            AlterSourceOperation::AddColumn { .. } => {
1131                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1132            }
1133            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1134                alter_owner::handle_alter_owner(
1135                    handler_args,
1136                    name,
1137                    new_owner_name,
1138                    StatementType::ALTER_SOURCE,
1139                )
1140                .await
1141            }
1142            AlterSourceOperation::SetSchema { new_schema_name } => {
1143                alter_set_schema::handle_alter_set_schema(
1144                    handler_args,
1145                    name,
1146                    new_schema_name,
1147                    StatementType::ALTER_SOURCE,
1148                    None,
1149                )
1150                .await
1151            }
1152            AlterSourceOperation::FormatEncode { format_encode } => {
1153                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1154                    .await
1155            }
1156            AlterSourceOperation::RefreshSchema => {
1157                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1158            }
1159            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1160                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1161                    handler_args,
1162                    PbThrottleTarget::Source,
1163                    name,
1164                    rate_limit,
1165                )
1166                .await
1167            }
1168            AlterSourceOperation::SwapRenameSource { target_source } => {
1169                alter_swap_rename::handle_swap_rename(
1170                    handler_args,
1171                    name,
1172                    target_source,
1173                    StatementType::ALTER_SOURCE,
1174                )
1175                .await
1176            }
1177            AlterSourceOperation::SetParallelism {
1178                parallelism,
1179                deferred,
1180            } => {
1181                alter_parallelism::handle_alter_parallelism(
1182                    handler_args,
1183                    name,
1184                    parallelism,
1185                    StatementType::ALTER_SOURCE,
1186                    deferred,
1187                )
1188                .await
1189            }
1190        },
1191        Statement::AlterFunction {
1192            name,
1193            args,
1194            operation,
1195        } => match operation {
1196            AlterFunctionOperation::SetSchema { new_schema_name } => {
1197                alter_set_schema::handle_alter_set_schema(
1198                    handler_args,
1199                    name,
1200                    new_schema_name,
1201                    StatementType::ALTER_FUNCTION,
1202                    args,
1203                )
1204                .await
1205            }
1206        },
1207        Statement::AlterConnection { name, operation } => match operation {
1208            AlterConnectionOperation::SetSchema { new_schema_name } => {
1209                alter_set_schema::handle_alter_set_schema(
1210                    handler_args,
1211                    name,
1212                    new_schema_name,
1213                    StatementType::ALTER_CONNECTION,
1214                    None,
1215                )
1216                .await
1217            }
1218            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1219                alter_owner::handle_alter_owner(
1220                    handler_args,
1221                    name,
1222                    new_owner_name,
1223                    StatementType::ALTER_CONNECTION,
1224                )
1225                .await
1226            }
1227        },
1228        Statement::AlterSystem { param, value } => {
1229            alter_system::handle_alter_system(handler_args, param, value).await
1230        }
1231        Statement::AlterSecret {
1232            name,
1233            with_options,
1234            operation,
1235        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1236        Statement::AlterFragment {
1237            fragment_id,
1238            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1239        } => {
1240            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1241                &handler_args.session,
1242                PbThrottleTarget::Fragment,
1243                fragment_id,
1244                rate_limit,
1245                StatementType::SET_VARIABLE,
1246            )
1247            .await
1248        }
1249        Statement::AlterDefaultPrivileges { .. } => {
1250            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1251        }
1252        Statement::StartTransaction { modes } => {
1253            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1254        }
1255        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1256        Statement::Commit { chain } => {
1257            transaction::handle_commit(handler_args, COMMIT, chain).await
1258        }
1259        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1260        Statement::Rollback { chain } => {
1261            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1262        }
1263        Statement::SetTransaction {
1264            modes,
1265            snapshot,
1266            session,
1267        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1268        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1269        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1270        Statement::Comment {
1271            object_type,
1272            object_name,
1273            comment,
1274        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1275        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1276        Statement::Prepare {
1277            name,
1278            data_types,
1279            statement,
1280        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1281        Statement::Deallocate { name, prepare } => {
1282            prepared_statement::handle_deallocate(name, prepare).await
1283        }
1284        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1285    }
1286}
1287
1288fn check_ban_ddl_for_iceberg_engine_table(
1289    session: Arc<SessionImpl>,
1290    stmt: &Statement,
1291) -> Result<()> {
1292    match stmt {
1293        Statement::AlterTable {
1294            name,
1295            operation:
1296                operation @ (AlterTableOperation::AddColumn { .. }
1297                | AlterTableOperation::DropColumn { .. }),
1298        } => {
1299            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1300            if table.is_iceberg_engine_table() {
1301                bail!(
1302                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1303                    operation,
1304                    schema_name,
1305                    name
1306                );
1307            }
1308        }
1309
1310        Statement::AlterTable {
1311            name,
1312            operation: AlterTableOperation::RenameTable { .. },
1313        } => {
1314            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1315            if table.is_iceberg_engine_table() {
1316                bail!(
1317                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1318                    schema_name,
1319                    name
1320                );
1321            }
1322        }
1323
1324        Statement::AlterTable {
1325            name,
1326            operation: AlterTableOperation::ChangeOwner { .. },
1327        } => {
1328            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1329            if table.is_iceberg_engine_table() {
1330                bail!(
1331                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1332                    schema_name,
1333                    name
1334                );
1335            }
1336        }
1337
1338        Statement::AlterTable {
1339            name,
1340            operation: AlterTableOperation::SetParallelism { .. },
1341        } => {
1342            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1343            if table.is_iceberg_engine_table() {
1344                bail!(
1345                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1346                    schema_name,
1347                    name
1348                );
1349            }
1350        }
1351
1352        Statement::AlterTable {
1353            name,
1354            operation: AlterTableOperation::SetSchema { .. },
1355        } => {
1356            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1357            if table.is_iceberg_engine_table() {
1358                bail!(
1359                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1360                    schema_name,
1361                    name
1362                );
1363            }
1364        }
1365
1366        Statement::AlterTable {
1367            name,
1368            operation: AlterTableOperation::RefreshSchema,
1369        } => {
1370            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1371            if table.is_iceberg_engine_table() {
1372                bail!(
1373                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1374                    schema_name,
1375                    name
1376                );
1377            }
1378        }
1379
1380        Statement::AlterTable {
1381            name,
1382            operation: AlterTableOperation::SetSourceRateLimit { .. },
1383        } => {
1384            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1385            if table.is_iceberg_engine_table() {
1386                bail!(
1387                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1388                    schema_name,
1389                    name
1390                );
1391            }
1392        }
1393
1394        _ => {}
1395    }
1396
1397    Ok(())
1398}