risingwave_frontend/handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116mod reset_source;
117pub mod show;
118mod transaction;
119mod use_db;
120pub mod util;
121pub mod vacuum;
122pub mod variable;
123mod wait;
124
125pub use alter_table_column::{
126    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
127};
128
129/// The [`PgResponseBuilder`] used by RisingWave.
130pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
131
132/// The [`PgResponse`] used by RisingWave.
133pub type RwPgResponse = PgResponse<PgResponseStream>;
134
135#[easy_ext::ext(RwPgResponseBuilderExt)]
136impl RwPgResponseBuilder {
137    /// Append rows to the response.
138    pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
139        let fields = T::fields();
140        self.values(
141            rows.into_iter()
142                .map(|row| {
143                    Row::new(
144                        row.into_owned_row()
145                            .into_iter()
146                            .zip_eq_fast(&fields)
147                            .map(|(datum, (_, ty))| {
148                                datum.map(|scalar| {
149                                    scalar.as_scalar_ref_impl().text_format(ty).into()
150                                })
151                            })
152                            .collect(),
153                    )
154                })
155                .collect_vec()
156                .into(),
157            fields_to_descriptors(fields),
158        )
159    }
160}
161
162pub fn fields_to_descriptors(
163    fields: Vec<(&str, risingwave_common::types::DataType)>,
164) -> Vec<PgFieldDescriptor> {
165    fields
166        .iter()
167        .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
168        .collect()
169}
170
171pub enum PgResponseStream {
172    LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
173    DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
174    Rows(BoxStream<'static, RowSetResult>),
175}
176
177impl Stream for PgResponseStream {
178    type Item = std::result::Result<Vec<Row>, BoxedError>;
179
180    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181        match &mut *self {
182            PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
183            PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
184            PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
185        }
186    }
187}
188
189impl From<Vec<Row>> for PgResponseStream {
190    fn from(rows: Vec<Row>) -> Self {
191        Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
192    }
193}
194
195#[derive(Clone)]
196pub struct HandlerArgs {
197    pub session: Arc<SessionImpl>,
198    pub sql: Arc<str>,
199    pub normalized_sql: String,
200    pub with_options: WithOptions,
201}
202
203impl HandlerArgs {
204    pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
205        Ok(Self {
206            session,
207            sql,
208            with_options: WithOptions::try_from(stmt)?,
209            normalized_sql: Self::normalize_sql(stmt),
210        })
211    }
212
213    /// Get normalized SQL from the statement.
214    ///
215    /// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
216    /// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
217    ///   make it suitable for the `SHOW CREATE` statements.
218    fn normalize_sql(stmt: &Statement) -> String {
219        let mut stmt = stmt.clone();
220        match &mut stmt {
221            Statement::CreateView {
222                or_replace,
223                if_not_exists,
224                ..
225            } => {
226                *or_replace = false;
227                *if_not_exists = false;
228            }
229            Statement::CreateTable {
230                or_replace,
231                if_not_exists,
232                ..
233            } => {
234                *or_replace = false;
235                *if_not_exists = false;
236            }
237            Statement::CreateIndex { if_not_exists, .. } => {
238                *if_not_exists = false;
239            }
240            Statement::CreateSource {
241                stmt: CreateSourceStatement { if_not_exists, .. },
242                ..
243            } => {
244                *if_not_exists = false;
245            }
246            Statement::CreateSink {
247                stmt: CreateSinkStatement { if_not_exists, .. },
248            } => {
249                *if_not_exists = false;
250            }
251            Statement::CreateSubscription {
252                stmt: CreateSubscriptionStatement { if_not_exists, .. },
253            } => {
254                *if_not_exists = false;
255            }
256            Statement::CreateConnection {
257                stmt: CreateConnectionStatement { if_not_exists, .. },
258            } => {
259                *if_not_exists = false;
260            }
261            _ => {}
262        }
263        stmt.to_string()
264    }
265}
266
267pub async fn handle(
268    session: Arc<SessionImpl>,
269    stmt: Statement,
270    sql: Arc<str>,
271    formats: Vec<Format>,
272) -> Result<RwPgResponse> {
273    session.clear_cancel_query_flag();
274    let _guard = session.txn_begin_implicit();
275    let handler_args = HandlerArgs::new(session, &stmt, sql)?;
276
277    check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
278
279    match stmt {
280        Statement::Explain {
281            statement,
282            analyze,
283            options,
284        } => explain::handle_explain(handler_args, *statement, options, analyze).await,
285        Statement::ExplainAnalyzeStreamJob {
286            target,
287            duration_secs,
288        } => {
289            explain_analyze_stream_job::handle_explain_analyze_stream_job(
290                handler_args,
291                target,
292                duration_secs,
293            )
294            .await
295        }
296        Statement::CreateSource { stmt } => {
297            create_source::handle_create_source(handler_args, stmt).await
298        }
299        Statement::CreateSink { stmt } => {
300            create_sink::handle_create_sink(handler_args, stmt, false).await
301        }
302        Statement::CreateSubscription { stmt } => {
303            create_subscription::handle_create_subscription(handler_args, stmt).await
304        }
305        Statement::CreateConnection { stmt } => {
306            create_connection::handle_create_connection(handler_args, stmt).await
307        }
308        Statement::CreateSecret { stmt } => {
309            create_secret::handle_create_secret(handler_args, stmt).await
310        }
311        Statement::CreateFunction {
312            or_replace,
313            temporary,
314            if_not_exists,
315            name,
316            args,
317            returns,
318            params,
319            with_options,
320        } => {
321            // For general udf, `language` clause could be ignored
322            // refer: https://github.com/risingwavelabs/risingwave/pull/10608
323            if params.language.is_none()
324                || !params
325                    .language
326                    .as_ref()
327                    .unwrap()
328                    .real_value()
329                    .eq_ignore_ascii_case("sql")
330            {
331                create_function::handle_create_function(
332                    handler_args,
333                    or_replace,
334                    temporary,
335                    if_not_exists,
336                    name,
337                    args,
338                    returns,
339                    params,
340                    with_options,
341                )
342                .await
343            } else {
344                create_sql_function::handle_create_sql_function(
345                    handler_args,
346                    or_replace,
347                    temporary,
348                    if_not_exists,
349                    name,
350                    args,
351                    returns,
352                    params,
353                )
354                .await
355            }
356        }
357        Statement::CreateAggregate {
358            or_replace,
359            if_not_exists,
360            name,
361            args,
362            returns,
363            params,
364            ..
365        } => {
366            create_aggregate::handle_create_aggregate(
367                handler_args,
368                or_replace,
369                if_not_exists,
370                name,
371                args,
372                returns,
373                params,
374            )
375            .await
376        }
377        Statement::CreateTable {
378            name,
379            columns,
380            wildcard_idx,
381            constraints,
382            query,
383            with_options: _, // It is put in OptimizerContext
384            // Not supported things
385            or_replace,
386            temporary,
387            if_not_exists,
388            format_encode,
389            source_watermarks,
390            append_only,
391            on_conflict,
392            with_version_columns,
393            cdc_table_info,
394            include_column_options,
395            webhook_info,
396            engine,
397        } => {
398            if or_replace {
399                bail_not_implemented!("CREATE OR REPLACE TABLE");
400            }
401            if temporary {
402                bail_not_implemented!("CREATE TEMPORARY TABLE");
403            }
404            if let Some(query) = query {
405                return create_table_as::handle_create_as(
406                    handler_args,
407                    name,
408                    if_not_exists,
409                    query,
410                    columns,
411                    append_only,
412                    on_conflict,
413                    with_version_columns
414                        .iter()
415                        .map(|col| col.real_value())
416                        .collect(),
417                    engine,
418                )
419                .await;
420            }
421            let format_encode = format_encode.map(|s| s.into_v2_with_warning());
422            create_table::handle_create_table(
423                handler_args,
424                name,
425                columns,
426                wildcard_idx,
427                constraints,
428                if_not_exists,
429                format_encode,
430                source_watermarks,
431                append_only,
432                on_conflict,
433                with_version_columns
434                    .iter()
435                    .map(|col| col.real_value())
436                    .collect(),
437                cdc_table_info,
438                include_column_options,
439                webhook_info,
440                engine,
441            )
442            .await
443        }
444        Statement::CreateDatabase {
445            db_name,
446            if_not_exists,
447            owner,
448            resource_group,
449            barrier_interval_ms,
450            checkpoint_frequency,
451        } => {
452            create_database::handle_create_database(
453                handler_args,
454                db_name,
455                if_not_exists,
456                owner,
457                resource_group,
458                barrier_interval_ms,
459                checkpoint_frequency,
460            )
461            .await
462        }
463        Statement::CreateSchema {
464            schema_name,
465            if_not_exists,
466            owner,
467        } => {
468            create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
469                .await
470        }
471        Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
472        Statement::DeclareCursor { stmt } => {
473            declare_cursor::handle_declare_cursor(handler_args, stmt).await
474        }
475        Statement::FetchCursor { stmt } => {
476            fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
477        }
478        Statement::CloseCursor { stmt } => {
479            close_cursor::handle_close_cursor(handler_args, stmt).await
480        }
481        Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
482        Statement::Grant { .. } => {
483            handle_privilege::handle_grant_privilege(handler_args, stmt).await
484        }
485        Statement::Revoke { .. } => {
486            handle_privilege::handle_revoke_privilege(handler_args, stmt).await
487        }
488        Statement::Describe { name, kind } => match kind {
489            DescribeKind::Fragments => {
490                describe::handle_describe_fragments(handler_args, name).await
491            }
492            DescribeKind::Plain => describe::handle_describe(handler_args, name),
493        },
494        Statement::DescribeFragment { fragment_id } => {
495            describe::handle_describe_fragment(handler_args, fragment_id.into()).await
496        }
497        Statement::Discard(..) => discard::handle_discard(handler_args),
498        Statement::ShowObjects {
499            object: show_object,
500            filter,
501        } => show::handle_show_object(handler_args, show_object, filter).await,
502        Statement::ShowCreateObject { create_type, name } => {
503            show::handle_show_create_object(handler_args, create_type, name)
504        }
505        Statement::ShowTransactionIsolationLevel => {
506            transaction::handle_show_isolation_level(handler_args)
507        }
508        Statement::Drop(DropStatement {
509            object_type,
510            object_name,
511            if_exists,
512            drop_mode,
513        }) => {
514            let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
515                match object_type {
516                    ObjectType::MaterializedView
517                    | ObjectType::View
518                    | ObjectType::Sink
519                    | ObjectType::Source
520                    | ObjectType::Subscription
521                    | ObjectType::Index
522                    | ObjectType::Table
523                    | ObjectType::Schema
524                    | ObjectType::Connection => true,
525                    ObjectType::Database | ObjectType::User | ObjectType::Secret => {
526                        bail_not_implemented!("DROP CASCADE");
527                    }
528                }
529            } else {
530                false
531            };
532            match object_type {
533                ObjectType::Table => {
534                    drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
535                        .await
536                }
537                ObjectType::MaterializedView => {
538                    drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
539                }
540                ObjectType::Index => {
541                    drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
542                        .await
543                }
544                ObjectType::Source => {
545                    drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
546                        .await
547                }
548                ObjectType::Sink => {
549                    drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
550                }
551                ObjectType::Subscription => {
552                    drop_subscription::handle_drop_subscription(
553                        handler_args,
554                        object_name,
555                        if_exists,
556                        cascade,
557                    )
558                    .await
559                }
560                ObjectType::Database => {
561                    drop_database::handle_drop_database(handler_args, object_name, if_exists).await
562                }
563                ObjectType::Schema => {
564                    drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
565                        .await
566                }
567                ObjectType::User => {
568                    drop_user::handle_drop_user(handler_args, object_name, if_exists).await
569                }
570                ObjectType::View => {
571                    drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
572                }
573                ObjectType::Connection => {
574                    drop_connection::handle_drop_connection(
575                        handler_args,
576                        object_name,
577                        if_exists,
578                        cascade,
579                    )
580                    .await
581                }
582                ObjectType::Secret => {
583                    drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
584                }
585            }
586        }
587        // XXX: should we reuse Statement::Drop for DROP FUNCTION?
588        Statement::DropFunction {
589            if_exists,
590            func_desc,
591            option,
592        } => {
593            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
594                .await
595        }
596        Statement::DropAggregate {
597            if_exists,
598            func_desc,
599            option,
600        } => {
601            drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
602                .await
603        }
604        Statement::Query(_)
605        | Statement::Insert { .. }
606        | Statement::Delete { .. }
607        | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
608        Statement::Copy {
609            entity: CopyEntity::Query(query),
610            target: CopyTarget::Stdout,
611        } => {
612            let response =
613                query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
614                    .await?;
615            Ok(response.into_copy_query_to_stdout())
616        }
617        Statement::CreateView {
618            materialized,
619            if_not_exists,
620            name,
621            columns,
622            query,
623            with_options: _, // It is put in OptimizerContext
624            or_replace,      // not supported
625            emit_mode,
626        } => {
627            if or_replace {
628                bail_not_implemented!("CREATE OR REPLACE VIEW");
629            }
630            if materialized {
631                create_mv::handle_create_mv(
632                    handler_args,
633                    if_not_exists,
634                    name,
635                    *query,
636                    columns,
637                    emit_mode,
638                )
639                .await
640            } else {
641                create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
642                    .await
643            }
644        }
645        Statement::Flush => flush::handle_flush(handler_args).await,
646        Statement::Wait => wait::handle_wait(handler_args).await,
647        Statement::Recover => recover::handle_recover(handler_args).await,
648        Statement::SetVariable {
649            local: _,
650            variable,
651            value,
652        } => {
653            // special handle for `use database`
654            if variable.real_value().eq_ignore_ascii_case("database") {
655                let x = variable::set_var_to_param_str(&value);
656                let res = use_db::handle_use_db(
657                    handler_args,
658                    ObjectName::from(vec![Ident::from_real_value(
659                        x.as_deref().unwrap_or("default"),
660                    )]),
661                )?;
662                let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
663                for notice in res.notices() {
664                    builder = builder.notice(notice);
665                }
666                return Ok(builder.into());
667            }
668            variable::handle_set(handler_args, variable, value)
669        }
670        Statement::SetTimeZone { local: _, value } => {
671            variable::handle_set_time_zone(handler_args, value)
672        }
673        Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
674        Statement::CreateIndex {
675            name,
676            table_name,
677            method,
678            columns,
679            include,
680            distributed_by,
681            unique,
682            if_not_exists,
683            with_properties: _,
684        } => {
685            if unique {
686                bail_not_implemented!("create unique index");
687            }
688
689            create_index::handle_create_index(
690                handler_args,
691                if_not_exists,
692                name,
693                table_name,
694                method,
695                columns.clone(),
696                include,
697                distributed_by,
698            )
699            .await
700        }
701        Statement::AlterDatabase { name, operation } => match operation {
702            AlterDatabaseOperation::RenameDatabase { database_name } => {
703                alter_rename::handle_rename_database(handler_args, name, database_name).await
704            }
705            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
706                alter_owner::handle_alter_owner(
707                    handler_args,
708                    name,
709                    new_owner_name,
710                    StatementType::ALTER_DATABASE,
711                )
712                .await
713            }
714            AlterDatabaseOperation::SetParam(config_param) => {
715                let ConfigParam { param, value } = config_param;
716
717                let database_param = match param.real_value().to_uppercase().as_str() {
718                    "BARRIER_INTERVAL_MS" => {
719                        let barrier_interval_ms = match value {
720                            SetVariableValue::Default => None,
721                            SetVariableValue::Single(SetVariableValueSingle::Literal(
722                                Value::Number(num),
723                            )) => {
724                                let num = num.parse::<u32>().map_err(|e| {
725                                    ErrorCode::InvalidInputSyntax(format!(
726                                        "barrier_interval_ms must be a u32 integer: {}",
727                                        e.as_report()
728                                    ))
729                                })?;
730                                Some(num)
731                            }
732                            _ => {
733                                return Err(ErrorCode::InvalidInputSyntax(
734                                    "barrier_interval_ms must be a u32 integer or DEFAULT"
735                                        .to_owned(),
736                                )
737                                .into());
738                            }
739                        };
740                        AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
741                    }
742                    "CHECKPOINT_FREQUENCY" => {
743                        let checkpoint_frequency = match value {
744                            SetVariableValue::Default => None,
745                            SetVariableValue::Single(SetVariableValueSingle::Literal(
746                                Value::Number(num),
747                            )) => {
748                                let num = num.parse::<u64>().map_err(|e| {
749                                    ErrorCode::InvalidInputSyntax(format!(
750                                        "checkpoint_frequency must be a u64 integer: {}",
751                                        e.as_report()
752                                    ))
753                                })?;
754                                Some(num)
755                            }
756                            _ => {
757                                return Err(ErrorCode::InvalidInputSyntax(
758                                    "checkpoint_frequency must be a u64 integer or DEFAULT"
759                                        .to_owned(),
760                                )
761                                .into());
762                            }
763                        };
764                        AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
765                    }
766                    _ => {
767                        return Err(ErrorCode::InvalidInputSyntax(format!(
768                            "Unsupported database config parameter: {}",
769                            param.real_value()
770                        ))
771                        .into());
772                    }
773                };
774
775                alter_database_param::handle_alter_database_param(
776                    handler_args,
777                    name,
778                    database_param,
779                )
780                .await
781            }
782        },
783        Statement::AlterSchema { name, operation } => match operation {
784            AlterSchemaOperation::RenameSchema { schema_name } => {
785                alter_rename::handle_rename_schema(handler_args, name, schema_name).await
786            }
787            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
788                alter_owner::handle_alter_owner(
789                    handler_args,
790                    name,
791                    new_owner_name,
792                    StatementType::ALTER_SCHEMA,
793                )
794                .await
795            }
796            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
797                alter_swap_rename::handle_swap_rename(
798                    handler_args,
799                    name,
800                    target_schema,
801                    StatementType::ALTER_SCHEMA,
802                )
803                .await
804            }
805        },
806        Statement::AlterTable { name, operation } => match operation {
807            AlterTableOperation::AddColumn { .. }
808            | AlterTableOperation::DropColumn { .. }
809            | AlterTableOperation::AlterColumn { .. } => {
810                alter_table_column::handle_alter_table_column(handler_args, name, operation).await
811            }
812            AlterTableOperation::RenameTable { table_name } => {
813                alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
814                    .await
815            }
816            AlterTableOperation::ChangeOwner { new_owner_name } => {
817                alter_owner::handle_alter_owner(
818                    handler_args,
819                    name,
820                    new_owner_name,
821                    StatementType::ALTER_TABLE,
822                )
823                .await
824            }
825            AlterTableOperation::SetParallelism {
826                parallelism,
827                deferred,
828            } => {
829                alter_parallelism::handle_alter_parallelism(
830                    handler_args,
831                    name,
832                    parallelism,
833                    StatementType::ALTER_TABLE,
834                    deferred,
835                )
836                .await
837            }
838            AlterTableOperation::SetSchema { new_schema_name } => {
839                alter_set_schema::handle_alter_set_schema(
840                    handler_args,
841                    name,
842                    new_schema_name,
843                    StatementType::ALTER_TABLE,
844                    None,
845                )
846                .await
847            }
848            AlterTableOperation::RefreshSchema => {
849                alter_table_with_sr::handle_refresh_schema(handler_args, name).await
850            }
851            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
852                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
853                    handler_args,
854                    PbThrottleTarget::TableWithSource,
855                    name,
856                    rate_limit,
857                )
858                .await
859            }
860            AlterTableOperation::DropConnector => {
861                alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
862                    .await
863            }
864            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
865                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
866                    handler_args,
867                    PbThrottleTarget::TableDml,
868                    name,
869                    rate_limit,
870                )
871                .await
872            }
873            AlterTableOperation::SetConfig { entries } => {
874                alter_streaming_config::handle_alter_streaming_set_config(
875                    handler_args,
876                    name,
877                    entries,
878                    StatementType::ALTER_TABLE,
879                )
880                .await
881            }
882            AlterTableOperation::ResetConfig { keys } => {
883                alter_streaming_config::handle_alter_streaming_reset_config(
884                    handler_args,
885                    name,
886                    keys,
887                    StatementType::ALTER_TABLE,
888                )
889                .await
890            }
891            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
892                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
893                    handler_args,
894                    PbThrottleTarget::CdcTable,
895                    name,
896                    rate_limit,
897                )
898                .await
899            }
900            AlterTableOperation::SwapRenameTable { target_table } => {
901                alter_swap_rename::handle_swap_rename(
902                    handler_args,
903                    name,
904                    target_table,
905                    StatementType::ALTER_TABLE,
906                )
907                .await
908            }
909            AlterTableOperation::AlterConnectorProps { alter_props } => {
910                alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
911            }
912            AlterTableOperation::AddConstraint { .. }
913            | AlterTableOperation::DropConstraint { .. }
914            | AlterTableOperation::RenameColumn { .. }
915            | AlterTableOperation::ChangeColumn { .. }
916            | AlterTableOperation::RenameConstraint { .. } => {
917                bail_not_implemented!(
918                    "Unhandled statement: {}",
919                    Statement::AlterTable { name, operation }
920                )
921            }
922        },
923        Statement::AlterIndex { name, operation } => match operation {
924            AlterIndexOperation::RenameIndex { index_name } => {
925                alter_rename::handle_rename_index(handler_args, name, index_name).await
926            }
927            AlterIndexOperation::SetParallelism {
928                parallelism,
929                deferred,
930            } => {
931                alter_parallelism::handle_alter_parallelism(
932                    handler_args,
933                    name,
934                    parallelism,
935                    StatementType::ALTER_INDEX,
936                    deferred,
937                )
938                .await
939            }
940            AlterIndexOperation::SetConfig { entries } => {
941                alter_streaming_config::handle_alter_streaming_set_config(
942                    handler_args,
943                    name,
944                    entries,
945                    StatementType::ALTER_INDEX,
946                )
947                .await
948            }
949            AlterIndexOperation::ResetConfig { keys } => {
950                alter_streaming_config::handle_alter_streaming_reset_config(
951                    handler_args,
952                    name,
953                    keys,
954                    StatementType::ALTER_INDEX,
955                )
956                .await
957            }
958        },
959        Statement::AlterView {
960            materialized,
961            name,
962            operation,
963        } => {
964            let statement_type = if materialized {
965                StatementType::ALTER_MATERIALIZED_VIEW
966            } else {
967                StatementType::ALTER_VIEW
968            };
969            match operation {
970                AlterViewOperation::RenameView { view_name } => {
971                    if materialized {
972                        alter_rename::handle_rename_table(
973                            handler_args,
974                            TableType::MaterializedView,
975                            name,
976                            view_name,
977                        )
978                        .await
979                    } else {
980                        alter_rename::handle_rename_view(handler_args, name, view_name).await
981                    }
982                }
983                AlterViewOperation::SetParallelism {
984                    parallelism,
985                    deferred,
986                } => {
987                    if !materialized {
988                        bail_not_implemented!("ALTER VIEW SET PARALLELISM");
989                    }
990                    alter_parallelism::handle_alter_parallelism(
991                        handler_args,
992                        name,
993                        parallelism,
994                        statement_type,
995                        deferred,
996                    )
997                    .await
998                }
999                AlterViewOperation::SetResourceGroup {
1000                    resource_group,
1001                    deferred,
1002                } => {
1003                    if !materialized {
1004                        bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1005                    }
1006                    alter_resource_group::handle_alter_resource_group(
1007                        handler_args,
1008                        name,
1009                        resource_group,
1010                        statement_type,
1011                        deferred,
1012                    )
1013                    .await
1014                }
1015                AlterViewOperation::ChangeOwner { new_owner_name } => {
1016                    alter_owner::handle_alter_owner(
1017                        handler_args,
1018                        name,
1019                        new_owner_name,
1020                        statement_type,
1021                    )
1022                    .await
1023                }
1024                AlterViewOperation::SetSchema { new_schema_name } => {
1025                    alter_set_schema::handle_alter_set_schema(
1026                        handler_args,
1027                        name,
1028                        new_schema_name,
1029                        statement_type,
1030                        None,
1031                    )
1032                    .await
1033                }
1034                AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1035                    if !materialized {
1036                        bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1037                    }
1038                    alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1039                        handler_args,
1040                        PbThrottleTarget::Mv,
1041                        name,
1042                        rate_limit,
1043                    )
1044                    .await
1045                }
1046                AlterViewOperation::SwapRenameView { target_view } => {
1047                    alter_swap_rename::handle_swap_rename(
1048                        handler_args,
1049                        name,
1050                        target_view,
1051                        statement_type,
1052                    )
1053                    .await
1054                }
1055                AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1056                    if !materialized {
1057                        bail!(
1058                            "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1059                        );
1060                    }
1061                    alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1062                }
1063                AlterViewOperation::AsQuery { query } => {
1064                    if !materialized {
1065                        bail_not_implemented!("ALTER VIEW AS QUERY");
1066                    }
1067                    // `ALTER MATERIALIZED VIEW AS QUERY` is only available in development build now.
1068                    if !cfg!(debug_assertions) {
1069                        bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1070                    }
1071                    alter_mv::handle_alter_mv(handler_args, name, query).await
1072                }
1073                AlterViewOperation::SetConfig { entries } => {
1074                    if !materialized {
1075                        bail!("SET CONFIG is only supported for materialized views");
1076                    }
1077                    alter_streaming_config::handle_alter_streaming_set_config(
1078                        handler_args,
1079                        name,
1080                        entries,
1081                        statement_type,
1082                    )
1083                    .await
1084                }
1085                AlterViewOperation::ResetConfig { keys } => {
1086                    if !materialized {
1087                        bail!("RESET CONFIG is only supported for materialized views");
1088                    }
1089                    alter_streaming_config::handle_alter_streaming_reset_config(
1090                        handler_args,
1091                        name,
1092                        keys,
1093                        statement_type,
1094                    )
1095                    .await
1096                }
1097            }
1098        }
1099
1100        Statement::AlterSink { name, operation } => match operation {
1101            AlterSinkOperation::AlterConnectorProps {
1102                alter_props: changed_props,
1103            } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1104            AlterSinkOperation::RenameSink { sink_name } => {
1105                alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1106            }
1107            AlterSinkOperation::ChangeOwner { new_owner_name } => {
1108                alter_owner::handle_alter_owner(
1109                    handler_args,
1110                    name,
1111                    new_owner_name,
1112                    StatementType::ALTER_SINK,
1113                )
1114                .await
1115            }
1116            AlterSinkOperation::SetSchema { new_schema_name } => {
1117                alter_set_schema::handle_alter_set_schema(
1118                    handler_args,
1119                    name,
1120                    new_schema_name,
1121                    StatementType::ALTER_SINK,
1122                    None,
1123                )
1124                .await
1125            }
1126            AlterSinkOperation::SetParallelism {
1127                parallelism,
1128                deferred,
1129            } => {
1130                alter_parallelism::handle_alter_parallelism(
1131                    handler_args,
1132                    name,
1133                    parallelism,
1134                    StatementType::ALTER_SINK,
1135                    deferred,
1136                )
1137                .await
1138            }
1139            AlterSinkOperation::SetConfig { entries } => {
1140                alter_streaming_config::handle_alter_streaming_set_config(
1141                    handler_args,
1142                    name,
1143                    entries,
1144                    StatementType::ALTER_SINK,
1145                )
1146                .await
1147            }
1148            AlterSinkOperation::ResetConfig { keys } => {
1149                alter_streaming_config::handle_alter_streaming_reset_config(
1150                    handler_args,
1151                    name,
1152                    keys,
1153                    StatementType::ALTER_SINK,
1154                )
1155                .await
1156            }
1157            AlterSinkOperation::SwapRenameSink { target_sink } => {
1158                alter_swap_rename::handle_swap_rename(
1159                    handler_args,
1160                    name,
1161                    target_sink,
1162                    StatementType::ALTER_SINK,
1163                )
1164                .await
1165            }
1166            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1167                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1168                    handler_args,
1169                    PbThrottleTarget::Sink,
1170                    name,
1171                    rate_limit,
1172                )
1173                .await
1174            }
1175            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1176                alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1177                    handler_args,
1178                    name,
1179                    enable,
1180                )
1181                .await
1182            }
1183        },
1184        Statement::AlterSubscription { name, operation } => match operation {
1185            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1186                alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1187                    .await
1188            }
1189            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1190                alter_owner::handle_alter_owner(
1191                    handler_args,
1192                    name,
1193                    new_owner_name,
1194                    StatementType::ALTER_SUBSCRIPTION,
1195                )
1196                .await
1197            }
1198            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1199                alter_set_schema::handle_alter_set_schema(
1200                    handler_args,
1201                    name,
1202                    new_schema_name,
1203                    StatementType::ALTER_SUBSCRIPTION,
1204                    None,
1205                )
1206                .await
1207            }
1208            AlterSubscriptionOperation::SwapRenameSubscription {
1209                target_subscription,
1210            } => {
1211                alter_swap_rename::handle_swap_rename(
1212                    handler_args,
1213                    name,
1214                    target_subscription,
1215                    StatementType::ALTER_SUBSCRIPTION,
1216                )
1217                .await
1218            }
1219        },
1220        Statement::AlterSource { name, operation } => match operation {
1221            AlterSourceOperation::AlterConnectorProps { alter_props } => {
1222                alter_source_props::handle_alter_source_connector_props(
1223                    handler_args,
1224                    name,
1225                    alter_props,
1226                )
1227                .await
1228            }
1229            AlterSourceOperation::RenameSource { source_name } => {
1230                alter_rename::handle_rename_source(handler_args, name, source_name).await
1231            }
1232            AlterSourceOperation::AddColumn { .. } => {
1233                alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1234            }
1235            AlterSourceOperation::ChangeOwner { new_owner_name } => {
1236                alter_owner::handle_alter_owner(
1237                    handler_args,
1238                    name,
1239                    new_owner_name,
1240                    StatementType::ALTER_SOURCE,
1241                )
1242                .await
1243            }
1244            AlterSourceOperation::SetSchema { new_schema_name } => {
1245                alter_set_schema::handle_alter_set_schema(
1246                    handler_args,
1247                    name,
1248                    new_schema_name,
1249                    StatementType::ALTER_SOURCE,
1250                    None,
1251                )
1252                .await
1253            }
1254            AlterSourceOperation::FormatEncode { format_encode } => {
1255                alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1256                    .await
1257            }
1258            AlterSourceOperation::RefreshSchema => {
1259                alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1260            }
1261            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1262                alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1263                    handler_args,
1264                    PbThrottleTarget::Source,
1265                    name,
1266                    rate_limit,
1267                )
1268                .await
1269            }
1270            AlterSourceOperation::SwapRenameSource { target_source } => {
1271                alter_swap_rename::handle_swap_rename(
1272                    handler_args,
1273                    name,
1274                    target_source,
1275                    StatementType::ALTER_SOURCE,
1276                )
1277                .await
1278            }
1279            AlterSourceOperation::SetParallelism {
1280                parallelism,
1281                deferred,
1282            } => {
1283                alter_parallelism::handle_alter_parallelism(
1284                    handler_args,
1285                    name,
1286                    parallelism,
1287                    StatementType::ALTER_SOURCE,
1288                    deferred,
1289                )
1290                .await
1291            }
1292            AlterSourceOperation::SetConfig { entries } => {
1293                alter_streaming_config::handle_alter_streaming_set_config(
1294                    handler_args,
1295                    name,
1296                    entries,
1297                    StatementType::ALTER_SOURCE,
1298                )
1299                .await
1300            }
1301            AlterSourceOperation::ResetConfig { keys } => {
1302                alter_streaming_config::handle_alter_streaming_reset_config(
1303                    handler_args,
1304                    name,
1305                    keys,
1306                    StatementType::ALTER_SOURCE,
1307                )
1308                .await
1309            }
1310            AlterSourceOperation::ResetSource => {
1311                reset_source::handle_reset_source(handler_args, name).await
1312            }
1313        },
1314        Statement::AlterFunction {
1315            name,
1316            args,
1317            operation,
1318        } => match operation {
1319            AlterFunctionOperation::SetSchema { new_schema_name } => {
1320                alter_set_schema::handle_alter_set_schema(
1321                    handler_args,
1322                    name,
1323                    new_schema_name,
1324                    StatementType::ALTER_FUNCTION,
1325                    args,
1326                )
1327                .await
1328            }
1329        },
1330        Statement::AlterConnection { name, operation } => match operation {
1331            AlterConnectionOperation::SetSchema { new_schema_name } => {
1332                alter_set_schema::handle_alter_set_schema(
1333                    handler_args,
1334                    name,
1335                    new_schema_name,
1336                    StatementType::ALTER_CONNECTION,
1337                    None,
1338                )
1339                .await
1340            }
1341            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1342                alter_owner::handle_alter_owner(
1343                    handler_args,
1344                    name,
1345                    new_owner_name,
1346                    StatementType::ALTER_CONNECTION,
1347                )
1348                .await
1349            }
1350            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1351                alter_connection_props::handle_alter_connection_connector_props(
1352                    handler_args,
1353                    name,
1354                    alter_props,
1355                )
1356                .await
1357            }
1358        },
1359        Statement::AlterSystem { param, value } => {
1360            alter_system::handle_alter_system(handler_args, param, value).await
1361        }
1362        Statement::AlterSecret {
1363            name,
1364            with_options,
1365            operation,
1366        } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1367        Statement::AlterFragment {
1368            fragment_ids,
1369            operation,
1370        } => match operation {
1371            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1372                let [fragment_id] = fragment_ids.as_slice() else {
1373                    return Err(ErrorCode::InvalidInputSyntax(
1374                        "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1375                            .to_owned(),
1376                    )
1377                    .into());
1378                };
1379                alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1380                    &handler_args.session,
1381                    PbThrottleTarget::Fragment,
1382                    *fragment_id,
1383                    rate_limit,
1384                    StatementType::SET_VARIABLE,
1385                )
1386                .await
1387            }
1388            AlterFragmentOperation::SetParallelism { parallelism } => {
1389                alter_parallelism::handle_alter_fragment_parallelism(
1390                    handler_args,
1391                    fragment_ids.into_iter().map_into().collect(),
1392                    parallelism,
1393                )
1394                .await
1395            }
1396        },
1397        Statement::AlterDefaultPrivileges { .. } => {
1398            handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1399        }
1400        Statement::StartTransaction { modes } => {
1401            transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1402        }
1403        Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1404        Statement::Commit { chain } => {
1405            transaction::handle_commit(handler_args, COMMIT, chain).await
1406        }
1407        Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1408        Statement::Rollback { chain } => {
1409            transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1410        }
1411        Statement::SetTransaction {
1412            modes,
1413            snapshot,
1414            session,
1415        } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1416        Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1417        Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1418        Statement::Comment {
1419            object_type,
1420            object_name,
1421            comment,
1422        } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1423        Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1424        Statement::Prepare {
1425            name,
1426            data_types,
1427            statement,
1428        } => prepared_statement::handle_prepare(name, data_types, statement).await,
1429        Statement::Deallocate { name, prepare } => {
1430            prepared_statement::handle_deallocate(name, prepare).await
1431        }
1432        Statement::Vacuum { object_name, full } => {
1433            vacuum::handle_vacuum(handler_args, object_name, full).await
1434        }
1435        Statement::Refresh { table_name } => {
1436            refresh::handle_refresh(handler_args, table_name).await
1437        }
1438        _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1439    }
1440}
1441
1442fn check_ban_ddl_for_iceberg_engine_table(
1443    session: Arc<SessionImpl>,
1444    stmt: &Statement,
1445) -> Result<()> {
1446    match stmt {
1447        Statement::AlterTable {
1448            name,
1449            operation:
1450                operation @ (AlterTableOperation::AddColumn { .. }
1451                | AlterTableOperation::DropColumn { .. }),
1452        } => {
1453            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1454            if table.is_iceberg_engine_table() {
1455                bail!(
1456                    "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1457                    operation,
1458                    schema_name,
1459                    name
1460                );
1461            }
1462        }
1463
1464        Statement::AlterTable {
1465            name,
1466            operation: AlterTableOperation::RenameTable { .. },
1467        } => {
1468            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1469            if table.is_iceberg_engine_table() {
1470                bail!(
1471                    "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1472                    schema_name,
1473                    name
1474                );
1475            }
1476        }
1477
1478        Statement::AlterTable {
1479            name,
1480            operation: AlterTableOperation::SetParallelism { .. },
1481        } => {
1482            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1483            if table.is_iceberg_engine_table() {
1484                bail!(
1485                    "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1486                    schema_name,
1487                    name
1488                );
1489            }
1490        }
1491
1492        Statement::AlterTable {
1493            name,
1494            operation: AlterTableOperation::SetSchema { .. },
1495        } => {
1496            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1497            if table.is_iceberg_engine_table() {
1498                bail!(
1499                    "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1500                    schema_name,
1501                    name
1502                );
1503            }
1504        }
1505
1506        Statement::AlterTable {
1507            name,
1508            operation: AlterTableOperation::RefreshSchema,
1509        } => {
1510            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1511            if table.is_iceberg_engine_table() {
1512                bail!(
1513                    "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1514                    schema_name,
1515                    name
1516                );
1517            }
1518        }
1519
1520        Statement::AlterTable {
1521            name,
1522            operation: AlterTableOperation::SetSourceRateLimit { .. },
1523        } => {
1524            let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1525            if table.is_iceberg_engine_table() {
1526                bail!(
1527                    "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1528                    schema_name,
1529                    name
1530                );
1531            }
1532        }
1533
1534        _ => {}
1535    }
1536
1537    Ok(())
1538}