1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::types::Fields;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::{bail, bail_not_implemented};
30use risingwave_pb::meta::PbThrottleTarget;
31use risingwave_sqlparser::ast::*;
32use util::get_table_catalog_by_table_name;
33
34use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
35use crate::catalog::table_catalog::TableType;
36use crate::error::{ErrorCode, Result};
37use crate::handler::cancel_job::handle_cancel;
38use crate::handler::kill_process::handle_kill;
39use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
40use crate::session::SessionImpl;
41use crate::utils::WithOptions;
42
43mod alter_owner;
44mod alter_parallelism;
45mod alter_rename;
46mod alter_resource_group;
47mod alter_secret;
48mod alter_set_schema;
49mod alter_sink_props;
50mod alter_source_column;
51mod alter_source_with_sr;
52mod alter_streaming_rate_limit;
53mod alter_swap_rename;
54mod alter_system;
55mod alter_table_column;
56pub mod alter_table_drop_connector;
57mod alter_table_with_sr;
58pub mod alter_user;
59pub mod cancel_job;
60pub mod close_cursor;
61mod comment;
62pub mod create_aggregate;
63pub mod create_connection;
64mod create_database;
65pub mod create_function;
66pub mod create_index;
67pub mod create_mv;
68pub mod create_schema;
69pub mod create_secret;
70pub mod create_sink;
71pub mod create_source;
72pub mod create_sql_function;
73pub mod create_subscription;
74pub mod create_table;
75pub mod create_table_as;
76pub mod create_user;
77pub mod create_view;
78pub mod declare_cursor;
79pub mod describe;
80pub mod discard;
81mod drop_connection;
82mod drop_database;
83pub mod drop_function;
84mod drop_index;
85pub mod drop_mv;
86mod drop_schema;
87pub mod drop_secret;
88pub mod drop_sink;
89pub mod drop_source;
90pub mod drop_subscription;
91pub mod drop_table;
92pub mod drop_user;
93mod drop_view;
94pub mod explain;
95pub mod explain_analyze_stream_job;
96pub mod extended_handle;
97pub mod fetch_cursor;
98mod flush;
99pub mod handle_privilege;
100mod kill_process;
101pub mod privilege;
102pub mod query;
103mod recover;
104pub mod show;
105mod transaction;
106mod use_db;
107pub mod util;
108pub mod variable;
109mod wait;
110
111pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
112
113pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
115
116pub type RwPgResponse = PgResponse<PgResponseStream>;
118
119#[easy_ext::ext(RwPgResponseBuilderExt)]
120impl RwPgResponseBuilder {
121 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
123 let fields = T::fields();
124 self.values(
125 rows.into_iter()
126 .map(|row| {
127 Row::new(
128 row.into_owned_row()
129 .into_iter()
130 .zip_eq_fast(&fields)
131 .map(|(datum, (_, ty))| {
132 datum.map(|scalar| {
133 scalar.as_scalar_ref_impl().text_format(ty).into()
134 })
135 })
136 .collect(),
137 )
138 })
139 .collect_vec()
140 .into(),
141 fields_to_descriptors(fields),
142 )
143 }
144}
145
146pub fn fields_to_descriptors(
147 fields: Vec<(&str, risingwave_common::types::DataType)>,
148) -> Vec<PgFieldDescriptor> {
149 fields
150 .iter()
151 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
152 .collect()
153}
154
155pub enum PgResponseStream {
156 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
157 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
158 Rows(BoxStream<'static, RowSetResult>),
159}
160
161impl Stream for PgResponseStream {
162 type Item = std::result::Result<Vec<Row>, BoxedError>;
163
164 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165 match &mut *self {
166 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
167 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
168 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
169 }
170 }
171}
172
173impl From<Vec<Row>> for PgResponseStream {
174 fn from(rows: Vec<Row>) -> Self {
175 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
176 }
177}
178
179#[derive(Clone)]
180pub struct HandlerArgs {
181 pub session: Arc<SessionImpl>,
182 pub sql: Arc<str>,
183 pub normalized_sql: String,
184 pub with_options: WithOptions,
185}
186
187impl HandlerArgs {
188 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
189 Ok(Self {
190 session,
191 sql,
192 with_options: WithOptions::try_from(stmt)?,
193 normalized_sql: Self::normalize_sql(stmt),
194 })
195 }
196
197 fn normalize_sql(stmt: &Statement) -> String {
203 let mut stmt = stmt.clone();
204 match &mut stmt {
205 Statement::CreateView {
206 or_replace,
207 if_not_exists,
208 ..
209 } => {
210 *or_replace = false;
211 *if_not_exists = false;
212 }
213 Statement::CreateTable {
214 or_replace,
215 if_not_exists,
216 ..
217 } => {
218 *or_replace = false;
219 *if_not_exists = false;
220 }
221 Statement::CreateIndex { if_not_exists, .. } => {
222 *if_not_exists = false;
223 }
224 Statement::CreateSource {
225 stmt: CreateSourceStatement { if_not_exists, .. },
226 ..
227 } => {
228 *if_not_exists = false;
229 }
230 Statement::CreateSink {
231 stmt: CreateSinkStatement { if_not_exists, .. },
232 } => {
233 *if_not_exists = false;
234 }
235 Statement::CreateSubscription {
236 stmt: CreateSubscriptionStatement { if_not_exists, .. },
237 } => {
238 *if_not_exists = false;
239 }
240 Statement::CreateConnection {
241 stmt: CreateConnectionStatement { if_not_exists, .. },
242 } => {
243 *if_not_exists = false;
244 }
245 _ => {}
246 }
247 stmt.to_string()
248 }
249}
250
251pub async fn handle(
252 session: Arc<SessionImpl>,
253 stmt: Statement,
254 sql: Arc<str>,
255 formats: Vec<Format>,
256) -> Result<RwPgResponse> {
257 session.clear_cancel_query_flag();
258 let _guard = session.txn_begin_implicit();
259 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
260
261 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
262
263 match stmt {
264 Statement::Explain {
265 statement,
266 analyze,
267 options,
268 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
269 Statement::ExplainAnalyzeStreamJob {
270 target,
271 duration_secs,
272 } => {
273 explain_analyze_stream_job::handle_explain_analyze_stream_job(
274 handler_args,
275 target,
276 duration_secs,
277 )
278 .await
279 }
280 Statement::CreateSource { stmt } => {
281 create_source::handle_create_source(handler_args, stmt).await
282 }
283 Statement::CreateSink { stmt } => create_sink::handle_create_sink(handler_args, stmt).await,
284 Statement::CreateSubscription { stmt } => {
285 create_subscription::handle_create_subscription(handler_args, stmt).await
286 }
287 Statement::CreateConnection { stmt } => {
288 create_connection::handle_create_connection(handler_args, stmt).await
289 }
290 Statement::CreateSecret { stmt } => {
291 create_secret::handle_create_secret(handler_args, stmt).await
292 }
293 Statement::CreateFunction {
294 or_replace,
295 temporary,
296 if_not_exists,
297 name,
298 args,
299 returns,
300 params,
301 with_options,
302 } => {
303 if params.language.is_none()
306 || !params
307 .language
308 .as_ref()
309 .unwrap()
310 .real_value()
311 .eq_ignore_ascii_case("sql")
312 {
313 create_function::handle_create_function(
314 handler_args,
315 or_replace,
316 temporary,
317 if_not_exists,
318 name,
319 args,
320 returns,
321 params,
322 with_options,
323 )
324 .await
325 } else {
326 create_sql_function::handle_create_sql_function(
327 handler_args,
328 or_replace,
329 temporary,
330 if_not_exists,
331 name,
332 args,
333 returns,
334 params,
335 )
336 .await
337 }
338 }
339 Statement::CreateAggregate {
340 or_replace,
341 if_not_exists,
342 name,
343 args,
344 returns,
345 params,
346 ..
347 } => {
348 create_aggregate::handle_create_aggregate(
349 handler_args,
350 or_replace,
351 if_not_exists,
352 name,
353 args,
354 returns,
355 params,
356 )
357 .await
358 }
359 Statement::CreateTable {
360 name,
361 columns,
362 wildcard_idx,
363 constraints,
364 query,
365 with_options: _, or_replace,
368 temporary,
369 if_not_exists,
370 format_encode,
371 source_watermarks,
372 append_only,
373 on_conflict,
374 with_version_column,
375 cdc_table_info,
376 include_column_options,
377 webhook_info,
378 engine,
379 } => {
380 if or_replace {
381 bail_not_implemented!("CREATE OR REPLACE TABLE");
382 }
383 if temporary {
384 bail_not_implemented!("CREATE TEMPORARY TABLE");
385 }
386 if let Some(query) = query {
387 return create_table_as::handle_create_as(
388 handler_args,
389 name,
390 if_not_exists,
391 query,
392 columns,
393 append_only,
394 on_conflict,
395 with_version_column.map(|x| x.real_value()),
396 engine,
397 )
398 .await;
399 }
400 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
401 create_table::handle_create_table(
402 handler_args,
403 name,
404 columns,
405 wildcard_idx,
406 constraints,
407 if_not_exists,
408 format_encode,
409 source_watermarks,
410 append_only,
411 on_conflict,
412 with_version_column.map(|x| x.real_value()),
413 cdc_table_info,
414 include_column_options,
415 webhook_info,
416 engine,
417 )
418 .await
419 }
420 Statement::CreateDatabase {
421 db_name,
422 if_not_exists,
423 owner,
424 resource_group,
425 } => {
426 create_database::handle_create_database(
427 handler_args,
428 db_name,
429 if_not_exists,
430 owner,
431 resource_group,
432 )
433 .await
434 }
435 Statement::CreateSchema {
436 schema_name,
437 if_not_exists,
438 owner,
439 } => {
440 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
441 .await
442 }
443 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
444 Statement::DeclareCursor { stmt } => {
445 declare_cursor::handle_declare_cursor(handler_args, stmt).await
446 }
447 Statement::FetchCursor { stmt } => {
448 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
449 }
450 Statement::CloseCursor { stmt } => {
451 close_cursor::handle_close_cursor(handler_args, stmt).await
452 }
453 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
454 Statement::Grant { .. } => {
455 handle_privilege::handle_grant_privilege(handler_args, stmt).await
456 }
457 Statement::Revoke { .. } => {
458 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
459 }
460 Statement::Describe { name, kind } => match kind {
461 DescribeKind::Fragments => {
462 describe::handle_describe_fragments(handler_args, name).await
463 }
464 DescribeKind::Plain => describe::handle_describe(handler_args, name),
465 },
466 Statement::Discard(..) => discard::handle_discard(handler_args),
467 Statement::ShowObjects {
468 object: show_object,
469 filter,
470 } => show::handle_show_object(handler_args, show_object, filter).await,
471 Statement::ShowCreateObject { create_type, name } => {
472 show::handle_show_create_object(handler_args, create_type, name)
473 }
474 Statement::ShowTransactionIsolationLevel => {
475 transaction::handle_show_isolation_level(handler_args)
476 }
477 Statement::Drop(DropStatement {
478 object_type,
479 object_name,
480 if_exists,
481 drop_mode,
482 }) => {
483 let mut cascade = false;
484 if let AstOption::Some(DropMode::Cascade) = drop_mode {
485 match object_type {
486 ObjectType::MaterializedView
487 | ObjectType::View
488 | ObjectType::Sink
489 | ObjectType::Source
490 | ObjectType::Subscription
491 | ObjectType::Index
492 | ObjectType::Table
493 | ObjectType::Schema => {
494 cascade = true;
495 }
496 ObjectType::Database
497 | ObjectType::User
498 | ObjectType::Connection
499 | ObjectType::Secret => {
500 bail_not_implemented!("DROP CASCADE");
501 }
502 };
503 };
504 match object_type {
505 ObjectType::Table => {
506 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
507 .await
508 }
509 ObjectType::MaterializedView => {
510 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
511 }
512 ObjectType::Index => {
513 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
514 .await
515 }
516 ObjectType::Source => {
517 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
518 .await
519 }
520 ObjectType::Sink => {
521 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
522 }
523 ObjectType::Subscription => {
524 drop_subscription::handle_drop_subscription(
525 handler_args,
526 object_name,
527 if_exists,
528 cascade,
529 )
530 .await
531 }
532 ObjectType::Database => {
533 drop_database::handle_drop_database(
534 handler_args,
535 object_name,
536 if_exists,
537 drop_mode.into(),
538 )
539 .await
540 }
541 ObjectType::Schema => {
542 drop_schema::handle_drop_schema(
543 handler_args,
544 object_name,
545 if_exists,
546 drop_mode.into(),
547 )
548 .await
549 }
550 ObjectType::User => {
551 drop_user::handle_drop_user(
552 handler_args,
553 object_name,
554 if_exists,
555 drop_mode.into(),
556 )
557 .await
558 }
559 ObjectType::View => {
560 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
561 }
562 ObjectType::Connection => {
563 drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
564 .await
565 }
566 ObjectType::Secret => {
567 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
568 }
569 }
570 }
571 Statement::DropFunction {
573 if_exists,
574 func_desc,
575 option,
576 } => {
577 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
578 .await
579 }
580 Statement::DropAggregate {
581 if_exists,
582 func_desc,
583 option,
584 } => {
585 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
586 .await
587 }
588 Statement::Query(_)
589 | Statement::Insert { .. }
590 | Statement::Delete { .. }
591 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
592 Statement::CreateView {
593 materialized,
594 if_not_exists,
595 name,
596 columns,
597 query,
598 with_options: _, or_replace, emit_mode,
601 } => {
602 if or_replace {
603 bail_not_implemented!("CREATE OR REPLACE VIEW");
604 }
605 if materialized {
606 create_mv::handle_create_mv(
607 handler_args,
608 if_not_exists,
609 name,
610 *query,
611 columns,
612 emit_mode,
613 )
614 .await
615 } else {
616 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
617 .await
618 }
619 }
620 Statement::Flush => flush::handle_flush(handler_args).await,
621 Statement::Wait => wait::handle_wait(handler_args).await,
622 Statement::Recover => recover::handle_recover(handler_args).await,
623 Statement::SetVariable {
624 local: _,
625 variable,
626 value,
627 } => {
628 if variable.real_value().eq_ignore_ascii_case("database") {
630 let x = variable::set_var_to_param_str(&value);
631 let res = use_db::handle_use_db(
632 handler_args,
633 ObjectName::from(vec![Ident::new_unchecked(
634 x.unwrap_or("default".to_owned()),
635 )]),
636 )?;
637 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
638 for notice in res.notices() {
639 builder = builder.notice(notice);
640 }
641 return Ok(builder.into());
642 }
643 variable::handle_set(handler_args, variable, value)
644 }
645 Statement::SetTimeZone { local: _, value } => {
646 variable::handle_set_time_zone(handler_args, value)
647 }
648 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
649 Statement::CreateIndex {
650 name,
651 table_name,
652 columns,
653 include,
654 distributed_by,
655 unique,
656 if_not_exists,
657 } => {
658 if unique {
659 bail_not_implemented!("create unique index");
660 }
661
662 create_index::handle_create_index(
663 handler_args,
664 if_not_exists,
665 name,
666 table_name,
667 columns.to_vec(),
668 include,
669 distributed_by,
670 )
671 .await
672 }
673 Statement::AlterDatabase { name, operation } => match operation {
674 AlterDatabaseOperation::RenameDatabase { database_name } => {
675 alter_rename::handle_rename_database(handler_args, name, database_name).await
676 }
677 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
678 alter_owner::handle_alter_owner(
679 handler_args,
680 name,
681 new_owner_name,
682 StatementType::ALTER_DATABASE,
683 )
684 .await
685 }
686 },
687 Statement::AlterSchema { name, operation } => match operation {
688 AlterSchemaOperation::RenameSchema { schema_name } => {
689 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
690 }
691 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
692 alter_owner::handle_alter_owner(
693 handler_args,
694 name,
695 new_owner_name,
696 StatementType::ALTER_SCHEMA,
697 )
698 .await
699 }
700 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
701 alter_swap_rename::handle_swap_rename(
702 handler_args,
703 name,
704 target_schema,
705 StatementType::ALTER_SCHEMA,
706 )
707 .await
708 }
709 },
710 Statement::AlterTable { name, operation } => match operation {
711 AlterTableOperation::AddColumn { .. }
712 | AlterTableOperation::DropColumn { .. }
713 | AlterTableOperation::AlterColumn { .. } => {
714 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
715 }
716 AlterTableOperation::RenameTable { table_name } => {
717 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
718 .await
719 }
720 AlterTableOperation::ChangeOwner { new_owner_name } => {
721 alter_owner::handle_alter_owner(
722 handler_args,
723 name,
724 new_owner_name,
725 StatementType::ALTER_TABLE,
726 )
727 .await
728 }
729 AlterTableOperation::SetParallelism {
730 parallelism,
731 deferred,
732 } => {
733 alter_parallelism::handle_alter_parallelism(
734 handler_args,
735 name,
736 parallelism,
737 StatementType::ALTER_TABLE,
738 deferred,
739 )
740 .await
741 }
742 AlterTableOperation::SetSchema { new_schema_name } => {
743 alter_set_schema::handle_alter_set_schema(
744 handler_args,
745 name,
746 new_schema_name,
747 StatementType::ALTER_TABLE,
748 None,
749 )
750 .await
751 }
752 AlterTableOperation::RefreshSchema => {
753 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
754 }
755 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
756 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
757 handler_args,
758 PbThrottleTarget::TableWithSource,
759 name,
760 rate_limit,
761 )
762 .await
763 }
764 AlterTableOperation::DropConnector => {
765 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
766 .await
767 }
768 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
769 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
770 handler_args,
771 PbThrottleTarget::TableDml,
772 name,
773 rate_limit,
774 )
775 .await
776 }
777 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
778 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
779 handler_args,
780 PbThrottleTarget::CdcTable,
781 name,
782 rate_limit,
783 )
784 .await
785 }
786 AlterTableOperation::SwapRenameTable { target_table } => {
787 alter_swap_rename::handle_swap_rename(
788 handler_args,
789 name,
790 target_table,
791 StatementType::ALTER_TABLE,
792 )
793 .await
794 }
795 AlterTableOperation::AddConstraint { .. }
796 | AlterTableOperation::DropConstraint { .. }
797 | AlterTableOperation::RenameColumn { .. }
798 | AlterTableOperation::ChangeColumn { .. }
799 | AlterTableOperation::RenameConstraint { .. } => {
800 bail_not_implemented!(
801 "Unhandled statement: {}",
802 Statement::AlterTable { name, operation }
803 )
804 }
805 },
806 Statement::AlterIndex { name, operation } => match operation {
807 AlterIndexOperation::RenameIndex { index_name } => {
808 alter_rename::handle_rename_index(handler_args, name, index_name).await
809 }
810 AlterIndexOperation::SetParallelism {
811 parallelism,
812 deferred,
813 } => {
814 alter_parallelism::handle_alter_parallelism(
815 handler_args,
816 name,
817 parallelism,
818 StatementType::ALTER_INDEX,
819 deferred,
820 )
821 .await
822 }
823 },
824 Statement::AlterView {
825 materialized,
826 name,
827 operation,
828 } => {
829 let statement_type = if materialized {
830 StatementType::ALTER_MATERIALIZED_VIEW
831 } else {
832 StatementType::ALTER_VIEW
833 };
834 match operation {
835 AlterViewOperation::RenameView { view_name } => {
836 if materialized {
837 alter_rename::handle_rename_table(
838 handler_args,
839 TableType::MaterializedView,
840 name,
841 view_name,
842 )
843 .await
844 } else {
845 alter_rename::handle_rename_view(handler_args, name, view_name).await
846 }
847 }
848 AlterViewOperation::SetParallelism {
849 parallelism,
850 deferred,
851 } => {
852 if !materialized {
853 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
854 }
855 alter_parallelism::handle_alter_parallelism(
856 handler_args,
857 name,
858 parallelism,
859 statement_type,
860 deferred,
861 )
862 .await
863 }
864 AlterViewOperation::SetResourceGroup {
865 resource_group,
866 deferred,
867 } => {
868 if !materialized {
869 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
870 }
871 alter_resource_group::handle_alter_resource_group(
872 handler_args,
873 name,
874 resource_group,
875 statement_type,
876 deferred,
877 )
878 .await
879 }
880 AlterViewOperation::ChangeOwner { new_owner_name } => {
881 alter_owner::handle_alter_owner(
882 handler_args,
883 name,
884 new_owner_name,
885 statement_type,
886 )
887 .await
888 }
889 AlterViewOperation::SetSchema { new_schema_name } => {
890 alter_set_schema::handle_alter_set_schema(
891 handler_args,
892 name,
893 new_schema_name,
894 statement_type,
895 None,
896 )
897 .await
898 }
899 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
900 if !materialized {
901 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
902 }
903 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
904 handler_args,
905 PbThrottleTarget::Mv,
906 name,
907 rate_limit,
908 )
909 .await
910 }
911 AlterViewOperation::SwapRenameView { target_view } => {
912 alter_swap_rename::handle_swap_rename(
913 handler_args,
914 name,
915 target_view,
916 statement_type,
917 )
918 .await
919 }
920 }
921 }
922
923 Statement::AlterSink { name, operation } => match operation {
924 AlterSinkOperation::SetSinkProps { changed_props } => {
925 alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await
926 }
927 AlterSinkOperation::RenameSink { sink_name } => {
928 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
929 }
930 AlterSinkOperation::ChangeOwner { new_owner_name } => {
931 alter_owner::handle_alter_owner(
932 handler_args,
933 name,
934 new_owner_name,
935 StatementType::ALTER_SINK,
936 )
937 .await
938 }
939 AlterSinkOperation::SetSchema { new_schema_name } => {
940 alter_set_schema::handle_alter_set_schema(
941 handler_args,
942 name,
943 new_schema_name,
944 StatementType::ALTER_SINK,
945 None,
946 )
947 .await
948 }
949 AlterSinkOperation::SetParallelism {
950 parallelism,
951 deferred,
952 } => {
953 alter_parallelism::handle_alter_parallelism(
954 handler_args,
955 name,
956 parallelism,
957 StatementType::ALTER_SINK,
958 deferred,
959 )
960 .await
961 }
962 AlterSinkOperation::SwapRenameSink { target_sink } => {
963 alter_swap_rename::handle_swap_rename(
964 handler_args,
965 name,
966 target_sink,
967 StatementType::ALTER_SINK,
968 )
969 .await
970 }
971 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
972 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
973 handler_args,
974 PbThrottleTarget::Sink,
975 name,
976 rate_limit,
977 )
978 .await
979 }
980 },
981 Statement::AlterSubscription { name, operation } => match operation {
982 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
983 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
984 .await
985 }
986 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
987 alter_owner::handle_alter_owner(
988 handler_args,
989 name,
990 new_owner_name,
991 StatementType::ALTER_SUBSCRIPTION,
992 )
993 .await
994 }
995 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
996 alter_set_schema::handle_alter_set_schema(
997 handler_args,
998 name,
999 new_schema_name,
1000 StatementType::ALTER_SUBSCRIPTION,
1001 None,
1002 )
1003 .await
1004 }
1005 AlterSubscriptionOperation::SwapRenameSubscription {
1006 target_subscription,
1007 } => {
1008 alter_swap_rename::handle_swap_rename(
1009 handler_args,
1010 name,
1011 target_subscription,
1012 StatementType::ALTER_SUBSCRIPTION,
1013 )
1014 .await
1015 }
1016 },
1017 Statement::AlterSource { name, operation } => match operation {
1018 AlterSourceOperation::RenameSource { source_name } => {
1019 alter_rename::handle_rename_source(handler_args, name, source_name).await
1020 }
1021 AlterSourceOperation::AddColumn { .. } => {
1022 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1023 }
1024 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1025 alter_owner::handle_alter_owner(
1026 handler_args,
1027 name,
1028 new_owner_name,
1029 StatementType::ALTER_SOURCE,
1030 )
1031 .await
1032 }
1033 AlterSourceOperation::SetSchema { new_schema_name } => {
1034 alter_set_schema::handle_alter_set_schema(
1035 handler_args,
1036 name,
1037 new_schema_name,
1038 StatementType::ALTER_SOURCE,
1039 None,
1040 )
1041 .await
1042 }
1043 AlterSourceOperation::FormatEncode { format_encode } => {
1044 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1045 .await
1046 }
1047 AlterSourceOperation::RefreshSchema => {
1048 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1049 }
1050 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1051 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1052 handler_args,
1053 PbThrottleTarget::Source,
1054 name,
1055 rate_limit,
1056 )
1057 .await
1058 }
1059 AlterSourceOperation::SwapRenameSource { target_source } => {
1060 alter_swap_rename::handle_swap_rename(
1061 handler_args,
1062 name,
1063 target_source,
1064 StatementType::ALTER_SOURCE,
1065 )
1066 .await
1067 }
1068 AlterSourceOperation::SetParallelism {
1069 parallelism,
1070 deferred,
1071 } => {
1072 alter_parallelism::handle_alter_parallelism(
1073 handler_args,
1074 name,
1075 parallelism,
1076 StatementType::ALTER_SOURCE,
1077 deferred,
1078 )
1079 .await
1080 }
1081 },
1082 Statement::AlterFunction {
1083 name,
1084 args,
1085 operation,
1086 } => match operation {
1087 AlterFunctionOperation::SetSchema { new_schema_name } => {
1088 alter_set_schema::handle_alter_set_schema(
1089 handler_args,
1090 name,
1091 new_schema_name,
1092 StatementType::ALTER_FUNCTION,
1093 args,
1094 )
1095 .await
1096 }
1097 },
1098 Statement::AlterConnection { name, operation } => match operation {
1099 AlterConnectionOperation::SetSchema { new_schema_name } => {
1100 alter_set_schema::handle_alter_set_schema(
1101 handler_args,
1102 name,
1103 new_schema_name,
1104 StatementType::ALTER_CONNECTION,
1105 None,
1106 )
1107 .await
1108 }
1109 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1110 alter_owner::handle_alter_owner(
1111 handler_args,
1112 name,
1113 new_owner_name,
1114 StatementType::ALTER_CONNECTION,
1115 )
1116 .await
1117 }
1118 },
1119 Statement::AlterSystem { param, value } => {
1120 alter_system::handle_alter_system(handler_args, param, value).await
1121 }
1122 Statement::AlterSecret {
1123 name,
1124 with_options,
1125 operation,
1126 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1127 Statement::AlterFragment {
1128 fragment_id,
1129 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1130 } => {
1131 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1132 &handler_args.session,
1133 PbThrottleTarget::Fragment,
1134 fragment_id,
1135 rate_limit,
1136 StatementType::SET_VARIABLE,
1137 )
1138 .await
1139 }
1140 Statement::StartTransaction { modes } => {
1141 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1142 }
1143 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1144 Statement::Commit { chain } => {
1145 transaction::handle_commit(handler_args, COMMIT, chain).await
1146 }
1147 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1148 Statement::Rollback { chain } => {
1149 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1150 }
1151 Statement::SetTransaction {
1152 modes,
1153 snapshot,
1154 session,
1155 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1156 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1157 Statement::Kill(process_id) => handle_kill(handler_args, process_id).await,
1158 Statement::Comment {
1159 object_type,
1160 object_name,
1161 comment,
1162 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1163 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1164 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1165 }
1166}
1167
1168fn check_ban_ddl_for_iceberg_engine_table(
1169 session: Arc<SessionImpl>,
1170 stmt: &Statement,
1171) -> Result<()> {
1172 match stmt {
1173 Statement::AlterTable {
1174 name,
1175 operation:
1176 operation @ (AlterTableOperation::AddColumn { .. }
1177 | AlterTableOperation::DropColumn { .. }),
1178 } => {
1179 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1180 if table.is_iceberg_engine_table() {
1181 bail!(
1182 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1183 operation,
1184 schema_name,
1185 name
1186 );
1187 }
1188 }
1189
1190 Statement::AlterTable {
1191 name,
1192 operation: AlterTableOperation::RenameTable { .. },
1193 } => {
1194 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1195 if table.is_iceberg_engine_table() {
1196 bail!(
1197 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1198 schema_name,
1199 name
1200 );
1201 }
1202 }
1203
1204 Statement::AlterTable {
1205 name,
1206 operation: AlterTableOperation::ChangeOwner { .. },
1207 } => {
1208 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1209 if table.is_iceberg_engine_table() {
1210 bail!(
1211 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1212 schema_name,
1213 name
1214 );
1215 }
1216 }
1217
1218 Statement::AlterTable {
1219 name,
1220 operation: AlterTableOperation::SetParallelism { .. },
1221 } => {
1222 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1223 if table.is_iceberg_engine_table() {
1224 bail!(
1225 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1226 schema_name,
1227 name
1228 );
1229 }
1230 }
1231
1232 Statement::AlterTable {
1233 name,
1234 operation: AlterTableOperation::SetSchema { .. },
1235 } => {
1236 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1237 if table.is_iceberg_engine_table() {
1238 bail!(
1239 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1240 schema_name,
1241 name
1242 );
1243 }
1244 }
1245
1246 Statement::AlterTable {
1247 name,
1248 operation: AlterTableOperation::RefreshSchema,
1249 } => {
1250 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1251 if table.is_iceberg_engine_table() {
1252 bail!(
1253 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1254 schema_name,
1255 name
1256 );
1257 }
1258 }
1259
1260 Statement::AlterTable {
1261 name,
1262 operation: AlterTableOperation::SetSourceRateLimit { .. },
1263 } => {
1264 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1265 if table.is_iceberg_engine_table() {
1266 bail!(
1267 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1268 schema_name,
1269 name
1270 );
1271 }
1272 }
1273
1274 _ => {}
1275 }
1276
1277 Ok(())
1278}