1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::types::Fields;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::{bail, bail_not_implemented};
30use risingwave_pb::meta::PbThrottleTarget;
31use risingwave_sqlparser::ast::*;
32use util::get_table_catalog_by_table_name;
33
34use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
35use crate::catalog::table_catalog::TableType;
36use crate::error::{ErrorCode, Result};
37use crate::handler::cancel_job::handle_cancel;
38use crate::handler::kill_process::handle_kill;
39use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
40use crate::session::SessionImpl;
41use crate::utils::WithOptions;
42
43mod alter_owner;
44mod alter_parallelism;
45mod alter_rename;
46mod alter_resource_group;
47mod alter_secret;
48mod alter_set_schema;
49mod alter_source_column;
50mod alter_source_with_sr;
51mod alter_streaming_rate_limit;
52mod alter_swap_rename;
53mod alter_system;
54mod alter_table_column;
55pub mod alter_table_drop_connector;
56mod alter_table_with_sr;
57pub mod alter_user;
58pub mod cancel_job;
59pub mod close_cursor;
60mod comment;
61pub mod create_aggregate;
62pub mod create_connection;
63mod create_database;
64pub mod create_function;
65pub mod create_index;
66pub mod create_mv;
67pub mod create_schema;
68pub mod create_secret;
69pub mod create_sink;
70pub mod create_source;
71pub mod create_sql_function;
72pub mod create_subscription;
73pub mod create_table;
74pub mod create_table_as;
75pub mod create_user;
76pub mod create_view;
77pub mod declare_cursor;
78pub mod describe;
79pub mod discard;
80mod drop_connection;
81mod drop_database;
82pub mod drop_function;
83mod drop_index;
84pub mod drop_mv;
85mod drop_schema;
86pub mod drop_secret;
87pub mod drop_sink;
88pub mod drop_source;
89pub mod drop_subscription;
90pub mod drop_table;
91pub mod drop_user;
92mod drop_view;
93pub mod explain;
94pub mod explain_analyze_stream_job;
95pub mod extended_handle;
96pub mod fetch_cursor;
97mod flush;
98pub mod handle_privilege;
99mod kill_process;
100pub mod privilege;
101pub mod query;
102mod recover;
103pub mod show;
104mod transaction;
105mod use_db;
106pub mod util;
107pub mod variable;
108mod wait;
109
110pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
111
112pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
114
115pub type RwPgResponse = PgResponse<PgResponseStream>;
117
118#[easy_ext::ext(RwPgResponseBuilderExt)]
119impl RwPgResponseBuilder {
120 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
122 let fields = T::fields();
123 self.values(
124 rows.into_iter()
125 .map(|row| {
126 Row::new(
127 row.into_owned_row()
128 .into_iter()
129 .zip_eq_fast(&fields)
130 .map(|(datum, (_, ty))| {
131 datum.map(|scalar| {
132 scalar.as_scalar_ref_impl().text_format(ty).into()
133 })
134 })
135 .collect(),
136 )
137 })
138 .collect_vec()
139 .into(),
140 fields_to_descriptors(fields),
141 )
142 }
143}
144
145pub fn fields_to_descriptors(
146 fields: Vec<(&str, risingwave_common::types::DataType)>,
147) -> Vec<PgFieldDescriptor> {
148 fields
149 .iter()
150 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
151 .collect()
152}
153
154pub enum PgResponseStream {
155 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
156 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
157 Rows(BoxStream<'static, RowSetResult>),
158}
159
160impl Stream for PgResponseStream {
161 type Item = std::result::Result<Vec<Row>, BoxedError>;
162
163 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
164 match &mut *self {
165 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
166 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
167 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
168 }
169 }
170}
171
172impl From<Vec<Row>> for PgResponseStream {
173 fn from(rows: Vec<Row>) -> Self {
174 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
175 }
176}
177
178#[derive(Clone)]
179pub struct HandlerArgs {
180 pub session: Arc<SessionImpl>,
181 pub sql: Arc<str>,
182 pub normalized_sql: String,
183 pub with_options: WithOptions,
184}
185
186impl HandlerArgs {
187 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
188 Ok(Self {
189 session,
190 sql,
191 with_options: WithOptions::try_from(stmt)?,
192 normalized_sql: Self::normalize_sql(stmt),
193 })
194 }
195
196 fn normalize_sql(stmt: &Statement) -> String {
202 let mut stmt = stmt.clone();
203 match &mut stmt {
204 Statement::CreateView {
205 or_replace,
206 if_not_exists,
207 ..
208 } => {
209 *or_replace = false;
210 *if_not_exists = false;
211 }
212 Statement::CreateTable {
213 or_replace,
214 if_not_exists,
215 ..
216 } => {
217 *or_replace = false;
218 *if_not_exists = false;
219 }
220 Statement::CreateIndex { if_not_exists, .. } => {
221 *if_not_exists = false;
222 }
223 Statement::CreateSource {
224 stmt: CreateSourceStatement { if_not_exists, .. },
225 ..
226 } => {
227 *if_not_exists = false;
228 }
229 Statement::CreateSink {
230 stmt: CreateSinkStatement { if_not_exists, .. },
231 } => {
232 *if_not_exists = false;
233 }
234 Statement::CreateSubscription {
235 stmt: CreateSubscriptionStatement { if_not_exists, .. },
236 } => {
237 *if_not_exists = false;
238 }
239 Statement::CreateConnection {
240 stmt: CreateConnectionStatement { if_not_exists, .. },
241 } => {
242 *if_not_exists = false;
243 }
244 _ => {}
245 }
246 stmt.to_string()
247 }
248}
249
250pub async fn handle(
251 session: Arc<SessionImpl>,
252 stmt: Statement,
253 sql: Arc<str>,
254 formats: Vec<Format>,
255) -> Result<RwPgResponse> {
256 session.clear_cancel_query_flag();
257 let _guard = session.txn_begin_implicit();
258 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
259
260 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
261
262 match stmt {
263 Statement::Explain {
264 statement,
265 analyze,
266 options,
267 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
268 Statement::ExplainAnalyzeStreamJob {
269 target,
270 duration_secs,
271 } => {
272 explain_analyze_stream_job::handle_explain_analyze_stream_job(
273 handler_args,
274 target,
275 duration_secs,
276 )
277 .await
278 }
279 Statement::CreateSource { stmt } => {
280 create_source::handle_create_source(handler_args, stmt).await
281 }
282 Statement::CreateSink { stmt } => create_sink::handle_create_sink(handler_args, stmt).await,
283 Statement::CreateSubscription { stmt } => {
284 create_subscription::handle_create_subscription(handler_args, stmt).await
285 }
286 Statement::CreateConnection { stmt } => {
287 create_connection::handle_create_connection(handler_args, stmt).await
288 }
289 Statement::CreateSecret { stmt } => {
290 create_secret::handle_create_secret(handler_args, stmt).await
291 }
292 Statement::CreateFunction {
293 or_replace,
294 temporary,
295 if_not_exists,
296 name,
297 args,
298 returns,
299 params,
300 with_options,
301 } => {
302 if params.language.is_none()
305 || !params
306 .language
307 .as_ref()
308 .unwrap()
309 .real_value()
310 .eq_ignore_ascii_case("sql")
311 {
312 create_function::handle_create_function(
313 handler_args,
314 or_replace,
315 temporary,
316 if_not_exists,
317 name,
318 args,
319 returns,
320 params,
321 with_options,
322 )
323 .await
324 } else {
325 create_sql_function::handle_create_sql_function(
326 handler_args,
327 or_replace,
328 temporary,
329 if_not_exists,
330 name,
331 args,
332 returns,
333 params,
334 )
335 .await
336 }
337 }
338 Statement::CreateAggregate {
339 or_replace,
340 if_not_exists,
341 name,
342 args,
343 returns,
344 params,
345 ..
346 } => {
347 create_aggregate::handle_create_aggregate(
348 handler_args,
349 or_replace,
350 if_not_exists,
351 name,
352 args,
353 returns,
354 params,
355 )
356 .await
357 }
358 Statement::CreateTable {
359 name,
360 columns,
361 wildcard_idx,
362 constraints,
363 query,
364 with_options: _, or_replace,
367 temporary,
368 if_not_exists,
369 format_encode,
370 source_watermarks,
371 append_only,
372 on_conflict,
373 with_version_column,
374 cdc_table_info,
375 include_column_options,
376 webhook_info,
377 engine,
378 } => {
379 if or_replace {
380 bail_not_implemented!("CREATE OR REPLACE TABLE");
381 }
382 if temporary {
383 bail_not_implemented!("CREATE TEMPORARY TABLE");
384 }
385 if let Some(query) = query {
386 return create_table_as::handle_create_as(
387 handler_args,
388 name,
389 if_not_exists,
390 query,
391 columns,
392 append_only,
393 on_conflict,
394 with_version_column.map(|x| x.real_value()),
395 engine,
396 )
397 .await;
398 }
399 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
400 create_table::handle_create_table(
401 handler_args,
402 name,
403 columns,
404 wildcard_idx,
405 constraints,
406 if_not_exists,
407 format_encode,
408 source_watermarks,
409 append_only,
410 on_conflict,
411 with_version_column.map(|x| x.real_value()),
412 cdc_table_info,
413 include_column_options,
414 webhook_info,
415 engine,
416 )
417 .await
418 }
419 Statement::CreateDatabase {
420 db_name,
421 if_not_exists,
422 owner,
423 resource_group,
424 } => {
425 create_database::handle_create_database(
426 handler_args,
427 db_name,
428 if_not_exists,
429 owner,
430 resource_group,
431 )
432 .await
433 }
434 Statement::CreateSchema {
435 schema_name,
436 if_not_exists,
437 owner,
438 } => {
439 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
440 .await
441 }
442 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
443 Statement::DeclareCursor { stmt } => {
444 declare_cursor::handle_declare_cursor(handler_args, stmt).await
445 }
446 Statement::FetchCursor { stmt } => {
447 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
448 }
449 Statement::CloseCursor { stmt } => {
450 close_cursor::handle_close_cursor(handler_args, stmt).await
451 }
452 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
453 Statement::Grant { .. } => {
454 handle_privilege::handle_grant_privilege(handler_args, stmt).await
455 }
456 Statement::Revoke { .. } => {
457 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
458 }
459 Statement::Describe { name } => describe::handle_describe(handler_args, name),
460 Statement::Discard(..) => discard::handle_discard(handler_args),
461 Statement::ShowObjects {
462 object: show_object,
463 filter,
464 } => show::handle_show_object(handler_args, show_object, filter).await,
465 Statement::ShowCreateObject { create_type, name } => {
466 show::handle_show_create_object(handler_args, create_type, name)
467 }
468 Statement::ShowTransactionIsolationLevel => {
469 transaction::handle_show_isolation_level(handler_args)
470 }
471 Statement::Drop(DropStatement {
472 object_type,
473 object_name,
474 if_exists,
475 drop_mode,
476 }) => {
477 let mut cascade = false;
478 if let AstOption::Some(DropMode::Cascade) = drop_mode {
479 match object_type {
480 ObjectType::MaterializedView
481 | ObjectType::View
482 | ObjectType::Sink
483 | ObjectType::Source
484 | ObjectType::Subscription
485 | ObjectType::Index
486 | ObjectType::Table
487 | ObjectType::Schema => {
488 cascade = true;
489 }
490 ObjectType::Database
491 | ObjectType::User
492 | ObjectType::Connection
493 | ObjectType::Secret => {
494 bail_not_implemented!("DROP CASCADE");
495 }
496 };
497 };
498 match object_type {
499 ObjectType::Table => {
500 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
501 .await
502 }
503 ObjectType::MaterializedView => {
504 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
505 }
506 ObjectType::Index => {
507 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
508 .await
509 }
510 ObjectType::Source => {
511 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
512 .await
513 }
514 ObjectType::Sink => {
515 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
516 }
517 ObjectType::Subscription => {
518 drop_subscription::handle_drop_subscription(
519 handler_args,
520 object_name,
521 if_exists,
522 cascade,
523 )
524 .await
525 }
526 ObjectType::Database => {
527 drop_database::handle_drop_database(
528 handler_args,
529 object_name,
530 if_exists,
531 drop_mode.into(),
532 )
533 .await
534 }
535 ObjectType::Schema => {
536 drop_schema::handle_drop_schema(
537 handler_args,
538 object_name,
539 if_exists,
540 drop_mode.into(),
541 )
542 .await
543 }
544 ObjectType::User => {
545 drop_user::handle_drop_user(
546 handler_args,
547 object_name,
548 if_exists,
549 drop_mode.into(),
550 )
551 .await
552 }
553 ObjectType::View => {
554 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
555 }
556 ObjectType::Connection => {
557 drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
558 .await
559 }
560 ObjectType::Secret => {
561 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
562 }
563 }
564 }
565 Statement::DropFunction {
567 if_exists,
568 func_desc,
569 option,
570 } => {
571 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
572 .await
573 }
574 Statement::DropAggregate {
575 if_exists,
576 func_desc,
577 option,
578 } => {
579 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
580 .await
581 }
582 Statement::Query(_)
583 | Statement::Insert { .. }
584 | Statement::Delete { .. }
585 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
586 Statement::CreateView {
587 materialized,
588 if_not_exists,
589 name,
590 columns,
591 query,
592 with_options: _, or_replace, emit_mode,
595 } => {
596 if or_replace {
597 bail_not_implemented!("CREATE OR REPLACE VIEW");
598 }
599 if materialized {
600 create_mv::handle_create_mv(
601 handler_args,
602 if_not_exists,
603 name,
604 *query,
605 columns,
606 emit_mode,
607 )
608 .await
609 } else {
610 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
611 .await
612 }
613 }
614 Statement::Flush => flush::handle_flush(handler_args).await,
615 Statement::Wait => wait::handle_wait(handler_args).await,
616 Statement::Recover => recover::handle_recover(handler_args).await,
617 Statement::SetVariable {
618 local: _,
619 variable,
620 value,
621 } => {
622 if variable.real_value().eq_ignore_ascii_case("database") {
624 let x = variable::set_var_to_param_str(&value);
625 let res = use_db::handle_use_db(
626 handler_args,
627 ObjectName::from(vec![Ident::new_unchecked(
628 x.unwrap_or("default".to_owned()),
629 )]),
630 )?;
631 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
632 for notice in res.notices() {
633 builder = builder.notice(notice);
634 }
635 return Ok(builder.into());
636 }
637 variable::handle_set(handler_args, variable, value)
638 }
639 Statement::SetTimeZone { local: _, value } => {
640 variable::handle_set_time_zone(handler_args, value)
641 }
642 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
643 Statement::CreateIndex {
644 name,
645 table_name,
646 columns,
647 include,
648 distributed_by,
649 unique,
650 if_not_exists,
651 } => {
652 if unique {
653 bail_not_implemented!("create unique index");
654 }
655
656 create_index::handle_create_index(
657 handler_args,
658 if_not_exists,
659 name,
660 table_name,
661 columns.to_vec(),
662 include,
663 distributed_by,
664 )
665 .await
666 }
667 Statement::AlterDatabase { name, operation } => match operation {
668 AlterDatabaseOperation::RenameDatabase { database_name } => {
669 alter_rename::handle_rename_database(handler_args, name, database_name).await
670 }
671 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
672 alter_owner::handle_alter_owner(
673 handler_args,
674 name,
675 new_owner_name,
676 StatementType::ALTER_DATABASE,
677 )
678 .await
679 }
680 },
681 Statement::AlterSchema { name, operation } => match operation {
682 AlterSchemaOperation::RenameSchema { schema_name } => {
683 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
684 }
685 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
686 alter_owner::handle_alter_owner(
687 handler_args,
688 name,
689 new_owner_name,
690 StatementType::ALTER_SCHEMA,
691 )
692 .await
693 }
694 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
695 alter_swap_rename::handle_swap_rename(
696 handler_args,
697 name,
698 target_schema,
699 StatementType::ALTER_SCHEMA,
700 )
701 .await
702 }
703 },
704 Statement::AlterTable { name, operation } => match operation {
705 AlterTableOperation::AddColumn { .. }
706 | AlterTableOperation::DropColumn { .. }
707 | AlterTableOperation::AlterColumn { .. } => {
708 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
709 }
710 AlterTableOperation::RenameTable { table_name } => {
711 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
712 .await
713 }
714 AlterTableOperation::ChangeOwner { new_owner_name } => {
715 alter_owner::handle_alter_owner(
716 handler_args,
717 name,
718 new_owner_name,
719 StatementType::ALTER_TABLE,
720 )
721 .await
722 }
723 AlterTableOperation::SetParallelism {
724 parallelism,
725 deferred,
726 } => {
727 alter_parallelism::handle_alter_parallelism(
728 handler_args,
729 name,
730 parallelism,
731 StatementType::ALTER_TABLE,
732 deferred,
733 )
734 .await
735 }
736 AlterTableOperation::SetSchema { new_schema_name } => {
737 alter_set_schema::handle_alter_set_schema(
738 handler_args,
739 name,
740 new_schema_name,
741 StatementType::ALTER_TABLE,
742 None,
743 )
744 .await
745 }
746 AlterTableOperation::RefreshSchema => {
747 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
748 }
749 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
750 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
751 handler_args,
752 PbThrottleTarget::TableWithSource,
753 name,
754 rate_limit,
755 )
756 .await
757 }
758 AlterTableOperation::DropConnector => {
759 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
760 .await
761 }
762 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
763 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
764 handler_args,
765 PbThrottleTarget::TableDml,
766 name,
767 rate_limit,
768 )
769 .await
770 }
771 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
772 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
773 handler_args,
774 PbThrottleTarget::CdcTable,
775 name,
776 rate_limit,
777 )
778 .await
779 }
780 AlterTableOperation::SwapRenameTable { target_table } => {
781 alter_swap_rename::handle_swap_rename(
782 handler_args,
783 name,
784 target_table,
785 StatementType::ALTER_TABLE,
786 )
787 .await
788 }
789 AlterTableOperation::AddConstraint { .. }
790 | AlterTableOperation::DropConstraint { .. }
791 | AlterTableOperation::RenameColumn { .. }
792 | AlterTableOperation::ChangeColumn { .. }
793 | AlterTableOperation::RenameConstraint { .. } => {
794 bail_not_implemented!(
795 "Unhandled statement: {}",
796 Statement::AlterTable { name, operation }
797 )
798 }
799 },
800 Statement::AlterIndex { name, operation } => match operation {
801 AlterIndexOperation::RenameIndex { index_name } => {
802 alter_rename::handle_rename_index(handler_args, name, index_name).await
803 }
804 AlterIndexOperation::SetParallelism {
805 parallelism,
806 deferred,
807 } => {
808 alter_parallelism::handle_alter_parallelism(
809 handler_args,
810 name,
811 parallelism,
812 StatementType::ALTER_INDEX,
813 deferred,
814 )
815 .await
816 }
817 },
818 Statement::AlterView {
819 materialized,
820 name,
821 operation,
822 } => {
823 let statement_type = if materialized {
824 StatementType::ALTER_MATERIALIZED_VIEW
825 } else {
826 StatementType::ALTER_VIEW
827 };
828 match operation {
829 AlterViewOperation::RenameView { view_name } => {
830 if materialized {
831 alter_rename::handle_rename_table(
832 handler_args,
833 TableType::MaterializedView,
834 name,
835 view_name,
836 )
837 .await
838 } else {
839 alter_rename::handle_rename_view(handler_args, name, view_name).await
840 }
841 }
842 AlterViewOperation::SetParallelism {
843 parallelism,
844 deferred,
845 } => {
846 if !materialized {
847 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
848 }
849 alter_parallelism::handle_alter_parallelism(
850 handler_args,
851 name,
852 parallelism,
853 statement_type,
854 deferred,
855 )
856 .await
857 }
858 AlterViewOperation::SetResourceGroup {
859 resource_group,
860 deferred,
861 } => {
862 if !materialized {
863 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
864 }
865 alter_resource_group::handle_alter_resource_group(
866 handler_args,
867 name,
868 resource_group,
869 statement_type,
870 deferred,
871 )
872 .await
873 }
874 AlterViewOperation::ChangeOwner { new_owner_name } => {
875 alter_owner::handle_alter_owner(
876 handler_args,
877 name,
878 new_owner_name,
879 statement_type,
880 )
881 .await
882 }
883 AlterViewOperation::SetSchema { new_schema_name } => {
884 alter_set_schema::handle_alter_set_schema(
885 handler_args,
886 name,
887 new_schema_name,
888 statement_type,
889 None,
890 )
891 .await
892 }
893 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
894 if !materialized {
895 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
896 }
897 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
898 handler_args,
899 PbThrottleTarget::Mv,
900 name,
901 rate_limit,
902 )
903 .await
904 }
905 AlterViewOperation::SwapRenameView { target_view } => {
906 alter_swap_rename::handle_swap_rename(
907 handler_args,
908 name,
909 target_view,
910 statement_type,
911 )
912 .await
913 }
914 }
915 }
916 Statement::AlterSink { name, operation } => match operation {
917 AlterSinkOperation::RenameSink { sink_name } => {
918 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
919 }
920 AlterSinkOperation::ChangeOwner { new_owner_name } => {
921 alter_owner::handle_alter_owner(
922 handler_args,
923 name,
924 new_owner_name,
925 StatementType::ALTER_SINK,
926 )
927 .await
928 }
929 AlterSinkOperation::SetSchema { new_schema_name } => {
930 alter_set_schema::handle_alter_set_schema(
931 handler_args,
932 name,
933 new_schema_name,
934 StatementType::ALTER_SINK,
935 None,
936 )
937 .await
938 }
939 AlterSinkOperation::SetParallelism {
940 parallelism,
941 deferred,
942 } => {
943 alter_parallelism::handle_alter_parallelism(
944 handler_args,
945 name,
946 parallelism,
947 StatementType::ALTER_SINK,
948 deferred,
949 )
950 .await
951 }
952 AlterSinkOperation::SwapRenameSink { target_sink } => {
953 alter_swap_rename::handle_swap_rename(
954 handler_args,
955 name,
956 target_sink,
957 StatementType::ALTER_SINK,
958 )
959 .await
960 }
961 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
962 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
963 handler_args,
964 PbThrottleTarget::Sink,
965 name,
966 rate_limit,
967 )
968 .await
969 }
970 },
971 Statement::AlterSubscription { name, operation } => match operation {
972 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
973 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
974 .await
975 }
976 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
977 alter_owner::handle_alter_owner(
978 handler_args,
979 name,
980 new_owner_name,
981 StatementType::ALTER_SUBSCRIPTION,
982 )
983 .await
984 }
985 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
986 alter_set_schema::handle_alter_set_schema(
987 handler_args,
988 name,
989 new_schema_name,
990 StatementType::ALTER_SUBSCRIPTION,
991 None,
992 )
993 .await
994 }
995 AlterSubscriptionOperation::SwapRenameSubscription {
996 target_subscription,
997 } => {
998 alter_swap_rename::handle_swap_rename(
999 handler_args,
1000 name,
1001 target_subscription,
1002 StatementType::ALTER_SUBSCRIPTION,
1003 )
1004 .await
1005 }
1006 },
1007 Statement::AlterSource { name, operation } => match operation {
1008 AlterSourceOperation::RenameSource { source_name } => {
1009 alter_rename::handle_rename_source(handler_args, name, source_name).await
1010 }
1011 AlterSourceOperation::AddColumn { .. } => {
1012 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1013 }
1014 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1015 alter_owner::handle_alter_owner(
1016 handler_args,
1017 name,
1018 new_owner_name,
1019 StatementType::ALTER_SOURCE,
1020 )
1021 .await
1022 }
1023 AlterSourceOperation::SetSchema { new_schema_name } => {
1024 alter_set_schema::handle_alter_set_schema(
1025 handler_args,
1026 name,
1027 new_schema_name,
1028 StatementType::ALTER_SOURCE,
1029 None,
1030 )
1031 .await
1032 }
1033 AlterSourceOperation::FormatEncode { format_encode } => {
1034 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1035 .await
1036 }
1037 AlterSourceOperation::RefreshSchema => {
1038 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1039 }
1040 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1041 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1042 handler_args,
1043 PbThrottleTarget::Source,
1044 name,
1045 rate_limit,
1046 )
1047 .await
1048 }
1049 AlterSourceOperation::SwapRenameSource { target_source } => {
1050 alter_swap_rename::handle_swap_rename(
1051 handler_args,
1052 name,
1053 target_source,
1054 StatementType::ALTER_SOURCE,
1055 )
1056 .await
1057 }
1058 AlterSourceOperation::SetParallelism {
1059 parallelism,
1060 deferred,
1061 } => {
1062 alter_parallelism::handle_alter_parallelism(
1063 handler_args,
1064 name,
1065 parallelism,
1066 StatementType::ALTER_SOURCE,
1067 deferred,
1068 )
1069 .await
1070 }
1071 },
1072 Statement::AlterFunction {
1073 name,
1074 args,
1075 operation,
1076 } => match operation {
1077 AlterFunctionOperation::SetSchema { new_schema_name } => {
1078 alter_set_schema::handle_alter_set_schema(
1079 handler_args,
1080 name,
1081 new_schema_name,
1082 StatementType::ALTER_FUNCTION,
1083 args,
1084 )
1085 .await
1086 }
1087 },
1088 Statement::AlterConnection { name, operation } => match operation {
1089 AlterConnectionOperation::SetSchema { new_schema_name } => {
1090 alter_set_schema::handle_alter_set_schema(
1091 handler_args,
1092 name,
1093 new_schema_name,
1094 StatementType::ALTER_CONNECTION,
1095 None,
1096 )
1097 .await
1098 }
1099 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1100 alter_owner::handle_alter_owner(
1101 handler_args,
1102 name,
1103 new_owner_name,
1104 StatementType::ALTER_CONNECTION,
1105 )
1106 .await
1107 }
1108 },
1109 Statement::AlterSystem { param, value } => {
1110 alter_system::handle_alter_system(handler_args, param, value).await
1111 }
1112 Statement::AlterSecret {
1113 name,
1114 with_options,
1115 operation,
1116 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1117 Statement::AlterFragment {
1118 fragment_id,
1119 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1120 } => {
1121 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1122 &handler_args.session,
1123 PbThrottleTarget::Fragment,
1124 fragment_id,
1125 rate_limit,
1126 StatementType::SET_VARIABLE,
1127 )
1128 .await
1129 }
1130 Statement::StartTransaction { modes } => {
1131 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1132 }
1133 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1134 Statement::Commit { chain } => {
1135 transaction::handle_commit(handler_args, COMMIT, chain).await
1136 }
1137 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1138 Statement::Rollback { chain } => {
1139 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1140 }
1141 Statement::SetTransaction {
1142 modes,
1143 snapshot,
1144 session,
1145 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1146 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1147 Statement::Kill(process_id) => handle_kill(handler_args, process_id).await,
1148 Statement::Comment {
1149 object_type,
1150 object_name,
1151 comment,
1152 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1153 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1154 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1155 }
1156}
1157
1158fn check_ban_ddl_for_iceberg_engine_table(
1159 session: Arc<SessionImpl>,
1160 stmt: &Statement,
1161) -> Result<()> {
1162 match stmt {
1163 Statement::AlterTable {
1164 name,
1165 operation:
1166 operation @ (AlterTableOperation::AddColumn { .. }
1167 | AlterTableOperation::DropColumn { .. }),
1168 } => {
1169 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1170 if table.is_iceberg_engine_table() {
1171 bail!(
1172 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1173 operation,
1174 schema_name,
1175 name
1176 );
1177 }
1178 }
1179
1180 Statement::AlterTable {
1181 name,
1182 operation: AlterTableOperation::RenameTable { .. },
1183 } => {
1184 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1185 if table.is_iceberg_engine_table() {
1186 bail!(
1187 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1188 schema_name,
1189 name
1190 );
1191 }
1192 }
1193
1194 Statement::AlterTable {
1195 name,
1196 operation: AlterTableOperation::ChangeOwner { .. },
1197 } => {
1198 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1199 if table.is_iceberg_engine_table() {
1200 bail!(
1201 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1202 schema_name,
1203 name
1204 );
1205 }
1206 }
1207
1208 Statement::AlterTable {
1209 name,
1210 operation: AlterTableOperation::SetParallelism { .. },
1211 } => {
1212 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1213 if table.is_iceberg_engine_table() {
1214 bail!(
1215 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1216 schema_name,
1217 name
1218 );
1219 }
1220 }
1221
1222 Statement::AlterTable {
1223 name,
1224 operation: AlterTableOperation::SetSchema { .. },
1225 } => {
1226 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1227 if table.is_iceberg_engine_table() {
1228 bail!(
1229 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1230 schema_name,
1231 name
1232 );
1233 }
1234 }
1235
1236 Statement::AlterTable {
1237 name,
1238 operation: AlterTableOperation::RefreshSchema,
1239 } => {
1240 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1241 if table.is_iceberg_engine_table() {
1242 bail!(
1243 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1244 schema_name,
1245 name
1246 );
1247 }
1248 }
1249
1250 Statement::AlterTable {
1251 name,
1252 operation: AlterTableOperation::SetSourceRateLimit { .. },
1253 } => {
1254 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1255 if table.is_iceberg_engine_table() {
1256 bail!(
1257 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1258 schema_name,
1259 name
1260 );
1261 }
1262 }
1263
1264 _ => {}
1265 }
1266
1267 Ok(())
1268}