risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_config;
58mod alter_streaming_enable_unaligned_join;
59mod alter_streaming_rate_limit;
60mod alter_swap_rename;
61mod alter_system;
62mod alter_table_column;
63pub mod alter_table_drop_connector;
64pub mod alter_table_props;
65mod alter_table_with_sr;
66pub mod alter_user;
67mod alter_utils;
68pub mod cancel_job;
69pub mod close_cursor;
70mod comment;
71pub mod create_aggregate;
72pub mod create_connection;
73mod create_database;
74pub mod create_function;
75pub mod create_index;
76pub mod create_mv;
77pub mod create_schema;
78pub mod create_secret;
79pub mod create_sink;
80pub mod create_source;
81pub mod create_sql_function;
82pub mod create_subscription;
83pub mod create_table;
84pub mod create_table_as;
85pub mod create_user;
86pub mod create_view;
87pub mod declare_cursor;
88pub mod describe;
89pub mod discard;
90mod drop_connection;
91mod drop_database;
92pub mod drop_function;
93mod drop_index;
94pub mod drop_mv;
95mod drop_schema;
96pub mod drop_secret;
97pub mod drop_sink;
98pub mod drop_source;
99pub mod drop_subscription;
100pub mod drop_table;
101pub mod drop_user;
102mod drop_view;
103pub mod explain;
104pub mod explain_analyze_stream_job;
105pub mod extended_handle;
106pub mod fetch_cursor;
107mod flush;
108pub mod handle_privilege;
109pub mod kill_process;
110mod prepared_statement;
111pub mod privilege;
112pub mod query;
113mod recover;
114mod refresh;
115pub mod show;
116mod transaction;
117mod use_db;
118pub mod util;
119pub mod vacuum;
120pub mod variable;
121mod wait;
122
123pub use alter_table_column::{
124    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
125};
126
127/// The [`PgResponseBuilder`] used by RisingWave.
128pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
129
130/// The [`PgResponse`] used by RisingWave.
131pub type RwPgResponse = PgResponse<PgResponseStream>;
132
133#[easy_ext::ext(RwPgResponseBuilderExt)]
134impl RwPgResponseBuilder {
135    /// Append rows to the response.
136    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
137        let fields = T::fields();
138        self.values(
139            rows.into_iter()
140                .map(|row| {
141                    Row::new(
142                        row.into_owned_row()
143                            .into_iter()
144                            .zip_eq_fast(&fields)
145                            .map(|(datum, (_, ty))| {
146                                datum.map(|scalar| {
147                                    scalar.as_scalar_ref_impl().text_format(ty).into()
148                                })
149                            })
150                            .collect(),
151                    )
152                })
153                .collect_vec()
154                .into(),
155            fields_to_descriptors(fields),
156        )
157    }
158}
159
160pub fn fields_to_descriptors(
161    fields: Vec<(&str, risingwave_common::types::DataType)>,
162) -> Vec<PgFieldDescriptor> {
163    fields
164        .iter()
165        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
166        .collect()
167}
168
169pub enum PgResponseStream {
170    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
171    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
172    Rows(BoxStream<'static, RowSetResult>),
173}
174
175impl Stream for PgResponseStream {
176    type Item = std::result::Result<Vec<Row>, BoxedError>;
177
178    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
179        match &mut *self {
180            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
181            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
182            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
183        }
184    }
185}
186
187impl From<Vec<Row>> for PgResponseStream {
188    fn from(rows: Vec<Row>) -> Self {
189        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
190    }
191}
192
193#[derive(Clone)]
194pub struct HandlerArgs {
195    pub session: Arc<SessionImpl>,
196    pub sql: Arc<str>,
197    pub normalized_sql: String,
198    pub with_options: WithOptions,
199}
200
201impl HandlerArgs {
202    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
203        Ok(Self {
204            session,
205            sql,
206            with_options: WithOptions::try_from(stmt)?,
207            normalized_sql: Self::normalize_sql(stmt),
208        })
209    }
210
211    /// Get normalized SQL from the statement.
212    ///
213    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
214    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
215    ///   make it suitable for the `SHOW CREATE` statements.
216    fn normalize_sql(stmt: &Statement) -> String {
217        let mut stmt = stmt.clone();
218        match &mut stmt {
219            Statement::CreateView {
220                or_replace,
221                if_not_exists,
222                ..
223            } => {
224                *or_replace = false;
225                *if_not_exists = false;
226            }
227            Statement::CreateTable {
228                or_replace,
229                if_not_exists,
230                ..
231            } => {
232                *or_replace = false;
233                *if_not_exists = false;
234            }
235            Statement::CreateIndex { if_not_exists, .. } => {
236                *if_not_exists = false;
237            }
238            Statement::CreateSource {
239                stmt: CreateSourceStatement { if_not_exists, .. },
240                ..
241            } => {
242                *if_not_exists = false;
243            }
244            Statement::CreateSink {
245                stmt: CreateSinkStatement { if_not_exists, .. },
246            } => {
247                *if_not_exists = false;
248            }
249            Statement::CreateSubscription {
250                stmt: CreateSubscriptionStatement { if_not_exists, .. },
251            } => {
252                *if_not_exists = false;
253            }
254            Statement::CreateConnection {
255                stmt: CreateConnectionStatement { if_not_exists, .. },
256            } => {
257                *if_not_exists = false;
258            }
259            _ => {}
260        }
261        stmt.to_string()
262    }
263}
264
265pub async fn handle(
266    session: Arc<SessionImpl>,
267    stmt: Statement,
268    sql: Arc<str>,
269    formats: Vec<Format>,
270) -> Result<RwPgResponse> {
271    session.clear_cancel_query_flag();
272    let _guard = session.txn_begin_implicit();
273    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
274
275    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
276
277    match stmt {
278        Statement::Explain {
279            statement,
280            analyze,
281            options,
282        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
283        Statement::ExplainAnalyzeStreamJob {
284            target,
285            duration_secs,
286        } => {
287            explain_analyze_stream_job::handle_explain_analyze_stream_job(
288                handler_args,
289                target,
290                duration_secs,
291            )
292            .await
293        }
294        Statement::CreateSource { stmt } => {
295            create_source::handle_create_source(handler_args, stmt).await
296        }
297        Statement::CreateSink { stmt } => {
298            create_sink::handle_create_sink(handler_args, stmt, false).await
299        }
300        Statement::CreateSubscription { stmt } => {
301            create_subscription::handle_create_subscription(handler_args, stmt).await
302        }
303        Statement::CreateConnection { stmt } => {
304            create_connection::handle_create_connection(handler_args, stmt).await
305        }
306        Statement::CreateSecret { stmt } => {
307            create_secret::handle_create_secret(handler_args, stmt).await
308        }
309        Statement::CreateFunction {
310            or_replace,
311            temporary,
312            if_not_exists,
313            name,
314            args,
315            returns,
316            params,
317            with_options,
318        } => {
319            // For general udf, `language` clause could be ignored
320            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
321            if params.language.is_none()
322                || !params
323                    .language
324                    .as_ref()
325                    .unwrap()
326                    .real_value()
327                    .eq_ignore_ascii_case("sql")
328            {
329                create_function::handle_create_function(
330                    handler_args,
331                    or_replace,
332                    temporary,
333                    if_not_exists,
334                    name,
335                    args,
336                    returns,
337                    params,
338                    with_options,
339                )
340                .await
341            } else {
342                create_sql_function::handle_create_sql_function(
343                    handler_args,
344                    or_replace,
345                    temporary,
346                    if_not_exists,
347                    name,
348                    args,
349                    returns,
350                    params,
351                )
352                .await
353            }
354        }
355        Statement::CreateAggregate {
356            or_replace,
357            if_not_exists,
358            name,
359            args,
360            returns,
361            params,
362            ..
363        } => {
364            create_aggregate::handle_create_aggregate(
365                handler_args,
366                or_replace,
367                if_not_exists,
368                name,
369                args,
370                returns,
371                params,
372            )
373            .await
374        }
375        Statement::CreateTable {
376            name,
377            columns,
378            wildcard_idx,
379            constraints,
380            query,
381            with_options: _, // It is put in OptimizerContext
382            // Not supported things
383            or_replace,
384            temporary,
385            if_not_exists,
386            format_encode,
387            source_watermarks,
388            append_only,
389            on_conflict,
390            with_version_columns,
391            cdc_table_info,
392            include_column_options,
393            webhook_info,
394            engine,
395        } => {
396            if or_replace {
397                bail_not_implemented!("CREATE OR REPLACE TABLE");
398            }
399            if temporary {
400                bail_not_implemented!("CREATE TEMPORARY TABLE");
401            }
402            if let Some(query) = query {
403                return create_table_as::handle_create_as(
404                    handler_args,
405                    name,
406                    if_not_exists,
407                    query,
408                    columns,
409                    append_only,
410                    on_conflict,
411                    with_version_columns
412                        .iter()
413                        .map(|col| col.real_value())
414                        .collect(),
415                    engine,
416                )
417                .await;
418            }
419            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
420            create_table::handle_create_table(
421                handler_args,
422                name,
423                columns,
424                wildcard_idx,
425                constraints,
426                if_not_exists,
427                format_encode,
428                source_watermarks,
429                append_only,
430                on_conflict,
431                with_version_columns
432                    .iter()
433                    .map(|col| col.real_value())
434                    .collect(),
435                cdc_table_info,
436                include_column_options,
437                webhook_info,
438                engine,
439            )
440            .await
441        }
442        Statement::CreateDatabase {
443            db_name,
444            if_not_exists,
445            owner,
446            resource_group,
447            barrier_interval_ms,
448            checkpoint_frequency,
449        } => {
450            create_database::handle_create_database(
451                handler_args,
452                db_name,
453                if_not_exists,
454                owner,
455                resource_group,
456                barrier_interval_ms,
457                checkpoint_frequency,
458            )
459            .await
460        }
461        Statement::CreateSchema {
462            schema_name,
463            if_not_exists,
464            owner,
465        } => {
466            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
467                .await
468        }
469        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
470        Statement::DeclareCursor { stmt } => {
471            declare_cursor::handle_declare_cursor(handler_args, stmt).await
472        }
473        Statement::FetchCursor { stmt } => {
474            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
475        }
476        Statement::CloseCursor { stmt } => {
477            close_cursor::handle_close_cursor(handler_args, stmt).await
478        }
479        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
480        Statement::Grant { .. } => {
481            handle_privilege::handle_grant_privilege(handler_args, stmt).await
482        }
483        Statement::Revoke { .. } => {
484            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
485        }
486        Statement::Describe { name, kind } => match kind {
487            DescribeKind::Fragments => {
488                describe::handle_describe_fragments(handler_args, name).await
489            }
490            DescribeKind::Plain => describe::handle_describe(handler_args, name),
491        },
492        Statement::DescribeFragment { fragment_id } => {
493            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
494        }
495        Statement::Discard(..) => discard::handle_discard(handler_args),
496        Statement::ShowObjects {
497            object: show_object,
498            filter,
499        } => show::handle_show_object(handler_args, show_object, filter).await,
500        Statement::ShowCreateObject { create_type, name } => {
501            show::handle_show_create_object(handler_args, create_type, name)
502        }
503        Statement::ShowTransactionIsolationLevel => {
504            transaction::handle_show_isolation_level(handler_args)
505        }
506        Statement::Drop(DropStatement {
507            object_type,
508            object_name,
509            if_exists,
510            drop_mode,
511        }) => {
512            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
513                match object_type {
514                    ObjectType::MaterializedView
515                    | ObjectType::View
516                    | ObjectType::Sink
517                    | ObjectType::Source
518                    | ObjectType::Subscription
519                    | ObjectType::Index
520                    | ObjectType::Table
521                    | ObjectType::Schema
522                    | ObjectType::Connection => true,
523                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
524                        bail_not_implemented!("DROP CASCADE");
525                    }
526                }
527            } else {
528                false
529            };
530            match object_type {
531                ObjectType::Table => {
532                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
533                        .await
534                }
535                ObjectType::MaterializedView => {
536                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
537                }
538                ObjectType::Index => {
539                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
540                        .await
541                }
542                ObjectType::Source => {
543                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
544                        .await
545                }
546                ObjectType::Sink => {
547                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
548                }
549                ObjectType::Subscription => {
550                    drop_subscription::handle_drop_subscription(
551                        handler_args,
552                        object_name,
553                        if_exists,
554                        cascade,
555                    )
556                    .await
557                }
558                ObjectType::Database => {
559                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
560                }
561                ObjectType::Schema => {
562                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
563                        .await
564                }
565                ObjectType::User => {
566                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
567                }
568                ObjectType::View => {
569                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
570                }
571                ObjectType::Connection => {
572                    drop_connection::handle_drop_connection(
573                        handler_args,
574                        object_name,
575                        if_exists,
576                        cascade,
577                    )
578                    .await
579                }
580                ObjectType::Secret => {
581                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
582                }
583            }
584        }
585        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
586        Statement::DropFunction {
587            if_exists,
588            func_desc,
589            option,
590        } => {
591            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
592                .await
593        }
594        Statement::DropAggregate {
595            if_exists,
596            func_desc,
597            option,
598        } => {
599            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
600                .await
601        }
602        Statement::Query(_)
603        | Statement::Insert { .. }
604        | Statement::Delete { .. }
605        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
606        Statement::Copy {
607            entity: CopyEntity::Query(query),
608            target: CopyTarget::Stdout,
609        } => {
610            let response =
611                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
612                    .await?;
613            Ok(response.into_copy_query_to_stdout())
614        }
615        Statement::CreateView {
616            materialized,
617            if_not_exists,
618            name,
619            columns,
620            query,
621            with_options: _, // It is put in OptimizerContext
622            or_replace,      // not supported
623            emit_mode,
624        } => {
625            if or_replace {
626                bail_not_implemented!("CREATE OR REPLACE VIEW");
627            }
628            if materialized {
629                create_mv::handle_create_mv(
630                    handler_args,
631                    if_not_exists,
632                    name,
633                    *query,
634                    columns,
635                    emit_mode,
636                )
637                .await
638            } else {
639                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
640                    .await
641            }
642        }
643        Statement::Flush => flush::handle_flush(handler_args).await,
644        Statement::Wait => wait::handle_wait(handler_args).await,
645        Statement::Recover => recover::handle_recover(handler_args).await,
646        Statement::SetVariable {
647            local: _,
648            variable,
649            value,
650        } => {
651            // special handle for `use database`
652            if variable.real_value().eq_ignore_ascii_case("database") {
653                let x = variable::set_var_to_param_str(&value);
654                let res = use_db::handle_use_db(
655                    handler_args,
656                    ObjectName::from(vec![Ident::from_real_value(
657                        x.as_deref().unwrap_or("default"),
658                    )]),
659                )?;
660                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
661                for notice in res.notices() {
662                    builder = builder.notice(notice);
663                }
664                return Ok(builder.into());
665            }
666            variable::handle_set(handler_args, variable, value)
667        }
668        Statement::SetTimeZone { local: _, value } => {
669            variable::handle_set_time_zone(handler_args, value)
670        }
671        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
672        Statement::CreateIndex {
673            name,
674            table_name,
675            method,
676            columns,
677            include,
678            distributed_by,
679            unique,
680            if_not_exists,
681            with_properties: _,
682        } => {
683            if unique {
684                bail_not_implemented!("create unique index");
685            }
686
687            create_index::handle_create_index(
688                handler_args,
689                if_not_exists,
690                name,
691                table_name,
692                method,
693                columns.clone(),
694                include,
695                distributed_by,
696            )
697            .await
698        }
699        Statement::AlterDatabase { name, operation } => match operation {
700            AlterDatabaseOperation::RenameDatabase { database_name } => {
701                alter_rename::handle_rename_database(handler_args, name, database_name).await
702            }
703            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
704                alter_owner::handle_alter_owner(
705                    handler_args,
706                    name,
707                    new_owner_name,
708                    StatementType::ALTER_DATABASE,
709                )
710                .await
711            }
712            AlterDatabaseOperation::SetParam(config_param) => {
713                let ConfigParam { param, value } = config_param;
714
715                let database_param = match param.real_value().to_uppercase().as_str() {
716                    "BARRIER_INTERVAL_MS" => {
717                        let barrier_interval_ms = match value {
718                            SetVariableValue::Default => None,
719                            SetVariableValue::Single(SetVariableValueSingle::Literal(
720                                Value::Number(num),
721                            )) => {
722                                let num = num.parse::<u32>().map_err(|e| {
723                                    ErrorCode::InvalidInputSyntax(format!(
724                                        "barrier_interval_ms must be a u32 integer: {}",
725                                        e.as_report()
726                                    ))
727                                })?;
728                                Some(num)
729                            }
730                            _ => {
731                                return Err(ErrorCode::InvalidInputSyntax(
732                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
733                                        .to_owned(),
734                                )
735                                .into());
736                            }
737                        };
738                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
739                    }
740                    "CHECKPOINT_FREQUENCY" => {
741                        let checkpoint_frequency = match value {
742                            SetVariableValue::Default => None,
743                            SetVariableValue::Single(SetVariableValueSingle::Literal(
744                                Value::Number(num),
745                            )) => {
746                                let num = num.parse::<u64>().map_err(|e| {
747                                    ErrorCode::InvalidInputSyntax(format!(
748                                        "checkpoint_frequency must be a u64 integer: {}",
749                                        e.as_report()
750                                    ))
751                                })?;
752                                Some(num)
753                            }
754                            _ => {
755                                return Err(ErrorCode::InvalidInputSyntax(
756                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
757                                        .to_owned(),
758                                )
759                                .into());
760                            }
761                        };
762                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
763                    }
764                    _ => {
765                        return Err(ErrorCode::InvalidInputSyntax(format!(
766                            "Unsupported database config parameter: {}",
767                            param.real_value()
768                        ))
769                        .into());
770                    }
771                };
772
773                alter_database_param::handle_alter_database_param(
774                    handler_args,
775                    name,
776                    database_param,
777                )
778                .await
779            }
780        },
781        Statement::AlterSchema { name, operation } => match operation {
782            AlterSchemaOperation::RenameSchema { schema_name } => {
783                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
784            }
785            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
786                alter_owner::handle_alter_owner(
787                    handler_args,
788                    name,
789                    new_owner_name,
790                    StatementType::ALTER_SCHEMA,
791                )
792                .await
793            }
794            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
795                alter_swap_rename::handle_swap_rename(
796                    handler_args,
797                    name,
798                    target_schema,
799                    StatementType::ALTER_SCHEMA,
800                )
801                .await
802            }
803        },
804        Statement::AlterTable { name, operation } => match operation {
805            AlterTableOperation::AddColumn { .. }
806            | AlterTableOperation::DropColumn { .. }
807            | AlterTableOperation::AlterColumn { .. } => {
808                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
809            }
810            AlterTableOperation::RenameTable { table_name } => {
811                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
812                    .await
813            }
814            AlterTableOperation::ChangeOwner { new_owner_name } => {
815                alter_owner::handle_alter_owner(
816                    handler_args,
817                    name,
818                    new_owner_name,
819                    StatementType::ALTER_TABLE,
820                )
821                .await
822            }
823            AlterTableOperation::SetParallelism {
824                parallelism,
825                deferred,
826            } => {
827                alter_parallelism::handle_alter_parallelism(
828                    handler_args,
829                    name,
830                    parallelism,
831                    StatementType::ALTER_TABLE,
832                    deferred,
833                )
834                .await
835            }
836            AlterTableOperation::SetSchema { new_schema_name } => {
837                alter_set_schema::handle_alter_set_schema(
838                    handler_args,
839                    name,
840                    new_schema_name,
841                    StatementType::ALTER_TABLE,
842                    None,
843                )
844                .await
845            }
846            AlterTableOperation::RefreshSchema => {
847                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
848            }
849            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
850                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
851                    handler_args,
852                    PbThrottleTarget::TableWithSource,
853                    name,
854                    rate_limit,
855                )
856                .await
857            }
858            AlterTableOperation::DropConnector => {
859                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
860                    .await
861            }
862            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
863                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
864                    handler_args,
865                    PbThrottleTarget::TableDml,
866                    name,
867                    rate_limit,
868                )
869                .await
870            }
871            AlterTableOperation::SetConfig { entries } => {
872                alter_streaming_config::handle_alter_streaming_set_config(
873                    handler_args,
874                    name,
875                    entries,
876                    StatementType::ALTER_TABLE,
877                )
878                .await
879            }
880            AlterTableOperation::ResetConfig { keys } => {
881                alter_streaming_config::handle_alter_streaming_reset_config(
882                    handler_args,
883                    name,
884                    keys,
885                    StatementType::ALTER_TABLE,
886                )
887                .await
888            }
889            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
890                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
891                    handler_args,
892                    PbThrottleTarget::CdcTable,
893                    name,
894                    rate_limit,
895                )
896                .await
897            }
898            AlterTableOperation::SwapRenameTable { target_table } => {
899                alter_swap_rename::handle_swap_rename(
900                    handler_args,
901                    name,
902                    target_table,
903                    StatementType::ALTER_TABLE,
904                )
905                .await
906            }
907            AlterTableOperation::AlterConnectorProps { alter_props } => {
908                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
909            }
910            AlterTableOperation::AddConstraint { .. }
911            | AlterTableOperation::DropConstraint { .. }
912            | AlterTableOperation::RenameColumn { .. }
913            | AlterTableOperation::ChangeColumn { .. }
914            | AlterTableOperation::RenameConstraint { .. } => {
915                bail_not_implemented!(
916                    "Unhandled statement: {}",
917                    Statement::AlterTable { name, operation }
918                )
919            }
920        },
921        Statement::AlterIndex { name, operation } => match operation {
922            AlterIndexOperation::RenameIndex { index_name } => {
923                alter_rename::handle_rename_index(handler_args, name, index_name).await
924            }
925            AlterIndexOperation::SetParallelism {
926                parallelism,
927                deferred,
928            } => {
929                alter_parallelism::handle_alter_parallelism(
930                    handler_args,
931                    name,
932                    parallelism,
933                    StatementType::ALTER_INDEX,
934                    deferred,
935                )
936                .await
937            }
938            AlterIndexOperation::SetConfig { entries } => {
939                alter_streaming_config::handle_alter_streaming_set_config(
940                    handler_args,
941                    name,
942                    entries,
943                    StatementType::ALTER_INDEX,
944                )
945                .await
946            }
947            AlterIndexOperation::ResetConfig { keys } => {
948                alter_streaming_config::handle_alter_streaming_reset_config(
949                    handler_args,
950                    name,
951                    keys,
952                    StatementType::ALTER_INDEX,
953                )
954                .await
955            }
956        },
957        Statement::AlterView {
958            materialized,
959            name,
960            operation,
961        } => {
962            let statement_type = if materialized {
963                StatementType::ALTER_MATERIALIZED_VIEW
964            } else {
965                StatementType::ALTER_VIEW
966            };
967            match operation {
968                AlterViewOperation::RenameView { view_name } => {
969                    if materialized {
970                        alter_rename::handle_rename_table(
971                            handler_args,
972                            TableType::MaterializedView,
973                            name,
974                            view_name,
975                        )
976                        .await
977                    } else {
978                        alter_rename::handle_rename_view(handler_args, name, view_name).await
979                    }
980                }
981                AlterViewOperation::SetParallelism {
982                    parallelism,
983                    deferred,
984                } => {
985                    if !materialized {
986                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
987                    }
988                    alter_parallelism::handle_alter_parallelism(
989                        handler_args,
990                        name,
991                        parallelism,
992                        statement_type,
993                        deferred,
994                    )
995                    .await
996                }
997                AlterViewOperation::SetResourceGroup {
998                    resource_group,
999                    deferred,
1000                } => {
1001                    if !materialized {
1002                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1003                    }
1004                    alter_resource_group::handle_alter_resource_group(
1005                        handler_args,
1006                        name,
1007                        resource_group,
1008                        statement_type,
1009                        deferred,
1010                    )
1011                    .await
1012                }
1013                AlterViewOperation::ChangeOwner { new_owner_name } => {
1014                    alter_owner::handle_alter_owner(
1015                        handler_args,
1016                        name,
1017                        new_owner_name,
1018                        statement_type,
1019                    )
1020                    .await
1021                }
1022                AlterViewOperation::SetSchema { new_schema_name } => {
1023                    alter_set_schema::handle_alter_set_schema(
1024                        handler_args,
1025                        name,
1026                        new_schema_name,
1027                        statement_type,
1028                        None,
1029                    )
1030                    .await
1031                }
1032                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1033                    if !materialized {
1034                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1035                    }
1036                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1037                        handler_args,
1038                        PbThrottleTarget::Mv,
1039                        name,
1040                        rate_limit,
1041                    )
1042                    .await
1043                }
1044                AlterViewOperation::SwapRenameView { target_view } => {
1045                    alter_swap_rename::handle_swap_rename(
1046                        handler_args,
1047                        name,
1048                        target_view,
1049                        statement_type,
1050                    )
1051                    .await
1052                }
1053                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1054                    if !materialized {
1055                        bail!(
1056                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1057                        );
1058                    }
1059                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1060                }
1061                AlterViewOperation::AsQuery { query } => {
1062                    if !materialized {
1063                        bail_not_implemented!("ALTER VIEW AS QUERY");
1064                    }
1065                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1066                    if !cfg!(debug_assertions) {
1067                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1068                    }
1069                    alter_mv::handle_alter_mv(handler_args, name, query).await
1070                }
1071                AlterViewOperation::SetConfig { entries } => {
1072                    if !materialized {
1073                        bail!("SET CONFIG is only supported for materialized views");
1074                    }
1075                    alter_streaming_config::handle_alter_streaming_set_config(
1076                        handler_args,
1077                        name,
1078                        entries,
1079                        statement_type,
1080                    )
1081                    .await
1082                }
1083                AlterViewOperation::ResetConfig { keys } => {
1084                    if !materialized {
1085                        bail!("RESET CONFIG is only supported for materialized views");
1086                    }
1087                    alter_streaming_config::handle_alter_streaming_reset_config(
1088                        handler_args,
1089                        name,
1090                        keys,
1091                        statement_type,
1092                    )
1093                    .await
1094                }
1095            }
1096        }
1097
1098        Statement::AlterSink { name, operation } => match operation {
1099            AlterSinkOperation::AlterConnectorProps {
1100                alter_props: changed_props,
1101            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1102            AlterSinkOperation::RenameSink { sink_name } => {
1103                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1104            }
1105            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1106                alter_owner::handle_alter_owner(
1107                    handler_args,
1108                    name,
1109                    new_owner_name,
1110                    StatementType::ALTER_SINK,
1111                )
1112                .await
1113            }
1114            AlterSinkOperation::SetSchema { new_schema_name } => {
1115                alter_set_schema::handle_alter_set_schema(
1116                    handler_args,
1117                    name,
1118                    new_schema_name,
1119                    StatementType::ALTER_SINK,
1120                    None,
1121                )
1122                .await
1123            }
1124            AlterSinkOperation::SetParallelism {
1125                parallelism,
1126                deferred,
1127            } => {
1128                alter_parallelism::handle_alter_parallelism(
1129                    handler_args,
1130                    name,
1131                    parallelism,
1132                    StatementType::ALTER_SINK,
1133                    deferred,
1134                )
1135                .await
1136            }
1137            AlterSinkOperation::SetConfig { entries } => {
1138                alter_streaming_config::handle_alter_streaming_set_config(
1139                    handler_args,
1140                    name,
1141                    entries,
1142                    StatementType::ALTER_SINK,
1143                )
1144                .await
1145            }
1146            AlterSinkOperation::ResetConfig { keys } => {
1147                alter_streaming_config::handle_alter_streaming_reset_config(
1148                    handler_args,
1149                    name,
1150                    keys,
1151                    StatementType::ALTER_SINK,
1152                )
1153                .await
1154            }
1155            AlterSinkOperation::SwapRenameSink { target_sink } => {
1156                alter_swap_rename::handle_swap_rename(
1157                    handler_args,
1158                    name,
1159                    target_sink,
1160                    StatementType::ALTER_SINK,
1161                )
1162                .await
1163            }
1164            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1165                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1166                    handler_args,
1167                    PbThrottleTarget::Sink,
1168                    name,
1169                    rate_limit,
1170                )
1171                .await
1172            }
1173            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1174                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1175                    handler_args,
1176                    name,
1177                    enable,
1178                )
1179                .await
1180            }
1181        },
1182        Statement::AlterSubscription { name, operation } => match operation {
1183            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1184                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1185                    .await
1186            }
1187            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1188                alter_owner::handle_alter_owner(
1189                    handler_args,
1190                    name,
1191                    new_owner_name,
1192                    StatementType::ALTER_SUBSCRIPTION,
1193                )
1194                .await
1195            }
1196            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1197                alter_set_schema::handle_alter_set_schema(
1198                    handler_args,
1199                    name,
1200                    new_schema_name,
1201                    StatementType::ALTER_SUBSCRIPTION,
1202                    None,
1203                )
1204                .await
1205            }
1206            AlterSubscriptionOperation::SwapRenameSubscription {
1207                target_subscription,
1208            } => {
1209                alter_swap_rename::handle_swap_rename(
1210                    handler_args,
1211                    name,
1212                    target_subscription,
1213                    StatementType::ALTER_SUBSCRIPTION,
1214                )
1215                .await
1216            }
1217        },
1218        Statement::AlterSource { name, operation } => match operation {
1219            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1220                alter_source_props::handle_alter_source_connector_props(
1221                    handler_args,
1222                    name,
1223                    alter_props,
1224                )
1225                .await
1226            }
1227            AlterSourceOperation::RenameSource { source_name } => {
1228                alter_rename::handle_rename_source(handler_args, name, source_name).await
1229            }
1230            AlterSourceOperation::AddColumn { .. } => {
1231                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1232            }
1233            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1234                alter_owner::handle_alter_owner(
1235                    handler_args,
1236                    name,
1237                    new_owner_name,
1238                    StatementType::ALTER_SOURCE,
1239                )
1240                .await
1241            }
1242            AlterSourceOperation::SetSchema { new_schema_name } => {
1243                alter_set_schema::handle_alter_set_schema(
1244                    handler_args,
1245                    name,
1246                    new_schema_name,
1247                    StatementType::ALTER_SOURCE,
1248                    None,
1249                )
1250                .await
1251            }
1252            AlterSourceOperation::FormatEncode { format_encode } => {
1253                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1254                    .await
1255            }
1256            AlterSourceOperation::RefreshSchema => {
1257                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1258            }
1259            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1260                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1261                    handler_args,
1262                    PbThrottleTarget::Source,
1263                    name,
1264                    rate_limit,
1265                )
1266                .await
1267            }
1268            AlterSourceOperation::SwapRenameSource { target_source } => {
1269                alter_swap_rename::handle_swap_rename(
1270                    handler_args,
1271                    name,
1272                    target_source,
1273                    StatementType::ALTER_SOURCE,
1274                )
1275                .await
1276            }
1277            AlterSourceOperation::SetParallelism {
1278                parallelism,
1279                deferred,
1280            } => {
1281                alter_parallelism::handle_alter_parallelism(
1282                    handler_args,
1283                    name,
1284                    parallelism,
1285                    StatementType::ALTER_SOURCE,
1286                    deferred,
1287                )
1288                .await
1289            }
1290            AlterSourceOperation::SetConfig { entries } => {
1291                alter_streaming_config::handle_alter_streaming_set_config(
1292                    handler_args,
1293                    name,
1294                    entries,
1295                    StatementType::ALTER_SOURCE,
1296                )
1297                .await
1298            }
1299            AlterSourceOperation::ResetConfig { keys } => {
1300                alter_streaming_config::handle_alter_streaming_reset_config(
1301                    handler_args,
1302                    name,
1303                    keys,
1304                    StatementType::ALTER_SOURCE,
1305                )
1306                .await
1307            }
1308        },
1309        Statement::AlterFunction {
1310            name,
1311            args,
1312            operation,
1313        } => match operation {
1314            AlterFunctionOperation::SetSchema { new_schema_name } => {
1315                alter_set_schema::handle_alter_set_schema(
1316                    handler_args,
1317                    name,
1318                    new_schema_name,
1319                    StatementType::ALTER_FUNCTION,
1320                    args,
1321                )
1322                .await
1323            }
1324        },
1325        Statement::AlterConnection { name, operation } => match operation {
1326            AlterConnectionOperation::SetSchema { new_schema_name } => {
1327                alter_set_schema::handle_alter_set_schema(
1328                    handler_args,
1329                    name,
1330                    new_schema_name,
1331                    StatementType::ALTER_CONNECTION,
1332                    None,
1333                )
1334                .await
1335            }
1336            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1337                alter_owner::handle_alter_owner(
1338                    handler_args,
1339                    name,
1340                    new_owner_name,
1341                    StatementType::ALTER_CONNECTION,
1342                )
1343                .await
1344            }
1345        },
1346        Statement::AlterSystem { param, value } => {
1347            alter_system::handle_alter_system(handler_args, param, value).await
1348        }
1349        Statement::AlterSecret {
1350            name,
1351            with_options,
1352            operation,
1353        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1354        Statement::AlterFragment {
1355            fragment_ids,
1356            operation,
1357        } => match operation {
1358            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1359                let [fragment_id] = fragment_ids.as_slice() else {
1360                    return Err(ErrorCode::InvalidInputSyntax(
1361                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1362                            .to_owned(),
1363                    )
1364                    .into());
1365                };
1366                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1367                    &handler_args.session,
1368                    PbThrottleTarget::Fragment,
1369                    *fragment_id,
1370                    rate_limit,
1371                    StatementType::SET_VARIABLE,
1372                )
1373                .await
1374            }
1375            AlterFragmentOperation::SetParallelism { parallelism } => {
1376                alter_parallelism::handle_alter_fragment_parallelism(
1377                    handler_args,
1378                    fragment_ids.into_iter().map_into().collect(),
1379                    parallelism,
1380                )
1381                .await
1382            }
1383        },
1384        Statement::AlterDefaultPrivileges { .. } => {
1385            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1386        }
1387        Statement::StartTransaction { modes } => {
1388            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1389        }
1390        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1391        Statement::Commit { chain } => {
1392            transaction::handle_commit(handler_args, COMMIT, chain).await
1393        }
1394        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1395        Statement::Rollback { chain } => {
1396            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1397        }
1398        Statement::SetTransaction {
1399            modes,
1400            snapshot,
1401            session,
1402        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1403        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1404        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1405        Statement::Comment {
1406            object_type,
1407            object_name,
1408            comment,
1409        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1410        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1411        Statement::Prepare {
1412            name,
1413            data_types,
1414            statement,
1415        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1416        Statement::Deallocate { name, prepare } => {
1417            prepared_statement::handle_deallocate(name, prepare).await
1418        }
1419        Statement::Vacuum { object_name, full } => {
1420            vacuum::handle_vacuum(handler_args, object_name, full).await
1421        }
1422        Statement::Refresh { table_name } => {
1423            refresh::handle_refresh(handler_args, table_name).await
1424        }
1425        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1426    }
1427}
1428
1429fn check_ban_ddl_for_iceberg_engine_table(
1430    session: Arc<SessionImpl>,
1431    stmt: &Statement,
1432) -> Result<()> {
1433    match stmt {
1434        Statement::AlterTable {
1435            name,
1436            operation:
1437                operation @ (AlterTableOperation::AddColumn { .. }
1438                | AlterTableOperation::DropColumn { .. }),
1439        } => {
1440            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1441            if table.is_iceberg_engine_table() {
1442                bail!(
1443                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1444                    operation,
1445                    schema_name,
1446                    name
1447                );
1448            }
1449        }
1450
1451        Statement::AlterTable {
1452            name,
1453            operation: AlterTableOperation::RenameTable { .. },
1454        } => {
1455            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1456            if table.is_iceberg_engine_table() {
1457                bail!(
1458                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1459                    schema_name,
1460                    name
1461                );
1462            }
1463        }
1464
1465        Statement::AlterTable {
1466            name,
1467            operation: AlterTableOperation::ChangeOwner { .. },
1468        } => {
1469            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1470            if table.is_iceberg_engine_table() {
1471                bail!(
1472                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1473                    schema_name,
1474                    name
1475                );
1476            }
1477        }
1478
1479        Statement::AlterTable {
1480            name,
1481            operation: AlterTableOperation::SetParallelism { .. },
1482        } => {
1483            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1484            if table.is_iceberg_engine_table() {
1485                bail!(
1486                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1487                    schema_name,
1488                    name
1489                );
1490            }
1491        }
1492
1493        Statement::AlterTable {
1494            name,
1495            operation: AlterTableOperation::SetSchema { .. },
1496        } => {
1497            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1498            if table.is_iceberg_engine_table() {
1499                bail!(
1500                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1501                    schema_name,
1502                    name
1503                );
1504            }
1505        }
1506
1507        Statement::AlterTable {
1508            name,
1509            operation: AlterTableOperation::RefreshSchema,
1510        } => {
1511            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1512            if table.is_iceberg_engine_table() {
1513                bail!(
1514                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1515                    schema_name,
1516                    name
1517                );
1518            }
1519        }
1520
1521        Statement::AlterTable {
1522            name,
1523            operation: AlterTableOperation::SetSourceRateLimit { .. },
1524        } => {
1525            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1526            if table.is_iceberg_engine_table() {
1527                bail!(
1528                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1529                    schema_name,
1530                    name
1531                );
1532            }
1533        }
1534
1535        _ => {}
1536    }
1537
1538    Ok(())
1539}