risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63pub mod alter_table_props;
64mod alter_table_with_sr;
65pub mod alter_user;
66pub mod cancel_job;
67pub mod close_cursor;
68mod comment;
69pub mod create_aggregate;
70pub mod create_connection;
71mod create_database;
72pub mod create_function;
73pub mod create_index;
74pub mod create_mv;
75pub mod create_schema;
76pub mod create_secret;
77pub mod create_sink;
78pub mod create_source;
79pub mod create_sql_function;
80pub mod create_subscription;
81pub mod create_table;
82pub mod create_table_as;
83pub mod create_user;
84pub mod create_view;
85pub mod declare_cursor;
86pub mod describe;
87pub mod discard;
88mod drop_connection;
89mod drop_database;
90pub mod drop_function;
91mod drop_index;
92pub mod drop_mv;
93mod drop_schema;
94pub mod drop_secret;
95pub mod drop_sink;
96pub mod drop_source;
97pub mod drop_subscription;
98pub mod drop_table;
99pub mod drop_user;
100mod drop_view;
101pub mod explain;
102pub mod explain_analyze_stream_job;
103pub mod extended_handle;
104pub mod fetch_cursor;
105mod flush;
106pub mod handle_privilege;
107pub mod kill_process;
108mod prepared_statement;
109pub mod privilege;
110pub mod query;
111mod recover;
112mod refresh;
113pub mod show;
114mod transaction;
115mod use_db;
116pub mod util;
117pub mod vacuum;
118pub mod variable;
119mod wait;
120
121pub use alter_table_column::{
122    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
123};
124
125/// The [`PgResponseBuilder`] used by RisingWave.
126pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
127
128/// The [`PgResponse`] used by RisingWave.
129pub type RwPgResponse = PgResponse<PgResponseStream>;
130
131#[easy_ext::ext(RwPgResponseBuilderExt)]
132impl RwPgResponseBuilder {
133    /// Append rows to the response.
134    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
135        let fields = T::fields();
136        self.values(
137            rows.into_iter()
138                .map(|row| {
139                    Row::new(
140                        row.into_owned_row()
141                            .into_iter()
142                            .zip_eq_fast(&fields)
143                            .map(|(datum, (_, ty))| {
144                                datum.map(|scalar| {
145                                    scalar.as_scalar_ref_impl().text_format(ty).into()
146                                })
147                            })
148                            .collect(),
149                    )
150                })
151                .collect_vec()
152                .into(),
153            fields_to_descriptors(fields),
154        )
155    }
156}
157
158pub fn fields_to_descriptors(
159    fields: Vec<(&str, risingwave_common::types::DataType)>,
160) -> Vec<PgFieldDescriptor> {
161    fields
162        .iter()
163        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
164        .collect()
165}
166
167pub enum PgResponseStream {
168    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
169    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
170    Rows(BoxStream<'static, RowSetResult>),
171}
172
173impl Stream for PgResponseStream {
174    type Item = std::result::Result<Vec<Row>, BoxedError>;
175
176    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177        match &mut *self {
178            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
179            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
180            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
181        }
182    }
183}
184
185impl From<Vec<Row>> for PgResponseStream {
186    fn from(rows: Vec<Row>) -> Self {
187        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
188    }
189}
190
191#[derive(Clone)]
192pub struct HandlerArgs {
193    pub session: Arc<SessionImpl>,
194    pub sql: Arc<str>,
195    pub normalized_sql: String,
196    pub with_options: WithOptions,
197}
198
199impl HandlerArgs {
200    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
201        Ok(Self {
202            session,
203            sql,
204            with_options: WithOptions::try_from(stmt)?,
205            normalized_sql: Self::normalize_sql(stmt),
206        })
207    }
208
209    /// Get normalized SQL from the statement.
210    ///
211    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
212    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
213    ///   make it suitable for the `SHOW CREATE` statements.
214    fn normalize_sql(stmt: &Statement) -> String {
215        let mut stmt = stmt.clone();
216        match &mut stmt {
217            Statement::CreateView {
218                or_replace,
219                if_not_exists,
220                ..
221            } => {
222                *or_replace = false;
223                *if_not_exists = false;
224            }
225            Statement::CreateTable {
226                or_replace,
227                if_not_exists,
228                ..
229            } => {
230                *or_replace = false;
231                *if_not_exists = false;
232            }
233            Statement::CreateIndex { if_not_exists, .. } => {
234                *if_not_exists = false;
235            }
236            Statement::CreateSource {
237                stmt: CreateSourceStatement { if_not_exists, .. },
238                ..
239            } => {
240                *if_not_exists = false;
241            }
242            Statement::CreateSink {
243                stmt: CreateSinkStatement { if_not_exists, .. },
244            } => {
245                *if_not_exists = false;
246            }
247            Statement::CreateSubscription {
248                stmt: CreateSubscriptionStatement { if_not_exists, .. },
249            } => {
250                *if_not_exists = false;
251            }
252            Statement::CreateConnection {
253                stmt: CreateConnectionStatement { if_not_exists, .. },
254            } => {
255                *if_not_exists = false;
256            }
257            _ => {}
258        }
259        stmt.to_string()
260    }
261}
262
263pub async fn handle(
264    session: Arc<SessionImpl>,
265    stmt: Statement,
266    sql: Arc<str>,
267    formats: Vec<Format>,
268) -> Result<RwPgResponse> {
269    session.clear_cancel_query_flag();
270    let _guard = session.txn_begin_implicit();
271    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
272
273    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
274
275    match stmt {
276        Statement::Explain {
277            statement,
278            analyze,
279            options,
280        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
281        Statement::ExplainAnalyzeStreamJob {
282            target,
283            duration_secs,
284        } => {
285            explain_analyze_stream_job::handle_explain_analyze_stream_job(
286                handler_args,
287                target,
288                duration_secs,
289            )
290            .await
291        }
292        Statement::CreateSource { stmt } => {
293            create_source::handle_create_source(handler_args, stmt).await
294        }
295        Statement::CreateSink { stmt } => {
296            create_sink::handle_create_sink(handler_args, stmt, false).await
297        }
298        Statement::CreateSubscription { stmt } => {
299            create_subscription::handle_create_subscription(handler_args, stmt).await
300        }
301        Statement::CreateConnection { stmt } => {
302            create_connection::handle_create_connection(handler_args, stmt).await
303        }
304        Statement::CreateSecret { stmt } => {
305            create_secret::handle_create_secret(handler_args, stmt).await
306        }
307        Statement::CreateFunction {
308            or_replace,
309            temporary,
310            if_not_exists,
311            name,
312            args,
313            returns,
314            params,
315            with_options,
316        } => {
317            // For general udf, `language` clause could be ignored
318            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
319            if params.language.is_none()
320                || !params
321                    .language
322                    .as_ref()
323                    .unwrap()
324                    .real_value()
325                    .eq_ignore_ascii_case("sql")
326            {
327                create_function::handle_create_function(
328                    handler_args,
329                    or_replace,
330                    temporary,
331                    if_not_exists,
332                    name,
333                    args,
334                    returns,
335                    params,
336                    with_options,
337                )
338                .await
339            } else {
340                create_sql_function::handle_create_sql_function(
341                    handler_args,
342                    or_replace,
343                    temporary,
344                    if_not_exists,
345                    name,
346                    args,
347                    returns,
348                    params,
349                )
350                .await
351            }
352        }
353        Statement::CreateAggregate {
354            or_replace,
355            if_not_exists,
356            name,
357            args,
358            returns,
359            params,
360            ..
361        } => {
362            create_aggregate::handle_create_aggregate(
363                handler_args,
364                or_replace,
365                if_not_exists,
366                name,
367                args,
368                returns,
369                params,
370            )
371            .await
372        }
373        Statement::CreateTable {
374            name,
375            columns,
376            wildcard_idx,
377            constraints,
378            query,
379            with_options: _, // It is put in OptimizerContext
380            // Not supported things
381            or_replace,
382            temporary,
383            if_not_exists,
384            format_encode,
385            source_watermarks,
386            append_only,
387            on_conflict,
388            with_version_columns,
389            cdc_table_info,
390            include_column_options,
391            webhook_info,
392            engine,
393        } => {
394            if or_replace {
395                bail_not_implemented!("CREATE OR REPLACE TABLE");
396            }
397            if temporary {
398                bail_not_implemented!("CREATE TEMPORARY TABLE");
399            }
400            if let Some(query) = query {
401                return create_table_as::handle_create_as(
402                    handler_args,
403                    name,
404                    if_not_exists,
405                    query,
406                    columns,
407                    append_only,
408                    on_conflict,
409                    with_version_columns
410                        .iter()
411                        .map(|col| col.real_value())
412                        .collect(),
413                    engine,
414                )
415                .await;
416            }
417            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
418            create_table::handle_create_table(
419                handler_args,
420                name,
421                columns,
422                wildcard_idx,
423                constraints,
424                if_not_exists,
425                format_encode,
426                source_watermarks,
427                append_only,
428                on_conflict,
429                with_version_columns
430                    .iter()
431                    .map(|col| col.real_value())
432                    .collect(),
433                cdc_table_info,
434                include_column_options,
435                webhook_info,
436                engine,
437            )
438            .await
439        }
440        Statement::CreateDatabase {
441            db_name,
442            if_not_exists,
443            owner,
444            resource_group,
445            barrier_interval_ms,
446            checkpoint_frequency,
447        } => {
448            create_database::handle_create_database(
449                handler_args,
450                db_name,
451                if_not_exists,
452                owner,
453                resource_group,
454                barrier_interval_ms,
455                checkpoint_frequency,
456            )
457            .await
458        }
459        Statement::CreateSchema {
460            schema_name,
461            if_not_exists,
462            owner,
463        } => {
464            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
465                .await
466        }
467        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
468        Statement::DeclareCursor { stmt } => {
469            declare_cursor::handle_declare_cursor(handler_args, stmt).await
470        }
471        Statement::FetchCursor { stmt } => {
472            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
473        }
474        Statement::CloseCursor { stmt } => {
475            close_cursor::handle_close_cursor(handler_args, stmt).await
476        }
477        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
478        Statement::Grant { .. } => {
479            handle_privilege::handle_grant_privilege(handler_args, stmt).await
480        }
481        Statement::Revoke { .. } => {
482            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
483        }
484        Statement::Describe { name, kind } => match kind {
485            DescribeKind::Fragments => {
486                describe::handle_describe_fragments(handler_args, name).await
487            }
488            DescribeKind::Plain => describe::handle_describe(handler_args, name),
489        },
490        Statement::DescribeFragment { fragment_id } => {
491            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
492        }
493        Statement::Discard(..) => discard::handle_discard(handler_args),
494        Statement::ShowObjects {
495            object: show_object,
496            filter,
497        } => show::handle_show_object(handler_args, show_object, filter).await,
498        Statement::ShowCreateObject { create_type, name } => {
499            show::handle_show_create_object(handler_args, create_type, name)
500        }
501        Statement::ShowTransactionIsolationLevel => {
502            transaction::handle_show_isolation_level(handler_args)
503        }
504        Statement::Drop(DropStatement {
505            object_type,
506            object_name,
507            if_exists,
508            drop_mode,
509        }) => {
510            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
511                match object_type {
512                    ObjectType::MaterializedView
513                    | ObjectType::View
514                    | ObjectType::Sink
515                    | ObjectType::Source
516                    | ObjectType::Subscription
517                    | ObjectType::Index
518                    | ObjectType::Table
519                    | ObjectType::Schema
520                    | ObjectType::Connection => true,
521                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
522                        bail_not_implemented!("DROP CASCADE");
523                    }
524                }
525            } else {
526                false
527            };
528            match object_type {
529                ObjectType::Table => {
530                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
531                        .await
532                }
533                ObjectType::MaterializedView => {
534                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
535                }
536                ObjectType::Index => {
537                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
538                        .await
539                }
540                ObjectType::Source => {
541                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
542                        .await
543                }
544                ObjectType::Sink => {
545                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
546                }
547                ObjectType::Subscription => {
548                    drop_subscription::handle_drop_subscription(
549                        handler_args,
550                        object_name,
551                        if_exists,
552                        cascade,
553                    )
554                    .await
555                }
556                ObjectType::Database => {
557                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
558                }
559                ObjectType::Schema => {
560                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
561                        .await
562                }
563                ObjectType::User => {
564                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
565                }
566                ObjectType::View => {
567                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
568                }
569                ObjectType::Connection => {
570                    drop_connection::handle_drop_connection(
571                        handler_args,
572                        object_name,
573                        if_exists,
574                        cascade,
575                    )
576                    .await
577                }
578                ObjectType::Secret => {
579                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
580                }
581            }
582        }
583        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
584        Statement::DropFunction {
585            if_exists,
586            func_desc,
587            option,
588        } => {
589            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
590                .await
591        }
592        Statement::DropAggregate {
593            if_exists,
594            func_desc,
595            option,
596        } => {
597            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
598                .await
599        }
600        Statement::Query(_)
601        | Statement::Insert { .. }
602        | Statement::Delete { .. }
603        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
604        Statement::Copy {
605            entity: CopyEntity::Query(query),
606            target: CopyTarget::Stdout,
607        } => {
608            let response =
609                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
610                    .await?;
611            Ok(response.into_copy_query_to_stdout())
612        }
613        Statement::CreateView {
614            materialized,
615            if_not_exists,
616            name,
617            columns,
618            query,
619            with_options: _, // It is put in OptimizerContext
620            or_replace,      // not supported
621            emit_mode,
622        } => {
623            if or_replace {
624                bail_not_implemented!("CREATE OR REPLACE VIEW");
625            }
626            if materialized {
627                create_mv::handle_create_mv(
628                    handler_args,
629                    if_not_exists,
630                    name,
631                    *query,
632                    columns,
633                    emit_mode,
634                )
635                .await
636            } else {
637                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
638                    .await
639            }
640        }
641        Statement::Flush => flush::handle_flush(handler_args).await,
642        Statement::Wait => wait::handle_wait(handler_args).await,
643        Statement::Recover => recover::handle_recover(handler_args).await,
644        Statement::SetVariable {
645            local: _,
646            variable,
647            value,
648        } => {
649            // special handle for `use database`
650            if variable.real_value().eq_ignore_ascii_case("database") {
651                let x = variable::set_var_to_param_str(&value);
652                let res = use_db::handle_use_db(
653                    handler_args,
654                    ObjectName::from(vec![Ident::from_real_value(
655                        x.as_deref().unwrap_or("default"),
656                    )]),
657                )?;
658                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
659                for notice in res.notices() {
660                    builder = builder.notice(notice);
661                }
662                return Ok(builder.into());
663            }
664            variable::handle_set(handler_args, variable, value)
665        }
666        Statement::SetTimeZone { local: _, value } => {
667            variable::handle_set_time_zone(handler_args, value)
668        }
669        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
670        Statement::CreateIndex {
671            name,
672            table_name,
673            method,
674            columns,
675            include,
676            distributed_by,
677            unique,
678            if_not_exists,
679            with_properties: _,
680        } => {
681            if unique {
682                bail_not_implemented!("create unique index");
683            }
684
685            create_index::handle_create_index(
686                handler_args,
687                if_not_exists,
688                name,
689                table_name,
690                method,
691                columns.clone(),
692                include,
693                distributed_by,
694            )
695            .await
696        }
697        Statement::AlterDatabase { name, operation } => match operation {
698            AlterDatabaseOperation::RenameDatabase { database_name } => {
699                alter_rename::handle_rename_database(handler_args, name, database_name).await
700            }
701            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
702                alter_owner::handle_alter_owner(
703                    handler_args,
704                    name,
705                    new_owner_name,
706                    StatementType::ALTER_DATABASE,
707                )
708                .await
709            }
710            AlterDatabaseOperation::SetParam(config_param) => {
711                let ConfigParam { param, value } = config_param;
712
713                let database_param = match param.real_value().to_uppercase().as_str() {
714                    "BARRIER_INTERVAL_MS" => {
715                        let barrier_interval_ms = match value {
716                            SetVariableValue::Default => None,
717                            SetVariableValue::Single(SetVariableValueSingle::Literal(
718                                Value::Number(num),
719                            )) => {
720                                let num = num.parse::<u32>().map_err(|e| {
721                                    ErrorCode::InvalidInputSyntax(format!(
722                                        "barrier_interval_ms must be a u32 integer: {}",
723                                        e.as_report()
724                                    ))
725                                })?;
726                                Some(num)
727                            }
728                            _ => {
729                                return Err(ErrorCode::InvalidInputSyntax(
730                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
731                                        .to_owned(),
732                                )
733                                .into());
734                            }
735                        };
736                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
737                    }
738                    "CHECKPOINT_FREQUENCY" => {
739                        let checkpoint_frequency = match value {
740                            SetVariableValue::Default => None,
741                            SetVariableValue::Single(SetVariableValueSingle::Literal(
742                                Value::Number(num),
743                            )) => {
744                                let num = num.parse::<u64>().map_err(|e| {
745                                    ErrorCode::InvalidInputSyntax(format!(
746                                        "checkpoint_frequency must be a u64 integer: {}",
747                                        e.as_report()
748                                    ))
749                                })?;
750                                Some(num)
751                            }
752                            _ => {
753                                return Err(ErrorCode::InvalidInputSyntax(
754                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
755                                        .to_owned(),
756                                )
757                                .into());
758                            }
759                        };
760                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
761                    }
762                    _ => {
763                        return Err(ErrorCode::InvalidInputSyntax(format!(
764                            "Unsupported database config parameter: {}",
765                            param.real_value()
766                        ))
767                        .into());
768                    }
769                };
770
771                alter_database_param::handle_alter_database_param(
772                    handler_args,
773                    name,
774                    database_param,
775                )
776                .await
777            }
778        },
779        Statement::AlterSchema { name, operation } => match operation {
780            AlterSchemaOperation::RenameSchema { schema_name } => {
781                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
782            }
783            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
784                alter_owner::handle_alter_owner(
785                    handler_args,
786                    name,
787                    new_owner_name,
788                    StatementType::ALTER_SCHEMA,
789                )
790                .await
791            }
792            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
793                alter_swap_rename::handle_swap_rename(
794                    handler_args,
795                    name,
796                    target_schema,
797                    StatementType::ALTER_SCHEMA,
798                )
799                .await
800            }
801        },
802        Statement::AlterTable { name, operation } => match operation {
803            AlterTableOperation::AddColumn { .. }
804            | AlterTableOperation::DropColumn { .. }
805            | AlterTableOperation::AlterColumn { .. } => {
806                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
807            }
808            AlterTableOperation::RenameTable { table_name } => {
809                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
810                    .await
811            }
812            AlterTableOperation::ChangeOwner { new_owner_name } => {
813                alter_owner::handle_alter_owner(
814                    handler_args,
815                    name,
816                    new_owner_name,
817                    StatementType::ALTER_TABLE,
818                )
819                .await
820            }
821            AlterTableOperation::SetParallelism {
822                parallelism,
823                deferred,
824            } => {
825                alter_parallelism::handle_alter_parallelism(
826                    handler_args,
827                    name,
828                    parallelism,
829                    StatementType::ALTER_TABLE,
830                    deferred,
831                )
832                .await
833            }
834            AlterTableOperation::SetSchema { new_schema_name } => {
835                alter_set_schema::handle_alter_set_schema(
836                    handler_args,
837                    name,
838                    new_schema_name,
839                    StatementType::ALTER_TABLE,
840                    None,
841                )
842                .await
843            }
844            AlterTableOperation::RefreshSchema => {
845                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
846            }
847            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
848                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
849                    handler_args,
850                    PbThrottleTarget::TableWithSource,
851                    name,
852                    rate_limit,
853                )
854                .await
855            }
856            AlterTableOperation::DropConnector => {
857                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
858                    .await
859            }
860            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
861                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
862                    handler_args,
863                    PbThrottleTarget::TableDml,
864                    name,
865                    rate_limit,
866                )
867                .await
868            }
869            AlterTableOperation::SetConfig { .. } => {
870                bail_not_implemented!("ALTER TABLE SET CONFIG")
871            }
872            AlterTableOperation::ResetConfig { .. } => {
873                bail_not_implemented!("ALTER TABLE RESET CONFIG")
874            }
875            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
876                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
877                    handler_args,
878                    PbThrottleTarget::CdcTable,
879                    name,
880                    rate_limit,
881                )
882                .await
883            }
884            AlterTableOperation::SwapRenameTable { target_table } => {
885                alter_swap_rename::handle_swap_rename(
886                    handler_args,
887                    name,
888                    target_table,
889                    StatementType::ALTER_TABLE,
890                )
891                .await
892            }
893            AlterTableOperation::AlterConnectorProps { alter_props } => {
894                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
895            }
896            AlterTableOperation::AddConstraint { .. }
897            | AlterTableOperation::DropConstraint { .. }
898            | AlterTableOperation::RenameColumn { .. }
899            | AlterTableOperation::ChangeColumn { .. }
900            | AlterTableOperation::RenameConstraint { .. } => {
901                bail_not_implemented!(
902                    "Unhandled statement: {}",
903                    Statement::AlterTable { name, operation }
904                )
905            }
906        },
907        Statement::AlterIndex { name, operation } => match operation {
908            AlterIndexOperation::RenameIndex { index_name } => {
909                alter_rename::handle_rename_index(handler_args, name, index_name).await
910            }
911            AlterIndexOperation::SetParallelism {
912                parallelism,
913                deferred,
914            } => {
915                alter_parallelism::handle_alter_parallelism(
916                    handler_args,
917                    name,
918                    parallelism,
919                    StatementType::ALTER_INDEX,
920                    deferred,
921                )
922                .await
923            }
924            AlterIndexOperation::SetConfig { .. } => {
925                bail_not_implemented!("ALTER INDEX SET CONFIG")
926            }
927            AlterIndexOperation::ResetConfig { .. } => {
928                bail_not_implemented!("ALTER INDEX RESET CONFIG")
929            }
930        },
931        Statement::AlterView {
932            materialized,
933            name,
934            operation,
935        } => {
936            let statement_type = if materialized {
937                StatementType::ALTER_MATERIALIZED_VIEW
938            } else {
939                StatementType::ALTER_VIEW
940            };
941            match operation {
942                AlterViewOperation::RenameView { view_name } => {
943                    if materialized {
944                        alter_rename::handle_rename_table(
945                            handler_args,
946                            TableType::MaterializedView,
947                            name,
948                            view_name,
949                        )
950                        .await
951                    } else {
952                        alter_rename::handle_rename_view(handler_args, name, view_name).await
953                    }
954                }
955                AlterViewOperation::SetParallelism {
956                    parallelism,
957                    deferred,
958                } => {
959                    if !materialized {
960                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
961                    }
962                    alter_parallelism::handle_alter_parallelism(
963                        handler_args,
964                        name,
965                        parallelism,
966                        statement_type,
967                        deferred,
968                    )
969                    .await
970                }
971                AlterViewOperation::SetResourceGroup {
972                    resource_group,
973                    deferred,
974                } => {
975                    if !materialized {
976                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
977                    }
978                    alter_resource_group::handle_alter_resource_group(
979                        handler_args,
980                        name,
981                        resource_group,
982                        statement_type,
983                        deferred,
984                    )
985                    .await
986                }
987                AlterViewOperation::ChangeOwner { new_owner_name } => {
988                    alter_owner::handle_alter_owner(
989                        handler_args,
990                        name,
991                        new_owner_name,
992                        statement_type,
993                    )
994                    .await
995                }
996                AlterViewOperation::SetSchema { new_schema_name } => {
997                    alter_set_schema::handle_alter_set_schema(
998                        handler_args,
999                        name,
1000                        new_schema_name,
1001                        statement_type,
1002                        None,
1003                    )
1004                    .await
1005                }
1006                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1007                    if !materialized {
1008                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1009                    }
1010                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1011                        handler_args,
1012                        PbThrottleTarget::Mv,
1013                        name,
1014                        rate_limit,
1015                    )
1016                    .await
1017                }
1018                AlterViewOperation::SwapRenameView { target_view } => {
1019                    alter_swap_rename::handle_swap_rename(
1020                        handler_args,
1021                        name,
1022                        target_view,
1023                        statement_type,
1024                    )
1025                    .await
1026                }
1027                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1028                    if !materialized {
1029                        bail!(
1030                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1031                        );
1032                    }
1033                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1034                }
1035                AlterViewOperation::AsQuery { query } => {
1036                    if !materialized {
1037                        bail_not_implemented!("ALTER VIEW AS QUERY");
1038                    }
1039                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1040                    if !cfg!(debug_assertions) {
1041                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1042                    }
1043                    alter_mv::handle_alter_mv(handler_args, name, query).await
1044                }
1045                AlterViewOperation::SetConfig { .. } => {
1046                    if !materialized {
1047                        bail!("SET CONFIG is only supported for materialized views");
1048                    }
1049                    bail_not_implemented!("ALTER MATERIALIZED VIEW SET CONFIG")
1050                }
1051                AlterViewOperation::ResetConfig { .. } => {
1052                    if !materialized {
1053                        bail!("RESET CONFIG is only supported for materialized views");
1054                    }
1055                    bail_not_implemented!("ALTER MATERIALIZED VIEW RESET CONFIG")
1056                }
1057            }
1058        }
1059
1060        Statement::AlterSink { name, operation } => match operation {
1061            AlterSinkOperation::AlterConnectorProps {
1062                alter_props: changed_props,
1063            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1064            AlterSinkOperation::RenameSink { sink_name } => {
1065                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1066            }
1067            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1068                alter_owner::handle_alter_owner(
1069                    handler_args,
1070                    name,
1071                    new_owner_name,
1072                    StatementType::ALTER_SINK,
1073                )
1074                .await
1075            }
1076            AlterSinkOperation::SetSchema { new_schema_name } => {
1077                alter_set_schema::handle_alter_set_schema(
1078                    handler_args,
1079                    name,
1080                    new_schema_name,
1081                    StatementType::ALTER_SINK,
1082                    None,
1083                )
1084                .await
1085            }
1086            AlterSinkOperation::SetParallelism {
1087                parallelism,
1088                deferred,
1089            } => {
1090                alter_parallelism::handle_alter_parallelism(
1091                    handler_args,
1092                    name,
1093                    parallelism,
1094                    StatementType::ALTER_SINK,
1095                    deferred,
1096                )
1097                .await
1098            }
1099            AlterSinkOperation::SetConfig { .. } => {
1100                bail_not_implemented!("ALTER SINK SET CONFIG")
1101            }
1102            AlterSinkOperation::ResetConfig { .. } => {
1103                bail_not_implemented!("ALTER SINK RESET CONFIG")
1104            }
1105            AlterSinkOperation::SwapRenameSink { target_sink } => {
1106                alter_swap_rename::handle_swap_rename(
1107                    handler_args,
1108                    name,
1109                    target_sink,
1110                    StatementType::ALTER_SINK,
1111                )
1112                .await
1113            }
1114            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1115                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1116                    handler_args,
1117                    PbThrottleTarget::Sink,
1118                    name,
1119                    rate_limit,
1120                )
1121                .await
1122            }
1123            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1124                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1125                    handler_args,
1126                    name,
1127                    enable,
1128                )
1129                .await
1130            }
1131        },
1132        Statement::AlterSubscription { name, operation } => match operation {
1133            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1134                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1135                    .await
1136            }
1137            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1138                alter_owner::handle_alter_owner(
1139                    handler_args,
1140                    name,
1141                    new_owner_name,
1142                    StatementType::ALTER_SUBSCRIPTION,
1143                )
1144                .await
1145            }
1146            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1147                alter_set_schema::handle_alter_set_schema(
1148                    handler_args,
1149                    name,
1150                    new_schema_name,
1151                    StatementType::ALTER_SUBSCRIPTION,
1152                    None,
1153                )
1154                .await
1155            }
1156            AlterSubscriptionOperation::SwapRenameSubscription {
1157                target_subscription,
1158            } => {
1159                alter_swap_rename::handle_swap_rename(
1160                    handler_args,
1161                    name,
1162                    target_subscription,
1163                    StatementType::ALTER_SUBSCRIPTION,
1164                )
1165                .await
1166            }
1167        },
1168        Statement::AlterSource { name, operation } => match operation {
1169            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1170                alter_source_props::handle_alter_source_connector_props(
1171                    handler_args,
1172                    name,
1173                    alter_props,
1174                )
1175                .await
1176            }
1177            AlterSourceOperation::RenameSource { source_name } => {
1178                alter_rename::handle_rename_source(handler_args, name, source_name).await
1179            }
1180            AlterSourceOperation::AddColumn { .. } => {
1181                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1182            }
1183            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1184                alter_owner::handle_alter_owner(
1185                    handler_args,
1186                    name,
1187                    new_owner_name,
1188                    StatementType::ALTER_SOURCE,
1189                )
1190                .await
1191            }
1192            AlterSourceOperation::SetSchema { new_schema_name } => {
1193                alter_set_schema::handle_alter_set_schema(
1194                    handler_args,
1195                    name,
1196                    new_schema_name,
1197                    StatementType::ALTER_SOURCE,
1198                    None,
1199                )
1200                .await
1201            }
1202            AlterSourceOperation::FormatEncode { format_encode } => {
1203                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1204                    .await
1205            }
1206            AlterSourceOperation::RefreshSchema => {
1207                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1208            }
1209            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1210                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1211                    handler_args,
1212                    PbThrottleTarget::Source,
1213                    name,
1214                    rate_limit,
1215                )
1216                .await
1217            }
1218            AlterSourceOperation::SwapRenameSource { target_source } => {
1219                alter_swap_rename::handle_swap_rename(
1220                    handler_args,
1221                    name,
1222                    target_source,
1223                    StatementType::ALTER_SOURCE,
1224                )
1225                .await
1226            }
1227            AlterSourceOperation::SetParallelism {
1228                parallelism,
1229                deferred,
1230            } => {
1231                alter_parallelism::handle_alter_parallelism(
1232                    handler_args,
1233                    name,
1234                    parallelism,
1235                    StatementType::ALTER_SOURCE,
1236                    deferred,
1237                )
1238                .await
1239            }
1240            AlterSourceOperation::SetConfig { .. } => {
1241                bail_not_implemented!("ALTER SOURCE SET CONFIG")
1242            }
1243            AlterSourceOperation::ResetConfig { .. } => {
1244                bail_not_implemented!("ALTER SOURCE RESET CONFIG")
1245            }
1246        },
1247        Statement::AlterFunction {
1248            name,
1249            args,
1250            operation,
1251        } => match operation {
1252            AlterFunctionOperation::SetSchema { new_schema_name } => {
1253                alter_set_schema::handle_alter_set_schema(
1254                    handler_args,
1255                    name,
1256                    new_schema_name,
1257                    StatementType::ALTER_FUNCTION,
1258                    args,
1259                )
1260                .await
1261            }
1262        },
1263        Statement::AlterConnection { name, operation } => match operation {
1264            AlterConnectionOperation::SetSchema { new_schema_name } => {
1265                alter_set_schema::handle_alter_set_schema(
1266                    handler_args,
1267                    name,
1268                    new_schema_name,
1269                    StatementType::ALTER_CONNECTION,
1270                    None,
1271                )
1272                .await
1273            }
1274            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1275                alter_owner::handle_alter_owner(
1276                    handler_args,
1277                    name,
1278                    new_owner_name,
1279                    StatementType::ALTER_CONNECTION,
1280                )
1281                .await
1282            }
1283        },
1284        Statement::AlterSystem { param, value } => {
1285            alter_system::handle_alter_system(handler_args, param, value).await
1286        }
1287        Statement::AlterSecret {
1288            name,
1289            with_options,
1290            operation,
1291        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1292        Statement::AlterFragment {
1293            fragment_ids,
1294            operation,
1295        } => match operation {
1296            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1297                let [fragment_id] = fragment_ids.as_slice() else {
1298                    return Err(ErrorCode::InvalidInputSyntax(
1299                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1300                            .to_owned(),
1301                    )
1302                    .into());
1303                };
1304                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1305                    &handler_args.session,
1306                    PbThrottleTarget::Fragment,
1307                    *fragment_id,
1308                    rate_limit,
1309                    StatementType::SET_VARIABLE,
1310                )
1311                .await
1312            }
1313            AlterFragmentOperation::SetParallelism { parallelism } => {
1314                alter_parallelism::handle_alter_fragment_parallelism(
1315                    handler_args,
1316                    fragment_ids.into_iter().map_into().collect(),
1317                    parallelism,
1318                )
1319                .await
1320            }
1321        },
1322        Statement::AlterDefaultPrivileges { .. } => {
1323            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1324        }
1325        Statement::StartTransaction { modes } => {
1326            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1327        }
1328        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1329        Statement::Commit { chain } => {
1330            transaction::handle_commit(handler_args, COMMIT, chain).await
1331        }
1332        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1333        Statement::Rollback { chain } => {
1334            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1335        }
1336        Statement::SetTransaction {
1337            modes,
1338            snapshot,
1339            session,
1340        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1341        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1342        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1343        Statement::Comment {
1344            object_type,
1345            object_name,
1346            comment,
1347        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1348        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1349        Statement::Prepare {
1350            name,
1351            data_types,
1352            statement,
1353        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1354        Statement::Deallocate { name, prepare } => {
1355            prepared_statement::handle_deallocate(name, prepare).await
1356        }
1357        Statement::Vacuum { object_name, full } => {
1358            vacuum::handle_vacuum(handler_args, object_name, full).await
1359        }
1360        Statement::Refresh { table_name } => {
1361            refresh::handle_refresh(handler_args, table_name).await
1362        }
1363        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1364    }
1365}
1366
1367fn check_ban_ddl_for_iceberg_engine_table(
1368    session: Arc<SessionImpl>,
1369    stmt: &Statement,
1370) -> Result<()> {
1371    match stmt {
1372        Statement::AlterTable {
1373            name,
1374            operation:
1375                operation @ (AlterTableOperation::AddColumn { .. }
1376                | AlterTableOperation::DropColumn { .. }),
1377        } => {
1378            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1379            if table.is_iceberg_engine_table() {
1380                bail!(
1381                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1382                    operation,
1383                    schema_name,
1384                    name
1385                );
1386            }
1387        }
1388
1389        Statement::AlterTable {
1390            name,
1391            operation: AlterTableOperation::RenameTable { .. },
1392        } => {
1393            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1394            if table.is_iceberg_engine_table() {
1395                bail!(
1396                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1397                    schema_name,
1398                    name
1399                );
1400            }
1401        }
1402
1403        Statement::AlterTable {
1404            name,
1405            operation: AlterTableOperation::ChangeOwner { .. },
1406        } => {
1407            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1408            if table.is_iceberg_engine_table() {
1409                bail!(
1410                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1411                    schema_name,
1412                    name
1413                );
1414            }
1415        }
1416
1417        Statement::AlterTable {
1418            name,
1419            operation: AlterTableOperation::SetParallelism { .. },
1420        } => {
1421            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1422            if table.is_iceberg_engine_table() {
1423                bail!(
1424                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1425                    schema_name,
1426                    name
1427                );
1428            }
1429        }
1430
1431        Statement::AlterTable {
1432            name,
1433            operation: AlterTableOperation::SetSchema { .. },
1434        } => {
1435            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1436            if table.is_iceberg_engine_table() {
1437                bail!(
1438                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1439                    schema_name,
1440                    name
1441                );
1442            }
1443        }
1444
1445        Statement::AlterTable {
1446            name,
1447            operation: AlterTableOperation::RefreshSchema,
1448        } => {
1449            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1450            if table.is_iceberg_engine_table() {
1451                bail!(
1452                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1453                    schema_name,
1454                    name
1455                );
1456            }
1457        }
1458
1459        Statement::AlterTable {
1460            name,
1461            operation: AlterTableOperation::SetSourceRateLimit { .. },
1462        } => {
1463            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1464            if table.is_iceberg_engine_table() {
1465                bail!(
1466                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1467                    schema_name,
1468                    name
1469                );
1470            }
1471        }
1472
1473        _ => {}
1474    }
1475
1476    Ok(())
1477}