risingwave_frontend/handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63mod alter_table_with_sr;
64pub mod alter_user;
65pub mod cancel_job;
66pub mod close_cursor;
67mod comment;
68pub mod create_aggregate;
69pub mod create_connection;
70mod create_database;
71pub mod create_function;
72pub mod create_index;
73pub mod create_mv;
74pub mod create_schema;
75pub mod create_secret;
76pub mod create_sink;
77pub mod create_source;
78pub mod create_sql_function;
79pub mod create_subscription;
80pub mod create_table;
81pub mod create_table_as;
82pub mod create_user;
83pub mod create_view;
84pub mod declare_cursor;
85pub mod describe;
86pub mod discard;
87mod drop_connection;
88mod drop_database;
89pub mod drop_function;
90mod drop_index;
91pub mod drop_mv;
92mod drop_schema;
93pub mod drop_secret;
94pub mod drop_sink;
95pub mod drop_source;
96pub mod drop_subscription;
97pub mod drop_table;
98pub mod drop_user;
99mod drop_view;
100pub mod explain;
101pub mod explain_analyze_stream_job;
102pub mod extended_handle;
103pub mod fetch_cursor;
104mod flush;
105pub mod handle_privilege;
106pub mod kill_process;
107mod prepared_statement;
108pub mod privilege;
109pub mod query;
110mod recover;
111mod refresh;
112pub mod show;
113mod transaction;
114mod use_db;
115pub mod util;
116pub mod vacuum;
117pub mod variable;
118mod wait;
119
120pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
121
122/// The [`PgResponseBuilder`] used by RisingWave.
123pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
124
125/// The [`PgResponse`] used by RisingWave.
126pub type RwPgResponse = PgResponse<PgResponseStream>;
127
128#[easy_ext::ext(RwPgResponseBuilderExt)]
129impl RwPgResponseBuilder {
130    /// Append rows to the response.
131    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
132        let fields = T::fields();
133        self.values(
134            rows.into_iter()
135                .map(|row| {
136                    Row::new(
137                        row.into_owned_row()
138                            .into_iter()
139                            .zip_eq_fast(&fields)
140                            .map(|(datum, (_, ty))| {
141                                datum.map(|scalar| {
142                                    scalar.as_scalar_ref_impl().text_format(ty).into()
143                                })
144                            })
145                            .collect(),
146                    )
147                })
148                .collect_vec()
149                .into(),
150            fields_to_descriptors(fields),
151        )
152    }
153}
154
155pub fn fields_to_descriptors(
156    fields: Vec<(&str, risingwave_common::types::DataType)>,
157) -> Vec<PgFieldDescriptor> {
158    fields
159        .iter()
160        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
161        .collect()
162}
163
164pub enum PgResponseStream {
165    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
166    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
167    Rows(BoxStream<'static, RowSetResult>),
168}
169
170impl Stream for PgResponseStream {
171    type Item = std::result::Result<Vec<Row>, BoxedError>;
172
173    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174        match &mut *self {
175            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
176            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
177            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
178        }
179    }
180}
181
182impl From<Vec<Row>> for PgResponseStream {
183    fn from(rows: Vec<Row>) -> Self {
184        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
185    }
186}
187
188#[derive(Clone)]
189pub struct HandlerArgs {
190    pub session: Arc<SessionImpl>,
191    pub sql: Arc<str>,
192    pub normalized_sql: String,
193    pub with_options: WithOptions,
194}
195
196impl HandlerArgs {
197    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
198        Ok(Self {
199            session,
200            sql,
201            with_options: WithOptions::try_from(stmt)?,
202            normalized_sql: Self::normalize_sql(stmt),
203        })
204    }
205
206    /// Get normalized SQL from the statement.
207    ///
208    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
209    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
210    ///   make it suitable for the `SHOW CREATE` statements.
211    fn normalize_sql(stmt: &Statement) -> String {
212        let mut stmt = stmt.clone();
213        match &mut stmt {
214            Statement::CreateView {
215                or_replace,
216                if_not_exists,
217                ..
218            } => {
219                *or_replace = false;
220                *if_not_exists = false;
221            }
222            Statement::CreateTable {
223                or_replace,
224                if_not_exists,
225                ..
226            } => {
227                *or_replace = false;
228                *if_not_exists = false;
229            }
230            Statement::CreateIndex { if_not_exists, .. } => {
231                *if_not_exists = false;
232            }
233            Statement::CreateSource {
234                stmt: CreateSourceStatement { if_not_exists, .. },
235                ..
236            } => {
237                *if_not_exists = false;
238            }
239            Statement::CreateSink {
240                stmt: CreateSinkStatement { if_not_exists, .. },
241            } => {
242                *if_not_exists = false;
243            }
244            Statement::CreateSubscription {
245                stmt: CreateSubscriptionStatement { if_not_exists, .. },
246            } => {
247                *if_not_exists = false;
248            }
249            Statement::CreateConnection {
250                stmt: CreateConnectionStatement { if_not_exists, .. },
251            } => {
252                *if_not_exists = false;
253            }
254            _ => {}
255        }
256        stmt.to_string()
257    }
258}
259
260pub async fn handle(
261    session: Arc<SessionImpl>,
262    stmt: Statement,
263    sql: Arc<str>,
264    formats: Vec<Format>,
265) -> Result<RwPgResponse> {
266    session.clear_cancel_query_flag();
267    let _guard = session.txn_begin_implicit();
268    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
269
270    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
271
272    match stmt {
273        Statement::Explain {
274            statement,
275            analyze,
276            options,
277        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
278        Statement::ExplainAnalyzeStreamJob {
279            target,
280            duration_secs,
281        } => {
282            explain_analyze_stream_job::handle_explain_analyze_stream_job(
283                handler_args,
284                target,
285                duration_secs,
286            )
287            .await
288        }
289        Statement::CreateSource { stmt } => {
290            create_source::handle_create_source(handler_args, stmt).await
291        }
292        Statement::CreateSink { stmt } => {
293            create_sink::handle_create_sink(handler_args, stmt, false).await
294        }
295        Statement::CreateSubscription { stmt } => {
296            create_subscription::handle_create_subscription(handler_args, stmt).await
297        }
298        Statement::CreateConnection { stmt } => {
299            create_connection::handle_create_connection(handler_args, stmt).await
300        }
301        Statement::CreateSecret { stmt } => {
302            create_secret::handle_create_secret(handler_args, stmt).await
303        }
304        Statement::CreateFunction {
305            or_replace,
306            temporary,
307            if_not_exists,
308            name,
309            args,
310            returns,
311            params,
312            with_options,
313        } => {
314            // For general udf, `language` clause could be ignored
315            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
316            if params.language.is_none()
317                || !params
318                    .language
319                    .as_ref()
320                    .unwrap()
321                    .real_value()
322                    .eq_ignore_ascii_case("sql")
323            {
324                create_function::handle_create_function(
325                    handler_args,
326                    or_replace,
327                    temporary,
328                    if_not_exists,
329                    name,
330                    args,
331                    returns,
332                    params,
333                    with_options,
334                )
335                .await
336            } else {
337                create_sql_function::handle_create_sql_function(
338                    handler_args,
339                    or_replace,
340                    temporary,
341                    if_not_exists,
342                    name,
343                    args,
344                    returns,
345                    params,
346                )
347                .await
348            }
349        }
350        Statement::CreateAggregate {
351            or_replace,
352            if_not_exists,
353            name,
354            args,
355            returns,
356            params,
357            ..
358        } => {
359            create_aggregate::handle_create_aggregate(
360                handler_args,
361                or_replace,
362                if_not_exists,
363                name,
364                args,
365                returns,
366                params,
367            )
368            .await
369        }
370        Statement::CreateTable {
371            name,
372            columns,
373            wildcard_idx,
374            constraints,
375            query,
376            with_options: _, // It is put in OptimizerContext
377            // Not supported things
378            or_replace,
379            temporary,
380            if_not_exists,
381            format_encode,
382            source_watermarks,
383            append_only,
384            on_conflict,
385            with_version_column,
386            cdc_table_info,
387            include_column_options,
388            webhook_info,
389            engine,
390        } => {
391            if or_replace {
392                bail_not_implemented!("CREATE OR REPLACE TABLE");
393            }
394            if temporary {
395                bail_not_implemented!("CREATE TEMPORARY TABLE");
396            }
397            if let Some(query) = query {
398                return create_table_as::handle_create_as(
399                    handler_args,
400                    name,
401                    if_not_exists,
402                    query,
403                    columns,
404                    append_only,
405                    on_conflict,
406                    with_version_column.map(|x| x.real_value()),
407                    engine,
408                )
409                .await;
410            }
411            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
412            create_table::handle_create_table(
413                handler_args,
414                name,
415                columns,
416                wildcard_idx,
417                constraints,
418                if_not_exists,
419                format_encode,
420                source_watermarks,
421                append_only,
422                on_conflict,
423                with_version_column.map(|x| x.real_value()),
424                cdc_table_info,
425                include_column_options,
426                webhook_info,
427                engine,
428            )
429            .await
430        }
431        Statement::CreateDatabase {
432            db_name,
433            if_not_exists,
434            owner,
435            resource_group,
436            barrier_interval_ms,
437            checkpoint_frequency,
438        } => {
439            create_database::handle_create_database(
440                handler_args,
441                db_name,
442                if_not_exists,
443                owner,
444                resource_group,
445                barrier_interval_ms,
446                checkpoint_frequency,
447            )
448            .await
449        }
450        Statement::CreateSchema {
451            schema_name,
452            if_not_exists,
453            owner,
454        } => {
455            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
456                .await
457        }
458        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
459        Statement::DeclareCursor { stmt } => {
460            declare_cursor::handle_declare_cursor(handler_args, stmt).await
461        }
462        Statement::FetchCursor { stmt } => {
463            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
464        }
465        Statement::CloseCursor { stmt } => {
466            close_cursor::handle_close_cursor(handler_args, stmt).await
467        }
468        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
469        Statement::Grant { .. } => {
470            handle_privilege::handle_grant_privilege(handler_args, stmt).await
471        }
472        Statement::Revoke { .. } => {
473            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
474        }
475        Statement::Describe { name, kind } => match kind {
476            DescribeKind::Fragments => {
477                describe::handle_describe_fragments(handler_args, name).await
478            }
479            DescribeKind::Plain => describe::handle_describe(handler_args, name),
480        },
481        Statement::DescribeFragment { fragment_id } => {
482            describe::handle_describe_fragment(handler_args, fragment_id).await
483        }
484        Statement::Discard(..) => discard::handle_discard(handler_args),
485        Statement::ShowObjects {
486            object: show_object,
487            filter,
488        } => show::handle_show_object(handler_args, show_object, filter).await,
489        Statement::ShowCreateObject { create_type, name } => {
490            show::handle_show_create_object(handler_args, create_type, name)
491        }
492        Statement::ShowTransactionIsolationLevel => {
493            transaction::handle_show_isolation_level(handler_args)
494        }
495        Statement::Drop(DropStatement {
496            object_type,
497            object_name,
498            if_exists,
499            drop_mode,
500        }) => {
501            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
502                match object_type {
503                    ObjectType::MaterializedView
504                    | ObjectType::View
505                    | ObjectType::Sink
506                    | ObjectType::Source
507                    | ObjectType::Subscription
508                    | ObjectType::Index
509                    | ObjectType::Table
510                    | ObjectType::Schema
511                    | ObjectType::Connection => true,
512                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
513                        bail_not_implemented!("DROP CASCADE");
514                    }
515                }
516            } else {
517                false
518            };
519            match object_type {
520                ObjectType::Table => {
521                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
522                        .await
523                }
524                ObjectType::MaterializedView => {
525                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
526                }
527                ObjectType::Index => {
528                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
529                        .await
530                }
531                ObjectType::Source => {
532                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
533                        .await
534                }
535                ObjectType::Sink => {
536                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
537                }
538                ObjectType::Subscription => {
539                    drop_subscription::handle_drop_subscription(
540                        handler_args,
541                        object_name,
542                        if_exists,
543                        cascade,
544                    )
545                    .await
546                }
547                ObjectType::Database => {
548                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
549                }
550                ObjectType::Schema => {
551                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
552                        .await
553                }
554                ObjectType::User => {
555                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
556                }
557                ObjectType::View => {
558                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
559                }
560                ObjectType::Connection => {
561                    drop_connection::handle_drop_connection(
562                        handler_args,
563                        object_name,
564                        if_exists,
565                        cascade,
566                    )
567                    .await
568                }
569                ObjectType::Secret => {
570                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
571                }
572            }
573        }
574        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
575        Statement::DropFunction {
576            if_exists,
577            func_desc,
578            option,
579        } => {
580            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
581                .await
582        }
583        Statement::DropAggregate {
584            if_exists,
585            func_desc,
586            option,
587        } => {
588            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
589                .await
590        }
591        Statement::Query(_)
592        | Statement::Insert { .. }
593        | Statement::Delete { .. }
594        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
595        Statement::CreateView {
596            materialized,
597            if_not_exists,
598            name,
599            columns,
600            query,
601            with_options: _, // It is put in OptimizerContext
602            or_replace,      // not supported
603            emit_mode,
604        } => {
605            if or_replace {
606                bail_not_implemented!("CREATE OR REPLACE VIEW");
607            }
608            if materialized {
609                create_mv::handle_create_mv(
610                    handler_args,
611                    if_not_exists,
612                    name,
613                    *query,
614                    columns,
615                    emit_mode,
616                )
617                .await
618            } else {
619                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
620                    .await
621            }
622        }
623        Statement::Flush => flush::handle_flush(handler_args).await,
624        Statement::Wait => wait::handle_wait(handler_args).await,
625        Statement::Recover => recover::handle_recover(handler_args).await,
626        Statement::SetVariable {
627            local: _,
628            variable,
629            value,
630        } => {
631            // special handle for `use database`
632            if variable.real_value().eq_ignore_ascii_case("database") {
633                let x = variable::set_var_to_param_str(&value);
634                let res = use_db::handle_use_db(
635                    handler_args,
636                    ObjectName::from(vec![Ident::from_real_value(
637                        x.as_deref().unwrap_or("default"),
638                    )]),
639                )?;
640                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
641                for notice in res.notices() {
642                    builder = builder.notice(notice);
643                }
644                return Ok(builder.into());
645            }
646            variable::handle_set(handler_args, variable, value)
647        }
648        Statement::SetTimeZone { local: _, value } => {
649            variable::handle_set_time_zone(handler_args, value)
650        }
651        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
652        Statement::CreateIndex {
653            name,
654            table_name,
655            columns,
656            include,
657            distributed_by,
658            unique,
659            if_not_exists,
660        } => {
661            if unique {
662                bail_not_implemented!("create unique index");
663            }
664
665            create_index::handle_create_index(
666                handler_args,
667                if_not_exists,
668                name,
669                table_name,
670                columns.to_vec(),
671                include,
672                distributed_by,
673            )
674            .await
675        }
676        Statement::AlterDatabase { name, operation } => match operation {
677            AlterDatabaseOperation::RenameDatabase { database_name } => {
678                alter_rename::handle_rename_database(handler_args, name, database_name).await
679            }
680            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
681                alter_owner::handle_alter_owner(
682                    handler_args,
683                    name,
684                    new_owner_name,
685                    StatementType::ALTER_DATABASE,
686                )
687                .await
688            }
689            AlterDatabaseOperation::SetParam(config_param) => {
690                let ConfigParam { param, value } = config_param;
691
692                let database_param = match param.real_value().to_uppercase().as_str() {
693                    "BARRIER_INTERVAL_MS" => {
694                        let barrier_interval_ms = match value {
695                            SetVariableValue::Default => None,
696                            SetVariableValue::Single(SetVariableValueSingle::Literal(
697                                Value::Number(num),
698                            )) => {
699                                let num = num.parse::<u32>().map_err(|e| {
700                                    ErrorCode::InvalidInputSyntax(format!(
701                                        "barrier_interval_ms must be a u32 integer: {}",
702                                        e.as_report()
703                                    ))
704                                })?;
705                                Some(num)
706                            }
707                            _ => {
708                                return Err(ErrorCode::InvalidInputSyntax(
709                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
710                                        .to_owned(),
711                                )
712                                .into());
713                            }
714                        };
715                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
716                    }
717                    "CHECKPOINT_FREQUENCY" => {
718                        let checkpoint_frequency = match value {
719                            SetVariableValue::Default => None,
720                            SetVariableValue::Single(SetVariableValueSingle::Literal(
721                                Value::Number(num),
722                            )) => {
723                                let num = num.parse::<u64>().map_err(|e| {
724                                    ErrorCode::InvalidInputSyntax(format!(
725                                        "checkpoint_frequency must be a u64 integer: {}",
726                                        e.as_report()
727                                    ))
728                                })?;
729                                Some(num)
730                            }
731                            _ => {
732                                return Err(ErrorCode::InvalidInputSyntax(
733                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
734                                        .to_owned(),
735                                )
736                                .into());
737                            }
738                        };
739                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
740                    }
741                    _ => {
742                        return Err(ErrorCode::InvalidInputSyntax(format!(
743                            "Unsupported database config parameter: {}",
744                            param.real_value()
745                        ))
746                        .into());
747                    }
748                };
749
750                alter_database_param::handle_alter_database_param(
751                    handler_args,
752                    name,
753                    database_param,
754                )
755                .await
756            }
757        },
758        Statement::AlterSchema { name, operation } => match operation {
759            AlterSchemaOperation::RenameSchema { schema_name } => {
760                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
761            }
762            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
763                alter_owner::handle_alter_owner(
764                    handler_args,
765                    name,
766                    new_owner_name,
767                    StatementType::ALTER_SCHEMA,
768                )
769                .await
770            }
771            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
772                alter_swap_rename::handle_swap_rename(
773                    handler_args,
774                    name,
775                    target_schema,
776                    StatementType::ALTER_SCHEMA,
777                )
778                .await
779            }
780        },
781        Statement::AlterTable { name, operation } => match operation {
782            AlterTableOperation::AddColumn { .. }
783            | AlterTableOperation::DropColumn { .. }
784            | AlterTableOperation::AlterColumn { .. } => {
785                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
786            }
787            AlterTableOperation::RenameTable { table_name } => {
788                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
789                    .await
790            }
791            AlterTableOperation::ChangeOwner { new_owner_name } => {
792                alter_owner::handle_alter_owner(
793                    handler_args,
794                    name,
795                    new_owner_name,
796                    StatementType::ALTER_TABLE,
797                )
798                .await
799            }
800            AlterTableOperation::SetParallelism {
801                parallelism,
802                deferred,
803            } => {
804                alter_parallelism::handle_alter_parallelism(
805                    handler_args,
806                    name,
807                    parallelism,
808                    StatementType::ALTER_TABLE,
809                    deferred,
810                )
811                .await
812            }
813            AlterTableOperation::SetSchema { new_schema_name } => {
814                alter_set_schema::handle_alter_set_schema(
815                    handler_args,
816                    name,
817                    new_schema_name,
818                    StatementType::ALTER_TABLE,
819                    None,
820                )
821                .await
822            }
823            AlterTableOperation::RefreshSchema => {
824                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
825            }
826            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
827                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
828                    handler_args,
829                    PbThrottleTarget::TableWithSource,
830                    name,
831                    rate_limit,
832                )
833                .await
834            }
835            AlterTableOperation::DropConnector => {
836                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
837                    .await
838            }
839            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
840                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
841                    handler_args,
842                    PbThrottleTarget::TableDml,
843                    name,
844                    rate_limit,
845                )
846                .await
847            }
848            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
849                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
850                    handler_args,
851                    PbThrottleTarget::CdcTable,
852                    name,
853                    rate_limit,
854                )
855                .await
856            }
857            AlterTableOperation::SwapRenameTable { target_table } => {
858                alter_swap_rename::handle_swap_rename(
859                    handler_args,
860                    name,
861                    target_table,
862                    StatementType::ALTER_TABLE,
863                )
864                .await
865            }
866            AlterTableOperation::AlterConnectorProps { alter_props } => {
867                // If exists a associated source, it should be of the same name.
868                crate::handler::alter_source_props::handle_alter_table_connector_props(
869                    handler_args,
870                    name,
871                    alter_props,
872                )
873                .await
874            }
875            AlterTableOperation::AddConstraint { .. }
876            | AlterTableOperation::DropConstraint { .. }
877            | AlterTableOperation::RenameColumn { .. }
878            | AlterTableOperation::ChangeColumn { .. }
879            | AlterTableOperation::RenameConstraint { .. } => {
880                bail_not_implemented!(
881                    "Unhandled statement: {}",
882                    Statement::AlterTable { name, operation }
883                )
884            }
885        },
886        Statement::AlterIndex { name, operation } => match operation {
887            AlterIndexOperation::RenameIndex { index_name } => {
888                alter_rename::handle_rename_index(handler_args, name, index_name).await
889            }
890            AlterIndexOperation::SetParallelism {
891                parallelism,
892                deferred,
893            } => {
894                alter_parallelism::handle_alter_parallelism(
895                    handler_args,
896                    name,
897                    parallelism,
898                    StatementType::ALTER_INDEX,
899                    deferred,
900                )
901                .await
902            }
903        },
904        Statement::AlterView {
905            materialized,
906            name,
907            operation,
908        } => {
909            let statement_type = if materialized {
910                StatementType::ALTER_MATERIALIZED_VIEW
911            } else {
912                StatementType::ALTER_VIEW
913            };
914            match operation {
915                AlterViewOperation::RenameView { view_name } => {
916                    if materialized {
917                        alter_rename::handle_rename_table(
918                            handler_args,
919                            TableType::MaterializedView,
920                            name,
921                            view_name,
922                        )
923                        .await
924                    } else {
925                        alter_rename::handle_rename_view(handler_args, name, view_name).await
926                    }
927                }
928                AlterViewOperation::SetParallelism {
929                    parallelism,
930                    deferred,
931                } => {
932                    if !materialized {
933                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
934                    }
935                    alter_parallelism::handle_alter_parallelism(
936                        handler_args,
937                        name,
938                        parallelism,
939                        statement_type,
940                        deferred,
941                    )
942                    .await
943                }
944                AlterViewOperation::SetResourceGroup {
945                    resource_group,
946                    deferred,
947                } => {
948                    if !materialized {
949                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
950                    }
951                    alter_resource_group::handle_alter_resource_group(
952                        handler_args,
953                        name,
954                        resource_group,
955                        statement_type,
956                        deferred,
957                    )
958                    .await
959                }
960                AlterViewOperation::ChangeOwner { new_owner_name } => {
961                    alter_owner::handle_alter_owner(
962                        handler_args,
963                        name,
964                        new_owner_name,
965                        statement_type,
966                    )
967                    .await
968                }
969                AlterViewOperation::SetSchema { new_schema_name } => {
970                    alter_set_schema::handle_alter_set_schema(
971                        handler_args,
972                        name,
973                        new_schema_name,
974                        statement_type,
975                        None,
976                    )
977                    .await
978                }
979                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
980                    if !materialized {
981                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
982                    }
983                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
984                        handler_args,
985                        PbThrottleTarget::Mv,
986                        name,
987                        rate_limit,
988                    )
989                    .await
990                }
991                AlterViewOperation::SwapRenameView { target_view } => {
992                    alter_swap_rename::handle_swap_rename(
993                        handler_args,
994                        name,
995                        target_view,
996                        statement_type,
997                    )
998                    .await
999                }
1000                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1001                    if !materialized {
1002                        bail!(
1003                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1004                        );
1005                    }
1006                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1007                }
1008                AlterViewOperation::AsQuery { query } => {
1009                    if !materialized {
1010                        bail_not_implemented!("ALTER VIEW AS QUERY");
1011                    }
1012                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1013                    if !cfg!(debug_assertions) {
1014                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1015                    }
1016                    alter_mv::handle_alter_mv(handler_args, name, query).await
1017                }
1018            }
1019        }
1020
1021        Statement::AlterSink { name, operation } => match operation {
1022            AlterSinkOperation::AlterConnectorProps {
1023                alter_props: changed_props,
1024            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1025            AlterSinkOperation::RenameSink { sink_name } => {
1026                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1027            }
1028            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1029                alter_owner::handle_alter_owner(
1030                    handler_args,
1031                    name,
1032                    new_owner_name,
1033                    StatementType::ALTER_SINK,
1034                )
1035                .await
1036            }
1037            AlterSinkOperation::SetSchema { new_schema_name } => {
1038                alter_set_schema::handle_alter_set_schema(
1039                    handler_args,
1040                    name,
1041                    new_schema_name,
1042                    StatementType::ALTER_SINK,
1043                    None,
1044                )
1045                .await
1046            }
1047            AlterSinkOperation::SetParallelism {
1048                parallelism,
1049                deferred,
1050            } => {
1051                alter_parallelism::handle_alter_parallelism(
1052                    handler_args,
1053                    name,
1054                    parallelism,
1055                    StatementType::ALTER_SINK,
1056                    deferred,
1057                )
1058                .await
1059            }
1060            AlterSinkOperation::SwapRenameSink { target_sink } => {
1061                alter_swap_rename::handle_swap_rename(
1062                    handler_args,
1063                    name,
1064                    target_sink,
1065                    StatementType::ALTER_SINK,
1066                )
1067                .await
1068            }
1069            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1070                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1071                    handler_args,
1072                    PbThrottleTarget::Sink,
1073                    name,
1074                    rate_limit,
1075                )
1076                .await
1077            }
1078            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1079                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1080                    handler_args,
1081                    name,
1082                    enable,
1083                )
1084                .await
1085            }
1086        },
1087        Statement::AlterSubscription { name, operation } => match operation {
1088            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1089                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1090                    .await
1091            }
1092            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1093                alter_owner::handle_alter_owner(
1094                    handler_args,
1095                    name,
1096                    new_owner_name,
1097                    StatementType::ALTER_SUBSCRIPTION,
1098                )
1099                .await
1100            }
1101            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1102                alter_set_schema::handle_alter_set_schema(
1103                    handler_args,
1104                    name,
1105                    new_schema_name,
1106                    StatementType::ALTER_SUBSCRIPTION,
1107                    None,
1108                )
1109                .await
1110            }
1111            AlterSubscriptionOperation::SwapRenameSubscription {
1112                target_subscription,
1113            } => {
1114                alter_swap_rename::handle_swap_rename(
1115                    handler_args,
1116                    name,
1117                    target_subscription,
1118                    StatementType::ALTER_SUBSCRIPTION,
1119                )
1120                .await
1121            }
1122        },
1123        Statement::AlterSource { name, operation } => match operation {
1124            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1125                alter_source_props::handle_alter_source_connector_props(
1126                    handler_args,
1127                    name,
1128                    alter_props,
1129                )
1130                .await
1131            }
1132            AlterSourceOperation::RenameSource { source_name } => {
1133                alter_rename::handle_rename_source(handler_args, name, source_name).await
1134            }
1135            AlterSourceOperation::AddColumn { .. } => {
1136                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1137            }
1138            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1139                alter_owner::handle_alter_owner(
1140                    handler_args,
1141                    name,
1142                    new_owner_name,
1143                    StatementType::ALTER_SOURCE,
1144                )
1145                .await
1146            }
1147            AlterSourceOperation::SetSchema { new_schema_name } => {
1148                alter_set_schema::handle_alter_set_schema(
1149                    handler_args,
1150                    name,
1151                    new_schema_name,
1152                    StatementType::ALTER_SOURCE,
1153                    None,
1154                )
1155                .await
1156            }
1157            AlterSourceOperation::FormatEncode { format_encode } => {
1158                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1159                    .await
1160            }
1161            AlterSourceOperation::RefreshSchema => {
1162                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1163            }
1164            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1165                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1166                    handler_args,
1167                    PbThrottleTarget::Source,
1168                    name,
1169                    rate_limit,
1170                )
1171                .await
1172            }
1173            AlterSourceOperation::SwapRenameSource { target_source } => {
1174                alter_swap_rename::handle_swap_rename(
1175                    handler_args,
1176                    name,
1177                    target_source,
1178                    StatementType::ALTER_SOURCE,
1179                )
1180                .await
1181            }
1182            AlterSourceOperation::SetParallelism {
1183                parallelism,
1184                deferred,
1185            } => {
1186                alter_parallelism::handle_alter_parallelism(
1187                    handler_args,
1188                    name,
1189                    parallelism,
1190                    StatementType::ALTER_SOURCE,
1191                    deferred,
1192                )
1193                .await
1194            }
1195        },
1196        Statement::AlterFunction {
1197            name,
1198            args,
1199            operation,
1200        } => match operation {
1201            AlterFunctionOperation::SetSchema { new_schema_name } => {
1202                alter_set_schema::handle_alter_set_schema(
1203                    handler_args,
1204                    name,
1205                    new_schema_name,
1206                    StatementType::ALTER_FUNCTION,
1207                    args,
1208                )
1209                .await
1210            }
1211        },
1212        Statement::AlterConnection { name, operation } => match operation {
1213            AlterConnectionOperation::SetSchema { new_schema_name } => {
1214                alter_set_schema::handle_alter_set_schema(
1215                    handler_args,
1216                    name,
1217                    new_schema_name,
1218                    StatementType::ALTER_CONNECTION,
1219                    None,
1220                )
1221                .await
1222            }
1223            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1224                alter_owner::handle_alter_owner(
1225                    handler_args,
1226                    name,
1227                    new_owner_name,
1228                    StatementType::ALTER_CONNECTION,
1229                )
1230                .await
1231            }
1232        },
1233        Statement::AlterSystem { param, value } => {
1234            alter_system::handle_alter_system(handler_args, param, value).await
1235        }
1236        Statement::AlterSecret {
1237            name,
1238            with_options,
1239            operation,
1240        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1241        Statement::AlterFragment {
1242            fragment_id,
1243            operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1244        } => {
1245            alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1246                &handler_args.session,
1247                PbThrottleTarget::Fragment,
1248                fragment_id,
1249                rate_limit,
1250                StatementType::SET_VARIABLE,
1251            )
1252            .await
1253        }
1254        Statement::AlterDefaultPrivileges { .. } => {
1255            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1256        }
1257        Statement::StartTransaction { modes } => {
1258            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1259        }
1260        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1261        Statement::Commit { chain } => {
1262            transaction::handle_commit(handler_args, COMMIT, chain).await
1263        }
1264        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1265        Statement::Rollback { chain } => {
1266            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1267        }
1268        Statement::SetTransaction {
1269            modes,
1270            snapshot,
1271            session,
1272        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1273        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1274        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1275        Statement::Comment {
1276            object_type,
1277            object_name,
1278            comment,
1279        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1280        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1281        Statement::Prepare {
1282            name,
1283            data_types,
1284            statement,
1285        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1286        Statement::Deallocate { name, prepare } => {
1287            prepared_statement::handle_deallocate(name, prepare).await
1288        }
1289        Statement::Vacuum { object_name } => vacuum::handle_vacuum(handler_args, object_name).await,
1290        Statement::Refresh { table_name } => {
1291            refresh::handle_refresh(handler_args, table_name).await
1292        }
1293        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1294    }
1295}
1296
1297fn check_ban_ddl_for_iceberg_engine_table(
1298    session: Arc<SessionImpl>,
1299    stmt: &Statement,
1300) -> Result<()> {
1301    match stmt {
1302        Statement::AlterTable {
1303            name,
1304            operation:
1305                operation @ (AlterTableOperation::AddColumn { .. }
1306                | AlterTableOperation::DropColumn { .. }),
1307        } => {
1308            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1309            if table.is_iceberg_engine_table() {
1310                bail!(
1311                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1312                    operation,
1313                    schema_name,
1314                    name
1315                );
1316            }
1317        }
1318
1319        Statement::AlterTable {
1320            name,
1321            operation: AlterTableOperation::RenameTable { .. },
1322        } => {
1323            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1324            if table.is_iceberg_engine_table() {
1325                bail!(
1326                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1327                    schema_name,
1328                    name
1329                );
1330            }
1331        }
1332
1333        Statement::AlterTable {
1334            name,
1335            operation: AlterTableOperation::ChangeOwner { .. },
1336        } => {
1337            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1338            if table.is_iceberg_engine_table() {
1339                bail!(
1340                    "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1341                    schema_name,
1342                    name
1343                );
1344            }
1345        }
1346
1347        Statement::AlterTable {
1348            name,
1349            operation: AlterTableOperation::SetParallelism { .. },
1350        } => {
1351            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1352            if table.is_iceberg_engine_table() {
1353                bail!(
1354                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1355                    schema_name,
1356                    name
1357                );
1358            }
1359        }
1360
1361        Statement::AlterTable {
1362            name,
1363            operation: AlterTableOperation::SetSchema { .. },
1364        } => {
1365            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1366            if table.is_iceberg_engine_table() {
1367                bail!(
1368                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1369                    schema_name,
1370                    name
1371                );
1372            }
1373        }
1374
1375        Statement::AlterTable {
1376            name,
1377            operation: AlterTableOperation::RefreshSchema,
1378        } => {
1379            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1380            if table.is_iceberg_engine_table() {
1381                bail!(
1382                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1383                    schema_name,
1384                    name
1385                );
1386            }
1387        }
1388
1389        Statement::AlterTable {
1390            name,
1391            operation: AlterTableOperation::SetSourceRateLimit { .. },
1392        } => {
1393            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1394            if table.is_iceberg_engine_table() {
1395                bail!(
1396                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1397                    schema_name,
1398                    name
1399                );
1400            }
1401        }
1402
1403        _ => {}
1404    }
1405
1406    Ok(())
1407}