risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_owner;
47mod alter_parallelism;
48mod alter_rename;
49mod alter_resource_group;
50mod alter_secret;
51mod alter_set_schema;
52mod alter_sink_props;
53mod alter_source_column;
54mod alter_source_with_sr;
55mod alter_streaming_rate_limit;
56mod alter_swap_rename;
57mod alter_system;
58mod alter_table_column;
59pub mod alter_table_drop_connector;
60mod alter_table_with_sr;
61pub mod alter_user;
62pub mod cancel_job;
63pub mod close_cursor;
64mod comment;
65pub mod create_aggregate;
66pub mod create_connection;
67mod create_database;
68pub mod create_function;
69pub mod create_index;
70pub mod create_mv;
71pub mod create_schema;
72pub mod create_secret;
73pub mod create_sink;
74pub mod create_source;
75pub mod create_sql_function;
76pub mod create_subscription;
77pub mod create_table;
78pub mod create_table_as;
79pub mod create_user;
80pub mod create_view;
81pub mod declare_cursor;
82pub mod describe;
83pub mod discard;
84mod drop_connection;
85mod drop_database;
86pub mod drop_function;
87mod drop_index;
88pub mod drop_mv;
89mod drop_schema;
90pub mod drop_secret;
91pub mod drop_sink;
92pub mod drop_source;
93pub mod drop_subscription;
94pub mod drop_table;
95pub mod drop_user;
96mod drop_view;
97pub mod explain;
98pub mod explain_analyze_stream_job;
99pub mod extended_handle;
100pub mod fetch_cursor;
101mod flush;
102pub mod handle_privilege;
103pub mod kill_process;
104pub mod privilege;
105pub mod query;
106mod recover;
107pub mod show;
108mod transaction;
109mod use_db;
110pub mod util;
111pub mod variable;
112mod wait;
113
114pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
115
116/// The [`PgResponseBuilder`] used by RisingWave.
117pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
118
119/// The [`PgResponse`] used by RisingWave.
120pub type RwPgResponse = PgResponse<PgResponseStream>;
121
122#[easy_ext::ext(RwPgResponseBuilderExt)]
123impl RwPgResponseBuilder {
124    /// Append rows to the response.
125    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
126        let fields = T::fields();
127        self.values(
128            rows.into_iter()
129                .map(|row| {
130                    Row::new(
131                        row.into_owned_row()
132                            .into_iter()
133                            .zip_eq_fast(&fields)
134                            .map(|(datum, (_, ty))| {
135                                datum.map(|scalar| {
136                                    scalar.as_scalar_ref_impl().text_format(ty).into()
137                                })
138                            })
139                            .collect(),
140                    )
141                })
142                .collect_vec()
143                .into(),
144            fields_to_descriptors(fields),
145        )
146    }
147}
148
149pub fn fields_to_descriptors(
150    fields: Vec<(&str, risingwave_common::types::DataType)>,
151) -> Vec<PgFieldDescriptor> {
152    fields
153        .iter()
154        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
155        .collect()
156}
157
158pub enum PgResponseStream {
159    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
160    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
161    Rows(BoxStream<'static, RowSetResult>),
162}
163
164impl Stream for PgResponseStream {
165    type Item = std::result::Result<Vec<Row>, BoxedError>;
166
167    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168        match &mut *self {
169            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
170            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
171            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
172        }
173    }
174}
175
176impl From<Vec<Row>> for PgResponseStream {
177    fn from(rows: Vec<Row>) -> Self {
178        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
179    }
180}
181
182#[derive(Clone)]
183pub struct HandlerArgs {
184    pub session: Arc<SessionImpl>,
185    pub sql: Arc<str>,
186    pub normalized_sql: String,
187    pub with_options: WithOptions,
188}
189
190impl HandlerArgs {
191    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
192        Ok(Self {
193            session,
194            sql,
195            with_options: WithOptions::try_from(stmt)?,
196            normalized_sql: Self::normalize_sql(stmt),
197        })
198    }
199
200    /// Get normalized SQL from the statement.
201    ///
202    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
203    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
204    ///   make it suitable for the `SHOW CREATE` statements.
205    fn normalize_sql(stmt: &Statement) -> String {
206        let mut stmt = stmt.clone();
207        match &mut stmt {
208            Statement::CreateView {
209                or_replace,
210                if_not_exists,
211                ..
212            } => {
213                *or_replace = false;
214                *if_not_exists = false;
215            }
216            Statement::CreateTable {
217                or_replace,
218                if_not_exists,
219                ..
220            } => {
221                *or_replace = false;
222                *if_not_exists = false;
223            }
224            Statement::CreateIndex { if_not_exists, .. } => {
225                *if_not_exists = false;
226            }
227            Statement::CreateSource {
228                stmt: CreateSourceStatement { if_not_exists, .. },
229                ..
230            } => {
231                *if_not_exists = false;
232            }
233            Statement::CreateSink {
234                stmt: CreateSinkStatement { if_not_exists, .. },
235            } => {
236                *if_not_exists = false;
237            }
238            Statement::CreateSubscription {
239                stmt: CreateSubscriptionStatement { if_not_exists, .. },
240            } => {
241                *if_not_exists = false;
242            }
243            Statement::CreateConnection {
244                stmt: CreateConnectionStatement { if_not_exists, .. },
245            } => {
246                *if_not_exists = false;
247            }
248            _ => {}
249        }
250        stmt.to_string()
251    }
252}
253
254pub async fn handle(
255    session: Arc<SessionImpl>,
256    stmt: Statement,
257    sql: Arc<str>,
258    formats: Vec<Format>,
259) -> Result<RwPgResponse> {
260    session.clear_cancel_query_flag();
261    let _guard = session.txn_begin_implicit();
262    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
263
264    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
265
266    match stmt {
267        Statement::Explain {
268            statement,
269            analyze,
270            options,
271        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
272        Statement::ExplainAnalyzeStreamJob {
273            target,
274            duration_secs,
275        } => {
276            explain_analyze_stream_job::handle_explain_analyze_stream_job(
277                handler_args,
278                target,
279                duration_secs,
280            )
281            .await
282        }
283        Statement::CreateSource { stmt } => {
284            create_source::handle_create_source(handler_args, stmt).await
285        }
286        Statement::CreateSink { stmt } => {
287            create_sink::handle_create_sink(handler_args, stmt, false).await
288        }
289        Statement::CreateSubscription { stmt } => {
290            create_subscription::handle_create_subscription(handler_args, stmt).await
291        }
292        Statement::CreateConnection { stmt } => {
293            create_connection::handle_create_connection(handler_args, stmt).await
294        }
295        Statement::CreateSecret { stmt } => {
296            create_secret::handle_create_secret(handler_args, stmt).await
297        }
298        Statement::CreateFunction {
299            or_replace,
300            temporary,
301            if_not_exists,
302            name,
303            args,
304            returns,
305            params,
306            with_options,
307        } => {
308            // For general udf, `language` clause could be ignored
309            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
310            if params.language.is_none()
311                || !params
312                    .language
313                    .as_ref()
314                    .unwrap()
315                    .real_value()
316                    .eq_ignore_ascii_case("sql")
317            {
318                create_function::handle_create_function(
319                    handler_args,
320                    or_replace,
321                    temporary,
322                    if_not_exists,
323                    name,
324                    args,
325                    returns,
326                    params,
327                    with_options,
328                )
329                .await
330            } else {
331                create_sql_function::handle_create_sql_function(
332                    handler_args,
333                    or_replace,
334                    temporary,
335                    if_not_exists,
336                    name,
337                    args,
338                    returns,
339                    params,
340                )
341                .await
342            }
343        }
344        Statement::CreateAggregate {
345            or_replace,
346            if_not_exists,
347            name,
348            args,
349            returns,
350            params,
351            ..
352        } => {
353            create_aggregate::handle_create_aggregate(
354                handler_args,
355                or_replace,
356                if_not_exists,
357                name,
358                args,
359                returns,
360                params,
361            )
362            .await
363        }
364        Statement::CreateTable {
365            name,
366            columns,
367            wildcard_idx,
368            constraints,
369            query,
370            with_options: _, // It is put in OptimizerContext
371            // Not supported things
372            or_replace,
373            temporary,
374            if_not_exists,
375            format_encode,
376            source_watermarks,
377            append_only,
378            on_conflict,
379            with_version_column,
380            cdc_table_info,
381            include_column_options,
382            webhook_info,
383            engine,
384        } => {
385            if or_replace {
386                bail_not_implemented!("CREATE OR REPLACE TABLE");
387            }
388            if temporary {
389                bail_not_implemented!("CREATE TEMPORARY TABLE");
390            }
391            if let Some(query) = query {
392                return create_table_as::handle_create_as(
393                    handler_args,
394                    name,
395                    if_not_exists,
396                    query,
397                    columns,
398                    append_only,
399                    on_conflict,
400                    with_version_column.map(|x| x.real_value()),
401                    engine,
402                )
403                .await;
404            }
405            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
406            create_table::handle_create_table(
407                handler_args,
408                name,
409                columns,
410                wildcard_idx,
411                constraints,
412                if_not_exists,
413                format_encode,
414                source_watermarks,
415                append_only,
416                on_conflict,
417                with_version_column.map(|x| x.real_value()),
418                cdc_table_info,
419                include_column_options,
420                webhook_info,
421                engine,
422            )
423            .await
424        }
425        Statement::CreateDatabase {
426            db_name,
427            if_not_exists,
428            owner,
429            resource_group,
430            barrier_interval_ms,
431            checkpoint_frequency,
432        } => {
433            create_database::handle_create_database(
434                handler_args,
435                db_name,
436                if_not_exists,
437                owner,
438                resource_group,
439                barrier_interval_ms,
440                checkpoint_frequency,
441            )
442            .await
443        }
444        Statement::CreateSchema {
445            schema_name,
446            if_not_exists,
447            owner,
448        } => {
449            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
450                .await
451        }
452        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
453        Statement::DeclareCursor { stmt } => {
454            declare_cursor::handle_declare_cursor(handler_args, stmt).await
455        }
456        Statement::FetchCursor { stmt } => {
457            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
458        }
459        Statement::CloseCursor { stmt } => {
460            close_cursor::handle_close_cursor(handler_args, stmt).await
461        }
462        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
463        Statement::Grant { .. } => {
464            handle_privilege::handle_grant_privilege(handler_args, stmt).await
465        }
466        Statement::Revoke { .. } => {
467            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
468        }
469        Statement::Describe { name, kind } => match kind {
470            DescribeKind::Fragments => {
471                describe::handle_describe_fragments(handler_args, name).await
472            }
473            DescribeKind::Plain => describe::handle_describe(handler_args, name),
474        },
475        Statement::DescribeFragment { fragment_id } => {
476            describe::handle_describe_fragment(handler_args, fragment_id).await
477        }
478        Statement::Discard(..) => discard::handle_discard(handler_args),
479        Statement::ShowObjects {
480            object: show_object,
481            filter,
482        } => show::handle_show_object(handler_args, show_object, filter).await,
483        Statement::ShowCreateObject { create_type, name } => {
484            show::handle_show_create_object(handler_args, create_type, name)
485        }
486        Statement::ShowTransactionIsolationLevel => {
487            transaction::handle_show_isolation_level(handler_args)
488        }
489        Statement::Drop(DropStatement {
490            object_type,
491            object_name,
492            if_exists,
493            drop_mode,
494        }) => {
495            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
496                match object_type {
497                    ObjectType::MaterializedView
498                    | ObjectType::View
499                    | ObjectType::Sink
500                    | ObjectType::Source
501                    | ObjectType::Subscription
502                    | ObjectType::Index
503                    | ObjectType::Table
504                    | ObjectType::Schema => true,
505                    ObjectType::Database
506                    | ObjectType::User
507                    | ObjectType::Connection
508                    | ObjectType::Secret => {
509                        bail_not_implemented!("DROP CASCADE");
510                    }
511                }
512            } else {
513                false
514            };
515            match object_type {
516                ObjectType::Table => {
517                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
518                        .await
519                }
520                ObjectType::MaterializedView => {
521                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
522                }
523                ObjectType::Index => {
524                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
525                        .await
526                }
527                ObjectType::Source => {
528                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
529                        .await
530                }
531                ObjectType::Sink => {
532                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
533                }
534                ObjectType::Subscription => {
535                    drop_subscription::handle_drop_subscription(
536                        handler_args,
537                        object_name,
538                        if_exists,
539                        cascade,
540                    )
541                    .await
542                }
543                ObjectType::Database => {
544                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
545                }
546                ObjectType::Schema => {
547                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
548                        .await
549                }
550                ObjectType::User => {
551                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
552                }
553                ObjectType::View => {
554                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
555                }
556                ObjectType::Connection => {
557                    drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
558                        .await
559                }
560                ObjectType::Secret => {
561                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
562                }
563            }
564        }
565        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
566        Statement::DropFunction {
567            if_exists,
568            func_desc,
569            option,
570        } => {
571            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
572                .await
573        }
574        Statement::DropAggregate {
575            if_exists,
576            func_desc,
577            option,
578        } => {
579            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
580                .await
581        }
582        Statement::Query(_)
583        | Statement::Insert { .. }
584        | Statement::Delete { .. }
585        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
586        Statement::CreateView {
587            materialized,
588            if_not_exists,
589            name,
590            columns,
591            query,
592            with_options: _, // It is put in OptimizerContext
593            or_replace,      // not supported
594            emit_mode,
595        } => {
596            if or_replace {
597                bail_not_implemented!("CREATE OR REPLACE VIEW");
598            }
599            if materialized {
600                create_mv::handle_create_mv(
601                    handler_args,
602                    if_not_exists,
603                    name,
604                    *query,
605                    columns,
606                    emit_mode,
607                )
608                .await
609            } else {
610                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
611                    .await
612            }
613        }
614        Statement::Flush => flush::handle_flush(handler_args).await,
615        Statement::Wait => wait::handle_wait(handler_args).await,
616        Statement::Recover => recover::handle_recover(handler_args).await,
617        Statement::SetVariable {
618            local: _,
619            variable,
620            value,
621        } => {
622            // special handle for `use database`
623            if variable.real_value().eq_ignore_ascii_case("database") {
624                let x = variable::set_var_to_param_str(&value);
625                let res = use_db::handle_use_db(
626                    handler_args,
627                    ObjectName::from(vec![Ident::from_real_value(
628                        x.as_deref().unwrap_or("default"),
629                    )]),
630                )?;
631                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
632                for notice in res.notices() {
633                    builder = builder.notice(notice);
634                }
635                return Ok(builder.into());
636            }
637            variable::handle_set(handler_args, variable, value)
638        }
639        Statement::SetTimeZone { local: _, value } => {
640            variable::handle_set_time_zone(handler_args, value)
641        }
642        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
643        Statement::CreateIndex {
644            name,
645            table_name,
646            columns,
647            include,
648            distributed_by,
649            unique,
650            if_not_exists,
651        } => {
652            if unique {
653                bail_not_implemented!("create unique index");
654            }
655
656            create_index::handle_create_index(
657                handler_args,
658                if_not_exists,
659                name,
660                table_name,
661                columns.to_vec(),
662                include,
663                distributed_by,
664            )
665            .await
666        }
667        Statement::AlterDatabase { name, operation } => match operation {
668            AlterDatabaseOperation::RenameDatabase { database_name } => {
669                alter_rename::handle_rename_database(handler_args, name, database_name).await
670            }
671            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
672                alter_owner::handle_alter_owner(
673                    handler_args,
674                    name,
675                    new_owner_name,
676                    StatementType::ALTER_DATABASE,
677                )
678                .await
679            }
680            AlterDatabaseOperation::SetParam(config_param) => {
681                let ConfigParam { param, value } = config_param;
682
683                let database_param = match param.real_value().to_uppercase().as_str() {
684                    "BARRIER_INTERVAL_MS" => {
685                        let barrier_interval_ms = match value {
686                            SetVariableValue::Default => None,
687                            SetVariableValue::Single(SetVariableValueSingle::Literal(
688                                Value::Number(num),
689                            )) => {
690                                let num = num.parse::<u32>().map_err(|e| {
691                                    ErrorCode::InvalidInputSyntax(format!(
692                                        "barrier_interval_ms must be a u32 integer: {}",
693                                        e.as_report()
694                                    ))
695                                })?;
696                                Some(num)
697                            }
698                            _ => {
699                                return Err(ErrorCode::InvalidInputSyntax(
700                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
701                                        .to_owned(),
702                                )
703                                .into());
704                            }
705                        };
706                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
707                    }
708                    "CHECKPOINT_FREQUENCY" => {
709                        let checkpoint_frequency = match value {
710                            SetVariableValue::Default => None,
711                            SetVariableValue::Single(SetVariableValueSingle::Literal(
712                                Value::Number(num),
713                            )) => {
714                                let num = num.parse::<u64>().map_err(|e| {
715                                    ErrorCode::InvalidInputSyntax(format!(
716                                        "checkpoint_frequency must be a u64 integer: {}",
717                                        e.as_report()
718                                    ))
719                                })?;
720                                Some(num)
721                            }
722                            _ => {
723                                return Err(ErrorCode::InvalidInputSyntax(
724                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
725                                        .to_owned(),
726                                )
727                                .into());
728                            }
729                        };
730                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
731                    }
732                    _ => {
733                        return Err(ErrorCode::InvalidInputSyntax(format!(
734                            "Unsupported database config parameter: {}",
735                            param.real_value()
736                        ))
737                        .into());
738                    }
739                };
740
741                alter_database_param::handle_alter_database_param(
742                    handler_args,
743                    name,
744                    database_param,
745                )
746                .await
747            }
748        },
749        Statement::AlterSchema { name, operation } => match operation {
750            AlterSchemaOperation::RenameSchema { schema_name } => {
751                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
752            }
753            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
754                alter_owner::handle_alter_owner(
755                    handler_args,
756                    name,
757                    new_owner_name,
758                    StatementType::ALTER_SCHEMA,
759                )
760                .await
761            }
762            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
763                alter_swap_rename::handle_swap_rename(
764                    handler_args,
765                    name,
766                    target_schema,
767                    StatementType::ALTER_SCHEMA,
768                )
769                .await
770            }
771        },
772        Statement::AlterTable { name, operation } => match operation {
773            AlterTableOperation::AddColumn { .. }
774            | AlterTableOperation::DropColumn { .. }
775            | AlterTableOperation::AlterColumn { .. } => {
776                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
777            }
778            AlterTableOperation::RenameTable { table_name } => {
779                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
780                    .await
781            }
782            AlterTableOperation::ChangeOwner { new_owner_name } => {
783                alter_owner::handle_alter_owner(
784                    handler_args,
785                    name,
786                    new_owner_name,
787                    StatementType::ALTER_TABLE,
788                )
789                .await
790            }
791            AlterTableOperation::SetParallelism {
792                parallelism,
793                deferred,
794            } => {
795                alter_parallelism::handle_alter_parallelism(
796                    handler_args,
797                    name,
798                    parallelism,
799                    StatementType::ALTER_TABLE,
800                    deferred,
801                )
802                .await
803            }
804            AlterTableOperation::SetSchema { new_schema_name } => {
805                alter_set_schema::handle_alter_set_schema(
806                    handler_args,
807                    name,
808                    new_schema_name,
809                    StatementType::ALTER_TABLE,
810                    None,
811                )
812                .await
813            }
814            AlterTableOperation::RefreshSchema => {
815                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
816            }
817            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
818                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
819                    handler_args,
820                    PbThrottleTarget::TableWithSource,
821                    name,
822                    rate_limit,
823                )
824                .await
825            }
826            AlterTableOperation::DropConnector => {
827                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
828                    .await
829            }
830            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
831                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
832                    handler_args,
833                    PbThrottleTarget::TableDml,
834                    name,
835                    rate_limit,
836                )
837                .await
838            }
839            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
840                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
841                    handler_args,
842                    PbThrottleTarget::CdcTable,
843                    name,
844                    rate_limit,
845                )
846                .await
847            }
848            AlterTableOperation::SwapRenameTable { target_table } => {
849                alter_swap_rename::handle_swap_rename(
850                    handler_args,
851                    name,
852                    target_table,
853                    StatementType::ALTER_TABLE,
854                )
855                .await
856            }
857            AlterTableOperation::AddConstraint { .. }
858            | AlterTableOperation::DropConstraint { .. }
859            | AlterTableOperation::RenameColumn { .. }
860            | AlterTableOperation::ChangeColumn { .. }
861            | AlterTableOperation::RenameConstraint { .. } => {
862                bail_not_implemented!(
863                    "Unhandled statement: {}",
864                    Statement::AlterTable { name, operation }
865                )
866            }
867        },
868        Statement::AlterIndex { name, operation } => match operation {
869            AlterIndexOperation::RenameIndex { index_name } => {
870                alter_rename::handle_rename_index(handler_args, name, index_name).await
871            }
872            AlterIndexOperation::SetParallelism {
873                parallelism,
874                deferred,
875            } => {
876                alter_parallelism::handle_alter_parallelism(
877                    handler_args,
878                    name,
879                    parallelism,
880                    StatementType::ALTER_INDEX,
881                    deferred,
882                )
883                .await
884            }
885        },
886        Statement::AlterView {
887            materialized,
888            name,
889            operation,
890        } => {
891            let statement_type = if materialized {
892                StatementType::ALTER_MATERIALIZED_VIEW
893            } else {
894                StatementType::ALTER_VIEW
895            };
896            match operation {
897                AlterViewOperation::RenameView { view_name } => {
898                    if materialized {
899                        alter_rename::handle_rename_table(
900                            handler_args,
901                            TableType::MaterializedView,
902                            name,
903                            view_name,
904                        )
905                        .await
906                    } else {
907                        alter_rename::handle_rename_view(handler_args, name, view_name).await
908                    }
909                }
910                AlterViewOperation::SetParallelism {
911                    parallelism,
912                    deferred,
913                } => {
914                    if !materialized {
915                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
916                    }
917                    alter_parallelism::handle_alter_parallelism(
918                        handler_args,
919                        name,
920                        parallelism,
921                        statement_type,
922                        deferred,
923                    )
924                    .await
925                }
926                AlterViewOperation::SetResourceGroup {
927                    resource_group,
928                    deferred,
929                } => {
930                    if !materialized {
931                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
932                    }
933                    alter_resource_group::handle_alter_resource_group(
934                        handler_args,
935                        name,
936                        resource_group,
937                        statement_type,
938                        deferred,
939                    )
940                    .await
941                }
942                AlterViewOperation::ChangeOwner { new_owner_name } => {
943                    alter_owner::handle_alter_owner(
944                        handler_args,
945                        name,
946                        new_owner_name,
947                        statement_type,
948                    )
949                    .await
950                }
951                AlterViewOperation::SetSchema { new_schema_name } => {
952                    alter_set_schema::handle_alter_set_schema(
953                        handler_args,
954                        name,
955                        new_schema_name,
956                        statement_type,
957                        None,
958                    )
959                    .await
960                }
961                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
962                    if !materialized {
963                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
964                    }
965                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
966                        handler_args,
967                        PbThrottleTarget::Mv,
968                        name,
969                        rate_limit,
970                    )
971                    .await
972                }
973                AlterViewOperation::SwapRenameView { target_view } => {
974                    alter_swap_rename::handle_swap_rename(
975                        handler_args,
976                        name,
977                        target_view,
978                        statement_type,
979                    )
980                    .await
981                }
982            }
983        }
984
985        Statement::AlterSink { name, operation } => match operation {
986            AlterSinkOperation::SetSinkProps { changed_props } => {
987                alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await
988            }
989            AlterSinkOperation::RenameSink { sink_name } => {
990                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
991            }
992            AlterSinkOperation::ChangeOwner { new_owner_name } => {
993                alter_owner::handle_alter_owner(
994                    handler_args,
995                    name,
996                    new_owner_name,
997                    StatementType::ALTER_SINK,
998                )
999                .await
1000            }
1001            AlterSinkOperation::SetSchema { new_schema_name } => {
1002                alter_set_schema::handle_alter_set_schema(
1003                    handler_args,
1004                    name,
1005                    new_schema_name,
1006                    StatementType::ALTER_SINK,
1007                    None,
1008                )
1009                .await
1010            }
1011            AlterSinkOperation::SetParallelism {
1012                parallelism,
1013                deferred,
1014            } => {
1015                alter_parallelism::handle_alter_parallelism(
1016                    handler_args,
1017                    name,
1018                    parallelism,
1019                    StatementType::ALTER_SINK,
1020                    deferred,
1021                )
1022                .await
1023            }
1024            AlterSinkOperation::SwapRenameSink { target_sink } => {
1025                alter_swap_rename::handle_swap_rename(
1026                    handler_args,
1027                    name,
1028                    target_sink,
1029                    StatementType::ALTER_SINK,
1030                )
1031                .await
1032            }
1033            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1034                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1035                    handler_args,
1036                    PbThrottleTarget::Sink,
1037                    name,
1038                    rate_limit,
1039                )
1040                .await
1041            }
1042        },
1043        Statement::AlterSubscription { name, operation } => match operation {
1044            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1045                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1046                    .await
1047            }
1048            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1049                alter_owner::handle_alter_owner(
1050                    handler_args,
1051                    name,
1052                    new_owner_name,
1053                    StatementType::ALTER_SUBSCRIPTION,
1054                )
1055                .await
1056            }
1057            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1058                alter_set_schema::handle_alter_set_schema(
1059                    handler_args,
1060                    name,
1061                    new_schema_name,
1062                    StatementType::ALTER_SUBSCRIPTION,
1063                    None,
1064                )
1065                .await
1066            }
1067            AlterSubscriptionOperation::SwapRenameSubscription {
1068                target_subscription,
1069            } => {
1070                alter_swap_rename::handle_swap_rename(
1071                    handler_args,
1072                    name,
1073                    target_subscription,
1074                    StatementType::ALTER_SUBSCRIPTION,
1075                )
1076                .await
1077            }
1078        },
1079        Statement::AlterSource { name, operation } => match operation {
1080            AlterSourceOperation::RenameSource { source_name } => {
1081                alter_rename::handle_rename_source(handler_args, name, source_name).await
1082            }
1083            AlterSourceOperation::AddColumn { .. } => {
1084                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1085            }
1086            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1087                alter_owner::handle_alter_owner(
1088                    handler_args,
1089                    name,
1090                    new_owner_name,
1091                    StatementType::ALTER_SOURCE,
1092                )
1093                .await
1094            }
1095            AlterSourceOperation::SetSchema { new_schema_name } => {
1096                alter_set_schema::handle_alter_set_schema(
1097                    handler_args,
1098                    name,
1099                    new_schema_name,
1100                    StatementType::ALTER_SOURCE,
1101                    None,
1102                )
1103                .await
1104            }
1105            AlterSourceOperation::FormatEncode { format_encode } => {
1106                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1107                    .await
1108            }
1109            AlterSourceOperation::RefreshSchema => {
1110                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1111            }
1112            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1113                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1114                    handler_args,
1115                    PbThrottleTarget::Source,
1116                    name,
1117                    rate_limit,
1118                )
1119                .await
1120            }
1121            AlterSourceOperation::SwapRenameSource { target_source } => {
1122                alter_swap_rename::handle_swap_rename(
1123                    handler_args,
1124                    name,
1125                    target_source,
1126                    StatementType::ALTER_SOURCE,
1127                )
1128                .await
1129            }
1130            AlterSourceOperation::SetParallelism {
1131                parallelism,
1132                deferred,
1133            } => {
1134                alter_parallelism::handle_alter_parallelism(
1135                    handler_args,
1136                    name,
1137                    parallelism,
1138                    StatementType::ALTER_SOURCE,
1139                    deferred,
1140                )
1141                .await
1142            }
1143        },
1144        Statement::AlterFunction {
1145            name,
1146            args,
1147            operation,
1148        } => match operation {
1149            AlterFunctionOperation::SetSchema { new_schema_name } => {
1150                alter_set_schema::handle_alter_set_schema(
1151                    handler_args,
1152                    name,
1153                    new_schema_name,
1154                    StatementType::ALTER_FUNCTION,
1155                    args,
1156                )
1157                .await
1158            }
1159        },
1160        Statement::AlterConnection { name, operation } => match operation {
1161            AlterConnectionOperation::SetSchema { new_schema_name } => {
1162                alter_set_schema::handle_alter_set_schema(
1163                    handler_args,
1164                    name,
1165                    new_schema_name,
1166                    StatementType::ALTER_CONNECTION,
1167                    None,
1168                )
1169                .await
1170            }
1171            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1172                alter_owner::handle_alter_owner(
1173                    handler_args,
1174                    name,
1175                    new_owner_name,
1176                    StatementType::ALTER_CONNECTION,
1177                )
1178                .await
1179            }
1180        },
1181        Statement::AlterSystem { param, value } => {
1182            alter_system::handle_alter_system(handler_args, param, value).await
1183        }
1184        Statement::AlterSecret {
1185            name,
1186            with_options,
1187            operation,
1188        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1189        Statement::AlterFragment {
1190            fragment_id,
1191            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1192        } => {
1193            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1194                &handler_args.session,
1195                PbThrottleTarget::Fragment,
1196                fragment_id,
1197                rate_limit,
1198                StatementType::SET_VARIABLE,
1199            )
1200            .await
1201        }
1202        Statement::StartTransaction { modes } => {
1203            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1204        }
1205        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1206        Statement::Commit { chain } => {
1207            transaction::handle_commit(handler_args, COMMIT, chain).await
1208        }
1209        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1210        Statement::Rollback { chain } => {
1211            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1212        }
1213        Statement::SetTransaction {
1214            modes,
1215            snapshot,
1216            session,
1217        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1218        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1219        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1220        Statement::Comment {
1221            object_type,
1222            object_name,
1223            comment,
1224        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1225        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1226        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1227    }
1228}
1229
1230fn check_ban_ddl_for_iceberg_engine_table(
1231    session: Arc<SessionImpl>,
1232    stmt: &Statement,
1233) -> Result<()> {
1234    match stmt {
1235        Statement::AlterTable {
1236            name,
1237            operation:
1238                operation @ (AlterTableOperation::AddColumn { .. }
1239                | AlterTableOperation::DropColumn { .. }),
1240        } => {
1241            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1242            if table.is_iceberg_engine_table() {
1243                bail!(
1244                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1245                    operation,
1246                    schema_name,
1247                    name
1248                );
1249            }
1250        }
1251
1252        Statement::AlterTable {
1253            name,
1254            operation: AlterTableOperation::RenameTable { .. },
1255        } => {
1256            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1257            if table.is_iceberg_engine_table() {
1258                bail!(
1259                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1260                    schema_name,
1261                    name
1262                );
1263            }
1264        }
1265
1266        Statement::AlterTable {
1267            name,
1268            operation: AlterTableOperation::ChangeOwner { .. },
1269        } => {
1270            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1271            if table.is_iceberg_engine_table() {
1272                bail!(
1273                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1274                    schema_name,
1275                    name
1276                );
1277            }
1278        }
1279
1280        Statement::AlterTable {
1281            name,
1282            operation: AlterTableOperation::SetParallelism { .. },
1283        } => {
1284            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1285            if table.is_iceberg_engine_table() {
1286                bail!(
1287                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1288                    schema_name,
1289                    name
1290                );
1291            }
1292        }
1293
1294        Statement::AlterTable {
1295            name,
1296            operation: AlterTableOperation::SetSchema { .. },
1297        } => {
1298            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1299            if table.is_iceberg_engine_table() {
1300                bail!(
1301                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1302                    schema_name,
1303                    name
1304                );
1305            }
1306        }
1307
1308        Statement::AlterTable {
1309            name,
1310            operation: AlterTableOperation::RefreshSchema,
1311        } => {
1312            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1313            if table.is_iceberg_engine_table() {
1314                bail!(
1315                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1316                    schema_name,
1317                    name
1318                );
1319            }
1320        }
1321
1322        Statement::AlterTable {
1323            name,
1324            operation: AlterTableOperation::SetSourceRateLimit { .. },
1325        } => {
1326            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1327            if table.is_iceberg_engine_table() {
1328                bail!(
1329                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1330                    schema_name,
1331                    name
1332                );
1333            }
1334        }
1335
1336        _ => {}
1337    }
1338
1339    Ok(())
1340}