1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_owner;
47mod alter_parallelism;
48mod alter_rename;
49mod alter_resource_group;
50mod alter_secret;
51mod alter_set_schema;
52mod alter_sink_props;
53mod alter_source_column;
54mod alter_source_with_sr;
55mod alter_streaming_rate_limit;
56mod alter_swap_rename;
57mod alter_system;
58mod alter_table_column;
59pub mod alter_table_drop_connector;
60mod alter_table_with_sr;
61pub mod alter_user;
62pub mod cancel_job;
63pub mod close_cursor;
64mod comment;
65pub mod create_aggregate;
66pub mod create_connection;
67mod create_database;
68pub mod create_function;
69pub mod create_index;
70pub mod create_mv;
71pub mod create_schema;
72pub mod create_secret;
73pub mod create_sink;
74pub mod create_source;
75pub mod create_sql_function;
76pub mod create_subscription;
77pub mod create_table;
78pub mod create_table_as;
79pub mod create_user;
80pub mod create_view;
81pub mod declare_cursor;
82pub mod describe;
83pub mod discard;
84mod drop_connection;
85mod drop_database;
86pub mod drop_function;
87mod drop_index;
88pub mod drop_mv;
89mod drop_schema;
90pub mod drop_secret;
91pub mod drop_sink;
92pub mod drop_source;
93pub mod drop_subscription;
94pub mod drop_table;
95pub mod drop_user;
96mod drop_view;
97pub mod explain;
98pub mod explain_analyze_stream_job;
99pub mod extended_handle;
100pub mod fetch_cursor;
101mod flush;
102pub mod handle_privilege;
103pub mod kill_process;
104pub mod privilege;
105pub mod query;
106mod recover;
107pub mod show;
108mod transaction;
109mod use_db;
110pub mod util;
111pub mod variable;
112mod wait;
113
114pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
115
116pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
118
119pub type RwPgResponse = PgResponse<PgResponseStream>;
121
122#[easy_ext::ext(RwPgResponseBuilderExt)]
123impl RwPgResponseBuilder {
124 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
126 let fields = T::fields();
127 self.values(
128 rows.into_iter()
129 .map(|row| {
130 Row::new(
131 row.into_owned_row()
132 .into_iter()
133 .zip_eq_fast(&fields)
134 .map(|(datum, (_, ty))| {
135 datum.map(|scalar| {
136 scalar.as_scalar_ref_impl().text_format(ty).into()
137 })
138 })
139 .collect(),
140 )
141 })
142 .collect_vec()
143 .into(),
144 fields_to_descriptors(fields),
145 )
146 }
147}
148
149pub fn fields_to_descriptors(
150 fields: Vec<(&str, risingwave_common::types::DataType)>,
151) -> Vec<PgFieldDescriptor> {
152 fields
153 .iter()
154 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
155 .collect()
156}
157
158pub enum PgResponseStream {
159 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
160 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
161 Rows(BoxStream<'static, RowSetResult>),
162}
163
164impl Stream for PgResponseStream {
165 type Item = std::result::Result<Vec<Row>, BoxedError>;
166
167 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168 match &mut *self {
169 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
170 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
171 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
172 }
173 }
174}
175
176impl From<Vec<Row>> for PgResponseStream {
177 fn from(rows: Vec<Row>) -> Self {
178 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
179 }
180}
181
182#[derive(Clone)]
183pub struct HandlerArgs {
184 pub session: Arc<SessionImpl>,
185 pub sql: Arc<str>,
186 pub normalized_sql: String,
187 pub with_options: WithOptions,
188}
189
190impl HandlerArgs {
191 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
192 Ok(Self {
193 session,
194 sql,
195 with_options: WithOptions::try_from(stmt)?,
196 normalized_sql: Self::normalize_sql(stmt),
197 })
198 }
199
200 fn normalize_sql(stmt: &Statement) -> String {
206 let mut stmt = stmt.clone();
207 match &mut stmt {
208 Statement::CreateView {
209 or_replace,
210 if_not_exists,
211 ..
212 } => {
213 *or_replace = false;
214 *if_not_exists = false;
215 }
216 Statement::CreateTable {
217 or_replace,
218 if_not_exists,
219 ..
220 } => {
221 *or_replace = false;
222 *if_not_exists = false;
223 }
224 Statement::CreateIndex { if_not_exists, .. } => {
225 *if_not_exists = false;
226 }
227 Statement::CreateSource {
228 stmt: CreateSourceStatement { if_not_exists, .. },
229 ..
230 } => {
231 *if_not_exists = false;
232 }
233 Statement::CreateSink {
234 stmt: CreateSinkStatement { if_not_exists, .. },
235 } => {
236 *if_not_exists = false;
237 }
238 Statement::CreateSubscription {
239 stmt: CreateSubscriptionStatement { if_not_exists, .. },
240 } => {
241 *if_not_exists = false;
242 }
243 Statement::CreateConnection {
244 stmt: CreateConnectionStatement { if_not_exists, .. },
245 } => {
246 *if_not_exists = false;
247 }
248 _ => {}
249 }
250 stmt.to_string()
251 }
252}
253
254pub async fn handle(
255 session: Arc<SessionImpl>,
256 stmt: Statement,
257 sql: Arc<str>,
258 formats: Vec<Format>,
259) -> Result<RwPgResponse> {
260 session.clear_cancel_query_flag();
261 let _guard = session.txn_begin_implicit();
262 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
263
264 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
265
266 match stmt {
267 Statement::Explain {
268 statement,
269 analyze,
270 options,
271 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
272 Statement::ExplainAnalyzeStreamJob {
273 target,
274 duration_secs,
275 } => {
276 explain_analyze_stream_job::handle_explain_analyze_stream_job(
277 handler_args,
278 target,
279 duration_secs,
280 )
281 .await
282 }
283 Statement::CreateSource { stmt } => {
284 create_source::handle_create_source(handler_args, stmt).await
285 }
286 Statement::CreateSink { stmt } => {
287 create_sink::handle_create_sink(handler_args, stmt, false).await
288 }
289 Statement::CreateSubscription { stmt } => {
290 create_subscription::handle_create_subscription(handler_args, stmt).await
291 }
292 Statement::CreateConnection { stmt } => {
293 create_connection::handle_create_connection(handler_args, stmt).await
294 }
295 Statement::CreateSecret { stmt } => {
296 create_secret::handle_create_secret(handler_args, stmt).await
297 }
298 Statement::CreateFunction {
299 or_replace,
300 temporary,
301 if_not_exists,
302 name,
303 args,
304 returns,
305 params,
306 with_options,
307 } => {
308 if params.language.is_none()
311 || !params
312 .language
313 .as_ref()
314 .unwrap()
315 .real_value()
316 .eq_ignore_ascii_case("sql")
317 {
318 create_function::handle_create_function(
319 handler_args,
320 or_replace,
321 temporary,
322 if_not_exists,
323 name,
324 args,
325 returns,
326 params,
327 with_options,
328 )
329 .await
330 } else {
331 create_sql_function::handle_create_sql_function(
332 handler_args,
333 or_replace,
334 temporary,
335 if_not_exists,
336 name,
337 args,
338 returns,
339 params,
340 )
341 .await
342 }
343 }
344 Statement::CreateAggregate {
345 or_replace,
346 if_not_exists,
347 name,
348 args,
349 returns,
350 params,
351 ..
352 } => {
353 create_aggregate::handle_create_aggregate(
354 handler_args,
355 or_replace,
356 if_not_exists,
357 name,
358 args,
359 returns,
360 params,
361 )
362 .await
363 }
364 Statement::CreateTable {
365 name,
366 columns,
367 wildcard_idx,
368 constraints,
369 query,
370 with_options: _, or_replace,
373 temporary,
374 if_not_exists,
375 format_encode,
376 source_watermarks,
377 append_only,
378 on_conflict,
379 with_version_column,
380 cdc_table_info,
381 include_column_options,
382 webhook_info,
383 engine,
384 } => {
385 if or_replace {
386 bail_not_implemented!("CREATE OR REPLACE TABLE");
387 }
388 if temporary {
389 bail_not_implemented!("CREATE TEMPORARY TABLE");
390 }
391 if let Some(query) = query {
392 return create_table_as::handle_create_as(
393 handler_args,
394 name,
395 if_not_exists,
396 query,
397 columns,
398 append_only,
399 on_conflict,
400 with_version_column.map(|x| x.real_value()),
401 engine,
402 )
403 .await;
404 }
405 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
406 create_table::handle_create_table(
407 handler_args,
408 name,
409 columns,
410 wildcard_idx,
411 constraints,
412 if_not_exists,
413 format_encode,
414 source_watermarks,
415 append_only,
416 on_conflict,
417 with_version_column.map(|x| x.real_value()),
418 cdc_table_info,
419 include_column_options,
420 webhook_info,
421 engine,
422 )
423 .await
424 }
425 Statement::CreateDatabase {
426 db_name,
427 if_not_exists,
428 owner,
429 resource_group,
430 barrier_interval_ms,
431 checkpoint_frequency,
432 } => {
433 create_database::handle_create_database(
434 handler_args,
435 db_name,
436 if_not_exists,
437 owner,
438 resource_group,
439 barrier_interval_ms,
440 checkpoint_frequency,
441 )
442 .await
443 }
444 Statement::CreateSchema {
445 schema_name,
446 if_not_exists,
447 owner,
448 } => {
449 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
450 .await
451 }
452 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
453 Statement::DeclareCursor { stmt } => {
454 declare_cursor::handle_declare_cursor(handler_args, stmt).await
455 }
456 Statement::FetchCursor { stmt } => {
457 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
458 }
459 Statement::CloseCursor { stmt } => {
460 close_cursor::handle_close_cursor(handler_args, stmt).await
461 }
462 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
463 Statement::Grant { .. } => {
464 handle_privilege::handle_grant_privilege(handler_args, stmt).await
465 }
466 Statement::Revoke { .. } => {
467 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
468 }
469 Statement::Describe { name, kind } => match kind {
470 DescribeKind::Fragments => {
471 describe::handle_describe_fragments(handler_args, name).await
472 }
473 DescribeKind::Plain => describe::handle_describe(handler_args, name),
474 },
475 Statement::DescribeFragment { fragment_id } => {
476 describe::handle_describe_fragment(handler_args, fragment_id).await
477 }
478 Statement::Discard(..) => discard::handle_discard(handler_args),
479 Statement::ShowObjects {
480 object: show_object,
481 filter,
482 } => show::handle_show_object(handler_args, show_object, filter).await,
483 Statement::ShowCreateObject { create_type, name } => {
484 show::handle_show_create_object(handler_args, create_type, name)
485 }
486 Statement::ShowTransactionIsolationLevel => {
487 transaction::handle_show_isolation_level(handler_args)
488 }
489 Statement::Drop(DropStatement {
490 object_type,
491 object_name,
492 if_exists,
493 drop_mode,
494 }) => {
495 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
496 match object_type {
497 ObjectType::MaterializedView
498 | ObjectType::View
499 | ObjectType::Sink
500 | ObjectType::Source
501 | ObjectType::Subscription
502 | ObjectType::Index
503 | ObjectType::Table
504 | ObjectType::Schema => true,
505 ObjectType::Database
506 | ObjectType::User
507 | ObjectType::Connection
508 | ObjectType::Secret => {
509 bail_not_implemented!("DROP CASCADE");
510 }
511 }
512 } else {
513 false
514 };
515 match object_type {
516 ObjectType::Table => {
517 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
518 .await
519 }
520 ObjectType::MaterializedView => {
521 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
522 }
523 ObjectType::Index => {
524 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
525 .await
526 }
527 ObjectType::Source => {
528 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
529 .await
530 }
531 ObjectType::Sink => {
532 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
533 }
534 ObjectType::Subscription => {
535 drop_subscription::handle_drop_subscription(
536 handler_args,
537 object_name,
538 if_exists,
539 cascade,
540 )
541 .await
542 }
543 ObjectType::Database => {
544 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
545 }
546 ObjectType::Schema => {
547 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
548 .await
549 }
550 ObjectType::User => {
551 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
552 }
553 ObjectType::View => {
554 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
555 }
556 ObjectType::Connection => {
557 drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
558 .await
559 }
560 ObjectType::Secret => {
561 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
562 }
563 }
564 }
565 Statement::DropFunction {
567 if_exists,
568 func_desc,
569 option,
570 } => {
571 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
572 .await
573 }
574 Statement::DropAggregate {
575 if_exists,
576 func_desc,
577 option,
578 } => {
579 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
580 .await
581 }
582 Statement::Query(_)
583 | Statement::Insert { .. }
584 | Statement::Delete { .. }
585 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
586 Statement::CreateView {
587 materialized,
588 if_not_exists,
589 name,
590 columns,
591 query,
592 with_options: _, or_replace, emit_mode,
595 } => {
596 if or_replace {
597 bail_not_implemented!("CREATE OR REPLACE VIEW");
598 }
599 if materialized {
600 create_mv::handle_create_mv(
601 handler_args,
602 if_not_exists,
603 name,
604 *query,
605 columns,
606 emit_mode,
607 )
608 .await
609 } else {
610 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
611 .await
612 }
613 }
614 Statement::Flush => flush::handle_flush(handler_args).await,
615 Statement::Wait => wait::handle_wait(handler_args).await,
616 Statement::Recover => recover::handle_recover(handler_args).await,
617 Statement::SetVariable {
618 local: _,
619 variable,
620 value,
621 } => {
622 if variable.real_value().eq_ignore_ascii_case("database") {
624 let x = variable::set_var_to_param_str(&value);
625 let res = use_db::handle_use_db(
626 handler_args,
627 ObjectName::from(vec![Ident::from_real_value(
628 x.as_deref().unwrap_or("default"),
629 )]),
630 )?;
631 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
632 for notice in res.notices() {
633 builder = builder.notice(notice);
634 }
635 return Ok(builder.into());
636 }
637 variable::handle_set(handler_args, variable, value)
638 }
639 Statement::SetTimeZone { local: _, value } => {
640 variable::handle_set_time_zone(handler_args, value)
641 }
642 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
643 Statement::CreateIndex {
644 name,
645 table_name,
646 columns,
647 include,
648 distributed_by,
649 unique,
650 if_not_exists,
651 } => {
652 if unique {
653 bail_not_implemented!("create unique index");
654 }
655
656 create_index::handle_create_index(
657 handler_args,
658 if_not_exists,
659 name,
660 table_name,
661 columns.to_vec(),
662 include,
663 distributed_by,
664 )
665 .await
666 }
667 Statement::AlterDatabase { name, operation } => match operation {
668 AlterDatabaseOperation::RenameDatabase { database_name } => {
669 alter_rename::handle_rename_database(handler_args, name, database_name).await
670 }
671 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
672 alter_owner::handle_alter_owner(
673 handler_args,
674 name,
675 new_owner_name,
676 StatementType::ALTER_DATABASE,
677 )
678 .await
679 }
680 AlterDatabaseOperation::SetParam(config_param) => {
681 let ConfigParam { param, value } = config_param;
682
683 let database_param = match param.real_value().to_uppercase().as_str() {
684 "BARRIER_INTERVAL_MS" => {
685 let barrier_interval_ms = match value {
686 SetVariableValue::Default => None,
687 SetVariableValue::Single(SetVariableValueSingle::Literal(
688 Value::Number(num),
689 )) => {
690 let num = num.parse::<u32>().map_err(|e| {
691 ErrorCode::InvalidInputSyntax(format!(
692 "barrier_interval_ms must be a u32 integer: {}",
693 e.as_report()
694 ))
695 })?;
696 Some(num)
697 }
698 _ => {
699 return Err(ErrorCode::InvalidInputSyntax(
700 "barrier_interval_ms must be a u32 integer or DEFAULT"
701 .to_owned(),
702 )
703 .into());
704 }
705 };
706 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
707 }
708 "CHECKPOINT_FREQUENCY" => {
709 let checkpoint_frequency = match value {
710 SetVariableValue::Default => None,
711 SetVariableValue::Single(SetVariableValueSingle::Literal(
712 Value::Number(num),
713 )) => {
714 let num = num.parse::<u64>().map_err(|e| {
715 ErrorCode::InvalidInputSyntax(format!(
716 "checkpoint_frequency must be a u64 integer: {}",
717 e.as_report()
718 ))
719 })?;
720 Some(num)
721 }
722 _ => {
723 return Err(ErrorCode::InvalidInputSyntax(
724 "checkpoint_frequency must be a u64 integer or DEFAULT"
725 .to_owned(),
726 )
727 .into());
728 }
729 };
730 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
731 }
732 _ => {
733 return Err(ErrorCode::InvalidInputSyntax(format!(
734 "Unsupported database config parameter: {}",
735 param.real_value()
736 ))
737 .into());
738 }
739 };
740
741 alter_database_param::handle_alter_database_param(
742 handler_args,
743 name,
744 database_param,
745 )
746 .await
747 }
748 },
749 Statement::AlterSchema { name, operation } => match operation {
750 AlterSchemaOperation::RenameSchema { schema_name } => {
751 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
752 }
753 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
754 alter_owner::handle_alter_owner(
755 handler_args,
756 name,
757 new_owner_name,
758 StatementType::ALTER_SCHEMA,
759 )
760 .await
761 }
762 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
763 alter_swap_rename::handle_swap_rename(
764 handler_args,
765 name,
766 target_schema,
767 StatementType::ALTER_SCHEMA,
768 )
769 .await
770 }
771 },
772 Statement::AlterTable { name, operation } => match operation {
773 AlterTableOperation::AddColumn { .. }
774 | AlterTableOperation::DropColumn { .. }
775 | AlterTableOperation::AlterColumn { .. } => {
776 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
777 }
778 AlterTableOperation::RenameTable { table_name } => {
779 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
780 .await
781 }
782 AlterTableOperation::ChangeOwner { new_owner_name } => {
783 alter_owner::handle_alter_owner(
784 handler_args,
785 name,
786 new_owner_name,
787 StatementType::ALTER_TABLE,
788 )
789 .await
790 }
791 AlterTableOperation::SetParallelism {
792 parallelism,
793 deferred,
794 } => {
795 alter_parallelism::handle_alter_parallelism(
796 handler_args,
797 name,
798 parallelism,
799 StatementType::ALTER_TABLE,
800 deferred,
801 )
802 .await
803 }
804 AlterTableOperation::SetSchema { new_schema_name } => {
805 alter_set_schema::handle_alter_set_schema(
806 handler_args,
807 name,
808 new_schema_name,
809 StatementType::ALTER_TABLE,
810 None,
811 )
812 .await
813 }
814 AlterTableOperation::RefreshSchema => {
815 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
816 }
817 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
818 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
819 handler_args,
820 PbThrottleTarget::TableWithSource,
821 name,
822 rate_limit,
823 )
824 .await
825 }
826 AlterTableOperation::DropConnector => {
827 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
828 .await
829 }
830 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
831 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
832 handler_args,
833 PbThrottleTarget::TableDml,
834 name,
835 rate_limit,
836 )
837 .await
838 }
839 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
840 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
841 handler_args,
842 PbThrottleTarget::CdcTable,
843 name,
844 rate_limit,
845 )
846 .await
847 }
848 AlterTableOperation::SwapRenameTable { target_table } => {
849 alter_swap_rename::handle_swap_rename(
850 handler_args,
851 name,
852 target_table,
853 StatementType::ALTER_TABLE,
854 )
855 .await
856 }
857 AlterTableOperation::AddConstraint { .. }
858 | AlterTableOperation::DropConstraint { .. }
859 | AlterTableOperation::RenameColumn { .. }
860 | AlterTableOperation::ChangeColumn { .. }
861 | AlterTableOperation::RenameConstraint { .. } => {
862 bail_not_implemented!(
863 "Unhandled statement: {}",
864 Statement::AlterTable { name, operation }
865 )
866 }
867 },
868 Statement::AlterIndex { name, operation } => match operation {
869 AlterIndexOperation::RenameIndex { index_name } => {
870 alter_rename::handle_rename_index(handler_args, name, index_name).await
871 }
872 AlterIndexOperation::SetParallelism {
873 parallelism,
874 deferred,
875 } => {
876 alter_parallelism::handle_alter_parallelism(
877 handler_args,
878 name,
879 parallelism,
880 StatementType::ALTER_INDEX,
881 deferred,
882 )
883 .await
884 }
885 },
886 Statement::AlterView {
887 materialized,
888 name,
889 operation,
890 } => {
891 let statement_type = if materialized {
892 StatementType::ALTER_MATERIALIZED_VIEW
893 } else {
894 StatementType::ALTER_VIEW
895 };
896 match operation {
897 AlterViewOperation::RenameView { view_name } => {
898 if materialized {
899 alter_rename::handle_rename_table(
900 handler_args,
901 TableType::MaterializedView,
902 name,
903 view_name,
904 )
905 .await
906 } else {
907 alter_rename::handle_rename_view(handler_args, name, view_name).await
908 }
909 }
910 AlterViewOperation::SetParallelism {
911 parallelism,
912 deferred,
913 } => {
914 if !materialized {
915 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
916 }
917 alter_parallelism::handle_alter_parallelism(
918 handler_args,
919 name,
920 parallelism,
921 statement_type,
922 deferred,
923 )
924 .await
925 }
926 AlterViewOperation::SetResourceGroup {
927 resource_group,
928 deferred,
929 } => {
930 if !materialized {
931 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
932 }
933 alter_resource_group::handle_alter_resource_group(
934 handler_args,
935 name,
936 resource_group,
937 statement_type,
938 deferred,
939 )
940 .await
941 }
942 AlterViewOperation::ChangeOwner { new_owner_name } => {
943 alter_owner::handle_alter_owner(
944 handler_args,
945 name,
946 new_owner_name,
947 statement_type,
948 )
949 .await
950 }
951 AlterViewOperation::SetSchema { new_schema_name } => {
952 alter_set_schema::handle_alter_set_schema(
953 handler_args,
954 name,
955 new_schema_name,
956 statement_type,
957 None,
958 )
959 .await
960 }
961 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
962 if !materialized {
963 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
964 }
965 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
966 handler_args,
967 PbThrottleTarget::Mv,
968 name,
969 rate_limit,
970 )
971 .await
972 }
973 AlterViewOperation::SwapRenameView { target_view } => {
974 alter_swap_rename::handle_swap_rename(
975 handler_args,
976 name,
977 target_view,
978 statement_type,
979 )
980 .await
981 }
982 }
983 }
984
985 Statement::AlterSink { name, operation } => match operation {
986 AlterSinkOperation::SetSinkProps { changed_props } => {
987 alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await
988 }
989 AlterSinkOperation::RenameSink { sink_name } => {
990 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
991 }
992 AlterSinkOperation::ChangeOwner { new_owner_name } => {
993 alter_owner::handle_alter_owner(
994 handler_args,
995 name,
996 new_owner_name,
997 StatementType::ALTER_SINK,
998 )
999 .await
1000 }
1001 AlterSinkOperation::SetSchema { new_schema_name } => {
1002 alter_set_schema::handle_alter_set_schema(
1003 handler_args,
1004 name,
1005 new_schema_name,
1006 StatementType::ALTER_SINK,
1007 None,
1008 )
1009 .await
1010 }
1011 AlterSinkOperation::SetParallelism {
1012 parallelism,
1013 deferred,
1014 } => {
1015 alter_parallelism::handle_alter_parallelism(
1016 handler_args,
1017 name,
1018 parallelism,
1019 StatementType::ALTER_SINK,
1020 deferred,
1021 )
1022 .await
1023 }
1024 AlterSinkOperation::SwapRenameSink { target_sink } => {
1025 alter_swap_rename::handle_swap_rename(
1026 handler_args,
1027 name,
1028 target_sink,
1029 StatementType::ALTER_SINK,
1030 )
1031 .await
1032 }
1033 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1034 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1035 handler_args,
1036 PbThrottleTarget::Sink,
1037 name,
1038 rate_limit,
1039 )
1040 .await
1041 }
1042 },
1043 Statement::AlterSubscription { name, operation } => match operation {
1044 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1045 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1046 .await
1047 }
1048 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1049 alter_owner::handle_alter_owner(
1050 handler_args,
1051 name,
1052 new_owner_name,
1053 StatementType::ALTER_SUBSCRIPTION,
1054 )
1055 .await
1056 }
1057 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1058 alter_set_schema::handle_alter_set_schema(
1059 handler_args,
1060 name,
1061 new_schema_name,
1062 StatementType::ALTER_SUBSCRIPTION,
1063 None,
1064 )
1065 .await
1066 }
1067 AlterSubscriptionOperation::SwapRenameSubscription {
1068 target_subscription,
1069 } => {
1070 alter_swap_rename::handle_swap_rename(
1071 handler_args,
1072 name,
1073 target_subscription,
1074 StatementType::ALTER_SUBSCRIPTION,
1075 )
1076 .await
1077 }
1078 },
1079 Statement::AlterSource { name, operation } => match operation {
1080 AlterSourceOperation::RenameSource { source_name } => {
1081 alter_rename::handle_rename_source(handler_args, name, source_name).await
1082 }
1083 AlterSourceOperation::AddColumn { .. } => {
1084 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1085 }
1086 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1087 alter_owner::handle_alter_owner(
1088 handler_args,
1089 name,
1090 new_owner_name,
1091 StatementType::ALTER_SOURCE,
1092 )
1093 .await
1094 }
1095 AlterSourceOperation::SetSchema { new_schema_name } => {
1096 alter_set_schema::handle_alter_set_schema(
1097 handler_args,
1098 name,
1099 new_schema_name,
1100 StatementType::ALTER_SOURCE,
1101 None,
1102 )
1103 .await
1104 }
1105 AlterSourceOperation::FormatEncode { format_encode } => {
1106 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1107 .await
1108 }
1109 AlterSourceOperation::RefreshSchema => {
1110 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1111 }
1112 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1113 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1114 handler_args,
1115 PbThrottleTarget::Source,
1116 name,
1117 rate_limit,
1118 )
1119 .await
1120 }
1121 AlterSourceOperation::SwapRenameSource { target_source } => {
1122 alter_swap_rename::handle_swap_rename(
1123 handler_args,
1124 name,
1125 target_source,
1126 StatementType::ALTER_SOURCE,
1127 )
1128 .await
1129 }
1130 AlterSourceOperation::SetParallelism {
1131 parallelism,
1132 deferred,
1133 } => {
1134 alter_parallelism::handle_alter_parallelism(
1135 handler_args,
1136 name,
1137 parallelism,
1138 StatementType::ALTER_SOURCE,
1139 deferred,
1140 )
1141 .await
1142 }
1143 },
1144 Statement::AlterFunction {
1145 name,
1146 args,
1147 operation,
1148 } => match operation {
1149 AlterFunctionOperation::SetSchema { new_schema_name } => {
1150 alter_set_schema::handle_alter_set_schema(
1151 handler_args,
1152 name,
1153 new_schema_name,
1154 StatementType::ALTER_FUNCTION,
1155 args,
1156 )
1157 .await
1158 }
1159 },
1160 Statement::AlterConnection { name, operation } => match operation {
1161 AlterConnectionOperation::SetSchema { new_schema_name } => {
1162 alter_set_schema::handle_alter_set_schema(
1163 handler_args,
1164 name,
1165 new_schema_name,
1166 StatementType::ALTER_CONNECTION,
1167 None,
1168 )
1169 .await
1170 }
1171 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1172 alter_owner::handle_alter_owner(
1173 handler_args,
1174 name,
1175 new_owner_name,
1176 StatementType::ALTER_CONNECTION,
1177 )
1178 .await
1179 }
1180 },
1181 Statement::AlterSystem { param, value } => {
1182 alter_system::handle_alter_system(handler_args, param, value).await
1183 }
1184 Statement::AlterSecret {
1185 name,
1186 with_options,
1187 operation,
1188 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1189 Statement::AlterFragment {
1190 fragment_id,
1191 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1192 } => {
1193 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1194 &handler_args.session,
1195 PbThrottleTarget::Fragment,
1196 fragment_id,
1197 rate_limit,
1198 StatementType::SET_VARIABLE,
1199 )
1200 .await
1201 }
1202 Statement::StartTransaction { modes } => {
1203 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1204 }
1205 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1206 Statement::Commit { chain } => {
1207 transaction::handle_commit(handler_args, COMMIT, chain).await
1208 }
1209 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1210 Statement::Rollback { chain } => {
1211 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1212 }
1213 Statement::SetTransaction {
1214 modes,
1215 snapshot,
1216 session,
1217 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1218 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1219 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1220 Statement::Comment {
1221 object_type,
1222 object_name,
1223 comment,
1224 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1225 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1226 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1227 }
1228}
1229
1230fn check_ban_ddl_for_iceberg_engine_table(
1231 session: Arc<SessionImpl>,
1232 stmt: &Statement,
1233) -> Result<()> {
1234 match stmt {
1235 Statement::AlterTable {
1236 name,
1237 operation:
1238 operation @ (AlterTableOperation::AddColumn { .. }
1239 | AlterTableOperation::DropColumn { .. }),
1240 } => {
1241 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1242 if table.is_iceberg_engine_table() {
1243 bail!(
1244 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1245 operation,
1246 schema_name,
1247 name
1248 );
1249 }
1250 }
1251
1252 Statement::AlterTable {
1253 name,
1254 operation: AlterTableOperation::RenameTable { .. },
1255 } => {
1256 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1257 if table.is_iceberg_engine_table() {
1258 bail!(
1259 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1260 schema_name,
1261 name
1262 );
1263 }
1264 }
1265
1266 Statement::AlterTable {
1267 name,
1268 operation: AlterTableOperation::ChangeOwner { .. },
1269 } => {
1270 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1271 if table.is_iceberg_engine_table() {
1272 bail!(
1273 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1274 schema_name,
1275 name
1276 );
1277 }
1278 }
1279
1280 Statement::AlterTable {
1281 name,
1282 operation: AlterTableOperation::SetParallelism { .. },
1283 } => {
1284 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1285 if table.is_iceberg_engine_table() {
1286 bail!(
1287 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1288 schema_name,
1289 name
1290 );
1291 }
1292 }
1293
1294 Statement::AlterTable {
1295 name,
1296 operation: AlterTableOperation::SetSchema { .. },
1297 } => {
1298 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1299 if table.is_iceberg_engine_table() {
1300 bail!(
1301 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1302 schema_name,
1303 name
1304 );
1305 }
1306 }
1307
1308 Statement::AlterTable {
1309 name,
1310 operation: AlterTableOperation::RefreshSchema,
1311 } => {
1312 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1313 if table.is_iceberg_engine_table() {
1314 bail!(
1315 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1316 schema_name,
1317 name
1318 );
1319 }
1320 }
1321
1322 Statement::AlterTable {
1323 name,
1324 operation: AlterTableOperation::SetSourceRateLimit { .. },
1325 } => {
1326 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1327 if table.is_iceberg_engine_table() {
1328 bail!(
1329 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1330 schema_name,
1331 name
1332 );
1333 }
1334 }
1335
1336 _ => {}
1337 }
1338
1339 Ok(())
1340}