risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63pub mod alter_table_props;
64mod alter_table_with_sr;
65pub mod alter_user;
66pub mod cancel_job;
67pub mod close_cursor;
68mod comment;
69pub mod create_aggregate;
70pub mod create_connection;
71mod create_database;
72pub mod create_function;
73pub mod create_index;
74pub mod create_mv;
75pub mod create_schema;
76pub mod create_secret;
77pub mod create_sink;
78pub mod create_source;
79pub mod create_sql_function;
80pub mod create_subscription;
81pub mod create_table;
82pub mod create_table_as;
83pub mod create_user;
84pub mod create_view;
85pub mod declare_cursor;
86pub mod describe;
87pub mod discard;
88mod drop_connection;
89mod drop_database;
90pub mod drop_function;
91mod drop_index;
92pub mod drop_mv;
93mod drop_schema;
94pub mod drop_secret;
95pub mod drop_sink;
96pub mod drop_source;
97pub mod drop_subscription;
98pub mod drop_table;
99pub mod drop_user;
100mod drop_view;
101pub mod explain;
102pub mod explain_analyze_stream_job;
103pub mod extended_handle;
104pub mod fetch_cursor;
105mod flush;
106pub mod handle_privilege;
107pub mod kill_process;
108mod prepared_statement;
109pub mod privilege;
110pub mod query;
111mod recover;
112mod refresh;
113pub mod show;
114mod transaction;
115mod use_db;
116pub mod util;
117pub mod vacuum;
118pub mod variable;
119mod wait;
120
121pub use alter_table_column::{
122    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
123};
124
125/// The [`PgResponseBuilder`] used by RisingWave.
126pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
127
128/// The [`PgResponse`] used by RisingWave.
129pub type RwPgResponse = PgResponse<PgResponseStream>;
130
131#[easy_ext::ext(RwPgResponseBuilderExt)]
132impl RwPgResponseBuilder {
133    /// Append rows to the response.
134    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
135        let fields = T::fields();
136        self.values(
137            rows.into_iter()
138                .map(|row| {
139                    Row::new(
140                        row.into_owned_row()
141                            .into_iter()
142                            .zip_eq_fast(&fields)
143                            .map(|(datum, (_, ty))| {
144                                datum.map(|scalar| {
145                                    scalar.as_scalar_ref_impl().text_format(ty).into()
146                                })
147                            })
148                            .collect(),
149                    )
150                })
151                .collect_vec()
152                .into(),
153            fields_to_descriptors(fields),
154        )
155    }
156}
157
158pub fn fields_to_descriptors(
159    fields: Vec<(&str, risingwave_common::types::DataType)>,
160) -> Vec<PgFieldDescriptor> {
161    fields
162        .iter()
163        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
164        .collect()
165}
166
167pub enum PgResponseStream {
168    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
169    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
170    Rows(BoxStream<'static, RowSetResult>),
171}
172
173impl Stream for PgResponseStream {
174    type Item = std::result::Result<Vec<Row>, BoxedError>;
175
176    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177        match &mut *self {
178            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
179            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
180            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
181        }
182    }
183}
184
185impl From<Vec<Row>> for PgResponseStream {
186    fn from(rows: Vec<Row>) -> Self {
187        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
188    }
189}
190
191#[derive(Clone)]
192pub struct HandlerArgs {
193    pub session: Arc<SessionImpl>,
194    pub sql: Arc<str>,
195    pub normalized_sql: String,
196    pub with_options: WithOptions,
197}
198
199impl HandlerArgs {
200    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
201        Ok(Self {
202            session,
203            sql,
204            with_options: WithOptions::try_from(stmt)?,
205            normalized_sql: Self::normalize_sql(stmt),
206        })
207    }
208
209    /// Get normalized SQL from the statement.
210    ///
211    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
212    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
213    ///   make it suitable for the `SHOW CREATE` statements.
214    fn normalize_sql(stmt: &Statement) -> String {
215        let mut stmt = stmt.clone();
216        match &mut stmt {
217            Statement::CreateView {
218                or_replace,
219                if_not_exists,
220                ..
221            } => {
222                *or_replace = false;
223                *if_not_exists = false;
224            }
225            Statement::CreateTable {
226                or_replace,
227                if_not_exists,
228                ..
229            } => {
230                *or_replace = false;
231                *if_not_exists = false;
232            }
233            Statement::CreateIndex { if_not_exists, .. } => {
234                *if_not_exists = false;
235            }
236            Statement::CreateSource {
237                stmt: CreateSourceStatement { if_not_exists, .. },
238                ..
239            } => {
240                *if_not_exists = false;
241            }
242            Statement::CreateSink {
243                stmt: CreateSinkStatement { if_not_exists, .. },
244            } => {
245                *if_not_exists = false;
246            }
247            Statement::CreateSubscription {
248                stmt: CreateSubscriptionStatement { if_not_exists, .. },
249            } => {
250                *if_not_exists = false;
251            }
252            Statement::CreateConnection {
253                stmt: CreateConnectionStatement { if_not_exists, .. },
254            } => {
255                *if_not_exists = false;
256            }
257            _ => {}
258        }
259        stmt.to_string()
260    }
261}
262
263pub async fn handle(
264    session: Arc<SessionImpl>,
265    stmt: Statement,
266    sql: Arc<str>,
267    formats: Vec<Format>,
268) -> Result<RwPgResponse> {
269    session.clear_cancel_query_flag();
270    let _guard = session.txn_begin_implicit();
271    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
272
273    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
274
275    match stmt {
276        Statement::Explain {
277            statement,
278            analyze,
279            options,
280        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
281        Statement::ExplainAnalyzeStreamJob {
282            target,
283            duration_secs,
284        } => {
285            explain_analyze_stream_job::handle_explain_analyze_stream_job(
286                handler_args,
287                target,
288                duration_secs,
289            )
290            .await
291        }
292        Statement::CreateSource { stmt } => {
293            create_source::handle_create_source(handler_args, stmt).await
294        }
295        Statement::CreateSink { stmt } => {
296            create_sink::handle_create_sink(handler_args, stmt, false).await
297        }
298        Statement::CreateSubscription { stmt } => {
299            create_subscription::handle_create_subscription(handler_args, stmt).await
300        }
301        Statement::CreateConnection { stmt } => {
302            create_connection::handle_create_connection(handler_args, stmt).await
303        }
304        Statement::CreateSecret { stmt } => {
305            create_secret::handle_create_secret(handler_args, stmt).await
306        }
307        Statement::CreateFunction {
308            or_replace,
309            temporary,
310            if_not_exists,
311            name,
312            args,
313            returns,
314            params,
315            with_options,
316        } => {
317            // For general udf, `language` clause could be ignored
318            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
319            if params.language.is_none()
320                || !params
321                    .language
322                    .as_ref()
323                    .unwrap()
324                    .real_value()
325                    .eq_ignore_ascii_case("sql")
326            {
327                create_function::handle_create_function(
328                    handler_args,
329                    or_replace,
330                    temporary,
331                    if_not_exists,
332                    name,
333                    args,
334                    returns,
335                    params,
336                    with_options,
337                )
338                .await
339            } else {
340                create_sql_function::handle_create_sql_function(
341                    handler_args,
342                    or_replace,
343                    temporary,
344                    if_not_exists,
345                    name,
346                    args,
347                    returns,
348                    params,
349                )
350                .await
351            }
352        }
353        Statement::CreateAggregate {
354            or_replace,
355            if_not_exists,
356            name,
357            args,
358            returns,
359            params,
360            ..
361        } => {
362            create_aggregate::handle_create_aggregate(
363                handler_args,
364                or_replace,
365                if_not_exists,
366                name,
367                args,
368                returns,
369                params,
370            )
371            .await
372        }
373        Statement::CreateTable {
374            name,
375            columns,
376            wildcard_idx,
377            constraints,
378            query,
379            with_options: _, // It is put in OptimizerContext
380            // Not supported things
381            or_replace,
382            temporary,
383            if_not_exists,
384            format_encode,
385            source_watermarks,
386            append_only,
387            on_conflict,
388            with_version_columns,
389            cdc_table_info,
390            include_column_options,
391            webhook_info,
392            engine,
393        } => {
394            if or_replace {
395                bail_not_implemented!("CREATE OR REPLACE TABLE");
396            }
397            if temporary {
398                bail_not_implemented!("CREATE TEMPORARY TABLE");
399            }
400            if let Some(query) = query {
401                return create_table_as::handle_create_as(
402                    handler_args,
403                    name,
404                    if_not_exists,
405                    query,
406                    columns,
407                    append_only,
408                    on_conflict,
409                    with_version_columns
410                        .iter()
411                        .map(|col| col.real_value())
412                        .collect(),
413                    engine,
414                )
415                .await;
416            }
417            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
418            create_table::handle_create_table(
419                handler_args,
420                name,
421                columns,
422                wildcard_idx,
423                constraints,
424                if_not_exists,
425                format_encode,
426                source_watermarks,
427                append_only,
428                on_conflict,
429                with_version_columns
430                    .iter()
431                    .map(|col| col.real_value())
432                    .collect(),
433                cdc_table_info,
434                include_column_options,
435                webhook_info,
436                engine,
437            )
438            .await
439        }
440        Statement::CreateDatabase {
441            db_name,
442            if_not_exists,
443            owner,
444            resource_group,
445            barrier_interval_ms,
446            checkpoint_frequency,
447        } => {
448            create_database::handle_create_database(
449                handler_args,
450                db_name,
451                if_not_exists,
452                owner,
453                resource_group,
454                barrier_interval_ms,
455                checkpoint_frequency,
456            )
457            .await
458        }
459        Statement::CreateSchema {
460            schema_name,
461            if_not_exists,
462            owner,
463        } => {
464            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
465                .await
466        }
467        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
468        Statement::DeclareCursor { stmt } => {
469            declare_cursor::handle_declare_cursor(handler_args, stmt).await
470        }
471        Statement::FetchCursor { stmt } => {
472            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
473        }
474        Statement::CloseCursor { stmt } => {
475            close_cursor::handle_close_cursor(handler_args, stmt).await
476        }
477        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
478        Statement::Grant { .. } => {
479            handle_privilege::handle_grant_privilege(handler_args, stmt).await
480        }
481        Statement::Revoke { .. } => {
482            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
483        }
484        Statement::Describe { name, kind } => match kind {
485            DescribeKind::Fragments => {
486                describe::handle_describe_fragments(handler_args, name).await
487            }
488            DescribeKind::Plain => describe::handle_describe(handler_args, name),
489        },
490        Statement::DescribeFragment { fragment_id } => {
491            describe::handle_describe_fragment(handler_args, fragment_id).await
492        }
493        Statement::Discard(..) => discard::handle_discard(handler_args),
494        Statement::ShowObjects {
495            object: show_object,
496            filter,
497        } => show::handle_show_object(handler_args, show_object, filter).await,
498        Statement::ShowCreateObject { create_type, name } => {
499            show::handle_show_create_object(handler_args, create_type, name)
500        }
501        Statement::ShowTransactionIsolationLevel => {
502            transaction::handle_show_isolation_level(handler_args)
503        }
504        Statement::Drop(DropStatement {
505            object_type,
506            object_name,
507            if_exists,
508            drop_mode,
509        }) => {
510            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
511                match object_type {
512                    ObjectType::MaterializedView
513                    | ObjectType::View
514                    | ObjectType::Sink
515                    | ObjectType::Source
516                    | ObjectType::Subscription
517                    | ObjectType::Index
518                    | ObjectType::Table
519                    | ObjectType::Schema
520                    | ObjectType::Connection => true,
521                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
522                        bail_not_implemented!("DROP CASCADE");
523                    }
524                }
525            } else {
526                false
527            };
528            match object_type {
529                ObjectType::Table => {
530                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
531                        .await
532                }
533                ObjectType::MaterializedView => {
534                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
535                }
536                ObjectType::Index => {
537                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
538                        .await
539                }
540                ObjectType::Source => {
541                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
542                        .await
543                }
544                ObjectType::Sink => {
545                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
546                }
547                ObjectType::Subscription => {
548                    drop_subscription::handle_drop_subscription(
549                        handler_args,
550                        object_name,
551                        if_exists,
552                        cascade,
553                    )
554                    .await
555                }
556                ObjectType::Database => {
557                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
558                }
559                ObjectType::Schema => {
560                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
561                        .await
562                }
563                ObjectType::User => {
564                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
565                }
566                ObjectType::View => {
567                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
568                }
569                ObjectType::Connection => {
570                    drop_connection::handle_drop_connection(
571                        handler_args,
572                        object_name,
573                        if_exists,
574                        cascade,
575                    )
576                    .await
577                }
578                ObjectType::Secret => {
579                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
580                }
581            }
582        }
583        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
584        Statement::DropFunction {
585            if_exists,
586            func_desc,
587            option,
588        } => {
589            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
590                .await
591        }
592        Statement::DropAggregate {
593            if_exists,
594            func_desc,
595            option,
596        } => {
597            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
598                .await
599        }
600        Statement::Query(_)
601        | Statement::Insert { .. }
602        | Statement::Delete { .. }
603        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
604        Statement::Copy {
605            entity: CopyEntity::Query(query),
606            target: CopyTarget::Stdout,
607        } => {
608            let response =
609                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
610                    .await?;
611            Ok(response.into_copy_query_to_stdout())
612        }
613        Statement::CreateView {
614            materialized,
615            if_not_exists,
616            name,
617            columns,
618            query,
619            with_options: _, // It is put in OptimizerContext
620            or_replace,      // not supported
621            emit_mode,
622        } => {
623            if or_replace {
624                bail_not_implemented!("CREATE OR REPLACE VIEW");
625            }
626            if materialized {
627                create_mv::handle_create_mv(
628                    handler_args,
629                    if_not_exists,
630                    name,
631                    *query,
632                    columns,
633                    emit_mode,
634                )
635                .await
636            } else {
637                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
638                    .await
639            }
640        }
641        Statement::Flush => flush::handle_flush(handler_args).await,
642        Statement::Wait => wait::handle_wait(handler_args).await,
643        Statement::Recover => recover::handle_recover(handler_args).await,
644        Statement::SetVariable {
645            local: _,
646            variable,
647            value,
648        } => {
649            // special handle for `use database`
650            if variable.real_value().eq_ignore_ascii_case("database") {
651                let x = variable::set_var_to_param_str(&value);
652                let res = use_db::handle_use_db(
653                    handler_args,
654                    ObjectName::from(vec![Ident::from_real_value(
655                        x.as_deref().unwrap_or("default"),
656                    )]),
657                )?;
658                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
659                for notice in res.notices() {
660                    builder = builder.notice(notice);
661                }
662                return Ok(builder.into());
663            }
664            variable::handle_set(handler_args, variable, value)
665        }
666        Statement::SetTimeZone { local: _, value } => {
667            variable::handle_set_time_zone(handler_args, value)
668        }
669        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
670        Statement::CreateIndex {
671            name,
672            table_name,
673            method,
674            columns,
675            include,
676            distributed_by,
677            unique,
678            if_not_exists,
679            with_properties: _,
680        } => {
681            if unique {
682                bail_not_implemented!("create unique index");
683            }
684
685            create_index::handle_create_index(
686                handler_args,
687                if_not_exists,
688                name,
689                table_name,
690                method,
691                columns.to_vec(),
692                include,
693                distributed_by,
694            )
695            .await
696        }
697        Statement::AlterDatabase { name, operation } => match operation {
698            AlterDatabaseOperation::RenameDatabase { database_name } => {
699                alter_rename::handle_rename_database(handler_args, name, database_name).await
700            }
701            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
702                alter_owner::handle_alter_owner(
703                    handler_args,
704                    name,
705                    new_owner_name,
706                    StatementType::ALTER_DATABASE,
707                )
708                .await
709            }
710            AlterDatabaseOperation::SetParam(config_param) => {
711                let ConfigParam { param, value } = config_param;
712
713                let database_param = match param.real_value().to_uppercase().as_str() {
714                    "BARRIER_INTERVAL_MS" => {
715                        let barrier_interval_ms = match value {
716                            SetVariableValue::Default => None,
717                            SetVariableValue::Single(SetVariableValueSingle::Literal(
718                                Value::Number(num),
719                            )) => {
720                                let num = num.parse::<u32>().map_err(|e| {
721                                    ErrorCode::InvalidInputSyntax(format!(
722                                        "barrier_interval_ms must be a u32 integer: {}",
723                                        e.as_report()
724                                    ))
725                                })?;
726                                Some(num)
727                            }
728                            _ => {
729                                return Err(ErrorCode::InvalidInputSyntax(
730                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
731                                        .to_owned(),
732                                )
733                                .into());
734                            }
735                        };
736                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
737                    }
738                    "CHECKPOINT_FREQUENCY" => {
739                        let checkpoint_frequency = match value {
740                            SetVariableValue::Default => None,
741                            SetVariableValue::Single(SetVariableValueSingle::Literal(
742                                Value::Number(num),
743                            )) => {
744                                let num = num.parse::<u64>().map_err(|e| {
745                                    ErrorCode::InvalidInputSyntax(format!(
746                                        "checkpoint_frequency must be a u64 integer: {}",
747                                        e.as_report()
748                                    ))
749                                })?;
750                                Some(num)
751                            }
752                            _ => {
753                                return Err(ErrorCode::InvalidInputSyntax(
754                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
755                                        .to_owned(),
756                                )
757                                .into());
758                            }
759                        };
760                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
761                    }
762                    _ => {
763                        return Err(ErrorCode::InvalidInputSyntax(format!(
764                            "Unsupported database config parameter: {}",
765                            param.real_value()
766                        ))
767                        .into());
768                    }
769                };
770
771                alter_database_param::handle_alter_database_param(
772                    handler_args,
773                    name,
774                    database_param,
775                )
776                .await
777            }
778        },
779        Statement::AlterSchema { name, operation } => match operation {
780            AlterSchemaOperation::RenameSchema { schema_name } => {
781                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
782            }
783            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
784                alter_owner::handle_alter_owner(
785                    handler_args,
786                    name,
787                    new_owner_name,
788                    StatementType::ALTER_SCHEMA,
789                )
790                .await
791            }
792            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
793                alter_swap_rename::handle_swap_rename(
794                    handler_args,
795                    name,
796                    target_schema,
797                    StatementType::ALTER_SCHEMA,
798                )
799                .await
800            }
801        },
802        Statement::AlterTable { name, operation } => match operation {
803            AlterTableOperation::AddColumn { .. }
804            | AlterTableOperation::DropColumn { .. }
805            | AlterTableOperation::AlterColumn { .. } => {
806                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
807            }
808            AlterTableOperation::RenameTable { table_name } => {
809                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
810                    .await
811            }
812            AlterTableOperation::ChangeOwner { new_owner_name } => {
813                alter_owner::handle_alter_owner(
814                    handler_args,
815                    name,
816                    new_owner_name,
817                    StatementType::ALTER_TABLE,
818                )
819                .await
820            }
821            AlterTableOperation::SetParallelism {
822                parallelism,
823                deferred,
824            } => {
825                alter_parallelism::handle_alter_parallelism(
826                    handler_args,
827                    name,
828                    parallelism,
829                    StatementType::ALTER_TABLE,
830                    deferred,
831                )
832                .await
833            }
834            AlterTableOperation::SetSchema { new_schema_name } => {
835                alter_set_schema::handle_alter_set_schema(
836                    handler_args,
837                    name,
838                    new_schema_name,
839                    StatementType::ALTER_TABLE,
840                    None,
841                )
842                .await
843            }
844            AlterTableOperation::RefreshSchema => {
845                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
846            }
847            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
848                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
849                    handler_args,
850                    PbThrottleTarget::TableWithSource,
851                    name,
852                    rate_limit,
853                )
854                .await
855            }
856            AlterTableOperation::DropConnector => {
857                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
858                    .await
859            }
860            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
861                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
862                    handler_args,
863                    PbThrottleTarget::TableDml,
864                    name,
865                    rate_limit,
866                )
867                .await
868            }
869            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
870                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
871                    handler_args,
872                    PbThrottleTarget::CdcTable,
873                    name,
874                    rate_limit,
875                )
876                .await
877            }
878            AlterTableOperation::SwapRenameTable { target_table } => {
879                alter_swap_rename::handle_swap_rename(
880                    handler_args,
881                    name,
882                    target_table,
883                    StatementType::ALTER_TABLE,
884                )
885                .await
886            }
887            AlterTableOperation::AlterConnectorProps { alter_props } => {
888                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
889            }
890            AlterTableOperation::AddConstraint { .. }
891            | AlterTableOperation::DropConstraint { .. }
892            | AlterTableOperation::RenameColumn { .. }
893            | AlterTableOperation::ChangeColumn { .. }
894            | AlterTableOperation::RenameConstraint { .. } => {
895                bail_not_implemented!(
896                    "Unhandled statement: {}",
897                    Statement::AlterTable { name, operation }
898                )
899            }
900        },
901        Statement::AlterIndex { name, operation } => match operation {
902            AlterIndexOperation::RenameIndex { index_name } => {
903                alter_rename::handle_rename_index(handler_args, name, index_name).await
904            }
905            AlterIndexOperation::SetParallelism {
906                parallelism,
907                deferred,
908            } => {
909                alter_parallelism::handle_alter_parallelism(
910                    handler_args,
911                    name,
912                    parallelism,
913                    StatementType::ALTER_INDEX,
914                    deferred,
915                )
916                .await
917            }
918        },
919        Statement::AlterView {
920            materialized,
921            name,
922            operation,
923        } => {
924            let statement_type = if materialized {
925                StatementType::ALTER_MATERIALIZED_VIEW
926            } else {
927                StatementType::ALTER_VIEW
928            };
929            match operation {
930                AlterViewOperation::RenameView { view_name } => {
931                    if materialized {
932                        alter_rename::handle_rename_table(
933                            handler_args,
934                            TableType::MaterializedView,
935                            name,
936                            view_name,
937                        )
938                        .await
939                    } else {
940                        alter_rename::handle_rename_view(handler_args, name, view_name).await
941                    }
942                }
943                AlterViewOperation::SetParallelism {
944                    parallelism,
945                    deferred,
946                } => {
947                    if !materialized {
948                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
949                    }
950                    alter_parallelism::handle_alter_parallelism(
951                        handler_args,
952                        name,
953                        parallelism,
954                        statement_type,
955                        deferred,
956                    )
957                    .await
958                }
959                AlterViewOperation::SetResourceGroup {
960                    resource_group,
961                    deferred,
962                } => {
963                    if !materialized {
964                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
965                    }
966                    alter_resource_group::handle_alter_resource_group(
967                        handler_args,
968                        name,
969                        resource_group,
970                        statement_type,
971                        deferred,
972                    )
973                    .await
974                }
975                AlterViewOperation::ChangeOwner { new_owner_name } => {
976                    alter_owner::handle_alter_owner(
977                        handler_args,
978                        name,
979                        new_owner_name,
980                        statement_type,
981                    )
982                    .await
983                }
984                AlterViewOperation::SetSchema { new_schema_name } => {
985                    alter_set_schema::handle_alter_set_schema(
986                        handler_args,
987                        name,
988                        new_schema_name,
989                        statement_type,
990                        None,
991                    )
992                    .await
993                }
994                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
995                    if !materialized {
996                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
997                    }
998                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
999                        handler_args,
1000                        PbThrottleTarget::Mv,
1001                        name,
1002                        rate_limit,
1003                    )
1004                    .await
1005                }
1006                AlterViewOperation::SwapRenameView { target_view } => {
1007                    alter_swap_rename::handle_swap_rename(
1008                        handler_args,
1009                        name,
1010                        target_view,
1011                        statement_type,
1012                    )
1013                    .await
1014                }
1015                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1016                    if !materialized {
1017                        bail!(
1018                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1019                        );
1020                    }
1021                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1022                }
1023                AlterViewOperation::AsQuery { query } => {
1024                    if !materialized {
1025                        bail_not_implemented!("ALTER VIEW AS QUERY");
1026                    }
1027                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1028                    if !cfg!(debug_assertions) {
1029                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1030                    }
1031                    alter_mv::handle_alter_mv(handler_args, name, query).await
1032                }
1033            }
1034        }
1035
1036        Statement::AlterSink { name, operation } => match operation {
1037            AlterSinkOperation::AlterConnectorProps {
1038                alter_props: changed_props,
1039            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1040            AlterSinkOperation::RenameSink { sink_name } => {
1041                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1042            }
1043            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1044                alter_owner::handle_alter_owner(
1045                    handler_args,
1046                    name,
1047                    new_owner_name,
1048                    StatementType::ALTER_SINK,
1049                )
1050                .await
1051            }
1052            AlterSinkOperation::SetSchema { new_schema_name } => {
1053                alter_set_schema::handle_alter_set_schema(
1054                    handler_args,
1055                    name,
1056                    new_schema_name,
1057                    StatementType::ALTER_SINK,
1058                    None,
1059                )
1060                .await
1061            }
1062            AlterSinkOperation::SetParallelism {
1063                parallelism,
1064                deferred,
1065            } => {
1066                alter_parallelism::handle_alter_parallelism(
1067                    handler_args,
1068                    name,
1069                    parallelism,
1070                    StatementType::ALTER_SINK,
1071                    deferred,
1072                )
1073                .await
1074            }
1075            AlterSinkOperation::SwapRenameSink { target_sink } => {
1076                alter_swap_rename::handle_swap_rename(
1077                    handler_args,
1078                    name,
1079                    target_sink,
1080                    StatementType::ALTER_SINK,
1081                )
1082                .await
1083            }
1084            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1085                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1086                    handler_args,
1087                    PbThrottleTarget::Sink,
1088                    name,
1089                    rate_limit,
1090                )
1091                .await
1092            }
1093            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1094                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1095                    handler_args,
1096                    name,
1097                    enable,
1098                )
1099                .await
1100            }
1101        },
1102        Statement::AlterSubscription { name, operation } => match operation {
1103            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1104                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1105                    .await
1106            }
1107            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1108                alter_owner::handle_alter_owner(
1109                    handler_args,
1110                    name,
1111                    new_owner_name,
1112                    StatementType::ALTER_SUBSCRIPTION,
1113                )
1114                .await
1115            }
1116            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1117                alter_set_schema::handle_alter_set_schema(
1118                    handler_args,
1119                    name,
1120                    new_schema_name,
1121                    StatementType::ALTER_SUBSCRIPTION,
1122                    None,
1123                )
1124                .await
1125            }
1126            AlterSubscriptionOperation::SwapRenameSubscription {
1127                target_subscription,
1128            } => {
1129                alter_swap_rename::handle_swap_rename(
1130                    handler_args,
1131                    name,
1132                    target_subscription,
1133                    StatementType::ALTER_SUBSCRIPTION,
1134                )
1135                .await
1136            }
1137        },
1138        Statement::AlterSource { name, operation } => match operation {
1139            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1140                alter_source_props::handle_alter_source_connector_props(
1141                    handler_args,
1142                    name,
1143                    alter_props,
1144                )
1145                .await
1146            }
1147            AlterSourceOperation::RenameSource { source_name } => {
1148                alter_rename::handle_rename_source(handler_args, name, source_name).await
1149            }
1150            AlterSourceOperation::AddColumn { .. } => {
1151                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1152            }
1153            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1154                alter_owner::handle_alter_owner(
1155                    handler_args,
1156                    name,
1157                    new_owner_name,
1158                    StatementType::ALTER_SOURCE,
1159                )
1160                .await
1161            }
1162            AlterSourceOperation::SetSchema { new_schema_name } => {
1163                alter_set_schema::handle_alter_set_schema(
1164                    handler_args,
1165                    name,
1166                    new_schema_name,
1167                    StatementType::ALTER_SOURCE,
1168                    None,
1169                )
1170                .await
1171            }
1172            AlterSourceOperation::FormatEncode { format_encode } => {
1173                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1174                    .await
1175            }
1176            AlterSourceOperation::RefreshSchema => {
1177                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1178            }
1179            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1180                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1181                    handler_args,
1182                    PbThrottleTarget::Source,
1183                    name,
1184                    rate_limit,
1185                )
1186                .await
1187            }
1188            AlterSourceOperation::SwapRenameSource { target_source } => {
1189                alter_swap_rename::handle_swap_rename(
1190                    handler_args,
1191                    name,
1192                    target_source,
1193                    StatementType::ALTER_SOURCE,
1194                )
1195                .await
1196            }
1197            AlterSourceOperation::SetParallelism {
1198                parallelism,
1199                deferred,
1200            } => {
1201                alter_parallelism::handle_alter_parallelism(
1202                    handler_args,
1203                    name,
1204                    parallelism,
1205                    StatementType::ALTER_SOURCE,
1206                    deferred,
1207                )
1208                .await
1209            }
1210        },
1211        Statement::AlterFunction {
1212            name,
1213            args,
1214            operation,
1215        } => match operation {
1216            AlterFunctionOperation::SetSchema { new_schema_name } => {
1217                alter_set_schema::handle_alter_set_schema(
1218                    handler_args,
1219                    name,
1220                    new_schema_name,
1221                    StatementType::ALTER_FUNCTION,
1222                    args,
1223                )
1224                .await
1225            }
1226        },
1227        Statement::AlterConnection { name, operation } => match operation {
1228            AlterConnectionOperation::SetSchema { new_schema_name } => {
1229                alter_set_schema::handle_alter_set_schema(
1230                    handler_args,
1231                    name,
1232                    new_schema_name,
1233                    StatementType::ALTER_CONNECTION,
1234                    None,
1235                )
1236                .await
1237            }
1238            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1239                alter_owner::handle_alter_owner(
1240                    handler_args,
1241                    name,
1242                    new_owner_name,
1243                    StatementType::ALTER_CONNECTION,
1244                )
1245                .await
1246            }
1247        },
1248        Statement::AlterSystem { param, value } => {
1249            alter_system::handle_alter_system(handler_args, param, value).await
1250        }
1251        Statement::AlterSecret {
1252            name,
1253            with_options,
1254            operation,
1255        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1256        Statement::AlterFragment {
1257            fragment_id,
1258            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1259        } => {
1260            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1261                &handler_args.session,
1262                PbThrottleTarget::Fragment,
1263                fragment_id,
1264                rate_limit,
1265                StatementType::SET_VARIABLE,
1266            )
1267            .await
1268        }
1269        Statement::AlterDefaultPrivileges { .. } => {
1270            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1271        }
1272        Statement::StartTransaction { modes } => {
1273            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1274        }
1275        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1276        Statement::Commit { chain } => {
1277            transaction::handle_commit(handler_args, COMMIT, chain).await
1278        }
1279        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1280        Statement::Rollback { chain } => {
1281            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1282        }
1283        Statement::SetTransaction {
1284            modes,
1285            snapshot,
1286            session,
1287        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1288        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1289        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1290        Statement::Comment {
1291            object_type,
1292            object_name,
1293            comment,
1294        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1295        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1296        Statement::Prepare {
1297            name,
1298            data_types,
1299            statement,
1300        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1301        Statement::Deallocate { name, prepare } => {
1302            prepared_statement::handle_deallocate(name, prepare).await
1303        }
1304        Statement::Vacuum { object_name, full } => {
1305            vacuum::handle_vacuum(handler_args, object_name, full).await
1306        }
1307        Statement::Refresh { table_name } => {
1308            refresh::handle_refresh(handler_args, table_name).await
1309        }
1310        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1311    }
1312}
1313
1314fn check_ban_ddl_for_iceberg_engine_table(
1315    session: Arc<SessionImpl>,
1316    stmt: &Statement,
1317) -> Result<()> {
1318    match stmt {
1319        Statement::AlterTable {
1320            name,
1321            operation:
1322                operation @ (AlterTableOperation::AddColumn { .. }
1323                | AlterTableOperation::DropColumn { .. }),
1324        } => {
1325            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1326            if table.is_iceberg_engine_table() {
1327                bail!(
1328                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1329                    operation,
1330                    schema_name,
1331                    name
1332                );
1333            }
1334        }
1335
1336        Statement::AlterTable {
1337            name,
1338            operation: AlterTableOperation::RenameTable { .. },
1339        } => {
1340            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1341            if table.is_iceberg_engine_table() {
1342                bail!(
1343                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1344                    schema_name,
1345                    name
1346                );
1347            }
1348        }
1349
1350        Statement::AlterTable {
1351            name,
1352            operation: AlterTableOperation::ChangeOwner { .. },
1353        } => {
1354            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1355            if table.is_iceberg_engine_table() {
1356                bail!(
1357                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1358                    schema_name,
1359                    name
1360                );
1361            }
1362        }
1363
1364        Statement::AlterTable {
1365            name,
1366            operation: AlterTableOperation::SetParallelism { .. },
1367        } => {
1368            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1369            if table.is_iceberg_engine_table() {
1370                bail!(
1371                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1372                    schema_name,
1373                    name
1374                );
1375            }
1376        }
1377
1378        Statement::AlterTable {
1379            name,
1380            operation: AlterTableOperation::SetSchema { .. },
1381        } => {
1382            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1383            if table.is_iceberg_engine_table() {
1384                bail!(
1385                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1386                    schema_name,
1387                    name
1388                );
1389            }
1390        }
1391
1392        Statement::AlterTable {
1393            name,
1394            operation: AlterTableOperation::RefreshSchema,
1395        } => {
1396            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1397            if table.is_iceberg_engine_table() {
1398                bail!(
1399                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1400                    schema_name,
1401                    name
1402                );
1403            }
1404        }
1405
1406        Statement::AlterTable {
1407            name,
1408            operation: AlterTableOperation::SetSourceRateLimit { .. },
1409        } => {
1410            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1411            if table.is_iceberg_engine_table() {
1412                bail!(
1413                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1414                    schema_name,
1415                    name
1416                );
1417            }
1418        }
1419
1420        _ => {}
1421    }
1422
1423    Ok(())
1424}