1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::types::Fields;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::{bail, bail_not_implemented};
30use risingwave_pb::meta::PbThrottleTarget;
31use risingwave_sqlparser::ast::*;
32use util::get_table_catalog_by_table_name;
33
34use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
35use crate::catalog::table_catalog::TableType;
36use crate::error::{ErrorCode, Result};
37use crate::handler::cancel_job::handle_cancel;
38use crate::handler::kill_process::handle_kill;
39use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
40use crate::session::SessionImpl;
41use crate::utils::WithOptions;
42
43mod alter_owner;
44mod alter_parallelism;
45mod alter_rename;
46mod alter_resource_group;
47mod alter_secret;
48mod alter_set_schema;
49mod alter_sink_props;
50mod alter_source_column;
51mod alter_source_with_sr;
52mod alter_streaming_rate_limit;
53mod alter_swap_rename;
54mod alter_system;
55mod alter_table_column;
56pub mod alter_table_drop_connector;
57mod alter_table_with_sr;
58pub mod alter_user;
59pub mod cancel_job;
60pub mod close_cursor;
61mod comment;
62pub mod create_aggregate;
63pub mod create_connection;
64mod create_database;
65pub mod create_function;
66pub mod create_index;
67pub mod create_mv;
68pub mod create_schema;
69pub mod create_secret;
70pub mod create_sink;
71pub mod create_source;
72pub mod create_sql_function;
73pub mod create_subscription;
74pub mod create_table;
75pub mod create_table_as;
76pub mod create_user;
77pub mod create_view;
78pub mod declare_cursor;
79pub mod describe;
80pub mod discard;
81mod drop_connection;
82mod drop_database;
83pub mod drop_function;
84mod drop_index;
85pub mod drop_mv;
86mod drop_schema;
87pub mod drop_secret;
88pub mod drop_sink;
89pub mod drop_source;
90pub mod drop_subscription;
91pub mod drop_table;
92pub mod drop_user;
93mod drop_view;
94pub mod explain;
95pub mod explain_analyze_stream_job;
96pub mod extended_handle;
97pub mod fetch_cursor;
98mod flush;
99pub mod handle_privilege;
100mod kill_process;
101pub mod privilege;
102pub mod query;
103mod recover;
104pub mod show;
105mod transaction;
106mod use_db;
107pub mod util;
108pub mod variable;
109mod wait;
110
111pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
112
113pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
115
116pub type RwPgResponse = PgResponse<PgResponseStream>;
118
119#[easy_ext::ext(RwPgResponseBuilderExt)]
120impl RwPgResponseBuilder {
121 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
123 let fields = T::fields();
124 self.values(
125 rows.into_iter()
126 .map(|row| {
127 Row::new(
128 row.into_owned_row()
129 .into_iter()
130 .zip_eq_fast(&fields)
131 .map(|(datum, (_, ty))| {
132 datum.map(|scalar| {
133 scalar.as_scalar_ref_impl().text_format(ty).into()
134 })
135 })
136 .collect(),
137 )
138 })
139 .collect_vec()
140 .into(),
141 fields_to_descriptors(fields),
142 )
143 }
144}
145
146pub fn fields_to_descriptors(
147 fields: Vec<(&str, risingwave_common::types::DataType)>,
148) -> Vec<PgFieldDescriptor> {
149 fields
150 .iter()
151 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
152 .collect()
153}
154
155pub enum PgResponseStream {
156 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
157 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
158 Rows(BoxStream<'static, RowSetResult>),
159}
160
161impl Stream for PgResponseStream {
162 type Item = std::result::Result<Vec<Row>, BoxedError>;
163
164 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165 match &mut *self {
166 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
167 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
168 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
169 }
170 }
171}
172
173impl From<Vec<Row>> for PgResponseStream {
174 fn from(rows: Vec<Row>) -> Self {
175 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
176 }
177}
178
179#[derive(Clone)]
180pub struct HandlerArgs {
181 pub session: Arc<SessionImpl>,
182 pub sql: Arc<str>,
183 pub normalized_sql: String,
184 pub with_options: WithOptions,
185}
186
187impl HandlerArgs {
188 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
189 Ok(Self {
190 session,
191 sql,
192 with_options: WithOptions::try_from(stmt)?,
193 normalized_sql: Self::normalize_sql(stmt),
194 })
195 }
196
197 fn normalize_sql(stmt: &Statement) -> String {
203 let mut stmt = stmt.clone();
204 match &mut stmt {
205 Statement::CreateView {
206 or_replace,
207 if_not_exists,
208 ..
209 } => {
210 *or_replace = false;
211 *if_not_exists = false;
212 }
213 Statement::CreateTable {
214 or_replace,
215 if_not_exists,
216 ..
217 } => {
218 *or_replace = false;
219 *if_not_exists = false;
220 }
221 Statement::CreateIndex { if_not_exists, .. } => {
222 *if_not_exists = false;
223 }
224 Statement::CreateSource {
225 stmt: CreateSourceStatement { if_not_exists, .. },
226 ..
227 } => {
228 *if_not_exists = false;
229 }
230 Statement::CreateSink {
231 stmt: CreateSinkStatement { if_not_exists, .. },
232 } => {
233 *if_not_exists = false;
234 }
235 Statement::CreateSubscription {
236 stmt: CreateSubscriptionStatement { if_not_exists, .. },
237 } => {
238 *if_not_exists = false;
239 }
240 Statement::CreateConnection {
241 stmt: CreateConnectionStatement { if_not_exists, .. },
242 } => {
243 *if_not_exists = false;
244 }
245 _ => {}
246 }
247 stmt.to_string()
248 }
249}
250
251pub async fn handle(
252 session: Arc<SessionImpl>,
253 stmt: Statement,
254 sql: Arc<str>,
255 formats: Vec<Format>,
256) -> Result<RwPgResponse> {
257 session.clear_cancel_query_flag();
258 let _guard = session.txn_begin_implicit();
259 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
260
261 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
262
263 match stmt {
264 Statement::Explain {
265 statement,
266 analyze,
267 options,
268 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
269 Statement::ExplainAnalyzeStreamJob {
270 target,
271 duration_secs,
272 } => {
273 explain_analyze_stream_job::handle_explain_analyze_stream_job(
274 handler_args,
275 target,
276 duration_secs,
277 )
278 .await
279 }
280 Statement::CreateSource { stmt } => {
281 create_source::handle_create_source(handler_args, stmt).await
282 }
283 Statement::CreateSink { stmt } => {
284 create_sink::handle_create_sink(handler_args, stmt, false).await
285 }
286 Statement::CreateSubscription { stmt } => {
287 create_subscription::handle_create_subscription(handler_args, stmt).await
288 }
289 Statement::CreateConnection { stmt } => {
290 create_connection::handle_create_connection(handler_args, stmt).await
291 }
292 Statement::CreateSecret { stmt } => {
293 create_secret::handle_create_secret(handler_args, stmt).await
294 }
295 Statement::CreateFunction {
296 or_replace,
297 temporary,
298 if_not_exists,
299 name,
300 args,
301 returns,
302 params,
303 with_options,
304 } => {
305 if params.language.is_none()
308 || !params
309 .language
310 .as_ref()
311 .unwrap()
312 .real_value()
313 .eq_ignore_ascii_case("sql")
314 {
315 create_function::handle_create_function(
316 handler_args,
317 or_replace,
318 temporary,
319 if_not_exists,
320 name,
321 args,
322 returns,
323 params,
324 with_options,
325 )
326 .await
327 } else {
328 create_sql_function::handle_create_sql_function(
329 handler_args,
330 or_replace,
331 temporary,
332 if_not_exists,
333 name,
334 args,
335 returns,
336 params,
337 )
338 .await
339 }
340 }
341 Statement::CreateAggregate {
342 or_replace,
343 if_not_exists,
344 name,
345 args,
346 returns,
347 params,
348 ..
349 } => {
350 create_aggregate::handle_create_aggregate(
351 handler_args,
352 or_replace,
353 if_not_exists,
354 name,
355 args,
356 returns,
357 params,
358 )
359 .await
360 }
361 Statement::CreateTable {
362 name,
363 columns,
364 wildcard_idx,
365 constraints,
366 query,
367 with_options: _, or_replace,
370 temporary,
371 if_not_exists,
372 format_encode,
373 source_watermarks,
374 append_only,
375 on_conflict,
376 with_version_column,
377 cdc_table_info,
378 include_column_options,
379 webhook_info,
380 engine,
381 } => {
382 if or_replace {
383 bail_not_implemented!("CREATE OR REPLACE TABLE");
384 }
385 if temporary {
386 bail_not_implemented!("CREATE TEMPORARY TABLE");
387 }
388 if let Some(query) = query {
389 return create_table_as::handle_create_as(
390 handler_args,
391 name,
392 if_not_exists,
393 query,
394 columns,
395 append_only,
396 on_conflict,
397 with_version_column.map(|x| x.real_value()),
398 engine,
399 )
400 .await;
401 }
402 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
403 create_table::handle_create_table(
404 handler_args,
405 name,
406 columns,
407 wildcard_idx,
408 constraints,
409 if_not_exists,
410 format_encode,
411 source_watermarks,
412 append_only,
413 on_conflict,
414 with_version_column.map(|x| x.real_value()),
415 cdc_table_info,
416 include_column_options,
417 webhook_info,
418 engine,
419 )
420 .await
421 }
422 Statement::CreateDatabase {
423 db_name,
424 if_not_exists,
425 owner,
426 resource_group,
427 } => {
428 create_database::handle_create_database(
429 handler_args,
430 db_name,
431 if_not_exists,
432 owner,
433 resource_group,
434 )
435 .await
436 }
437 Statement::CreateSchema {
438 schema_name,
439 if_not_exists,
440 owner,
441 } => {
442 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
443 .await
444 }
445 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
446 Statement::DeclareCursor { stmt } => {
447 declare_cursor::handle_declare_cursor(handler_args, stmt).await
448 }
449 Statement::FetchCursor { stmt } => {
450 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
451 }
452 Statement::CloseCursor { stmt } => {
453 close_cursor::handle_close_cursor(handler_args, stmt).await
454 }
455 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
456 Statement::Grant { .. } => {
457 handle_privilege::handle_grant_privilege(handler_args, stmt).await
458 }
459 Statement::Revoke { .. } => {
460 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
461 }
462 Statement::Describe { name, kind } => match kind {
463 DescribeKind::Fragments => {
464 describe::handle_describe_fragments(handler_args, name).await
465 }
466 DescribeKind::Plain => describe::handle_describe(handler_args, name),
467 },
468 Statement::DescribeFragment { fragment_id } => {
469 describe::handle_describe_fragment(handler_args, fragment_id).await
470 }
471 Statement::Discard(..) => discard::handle_discard(handler_args),
472 Statement::ShowObjects {
473 object: show_object,
474 filter,
475 } => show::handle_show_object(handler_args, show_object, filter).await,
476 Statement::ShowCreateObject { create_type, name } => {
477 show::handle_show_create_object(handler_args, create_type, name)
478 }
479 Statement::ShowTransactionIsolationLevel => {
480 transaction::handle_show_isolation_level(handler_args)
481 }
482 Statement::Drop(DropStatement {
483 object_type,
484 object_name,
485 if_exists,
486 drop_mode,
487 }) => {
488 let mut cascade = false;
489 if let AstOption::Some(DropMode::Cascade) = drop_mode {
490 match object_type {
491 ObjectType::MaterializedView
492 | ObjectType::View
493 | ObjectType::Sink
494 | ObjectType::Source
495 | ObjectType::Subscription
496 | ObjectType::Index
497 | ObjectType::Table
498 | ObjectType::Schema => {
499 cascade = true;
500 }
501 ObjectType::Database
502 | ObjectType::User
503 | ObjectType::Connection
504 | ObjectType::Secret => {
505 bail_not_implemented!("DROP CASCADE");
506 }
507 };
508 };
509 match object_type {
510 ObjectType::Table => {
511 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
512 .await
513 }
514 ObjectType::MaterializedView => {
515 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
516 }
517 ObjectType::Index => {
518 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
519 .await
520 }
521 ObjectType::Source => {
522 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
523 .await
524 }
525 ObjectType::Sink => {
526 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
527 }
528 ObjectType::Subscription => {
529 drop_subscription::handle_drop_subscription(
530 handler_args,
531 object_name,
532 if_exists,
533 cascade,
534 )
535 .await
536 }
537 ObjectType::Database => {
538 drop_database::handle_drop_database(
539 handler_args,
540 object_name,
541 if_exists,
542 drop_mode.into(),
543 )
544 .await
545 }
546 ObjectType::Schema => {
547 drop_schema::handle_drop_schema(
548 handler_args,
549 object_name,
550 if_exists,
551 drop_mode.into(),
552 )
553 .await
554 }
555 ObjectType::User => {
556 drop_user::handle_drop_user(
557 handler_args,
558 object_name,
559 if_exists,
560 drop_mode.into(),
561 )
562 .await
563 }
564 ObjectType::View => {
565 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
566 }
567 ObjectType::Connection => {
568 drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
569 .await
570 }
571 ObjectType::Secret => {
572 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
573 }
574 }
575 }
576 Statement::DropFunction {
578 if_exists,
579 func_desc,
580 option,
581 } => {
582 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
583 .await
584 }
585 Statement::DropAggregate {
586 if_exists,
587 func_desc,
588 option,
589 } => {
590 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
591 .await
592 }
593 Statement::Query(_)
594 | Statement::Insert { .. }
595 | Statement::Delete { .. }
596 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
597 Statement::CreateView {
598 materialized,
599 if_not_exists,
600 name,
601 columns,
602 query,
603 with_options: _, or_replace, emit_mode,
606 } => {
607 if or_replace {
608 bail_not_implemented!("CREATE OR REPLACE VIEW");
609 }
610 if materialized {
611 create_mv::handle_create_mv(
612 handler_args,
613 if_not_exists,
614 name,
615 *query,
616 columns,
617 emit_mode,
618 )
619 .await
620 } else {
621 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
622 .await
623 }
624 }
625 Statement::Flush => flush::handle_flush(handler_args).await,
626 Statement::Wait => wait::handle_wait(handler_args).await,
627 Statement::Recover => recover::handle_recover(handler_args).await,
628 Statement::SetVariable {
629 local: _,
630 variable,
631 value,
632 } => {
633 if variable.real_value().eq_ignore_ascii_case("database") {
635 let x = variable::set_var_to_param_str(&value);
636 let res = use_db::handle_use_db(
637 handler_args,
638 ObjectName::from(vec![Ident::new_unchecked(
639 x.unwrap_or("default".to_owned()),
640 )]),
641 )?;
642 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
643 for notice in res.notices() {
644 builder = builder.notice(notice);
645 }
646 return Ok(builder.into());
647 }
648 variable::handle_set(handler_args, variable, value)
649 }
650 Statement::SetTimeZone { local: _, value } => {
651 variable::handle_set_time_zone(handler_args, value)
652 }
653 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
654 Statement::CreateIndex {
655 name,
656 table_name,
657 columns,
658 include,
659 distributed_by,
660 unique,
661 if_not_exists,
662 } => {
663 if unique {
664 bail_not_implemented!("create unique index");
665 }
666
667 create_index::handle_create_index(
668 handler_args,
669 if_not_exists,
670 name,
671 table_name,
672 columns.to_vec(),
673 include,
674 distributed_by,
675 )
676 .await
677 }
678 Statement::AlterDatabase { name, operation } => match operation {
679 AlterDatabaseOperation::RenameDatabase { database_name } => {
680 alter_rename::handle_rename_database(handler_args, name, database_name).await
681 }
682 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
683 alter_owner::handle_alter_owner(
684 handler_args,
685 name,
686 new_owner_name,
687 StatementType::ALTER_DATABASE,
688 )
689 .await
690 }
691 },
692 Statement::AlterSchema { name, operation } => match operation {
693 AlterSchemaOperation::RenameSchema { schema_name } => {
694 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
695 }
696 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
697 alter_owner::handle_alter_owner(
698 handler_args,
699 name,
700 new_owner_name,
701 StatementType::ALTER_SCHEMA,
702 )
703 .await
704 }
705 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
706 alter_swap_rename::handle_swap_rename(
707 handler_args,
708 name,
709 target_schema,
710 StatementType::ALTER_SCHEMA,
711 )
712 .await
713 }
714 },
715 Statement::AlterTable { name, operation } => match operation {
716 AlterTableOperation::AddColumn { .. }
717 | AlterTableOperation::DropColumn { .. }
718 | AlterTableOperation::AlterColumn { .. } => {
719 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
720 }
721 AlterTableOperation::RenameTable { table_name } => {
722 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
723 .await
724 }
725 AlterTableOperation::ChangeOwner { new_owner_name } => {
726 alter_owner::handle_alter_owner(
727 handler_args,
728 name,
729 new_owner_name,
730 StatementType::ALTER_TABLE,
731 )
732 .await
733 }
734 AlterTableOperation::SetParallelism {
735 parallelism,
736 deferred,
737 } => {
738 alter_parallelism::handle_alter_parallelism(
739 handler_args,
740 name,
741 parallelism,
742 StatementType::ALTER_TABLE,
743 deferred,
744 )
745 .await
746 }
747 AlterTableOperation::SetSchema { new_schema_name } => {
748 alter_set_schema::handle_alter_set_schema(
749 handler_args,
750 name,
751 new_schema_name,
752 StatementType::ALTER_TABLE,
753 None,
754 )
755 .await
756 }
757 AlterTableOperation::RefreshSchema => {
758 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
759 }
760 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
761 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
762 handler_args,
763 PbThrottleTarget::TableWithSource,
764 name,
765 rate_limit,
766 )
767 .await
768 }
769 AlterTableOperation::DropConnector => {
770 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
771 .await
772 }
773 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
774 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
775 handler_args,
776 PbThrottleTarget::TableDml,
777 name,
778 rate_limit,
779 )
780 .await
781 }
782 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
783 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
784 handler_args,
785 PbThrottleTarget::CdcTable,
786 name,
787 rate_limit,
788 )
789 .await
790 }
791 AlterTableOperation::SwapRenameTable { target_table } => {
792 alter_swap_rename::handle_swap_rename(
793 handler_args,
794 name,
795 target_table,
796 StatementType::ALTER_TABLE,
797 )
798 .await
799 }
800 AlterTableOperation::AddConstraint { .. }
801 | AlterTableOperation::DropConstraint { .. }
802 | AlterTableOperation::RenameColumn { .. }
803 | AlterTableOperation::ChangeColumn { .. }
804 | AlterTableOperation::RenameConstraint { .. } => {
805 bail_not_implemented!(
806 "Unhandled statement: {}",
807 Statement::AlterTable { name, operation }
808 )
809 }
810 },
811 Statement::AlterIndex { name, operation } => match operation {
812 AlterIndexOperation::RenameIndex { index_name } => {
813 alter_rename::handle_rename_index(handler_args, name, index_name).await
814 }
815 AlterIndexOperation::SetParallelism {
816 parallelism,
817 deferred,
818 } => {
819 alter_parallelism::handle_alter_parallelism(
820 handler_args,
821 name,
822 parallelism,
823 StatementType::ALTER_INDEX,
824 deferred,
825 )
826 .await
827 }
828 },
829 Statement::AlterView {
830 materialized,
831 name,
832 operation,
833 } => {
834 let statement_type = if materialized {
835 StatementType::ALTER_MATERIALIZED_VIEW
836 } else {
837 StatementType::ALTER_VIEW
838 };
839 match operation {
840 AlterViewOperation::RenameView { view_name } => {
841 if materialized {
842 alter_rename::handle_rename_table(
843 handler_args,
844 TableType::MaterializedView,
845 name,
846 view_name,
847 )
848 .await
849 } else {
850 alter_rename::handle_rename_view(handler_args, name, view_name).await
851 }
852 }
853 AlterViewOperation::SetParallelism {
854 parallelism,
855 deferred,
856 } => {
857 if !materialized {
858 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
859 }
860 alter_parallelism::handle_alter_parallelism(
861 handler_args,
862 name,
863 parallelism,
864 statement_type,
865 deferred,
866 )
867 .await
868 }
869 AlterViewOperation::SetResourceGroup {
870 resource_group,
871 deferred,
872 } => {
873 if !materialized {
874 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
875 }
876 alter_resource_group::handle_alter_resource_group(
877 handler_args,
878 name,
879 resource_group,
880 statement_type,
881 deferred,
882 )
883 .await
884 }
885 AlterViewOperation::ChangeOwner { new_owner_name } => {
886 alter_owner::handle_alter_owner(
887 handler_args,
888 name,
889 new_owner_name,
890 statement_type,
891 )
892 .await
893 }
894 AlterViewOperation::SetSchema { new_schema_name } => {
895 alter_set_schema::handle_alter_set_schema(
896 handler_args,
897 name,
898 new_schema_name,
899 statement_type,
900 None,
901 )
902 .await
903 }
904 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
905 if !materialized {
906 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
907 }
908 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
909 handler_args,
910 PbThrottleTarget::Mv,
911 name,
912 rate_limit,
913 )
914 .await
915 }
916 AlterViewOperation::SwapRenameView { target_view } => {
917 alter_swap_rename::handle_swap_rename(
918 handler_args,
919 name,
920 target_view,
921 statement_type,
922 )
923 .await
924 }
925 }
926 }
927
928 Statement::AlterSink { name, operation } => match operation {
929 AlterSinkOperation::SetSinkProps { changed_props } => {
930 alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await
931 }
932 AlterSinkOperation::RenameSink { sink_name } => {
933 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
934 }
935 AlterSinkOperation::ChangeOwner { new_owner_name } => {
936 alter_owner::handle_alter_owner(
937 handler_args,
938 name,
939 new_owner_name,
940 StatementType::ALTER_SINK,
941 )
942 .await
943 }
944 AlterSinkOperation::SetSchema { new_schema_name } => {
945 alter_set_schema::handle_alter_set_schema(
946 handler_args,
947 name,
948 new_schema_name,
949 StatementType::ALTER_SINK,
950 None,
951 )
952 .await
953 }
954 AlterSinkOperation::SetParallelism {
955 parallelism,
956 deferred,
957 } => {
958 alter_parallelism::handle_alter_parallelism(
959 handler_args,
960 name,
961 parallelism,
962 StatementType::ALTER_SINK,
963 deferred,
964 )
965 .await
966 }
967 AlterSinkOperation::SwapRenameSink { target_sink } => {
968 alter_swap_rename::handle_swap_rename(
969 handler_args,
970 name,
971 target_sink,
972 StatementType::ALTER_SINK,
973 )
974 .await
975 }
976 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
977 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
978 handler_args,
979 PbThrottleTarget::Sink,
980 name,
981 rate_limit,
982 )
983 .await
984 }
985 },
986 Statement::AlterSubscription { name, operation } => match operation {
987 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
988 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
989 .await
990 }
991 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
992 alter_owner::handle_alter_owner(
993 handler_args,
994 name,
995 new_owner_name,
996 StatementType::ALTER_SUBSCRIPTION,
997 )
998 .await
999 }
1000 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1001 alter_set_schema::handle_alter_set_schema(
1002 handler_args,
1003 name,
1004 new_schema_name,
1005 StatementType::ALTER_SUBSCRIPTION,
1006 None,
1007 )
1008 .await
1009 }
1010 AlterSubscriptionOperation::SwapRenameSubscription {
1011 target_subscription,
1012 } => {
1013 alter_swap_rename::handle_swap_rename(
1014 handler_args,
1015 name,
1016 target_subscription,
1017 StatementType::ALTER_SUBSCRIPTION,
1018 )
1019 .await
1020 }
1021 },
1022 Statement::AlterSource { name, operation } => match operation {
1023 AlterSourceOperation::RenameSource { source_name } => {
1024 alter_rename::handle_rename_source(handler_args, name, source_name).await
1025 }
1026 AlterSourceOperation::AddColumn { .. } => {
1027 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1028 }
1029 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1030 alter_owner::handle_alter_owner(
1031 handler_args,
1032 name,
1033 new_owner_name,
1034 StatementType::ALTER_SOURCE,
1035 )
1036 .await
1037 }
1038 AlterSourceOperation::SetSchema { new_schema_name } => {
1039 alter_set_schema::handle_alter_set_schema(
1040 handler_args,
1041 name,
1042 new_schema_name,
1043 StatementType::ALTER_SOURCE,
1044 None,
1045 )
1046 .await
1047 }
1048 AlterSourceOperation::FormatEncode { format_encode } => {
1049 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1050 .await
1051 }
1052 AlterSourceOperation::RefreshSchema => {
1053 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1054 }
1055 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1056 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1057 handler_args,
1058 PbThrottleTarget::Source,
1059 name,
1060 rate_limit,
1061 )
1062 .await
1063 }
1064 AlterSourceOperation::SwapRenameSource { target_source } => {
1065 alter_swap_rename::handle_swap_rename(
1066 handler_args,
1067 name,
1068 target_source,
1069 StatementType::ALTER_SOURCE,
1070 )
1071 .await
1072 }
1073 AlterSourceOperation::SetParallelism {
1074 parallelism,
1075 deferred,
1076 } => {
1077 alter_parallelism::handle_alter_parallelism(
1078 handler_args,
1079 name,
1080 parallelism,
1081 StatementType::ALTER_SOURCE,
1082 deferred,
1083 )
1084 .await
1085 }
1086 },
1087 Statement::AlterFunction {
1088 name,
1089 args,
1090 operation,
1091 } => match operation {
1092 AlterFunctionOperation::SetSchema { new_schema_name } => {
1093 alter_set_schema::handle_alter_set_schema(
1094 handler_args,
1095 name,
1096 new_schema_name,
1097 StatementType::ALTER_FUNCTION,
1098 args,
1099 )
1100 .await
1101 }
1102 },
1103 Statement::AlterConnection { name, operation } => match operation {
1104 AlterConnectionOperation::SetSchema { new_schema_name } => {
1105 alter_set_schema::handle_alter_set_schema(
1106 handler_args,
1107 name,
1108 new_schema_name,
1109 StatementType::ALTER_CONNECTION,
1110 None,
1111 )
1112 .await
1113 }
1114 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1115 alter_owner::handle_alter_owner(
1116 handler_args,
1117 name,
1118 new_owner_name,
1119 StatementType::ALTER_CONNECTION,
1120 )
1121 .await
1122 }
1123 },
1124 Statement::AlterSystem { param, value } => {
1125 alter_system::handle_alter_system(handler_args, param, value).await
1126 }
1127 Statement::AlterSecret {
1128 name,
1129 with_options,
1130 operation,
1131 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1132 Statement::AlterFragment {
1133 fragment_id,
1134 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1135 } => {
1136 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1137 &handler_args.session,
1138 PbThrottleTarget::Fragment,
1139 fragment_id,
1140 rate_limit,
1141 StatementType::SET_VARIABLE,
1142 )
1143 .await
1144 }
1145 Statement::StartTransaction { modes } => {
1146 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1147 }
1148 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1149 Statement::Commit { chain } => {
1150 transaction::handle_commit(handler_args, COMMIT, chain).await
1151 }
1152 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1153 Statement::Rollback { chain } => {
1154 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1155 }
1156 Statement::SetTransaction {
1157 modes,
1158 snapshot,
1159 session,
1160 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1161 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1162 Statement::Kill(process_id) => handle_kill(handler_args, process_id).await,
1163 Statement::Comment {
1164 object_type,
1165 object_name,
1166 comment,
1167 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1168 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1169 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1170 }
1171}
1172
1173fn check_ban_ddl_for_iceberg_engine_table(
1174 session: Arc<SessionImpl>,
1175 stmt: &Statement,
1176) -> Result<()> {
1177 match stmt {
1178 Statement::AlterTable {
1179 name,
1180 operation:
1181 operation @ (AlterTableOperation::AddColumn { .. }
1182 | AlterTableOperation::DropColumn { .. }),
1183 } => {
1184 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1185 if table.is_iceberg_engine_table() {
1186 bail!(
1187 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1188 operation,
1189 schema_name,
1190 name
1191 );
1192 }
1193 }
1194
1195 Statement::AlterTable {
1196 name,
1197 operation: AlterTableOperation::RenameTable { .. },
1198 } => {
1199 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1200 if table.is_iceberg_engine_table() {
1201 bail!(
1202 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1203 schema_name,
1204 name
1205 );
1206 }
1207 }
1208
1209 Statement::AlterTable {
1210 name,
1211 operation: AlterTableOperation::ChangeOwner { .. },
1212 } => {
1213 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1214 if table.is_iceberg_engine_table() {
1215 bail!(
1216 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1217 schema_name,
1218 name
1219 );
1220 }
1221 }
1222
1223 Statement::AlterTable {
1224 name,
1225 operation: AlterTableOperation::SetParallelism { .. },
1226 } => {
1227 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1228 if table.is_iceberg_engine_table() {
1229 bail!(
1230 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1231 schema_name,
1232 name
1233 );
1234 }
1235 }
1236
1237 Statement::AlterTable {
1238 name,
1239 operation: AlterTableOperation::SetSchema { .. },
1240 } => {
1241 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1242 if table.is_iceberg_engine_table() {
1243 bail!(
1244 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1245 schema_name,
1246 name
1247 );
1248 }
1249 }
1250
1251 Statement::AlterTable {
1252 name,
1253 operation: AlterTableOperation::RefreshSchema,
1254 } => {
1255 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1256 if table.is_iceberg_engine_table() {
1257 bail!(
1258 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1259 schema_name,
1260 name
1261 );
1262 }
1263 }
1264
1265 Statement::AlterTable {
1266 name,
1267 operation: AlterTableOperation::SetSourceRateLimit { .. },
1268 } => {
1269 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1270 if table.is_iceberg_engine_table() {
1271 bail!(
1272 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1273 schema_name,
1274 name
1275 );
1276 }
1277 }
1278
1279 _ => {}
1280 }
1281
1282 Ok(())
1283}