risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116pub mod show;
117mod transaction;
118mod use_db;
119pub mod util;
120pub mod vacuum;
121pub mod variable;
122mod wait;
123
124pub use alter_table_column::{
125    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
126};
127
128/// The [`PgResponseBuilder`] used by RisingWave.
129pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
130
131/// The [`PgResponse`] used by RisingWave.
132pub type RwPgResponse = PgResponse<PgResponseStream>;
133
134#[easy_ext::ext(RwPgResponseBuilderExt)]
135impl RwPgResponseBuilder {
136    /// Append rows to the response.
137    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
138        let fields = T::fields();
139        self.values(
140            rows.into_iter()
141                .map(|row| {
142                    Row::new(
143                        row.into_owned_row()
144                            .into_iter()
145                            .zip_eq_fast(&fields)
146                            .map(|(datum, (_, ty))| {
147                                datum.map(|scalar| {
148                                    scalar.as_scalar_ref_impl().text_format(ty).into()
149                                })
150                            })
151                            .collect(),
152                    )
153                })
154                .collect_vec()
155                .into(),
156            fields_to_descriptors(fields),
157        )
158    }
159}
160
161pub fn fields_to_descriptors(
162    fields: Vec<(&str, risingwave_common::types::DataType)>,
163) -> Vec<PgFieldDescriptor> {
164    fields
165        .iter()
166        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
167        .collect()
168}
169
170pub enum PgResponseStream {
171    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
172    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
173    Rows(BoxStream<'static, RowSetResult>),
174}
175
176impl Stream for PgResponseStream {
177    type Item = std::result::Result<Vec<Row>, BoxedError>;
178
179    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180        match &mut *self {
181            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
182            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
183            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
184        }
185    }
186}
187
188impl From<Vec<Row>> for PgResponseStream {
189    fn from(rows: Vec<Row>) -> Self {
190        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
191    }
192}
193
194#[derive(Clone)]
195pub struct HandlerArgs {
196    pub session: Arc<SessionImpl>,
197    pub sql: Arc<str>,
198    pub normalized_sql: String,
199    pub with_options: WithOptions,
200}
201
202impl HandlerArgs {
203    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
204        Ok(Self {
205            session,
206            sql,
207            with_options: WithOptions::try_from(stmt)?,
208            normalized_sql: Self::normalize_sql(stmt),
209        })
210    }
211
212    /// Get normalized SQL from the statement.
213    ///
214    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
215    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
216    ///   make it suitable for the `SHOW CREATE` statements.
217    fn normalize_sql(stmt: &Statement) -> String {
218        let mut stmt = stmt.clone();
219        match &mut stmt {
220            Statement::CreateView {
221                or_replace,
222                if_not_exists,
223                ..
224            } => {
225                *or_replace = false;
226                *if_not_exists = false;
227            }
228            Statement::CreateTable {
229                or_replace,
230                if_not_exists,
231                ..
232            } => {
233                *or_replace = false;
234                *if_not_exists = false;
235            }
236            Statement::CreateIndex { if_not_exists, .. } => {
237                *if_not_exists = false;
238            }
239            Statement::CreateSource {
240                stmt: CreateSourceStatement { if_not_exists, .. },
241                ..
242            } => {
243                *if_not_exists = false;
244            }
245            Statement::CreateSink {
246                stmt: CreateSinkStatement { if_not_exists, .. },
247            } => {
248                *if_not_exists = false;
249            }
250            Statement::CreateSubscription {
251                stmt: CreateSubscriptionStatement { if_not_exists, .. },
252            } => {
253                *if_not_exists = false;
254            }
255            Statement::CreateConnection {
256                stmt: CreateConnectionStatement { if_not_exists, .. },
257            } => {
258                *if_not_exists = false;
259            }
260            _ => {}
261        }
262        stmt.to_string()
263    }
264}
265
266pub async fn handle(
267    session: Arc<SessionImpl>,
268    stmt: Statement,
269    sql: Arc<str>,
270    formats: Vec<Format>,
271) -> Result<RwPgResponse> {
272    session.clear_cancel_query_flag();
273    let _guard = session.txn_begin_implicit();
274    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
275
276    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
277
278    match stmt {
279        Statement::Explain {
280            statement,
281            analyze,
282            options,
283        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
284        Statement::ExplainAnalyzeStreamJob {
285            target,
286            duration_secs,
287        } => {
288            explain_analyze_stream_job::handle_explain_analyze_stream_job(
289                handler_args,
290                target,
291                duration_secs,
292            )
293            .await
294        }
295        Statement::CreateSource { stmt } => {
296            create_source::handle_create_source(handler_args, stmt).await
297        }
298        Statement::CreateSink { stmt } => {
299            create_sink::handle_create_sink(handler_args, stmt, false).await
300        }
301        Statement::CreateSubscription { stmt } => {
302            create_subscription::handle_create_subscription(handler_args, stmt).await
303        }
304        Statement::CreateConnection { stmt } => {
305            create_connection::handle_create_connection(handler_args, stmt).await
306        }
307        Statement::CreateSecret { stmt } => {
308            create_secret::handle_create_secret(handler_args, stmt).await
309        }
310        Statement::CreateFunction {
311            or_replace,
312            temporary,
313            if_not_exists,
314            name,
315            args,
316            returns,
317            params,
318            with_options,
319        } => {
320            // For general udf, `language` clause could be ignored
321            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
322            if params.language.is_none()
323                || !params
324                    .language
325                    .as_ref()
326                    .unwrap()
327                    .real_value()
328                    .eq_ignore_ascii_case("sql")
329            {
330                create_function::handle_create_function(
331                    handler_args,
332                    or_replace,
333                    temporary,
334                    if_not_exists,
335                    name,
336                    args,
337                    returns,
338                    params,
339                    with_options,
340                )
341                .await
342            } else {
343                create_sql_function::handle_create_sql_function(
344                    handler_args,
345                    or_replace,
346                    temporary,
347                    if_not_exists,
348                    name,
349                    args,
350                    returns,
351                    params,
352                )
353                .await
354            }
355        }
356        Statement::CreateAggregate {
357            or_replace,
358            if_not_exists,
359            name,
360            args,
361            returns,
362            params,
363            ..
364        } => {
365            create_aggregate::handle_create_aggregate(
366                handler_args,
367                or_replace,
368                if_not_exists,
369                name,
370                args,
371                returns,
372                params,
373            )
374            .await
375        }
376        Statement::CreateTable {
377            name,
378            columns,
379            wildcard_idx,
380            constraints,
381            query,
382            with_options: _, // It is put in OptimizerContext
383            // Not supported things
384            or_replace,
385            temporary,
386            if_not_exists,
387            format_encode,
388            source_watermarks,
389            append_only,
390            on_conflict,
391            with_version_columns,
392            cdc_table_info,
393            include_column_options,
394            webhook_info,
395            engine,
396        } => {
397            if or_replace {
398                bail_not_implemented!("CREATE OR REPLACE TABLE");
399            }
400            if temporary {
401                bail_not_implemented!("CREATE TEMPORARY TABLE");
402            }
403            if let Some(query) = query {
404                return create_table_as::handle_create_as(
405                    handler_args,
406                    name,
407                    if_not_exists,
408                    query,
409                    columns,
410                    append_only,
411                    on_conflict,
412                    with_version_columns
413                        .iter()
414                        .map(|col| col.real_value())
415                        .collect(),
416                    engine,
417                )
418                .await;
419            }
420            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
421            create_table::handle_create_table(
422                handler_args,
423                name,
424                columns,
425                wildcard_idx,
426                constraints,
427                if_not_exists,
428                format_encode,
429                source_watermarks,
430                append_only,
431                on_conflict,
432                with_version_columns
433                    .iter()
434                    .map(|col| col.real_value())
435                    .collect(),
436                cdc_table_info,
437                include_column_options,
438                webhook_info,
439                engine,
440            )
441            .await
442        }
443        Statement::CreateDatabase {
444            db_name,
445            if_not_exists,
446            owner,
447            resource_group,
448            barrier_interval_ms,
449            checkpoint_frequency,
450        } => {
451            create_database::handle_create_database(
452                handler_args,
453                db_name,
454                if_not_exists,
455                owner,
456                resource_group,
457                barrier_interval_ms,
458                checkpoint_frequency,
459            )
460            .await
461        }
462        Statement::CreateSchema {
463            schema_name,
464            if_not_exists,
465            owner,
466        } => {
467            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
468                .await
469        }
470        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
471        Statement::DeclareCursor { stmt } => {
472            declare_cursor::handle_declare_cursor(handler_args, stmt).await
473        }
474        Statement::FetchCursor { stmt } => {
475            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
476        }
477        Statement::CloseCursor { stmt } => {
478            close_cursor::handle_close_cursor(handler_args, stmt).await
479        }
480        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
481        Statement::Grant { .. } => {
482            handle_privilege::handle_grant_privilege(handler_args, stmt).await
483        }
484        Statement::Revoke { .. } => {
485            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
486        }
487        Statement::Describe { name, kind } => match kind {
488            DescribeKind::Fragments => {
489                describe::handle_describe_fragments(handler_args, name).await
490            }
491            DescribeKind::Plain => describe::handle_describe(handler_args, name),
492        },
493        Statement::DescribeFragment { fragment_id } => {
494            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
495        }
496        Statement::Discard(..) => discard::handle_discard(handler_args),
497        Statement::ShowObjects {
498            object: show_object,
499            filter,
500        } => show::handle_show_object(handler_args, show_object, filter).await,
501        Statement::ShowCreateObject { create_type, name } => {
502            show::handle_show_create_object(handler_args, create_type, name)
503        }
504        Statement::ShowTransactionIsolationLevel => {
505            transaction::handle_show_isolation_level(handler_args)
506        }
507        Statement::Drop(DropStatement {
508            object_type,
509            object_name,
510            if_exists,
511            drop_mode,
512        }) => {
513            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
514                match object_type {
515                    ObjectType::MaterializedView
516                    | ObjectType::View
517                    | ObjectType::Sink
518                    | ObjectType::Source
519                    | ObjectType::Subscription
520                    | ObjectType::Index
521                    | ObjectType::Table
522                    | ObjectType::Schema
523                    | ObjectType::Connection => true,
524                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
525                        bail_not_implemented!("DROP CASCADE");
526                    }
527                }
528            } else {
529                false
530            };
531            match object_type {
532                ObjectType::Table => {
533                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
534                        .await
535                }
536                ObjectType::MaterializedView => {
537                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
538                }
539                ObjectType::Index => {
540                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
541                        .await
542                }
543                ObjectType::Source => {
544                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
545                        .await
546                }
547                ObjectType::Sink => {
548                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
549                }
550                ObjectType::Subscription => {
551                    drop_subscription::handle_drop_subscription(
552                        handler_args,
553                        object_name,
554                        if_exists,
555                        cascade,
556                    )
557                    .await
558                }
559                ObjectType::Database => {
560                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
561                }
562                ObjectType::Schema => {
563                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
564                        .await
565                }
566                ObjectType::User => {
567                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
568                }
569                ObjectType::View => {
570                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
571                }
572                ObjectType::Connection => {
573                    drop_connection::handle_drop_connection(
574                        handler_args,
575                        object_name,
576                        if_exists,
577                        cascade,
578                    )
579                    .await
580                }
581                ObjectType::Secret => {
582                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
583                }
584            }
585        }
586        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
587        Statement::DropFunction {
588            if_exists,
589            func_desc,
590            option,
591        } => {
592            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
593                .await
594        }
595        Statement::DropAggregate {
596            if_exists,
597            func_desc,
598            option,
599        } => {
600            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
601                .await
602        }
603        Statement::Query(_)
604        | Statement::Insert { .. }
605        | Statement::Delete { .. }
606        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
607        Statement::Copy {
608            entity: CopyEntity::Query(query),
609            target: CopyTarget::Stdout,
610        } => {
611            let response =
612                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
613                    .await?;
614            Ok(response.into_copy_query_to_stdout())
615        }
616        Statement::CreateView {
617            materialized,
618            if_not_exists,
619            name,
620            columns,
621            query,
622            with_options: _, // It is put in OptimizerContext
623            or_replace,      // not supported
624            emit_mode,
625        } => {
626            if or_replace {
627                bail_not_implemented!("CREATE OR REPLACE VIEW");
628            }
629            if materialized {
630                create_mv::handle_create_mv(
631                    handler_args,
632                    if_not_exists,
633                    name,
634                    *query,
635                    columns,
636                    emit_mode,
637                )
638                .await
639            } else {
640                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
641                    .await
642            }
643        }
644        Statement::Flush => flush::handle_flush(handler_args).await,
645        Statement::Wait => wait::handle_wait(handler_args).await,
646        Statement::Recover => recover::handle_recover(handler_args).await,
647        Statement::SetVariable {
648            local: _,
649            variable,
650            value,
651        } => {
652            // special handle for `use database`
653            if variable.real_value().eq_ignore_ascii_case("database") {
654                let x = variable::set_var_to_param_str(&value);
655                let res = use_db::handle_use_db(
656                    handler_args,
657                    ObjectName::from(vec![Ident::from_real_value(
658                        x.as_deref().unwrap_or("default"),
659                    )]),
660                )?;
661                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
662                for notice in res.notices() {
663                    builder = builder.notice(notice);
664                }
665                return Ok(builder.into());
666            }
667            variable::handle_set(handler_args, variable, value)
668        }
669        Statement::SetTimeZone { local: _, value } => {
670            variable::handle_set_time_zone(handler_args, value)
671        }
672        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
673        Statement::CreateIndex {
674            name,
675            table_name,
676            method,
677            columns,
678            include,
679            distributed_by,
680            unique,
681            if_not_exists,
682            with_properties: _,
683        } => {
684            if unique {
685                bail_not_implemented!("create unique index");
686            }
687
688            create_index::handle_create_index(
689                handler_args,
690                if_not_exists,
691                name,
692                table_name,
693                method,
694                columns.clone(),
695                include,
696                distributed_by,
697            )
698            .await
699        }
700        Statement::AlterDatabase { name, operation } => match operation {
701            AlterDatabaseOperation::RenameDatabase { database_name } => {
702                alter_rename::handle_rename_database(handler_args, name, database_name).await
703            }
704            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
705                alter_owner::handle_alter_owner(
706                    handler_args,
707                    name,
708                    new_owner_name,
709                    StatementType::ALTER_DATABASE,
710                )
711                .await
712            }
713            AlterDatabaseOperation::SetParam(config_param) => {
714                let ConfigParam { param, value } = config_param;
715
716                let database_param = match param.real_value().to_uppercase().as_str() {
717                    "BARRIER_INTERVAL_MS" => {
718                        let barrier_interval_ms = match value {
719                            SetVariableValue::Default => None,
720                            SetVariableValue::Single(SetVariableValueSingle::Literal(
721                                Value::Number(num),
722                            )) => {
723                                let num = num.parse::<u32>().map_err(|e| {
724                                    ErrorCode::InvalidInputSyntax(format!(
725                                        "barrier_interval_ms must be a u32 integer: {}",
726                                        e.as_report()
727                                    ))
728                                })?;
729                                Some(num)
730                            }
731                            _ => {
732                                return Err(ErrorCode::InvalidInputSyntax(
733                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
734                                        .to_owned(),
735                                )
736                                .into());
737                            }
738                        };
739                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
740                    }
741                    "CHECKPOINT_FREQUENCY" => {
742                        let checkpoint_frequency = match value {
743                            SetVariableValue::Default => None,
744                            SetVariableValue::Single(SetVariableValueSingle::Literal(
745                                Value::Number(num),
746                            )) => {
747                                let num = num.parse::<u64>().map_err(|e| {
748                                    ErrorCode::InvalidInputSyntax(format!(
749                                        "checkpoint_frequency must be a u64 integer: {}",
750                                        e.as_report()
751                                    ))
752                                })?;
753                                Some(num)
754                            }
755                            _ => {
756                                return Err(ErrorCode::InvalidInputSyntax(
757                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
758                                        .to_owned(),
759                                )
760                                .into());
761                            }
762                        };
763                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
764                    }
765                    _ => {
766                        return Err(ErrorCode::InvalidInputSyntax(format!(
767                            "Unsupported database config parameter: {}",
768                            param.real_value()
769                        ))
770                        .into());
771                    }
772                };
773
774                alter_database_param::handle_alter_database_param(
775                    handler_args,
776                    name,
777                    database_param,
778                )
779                .await
780            }
781        },
782        Statement::AlterSchema { name, operation } => match operation {
783            AlterSchemaOperation::RenameSchema { schema_name } => {
784                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
785            }
786            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
787                alter_owner::handle_alter_owner(
788                    handler_args,
789                    name,
790                    new_owner_name,
791                    StatementType::ALTER_SCHEMA,
792                )
793                .await
794            }
795            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
796                alter_swap_rename::handle_swap_rename(
797                    handler_args,
798                    name,
799                    target_schema,
800                    StatementType::ALTER_SCHEMA,
801                )
802                .await
803            }
804        },
805        Statement::AlterTable { name, operation } => match operation {
806            AlterTableOperation::AddColumn { .. }
807            | AlterTableOperation::DropColumn { .. }
808            | AlterTableOperation::AlterColumn { .. } => {
809                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
810            }
811            AlterTableOperation::RenameTable { table_name } => {
812                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
813                    .await
814            }
815            AlterTableOperation::ChangeOwner { new_owner_name } => {
816                alter_owner::handle_alter_owner(
817                    handler_args,
818                    name,
819                    new_owner_name,
820                    StatementType::ALTER_TABLE,
821                )
822                .await
823            }
824            AlterTableOperation::SetParallelism {
825                parallelism,
826                deferred,
827            } => {
828                alter_parallelism::handle_alter_parallelism(
829                    handler_args,
830                    name,
831                    parallelism,
832                    StatementType::ALTER_TABLE,
833                    deferred,
834                )
835                .await
836            }
837            AlterTableOperation::SetSchema { new_schema_name } => {
838                alter_set_schema::handle_alter_set_schema(
839                    handler_args,
840                    name,
841                    new_schema_name,
842                    StatementType::ALTER_TABLE,
843                    None,
844                )
845                .await
846            }
847            AlterTableOperation::RefreshSchema => {
848                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
849            }
850            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
851                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
852                    handler_args,
853                    PbThrottleTarget::TableWithSource,
854                    name,
855                    rate_limit,
856                )
857                .await
858            }
859            AlterTableOperation::DropConnector => {
860                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
861                    .await
862            }
863            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
864                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
865                    handler_args,
866                    PbThrottleTarget::TableDml,
867                    name,
868                    rate_limit,
869                )
870                .await
871            }
872            AlterTableOperation::SetConfig { entries } => {
873                alter_streaming_config::handle_alter_streaming_set_config(
874                    handler_args,
875                    name,
876                    entries,
877                    StatementType::ALTER_TABLE,
878                )
879                .await
880            }
881            AlterTableOperation::ResetConfig { keys } => {
882                alter_streaming_config::handle_alter_streaming_reset_config(
883                    handler_args,
884                    name,
885                    keys,
886                    StatementType::ALTER_TABLE,
887                )
888                .await
889            }
890            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
891                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
892                    handler_args,
893                    PbThrottleTarget::CdcTable,
894                    name,
895                    rate_limit,
896                )
897                .await
898            }
899            AlterTableOperation::SwapRenameTable { target_table } => {
900                alter_swap_rename::handle_swap_rename(
901                    handler_args,
902                    name,
903                    target_table,
904                    StatementType::ALTER_TABLE,
905                )
906                .await
907            }
908            AlterTableOperation::AlterConnectorProps { alter_props } => {
909                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
910            }
911            AlterTableOperation::AddConstraint { .. }
912            | AlterTableOperation::DropConstraint { .. }
913            | AlterTableOperation::RenameColumn { .. }
914            | AlterTableOperation::ChangeColumn { .. }
915            | AlterTableOperation::RenameConstraint { .. } => {
916                bail_not_implemented!(
917                    "Unhandled statement: {}",
918                    Statement::AlterTable { name, operation }
919                )
920            }
921        },
922        Statement::AlterIndex { name, operation } => match operation {
923            AlterIndexOperation::RenameIndex { index_name } => {
924                alter_rename::handle_rename_index(handler_args, name, index_name).await
925            }
926            AlterIndexOperation::SetParallelism {
927                parallelism,
928                deferred,
929            } => {
930                alter_parallelism::handle_alter_parallelism(
931                    handler_args,
932                    name,
933                    parallelism,
934                    StatementType::ALTER_INDEX,
935                    deferred,
936                )
937                .await
938            }
939            AlterIndexOperation::SetConfig { entries } => {
940                alter_streaming_config::handle_alter_streaming_set_config(
941                    handler_args,
942                    name,
943                    entries,
944                    StatementType::ALTER_INDEX,
945                )
946                .await
947            }
948            AlterIndexOperation::ResetConfig { keys } => {
949                alter_streaming_config::handle_alter_streaming_reset_config(
950                    handler_args,
951                    name,
952                    keys,
953                    StatementType::ALTER_INDEX,
954                )
955                .await
956            }
957        },
958        Statement::AlterView {
959            materialized,
960            name,
961            operation,
962        } => {
963            let statement_type = if materialized {
964                StatementType::ALTER_MATERIALIZED_VIEW
965            } else {
966                StatementType::ALTER_VIEW
967            };
968            match operation {
969                AlterViewOperation::RenameView { view_name } => {
970                    if materialized {
971                        alter_rename::handle_rename_table(
972                            handler_args,
973                            TableType::MaterializedView,
974                            name,
975                            view_name,
976                        )
977                        .await
978                    } else {
979                        alter_rename::handle_rename_view(handler_args, name, view_name).await
980                    }
981                }
982                AlterViewOperation::SetParallelism {
983                    parallelism,
984                    deferred,
985                } => {
986                    if !materialized {
987                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
988                    }
989                    alter_parallelism::handle_alter_parallelism(
990                        handler_args,
991                        name,
992                        parallelism,
993                        statement_type,
994                        deferred,
995                    )
996                    .await
997                }
998                AlterViewOperation::SetResourceGroup {
999                    resource_group,
1000                    deferred,
1001                } => {
1002                    if !materialized {
1003                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1004                    }
1005                    alter_resource_group::handle_alter_resource_group(
1006                        handler_args,
1007                        name,
1008                        resource_group,
1009                        statement_type,
1010                        deferred,
1011                    )
1012                    .await
1013                }
1014                AlterViewOperation::ChangeOwner { new_owner_name } => {
1015                    alter_owner::handle_alter_owner(
1016                        handler_args,
1017                        name,
1018                        new_owner_name,
1019                        statement_type,
1020                    )
1021                    .await
1022                }
1023                AlterViewOperation::SetSchema { new_schema_name } => {
1024                    alter_set_schema::handle_alter_set_schema(
1025                        handler_args,
1026                        name,
1027                        new_schema_name,
1028                        statement_type,
1029                        None,
1030                    )
1031                    .await
1032                }
1033                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1034                    if !materialized {
1035                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1036                    }
1037                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1038                        handler_args,
1039                        PbThrottleTarget::Mv,
1040                        name,
1041                        rate_limit,
1042                    )
1043                    .await
1044                }
1045                AlterViewOperation::SwapRenameView { target_view } => {
1046                    alter_swap_rename::handle_swap_rename(
1047                        handler_args,
1048                        name,
1049                        target_view,
1050                        statement_type,
1051                    )
1052                    .await
1053                }
1054                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1055                    if !materialized {
1056                        bail!(
1057                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1058                        );
1059                    }
1060                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1061                }
1062                AlterViewOperation::AsQuery { query } => {
1063                    if !materialized {
1064                        bail_not_implemented!("ALTER VIEW AS QUERY");
1065                    }
1066                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1067                    if !cfg!(debug_assertions) {
1068                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1069                    }
1070                    alter_mv::handle_alter_mv(handler_args, name, query).await
1071                }
1072                AlterViewOperation::SetConfig { entries } => {
1073                    if !materialized {
1074                        bail!("SET CONFIG is only supported for materialized views");
1075                    }
1076                    alter_streaming_config::handle_alter_streaming_set_config(
1077                        handler_args,
1078                        name,
1079                        entries,
1080                        statement_type,
1081                    )
1082                    .await
1083                }
1084                AlterViewOperation::ResetConfig { keys } => {
1085                    if !materialized {
1086                        bail!("RESET CONFIG is only supported for materialized views");
1087                    }
1088                    alter_streaming_config::handle_alter_streaming_reset_config(
1089                        handler_args,
1090                        name,
1091                        keys,
1092                        statement_type,
1093                    )
1094                    .await
1095                }
1096            }
1097        }
1098
1099        Statement::AlterSink { name, operation } => match operation {
1100            AlterSinkOperation::AlterConnectorProps {
1101                alter_props: changed_props,
1102            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1103            AlterSinkOperation::RenameSink { sink_name } => {
1104                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1105            }
1106            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1107                alter_owner::handle_alter_owner(
1108                    handler_args,
1109                    name,
1110                    new_owner_name,
1111                    StatementType::ALTER_SINK,
1112                )
1113                .await
1114            }
1115            AlterSinkOperation::SetSchema { new_schema_name } => {
1116                alter_set_schema::handle_alter_set_schema(
1117                    handler_args,
1118                    name,
1119                    new_schema_name,
1120                    StatementType::ALTER_SINK,
1121                    None,
1122                )
1123                .await
1124            }
1125            AlterSinkOperation::SetParallelism {
1126                parallelism,
1127                deferred,
1128            } => {
1129                alter_parallelism::handle_alter_parallelism(
1130                    handler_args,
1131                    name,
1132                    parallelism,
1133                    StatementType::ALTER_SINK,
1134                    deferred,
1135                )
1136                .await
1137            }
1138            AlterSinkOperation::SetConfig { entries } => {
1139                alter_streaming_config::handle_alter_streaming_set_config(
1140                    handler_args,
1141                    name,
1142                    entries,
1143                    StatementType::ALTER_SINK,
1144                )
1145                .await
1146            }
1147            AlterSinkOperation::ResetConfig { keys } => {
1148                alter_streaming_config::handle_alter_streaming_reset_config(
1149                    handler_args,
1150                    name,
1151                    keys,
1152                    StatementType::ALTER_SINK,
1153                )
1154                .await
1155            }
1156            AlterSinkOperation::SwapRenameSink { target_sink } => {
1157                alter_swap_rename::handle_swap_rename(
1158                    handler_args,
1159                    name,
1160                    target_sink,
1161                    StatementType::ALTER_SINK,
1162                )
1163                .await
1164            }
1165            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1166                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1167                    handler_args,
1168                    PbThrottleTarget::Sink,
1169                    name,
1170                    rate_limit,
1171                )
1172                .await
1173            }
1174            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1175                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1176                    handler_args,
1177                    name,
1178                    enable,
1179                )
1180                .await
1181            }
1182        },
1183        Statement::AlterSubscription { name, operation } => match operation {
1184            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1185                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1186                    .await
1187            }
1188            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1189                alter_owner::handle_alter_owner(
1190                    handler_args,
1191                    name,
1192                    new_owner_name,
1193                    StatementType::ALTER_SUBSCRIPTION,
1194                )
1195                .await
1196            }
1197            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1198                alter_set_schema::handle_alter_set_schema(
1199                    handler_args,
1200                    name,
1201                    new_schema_name,
1202                    StatementType::ALTER_SUBSCRIPTION,
1203                    None,
1204                )
1205                .await
1206            }
1207            AlterSubscriptionOperation::SwapRenameSubscription {
1208                target_subscription,
1209            } => {
1210                alter_swap_rename::handle_swap_rename(
1211                    handler_args,
1212                    name,
1213                    target_subscription,
1214                    StatementType::ALTER_SUBSCRIPTION,
1215                )
1216                .await
1217            }
1218        },
1219        Statement::AlterSource { name, operation } => match operation {
1220            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1221                alter_source_props::handle_alter_source_connector_props(
1222                    handler_args,
1223                    name,
1224                    alter_props,
1225                )
1226                .await
1227            }
1228            AlterSourceOperation::RenameSource { source_name } => {
1229                alter_rename::handle_rename_source(handler_args, name, source_name).await
1230            }
1231            AlterSourceOperation::AddColumn { .. } => {
1232                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1233            }
1234            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1235                alter_owner::handle_alter_owner(
1236                    handler_args,
1237                    name,
1238                    new_owner_name,
1239                    StatementType::ALTER_SOURCE,
1240                )
1241                .await
1242            }
1243            AlterSourceOperation::SetSchema { new_schema_name } => {
1244                alter_set_schema::handle_alter_set_schema(
1245                    handler_args,
1246                    name,
1247                    new_schema_name,
1248                    StatementType::ALTER_SOURCE,
1249                    None,
1250                )
1251                .await
1252            }
1253            AlterSourceOperation::FormatEncode { format_encode } => {
1254                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1255                    .await
1256            }
1257            AlterSourceOperation::RefreshSchema => {
1258                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1259            }
1260            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1261                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1262                    handler_args,
1263                    PbThrottleTarget::Source,
1264                    name,
1265                    rate_limit,
1266                )
1267                .await
1268            }
1269            AlterSourceOperation::SwapRenameSource { target_source } => {
1270                alter_swap_rename::handle_swap_rename(
1271                    handler_args,
1272                    name,
1273                    target_source,
1274                    StatementType::ALTER_SOURCE,
1275                )
1276                .await
1277            }
1278            AlterSourceOperation::SetParallelism {
1279                parallelism,
1280                deferred,
1281            } => {
1282                alter_parallelism::handle_alter_parallelism(
1283                    handler_args,
1284                    name,
1285                    parallelism,
1286                    StatementType::ALTER_SOURCE,
1287                    deferred,
1288                )
1289                .await
1290            }
1291            AlterSourceOperation::SetConfig { entries } => {
1292                alter_streaming_config::handle_alter_streaming_set_config(
1293                    handler_args,
1294                    name,
1295                    entries,
1296                    StatementType::ALTER_SOURCE,
1297                )
1298                .await
1299            }
1300            AlterSourceOperation::ResetConfig { keys } => {
1301                alter_streaming_config::handle_alter_streaming_reset_config(
1302                    handler_args,
1303                    name,
1304                    keys,
1305                    StatementType::ALTER_SOURCE,
1306                )
1307                .await
1308            }
1309        },
1310        Statement::AlterFunction {
1311            name,
1312            args,
1313            operation,
1314        } => match operation {
1315            AlterFunctionOperation::SetSchema { new_schema_name } => {
1316                alter_set_schema::handle_alter_set_schema(
1317                    handler_args,
1318                    name,
1319                    new_schema_name,
1320                    StatementType::ALTER_FUNCTION,
1321                    args,
1322                )
1323                .await
1324            }
1325        },
1326        Statement::AlterConnection { name, operation } => match operation {
1327            AlterConnectionOperation::SetSchema { new_schema_name } => {
1328                alter_set_schema::handle_alter_set_schema(
1329                    handler_args,
1330                    name,
1331                    new_schema_name,
1332                    StatementType::ALTER_CONNECTION,
1333                    None,
1334                )
1335                .await
1336            }
1337            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1338                alter_owner::handle_alter_owner(
1339                    handler_args,
1340                    name,
1341                    new_owner_name,
1342                    StatementType::ALTER_CONNECTION,
1343                )
1344                .await
1345            }
1346            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1347                alter_connection_props::handle_alter_connection_connector_props(
1348                    handler_args,
1349                    name,
1350                    alter_props,
1351                )
1352                .await
1353            }
1354        },
1355        Statement::AlterSystem { param, value } => {
1356            alter_system::handle_alter_system(handler_args, param, value).await
1357        }
1358        Statement::AlterSecret {
1359            name,
1360            with_options,
1361            operation,
1362        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1363        Statement::AlterFragment {
1364            fragment_ids,
1365            operation,
1366        } => match operation {
1367            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1368                let [fragment_id] = fragment_ids.as_slice() else {
1369                    return Err(ErrorCode::InvalidInputSyntax(
1370                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1371                            .to_owned(),
1372                    )
1373                    .into());
1374                };
1375                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1376                    &handler_args.session,
1377                    PbThrottleTarget::Fragment,
1378                    *fragment_id,
1379                    rate_limit,
1380                    StatementType::SET_VARIABLE,
1381                )
1382                .await
1383            }
1384            AlterFragmentOperation::SetParallelism { parallelism } => {
1385                alter_parallelism::handle_alter_fragment_parallelism(
1386                    handler_args,
1387                    fragment_ids.into_iter().map_into().collect(),
1388                    parallelism,
1389                )
1390                .await
1391            }
1392        },
1393        Statement::AlterDefaultPrivileges { .. } => {
1394            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1395        }
1396        Statement::StartTransaction { modes } => {
1397            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1398        }
1399        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1400        Statement::Commit { chain } => {
1401            transaction::handle_commit(handler_args, COMMIT, chain).await
1402        }
1403        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1404        Statement::Rollback { chain } => {
1405            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1406        }
1407        Statement::SetTransaction {
1408            modes,
1409            snapshot,
1410            session,
1411        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1412        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1413        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1414        Statement::Comment {
1415            object_type,
1416            object_name,
1417            comment,
1418        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1419        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1420        Statement::Prepare {
1421            name,
1422            data_types,
1423            statement,
1424        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1425        Statement::Deallocate { name, prepare } => {
1426            prepared_statement::handle_deallocate(name, prepare).await
1427        }
1428        Statement::Vacuum { object_name, full } => {
1429            vacuum::handle_vacuum(handler_args, object_name, full).await
1430        }
1431        Statement::Refresh { table_name } => {
1432            refresh::handle_refresh(handler_args, table_name).await
1433        }
1434        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1435    }
1436}
1437
1438fn check_ban_ddl_for_iceberg_engine_table(
1439    session: Arc<SessionImpl>,
1440    stmt: &Statement,
1441) -> Result<()> {
1442    match stmt {
1443        Statement::AlterTable {
1444            name,
1445            operation:
1446                operation @ (AlterTableOperation::AddColumn { .. }
1447                | AlterTableOperation::DropColumn { .. }),
1448        } => {
1449            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1450            if table.is_iceberg_engine_table() {
1451                bail!(
1452                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1453                    operation,
1454                    schema_name,
1455                    name
1456                );
1457            }
1458        }
1459
1460        Statement::AlterTable {
1461            name,
1462            operation: AlterTableOperation::RenameTable { .. },
1463        } => {
1464            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1465            if table.is_iceberg_engine_table() {
1466                bail!(
1467                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1468                    schema_name,
1469                    name
1470                );
1471            }
1472        }
1473
1474        Statement::AlterTable {
1475            name,
1476            operation: AlterTableOperation::SetParallelism { .. },
1477        } => {
1478            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1479            if table.is_iceberg_engine_table() {
1480                bail!(
1481                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1482                    schema_name,
1483                    name
1484                );
1485            }
1486        }
1487
1488        Statement::AlterTable {
1489            name,
1490            operation: AlterTableOperation::SetSchema { .. },
1491        } => {
1492            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1493            if table.is_iceberg_engine_table() {
1494                bail!(
1495                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1496                    schema_name,
1497                    name
1498                );
1499            }
1500        }
1501
1502        Statement::AlterTable {
1503            name,
1504            operation: AlterTableOperation::RefreshSchema,
1505        } => {
1506            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1507            if table.is_iceberg_engine_table() {
1508                bail!(
1509                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1510                    schema_name,
1511                    name
1512                );
1513            }
1514        }
1515
1516        Statement::AlterTable {
1517            name,
1518            operation: AlterTableOperation::SetSourceRateLimit { .. },
1519        } => {
1520            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1521            if table.is_iceberg_engine_table() {
1522                bail!(
1523                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1524                    schema_name,
1525                    name
1526                );
1527            }
1528        }
1529
1530        _ => {}
1531    }
1532
1533    Ok(())
1534}