1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63mod alter_table_with_sr;
64pub mod alter_user;
65pub mod cancel_job;
66pub mod close_cursor;
67mod comment;
68pub mod create_aggregate;
69pub mod create_connection;
70mod create_database;
71pub mod create_function;
72pub mod create_index;
73pub mod create_mv;
74pub mod create_schema;
75pub mod create_secret;
76pub mod create_sink;
77pub mod create_source;
78pub mod create_sql_function;
79pub mod create_subscription;
80pub mod create_table;
81pub mod create_table_as;
82pub mod create_user;
83pub mod create_view;
84pub mod declare_cursor;
85pub mod describe;
86pub mod discard;
87mod drop_connection;
88mod drop_database;
89pub mod drop_function;
90mod drop_index;
91pub mod drop_mv;
92mod drop_schema;
93pub mod drop_secret;
94pub mod drop_sink;
95pub mod drop_source;
96pub mod drop_subscription;
97pub mod drop_table;
98pub mod drop_user;
99mod drop_view;
100pub mod explain;
101pub mod explain_analyze_stream_job;
102pub mod extended_handle;
103pub mod fetch_cursor;
104mod flush;
105pub mod handle_privilege;
106pub mod kill_process;
107mod prepared_statement;
108pub mod privilege;
109pub mod query;
110mod recover;
111mod refresh;
112pub mod show;
113mod transaction;
114mod use_db;
115pub mod util;
116pub mod vacuum;
117pub mod variable;
118mod wait;
119
120pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
121
122pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
124
125pub type RwPgResponse = PgResponse<PgResponseStream>;
127
128#[easy_ext::ext(RwPgResponseBuilderExt)]
129impl RwPgResponseBuilder {
130 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
132 let fields = T::fields();
133 self.values(
134 rows.into_iter()
135 .map(|row| {
136 Row::new(
137 row.into_owned_row()
138 .into_iter()
139 .zip_eq_fast(&fields)
140 .map(|(datum, (_, ty))| {
141 datum.map(|scalar| {
142 scalar.as_scalar_ref_impl().text_format(ty).into()
143 })
144 })
145 .collect(),
146 )
147 })
148 .collect_vec()
149 .into(),
150 fields_to_descriptors(fields),
151 )
152 }
153}
154
155pub fn fields_to_descriptors(
156 fields: Vec<(&str, risingwave_common::types::DataType)>,
157) -> Vec<PgFieldDescriptor> {
158 fields
159 .iter()
160 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
161 .collect()
162}
163
164pub enum PgResponseStream {
165 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
166 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
167 Rows(BoxStream<'static, RowSetResult>),
168}
169
170impl Stream for PgResponseStream {
171 type Item = std::result::Result<Vec<Row>, BoxedError>;
172
173 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174 match &mut *self {
175 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
176 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
177 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
178 }
179 }
180}
181
182impl From<Vec<Row>> for PgResponseStream {
183 fn from(rows: Vec<Row>) -> Self {
184 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
185 }
186}
187
188#[derive(Clone)]
189pub struct HandlerArgs {
190 pub session: Arc<SessionImpl>,
191 pub sql: Arc<str>,
192 pub normalized_sql: String,
193 pub with_options: WithOptions,
194}
195
196impl HandlerArgs {
197 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
198 Ok(Self {
199 session,
200 sql,
201 with_options: WithOptions::try_from(stmt)?,
202 normalized_sql: Self::normalize_sql(stmt),
203 })
204 }
205
206 fn normalize_sql(stmt: &Statement) -> String {
212 let mut stmt = stmt.clone();
213 match &mut stmt {
214 Statement::CreateView {
215 or_replace,
216 if_not_exists,
217 ..
218 } => {
219 *or_replace = false;
220 *if_not_exists = false;
221 }
222 Statement::CreateTable {
223 or_replace,
224 if_not_exists,
225 ..
226 } => {
227 *or_replace = false;
228 *if_not_exists = false;
229 }
230 Statement::CreateIndex { if_not_exists, .. } => {
231 *if_not_exists = false;
232 }
233 Statement::CreateSource {
234 stmt: CreateSourceStatement { if_not_exists, .. },
235 ..
236 } => {
237 *if_not_exists = false;
238 }
239 Statement::CreateSink {
240 stmt: CreateSinkStatement { if_not_exists, .. },
241 } => {
242 *if_not_exists = false;
243 }
244 Statement::CreateSubscription {
245 stmt: CreateSubscriptionStatement { if_not_exists, .. },
246 } => {
247 *if_not_exists = false;
248 }
249 Statement::CreateConnection {
250 stmt: CreateConnectionStatement { if_not_exists, .. },
251 } => {
252 *if_not_exists = false;
253 }
254 _ => {}
255 }
256 stmt.to_string()
257 }
258}
259
260pub async fn handle(
261 session: Arc<SessionImpl>,
262 stmt: Statement,
263 sql: Arc<str>,
264 formats: Vec<Format>,
265) -> Result<RwPgResponse> {
266 session.clear_cancel_query_flag();
267 let _guard = session.txn_begin_implicit();
268 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
269
270 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
271
272 match stmt {
273 Statement::Explain {
274 statement,
275 analyze,
276 options,
277 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
278 Statement::ExplainAnalyzeStreamJob {
279 target,
280 duration_secs,
281 } => {
282 explain_analyze_stream_job::handle_explain_analyze_stream_job(
283 handler_args,
284 target,
285 duration_secs,
286 )
287 .await
288 }
289 Statement::CreateSource { stmt } => {
290 create_source::handle_create_source(handler_args, stmt).await
291 }
292 Statement::CreateSink { stmt } => {
293 create_sink::handle_create_sink(handler_args, stmt, false).await
294 }
295 Statement::CreateSubscription { stmt } => {
296 create_subscription::handle_create_subscription(handler_args, stmt).await
297 }
298 Statement::CreateConnection { stmt } => {
299 create_connection::handle_create_connection(handler_args, stmt).await
300 }
301 Statement::CreateSecret { stmt } => {
302 create_secret::handle_create_secret(handler_args, stmt).await
303 }
304 Statement::CreateFunction {
305 or_replace,
306 temporary,
307 if_not_exists,
308 name,
309 args,
310 returns,
311 params,
312 with_options,
313 } => {
314 if params.language.is_none()
317 || !params
318 .language
319 .as_ref()
320 .unwrap()
321 .real_value()
322 .eq_ignore_ascii_case("sql")
323 {
324 create_function::handle_create_function(
325 handler_args,
326 or_replace,
327 temporary,
328 if_not_exists,
329 name,
330 args,
331 returns,
332 params,
333 with_options,
334 )
335 .await
336 } else {
337 create_sql_function::handle_create_sql_function(
338 handler_args,
339 or_replace,
340 temporary,
341 if_not_exists,
342 name,
343 args,
344 returns,
345 params,
346 )
347 .await
348 }
349 }
350 Statement::CreateAggregate {
351 or_replace,
352 if_not_exists,
353 name,
354 args,
355 returns,
356 params,
357 ..
358 } => {
359 create_aggregate::handle_create_aggregate(
360 handler_args,
361 or_replace,
362 if_not_exists,
363 name,
364 args,
365 returns,
366 params,
367 )
368 .await
369 }
370 Statement::CreateTable {
371 name,
372 columns,
373 wildcard_idx,
374 constraints,
375 query,
376 with_options: _, or_replace,
379 temporary,
380 if_not_exists,
381 format_encode,
382 source_watermarks,
383 append_only,
384 on_conflict,
385 with_version_columns,
386 cdc_table_info,
387 include_column_options,
388 webhook_info,
389 engine,
390 } => {
391 if or_replace {
392 bail_not_implemented!("CREATE OR REPLACE TABLE");
393 }
394 if temporary {
395 bail_not_implemented!("CREATE TEMPORARY TABLE");
396 }
397 if let Some(query) = query {
398 return create_table_as::handle_create_as(
399 handler_args,
400 name,
401 if_not_exists,
402 query,
403 columns,
404 append_only,
405 on_conflict,
406 with_version_columns
407 .iter()
408 .map(|col| col.real_value())
409 .collect(),
410 engine,
411 )
412 .await;
413 }
414 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
415 create_table::handle_create_table(
416 handler_args,
417 name,
418 columns,
419 wildcard_idx,
420 constraints,
421 if_not_exists,
422 format_encode,
423 source_watermarks,
424 append_only,
425 on_conflict,
426 with_version_columns
427 .iter()
428 .map(|col| col.real_value())
429 .collect(),
430 cdc_table_info,
431 include_column_options,
432 webhook_info,
433 engine,
434 )
435 .await
436 }
437 Statement::CreateDatabase {
438 db_name,
439 if_not_exists,
440 owner,
441 resource_group,
442 barrier_interval_ms,
443 checkpoint_frequency,
444 } => {
445 create_database::handle_create_database(
446 handler_args,
447 db_name,
448 if_not_exists,
449 owner,
450 resource_group,
451 barrier_interval_ms,
452 checkpoint_frequency,
453 )
454 .await
455 }
456 Statement::CreateSchema {
457 schema_name,
458 if_not_exists,
459 owner,
460 } => {
461 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
462 .await
463 }
464 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
465 Statement::DeclareCursor { stmt } => {
466 declare_cursor::handle_declare_cursor(handler_args, stmt).await
467 }
468 Statement::FetchCursor { stmt } => {
469 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
470 }
471 Statement::CloseCursor { stmt } => {
472 close_cursor::handle_close_cursor(handler_args, stmt).await
473 }
474 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
475 Statement::Grant { .. } => {
476 handle_privilege::handle_grant_privilege(handler_args, stmt).await
477 }
478 Statement::Revoke { .. } => {
479 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
480 }
481 Statement::Describe { name, kind } => match kind {
482 DescribeKind::Fragments => {
483 describe::handle_describe_fragments(handler_args, name).await
484 }
485 DescribeKind::Plain => describe::handle_describe(handler_args, name),
486 },
487 Statement::DescribeFragment { fragment_id } => {
488 describe::handle_describe_fragment(handler_args, fragment_id).await
489 }
490 Statement::Discard(..) => discard::handle_discard(handler_args),
491 Statement::ShowObjects {
492 object: show_object,
493 filter,
494 } => show::handle_show_object(handler_args, show_object, filter).await,
495 Statement::ShowCreateObject { create_type, name } => {
496 show::handle_show_create_object(handler_args, create_type, name)
497 }
498 Statement::ShowTransactionIsolationLevel => {
499 transaction::handle_show_isolation_level(handler_args)
500 }
501 Statement::Drop(DropStatement {
502 object_type,
503 object_name,
504 if_exists,
505 drop_mode,
506 }) => {
507 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
508 match object_type {
509 ObjectType::MaterializedView
510 | ObjectType::View
511 | ObjectType::Sink
512 | ObjectType::Source
513 | ObjectType::Subscription
514 | ObjectType::Index
515 | ObjectType::Table
516 | ObjectType::Schema
517 | ObjectType::Connection => true,
518 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
519 bail_not_implemented!("DROP CASCADE");
520 }
521 }
522 } else {
523 false
524 };
525 match object_type {
526 ObjectType::Table => {
527 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
528 .await
529 }
530 ObjectType::MaterializedView => {
531 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
532 }
533 ObjectType::Index => {
534 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
535 .await
536 }
537 ObjectType::Source => {
538 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
539 .await
540 }
541 ObjectType::Sink => {
542 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
543 }
544 ObjectType::Subscription => {
545 drop_subscription::handle_drop_subscription(
546 handler_args,
547 object_name,
548 if_exists,
549 cascade,
550 )
551 .await
552 }
553 ObjectType::Database => {
554 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
555 }
556 ObjectType::Schema => {
557 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
558 .await
559 }
560 ObjectType::User => {
561 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
562 }
563 ObjectType::View => {
564 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
565 }
566 ObjectType::Connection => {
567 drop_connection::handle_drop_connection(
568 handler_args,
569 object_name,
570 if_exists,
571 cascade,
572 )
573 .await
574 }
575 ObjectType::Secret => {
576 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
577 }
578 }
579 }
580 Statement::DropFunction {
582 if_exists,
583 func_desc,
584 option,
585 } => {
586 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
587 .await
588 }
589 Statement::DropAggregate {
590 if_exists,
591 func_desc,
592 option,
593 } => {
594 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
595 .await
596 }
597 Statement::Query(_)
598 | Statement::Insert { .. }
599 | Statement::Delete { .. }
600 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
601 Statement::CreateView {
602 materialized,
603 if_not_exists,
604 name,
605 columns,
606 query,
607 with_options: _, or_replace, emit_mode,
610 } => {
611 if or_replace {
612 bail_not_implemented!("CREATE OR REPLACE VIEW");
613 }
614 if materialized {
615 create_mv::handle_create_mv(
616 handler_args,
617 if_not_exists,
618 name,
619 *query,
620 columns,
621 emit_mode,
622 )
623 .await
624 } else {
625 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
626 .await
627 }
628 }
629 Statement::Flush => flush::handle_flush(handler_args).await,
630 Statement::Wait => wait::handle_wait(handler_args).await,
631 Statement::Recover => recover::handle_recover(handler_args).await,
632 Statement::SetVariable {
633 local: _,
634 variable,
635 value,
636 } => {
637 if variable.real_value().eq_ignore_ascii_case("database") {
639 let x = variable::set_var_to_param_str(&value);
640 let res = use_db::handle_use_db(
641 handler_args,
642 ObjectName::from(vec![Ident::from_real_value(
643 x.as_deref().unwrap_or("default"),
644 )]),
645 )?;
646 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
647 for notice in res.notices() {
648 builder = builder.notice(notice);
649 }
650 return Ok(builder.into());
651 }
652 variable::handle_set(handler_args, variable, value)
653 }
654 Statement::SetTimeZone { local: _, value } => {
655 variable::handle_set_time_zone(handler_args, value)
656 }
657 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
658 Statement::CreateIndex {
659 name,
660 table_name,
661 method,
662 columns,
663 include,
664 distributed_by,
665 unique,
666 if_not_exists,
667 with_properties: _,
668 } => {
669 if unique {
670 bail_not_implemented!("create unique index");
671 }
672
673 create_index::handle_create_index(
674 handler_args,
675 if_not_exists,
676 name,
677 table_name,
678 method,
679 columns.to_vec(),
680 include,
681 distributed_by,
682 )
683 .await
684 }
685 Statement::AlterDatabase { name, operation } => match operation {
686 AlterDatabaseOperation::RenameDatabase { database_name } => {
687 alter_rename::handle_rename_database(handler_args, name, database_name).await
688 }
689 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
690 alter_owner::handle_alter_owner(
691 handler_args,
692 name,
693 new_owner_name,
694 StatementType::ALTER_DATABASE,
695 )
696 .await
697 }
698 AlterDatabaseOperation::SetParam(config_param) => {
699 let ConfigParam { param, value } = config_param;
700
701 let database_param = match param.real_value().to_uppercase().as_str() {
702 "BARRIER_INTERVAL_MS" => {
703 let barrier_interval_ms = match value {
704 SetVariableValue::Default => None,
705 SetVariableValue::Single(SetVariableValueSingle::Literal(
706 Value::Number(num),
707 )) => {
708 let num = num.parse::<u32>().map_err(|e| {
709 ErrorCode::InvalidInputSyntax(format!(
710 "barrier_interval_ms must be a u32 integer: {}",
711 e.as_report()
712 ))
713 })?;
714 Some(num)
715 }
716 _ => {
717 return Err(ErrorCode::InvalidInputSyntax(
718 "barrier_interval_ms must be a u32 integer or DEFAULT"
719 .to_owned(),
720 )
721 .into());
722 }
723 };
724 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
725 }
726 "CHECKPOINT_FREQUENCY" => {
727 let checkpoint_frequency = match value {
728 SetVariableValue::Default => None,
729 SetVariableValue::Single(SetVariableValueSingle::Literal(
730 Value::Number(num),
731 )) => {
732 let num = num.parse::<u64>().map_err(|e| {
733 ErrorCode::InvalidInputSyntax(format!(
734 "checkpoint_frequency must be a u64 integer: {}",
735 e.as_report()
736 ))
737 })?;
738 Some(num)
739 }
740 _ => {
741 return Err(ErrorCode::InvalidInputSyntax(
742 "checkpoint_frequency must be a u64 integer or DEFAULT"
743 .to_owned(),
744 )
745 .into());
746 }
747 };
748 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
749 }
750 _ => {
751 return Err(ErrorCode::InvalidInputSyntax(format!(
752 "Unsupported database config parameter: {}",
753 param.real_value()
754 ))
755 .into());
756 }
757 };
758
759 alter_database_param::handle_alter_database_param(
760 handler_args,
761 name,
762 database_param,
763 )
764 .await
765 }
766 },
767 Statement::AlterSchema { name, operation } => match operation {
768 AlterSchemaOperation::RenameSchema { schema_name } => {
769 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
770 }
771 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
772 alter_owner::handle_alter_owner(
773 handler_args,
774 name,
775 new_owner_name,
776 StatementType::ALTER_SCHEMA,
777 )
778 .await
779 }
780 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
781 alter_swap_rename::handle_swap_rename(
782 handler_args,
783 name,
784 target_schema,
785 StatementType::ALTER_SCHEMA,
786 )
787 .await
788 }
789 },
790 Statement::AlterTable { name, operation } => match operation {
791 AlterTableOperation::AddColumn { .. }
792 | AlterTableOperation::DropColumn { .. }
793 | AlterTableOperation::AlterColumn { .. } => {
794 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
795 }
796 AlterTableOperation::RenameTable { table_name } => {
797 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
798 .await
799 }
800 AlterTableOperation::ChangeOwner { new_owner_name } => {
801 alter_owner::handle_alter_owner(
802 handler_args,
803 name,
804 new_owner_name,
805 StatementType::ALTER_TABLE,
806 )
807 .await
808 }
809 AlterTableOperation::SetParallelism {
810 parallelism,
811 deferred,
812 } => {
813 alter_parallelism::handle_alter_parallelism(
814 handler_args,
815 name,
816 parallelism,
817 StatementType::ALTER_TABLE,
818 deferred,
819 )
820 .await
821 }
822 AlterTableOperation::SetSchema { new_schema_name } => {
823 alter_set_schema::handle_alter_set_schema(
824 handler_args,
825 name,
826 new_schema_name,
827 StatementType::ALTER_TABLE,
828 None,
829 )
830 .await
831 }
832 AlterTableOperation::RefreshSchema => {
833 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
834 }
835 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
836 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
837 handler_args,
838 PbThrottleTarget::TableWithSource,
839 name,
840 rate_limit,
841 )
842 .await
843 }
844 AlterTableOperation::DropConnector => {
845 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
846 .await
847 }
848 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
849 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
850 handler_args,
851 PbThrottleTarget::TableDml,
852 name,
853 rate_limit,
854 )
855 .await
856 }
857 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
858 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
859 handler_args,
860 PbThrottleTarget::CdcTable,
861 name,
862 rate_limit,
863 )
864 .await
865 }
866 AlterTableOperation::SwapRenameTable { target_table } => {
867 alter_swap_rename::handle_swap_rename(
868 handler_args,
869 name,
870 target_table,
871 StatementType::ALTER_TABLE,
872 )
873 .await
874 }
875 AlterTableOperation::AlterConnectorProps { alter_props } => {
876 crate::handler::alter_source_props::handle_alter_table_connector_props(
878 handler_args,
879 name,
880 alter_props,
881 )
882 .await
883 }
884 AlterTableOperation::AddConstraint { .. }
885 | AlterTableOperation::DropConstraint { .. }
886 | AlterTableOperation::RenameColumn { .. }
887 | AlterTableOperation::ChangeColumn { .. }
888 | AlterTableOperation::RenameConstraint { .. } => {
889 bail_not_implemented!(
890 "Unhandled statement: {}",
891 Statement::AlterTable { name, operation }
892 )
893 }
894 },
895 Statement::AlterIndex { name, operation } => match operation {
896 AlterIndexOperation::RenameIndex { index_name } => {
897 alter_rename::handle_rename_index(handler_args, name, index_name).await
898 }
899 AlterIndexOperation::SetParallelism {
900 parallelism,
901 deferred,
902 } => {
903 alter_parallelism::handle_alter_parallelism(
904 handler_args,
905 name,
906 parallelism,
907 StatementType::ALTER_INDEX,
908 deferred,
909 )
910 .await
911 }
912 },
913 Statement::AlterView {
914 materialized,
915 name,
916 operation,
917 } => {
918 let statement_type = if materialized {
919 StatementType::ALTER_MATERIALIZED_VIEW
920 } else {
921 StatementType::ALTER_VIEW
922 };
923 match operation {
924 AlterViewOperation::RenameView { view_name } => {
925 if materialized {
926 alter_rename::handle_rename_table(
927 handler_args,
928 TableType::MaterializedView,
929 name,
930 view_name,
931 )
932 .await
933 } else {
934 alter_rename::handle_rename_view(handler_args, name, view_name).await
935 }
936 }
937 AlterViewOperation::SetParallelism {
938 parallelism,
939 deferred,
940 } => {
941 if !materialized {
942 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
943 }
944 alter_parallelism::handle_alter_parallelism(
945 handler_args,
946 name,
947 parallelism,
948 statement_type,
949 deferred,
950 )
951 .await
952 }
953 AlterViewOperation::SetResourceGroup {
954 resource_group,
955 deferred,
956 } => {
957 if !materialized {
958 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
959 }
960 alter_resource_group::handle_alter_resource_group(
961 handler_args,
962 name,
963 resource_group,
964 statement_type,
965 deferred,
966 )
967 .await
968 }
969 AlterViewOperation::ChangeOwner { new_owner_name } => {
970 alter_owner::handle_alter_owner(
971 handler_args,
972 name,
973 new_owner_name,
974 statement_type,
975 )
976 .await
977 }
978 AlterViewOperation::SetSchema { new_schema_name } => {
979 alter_set_schema::handle_alter_set_schema(
980 handler_args,
981 name,
982 new_schema_name,
983 statement_type,
984 None,
985 )
986 .await
987 }
988 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
989 if !materialized {
990 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
991 }
992 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
993 handler_args,
994 PbThrottleTarget::Mv,
995 name,
996 rate_limit,
997 )
998 .await
999 }
1000 AlterViewOperation::SwapRenameView { target_view } => {
1001 alter_swap_rename::handle_swap_rename(
1002 handler_args,
1003 name,
1004 target_view,
1005 statement_type,
1006 )
1007 .await
1008 }
1009 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1010 if !materialized {
1011 bail!(
1012 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1013 );
1014 }
1015 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1016 }
1017 AlterViewOperation::AsQuery { query } => {
1018 if !materialized {
1019 bail_not_implemented!("ALTER VIEW AS QUERY");
1020 }
1021 if !cfg!(debug_assertions) {
1023 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1024 }
1025 alter_mv::handle_alter_mv(handler_args, name, query).await
1026 }
1027 }
1028 }
1029
1030 Statement::AlterSink { name, operation } => match operation {
1031 AlterSinkOperation::AlterConnectorProps {
1032 alter_props: changed_props,
1033 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1034 AlterSinkOperation::RenameSink { sink_name } => {
1035 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1036 }
1037 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1038 alter_owner::handle_alter_owner(
1039 handler_args,
1040 name,
1041 new_owner_name,
1042 StatementType::ALTER_SINK,
1043 )
1044 .await
1045 }
1046 AlterSinkOperation::SetSchema { new_schema_name } => {
1047 alter_set_schema::handle_alter_set_schema(
1048 handler_args,
1049 name,
1050 new_schema_name,
1051 StatementType::ALTER_SINK,
1052 None,
1053 )
1054 .await
1055 }
1056 AlterSinkOperation::SetParallelism {
1057 parallelism,
1058 deferred,
1059 } => {
1060 alter_parallelism::handle_alter_parallelism(
1061 handler_args,
1062 name,
1063 parallelism,
1064 StatementType::ALTER_SINK,
1065 deferred,
1066 )
1067 .await
1068 }
1069 AlterSinkOperation::SwapRenameSink { target_sink } => {
1070 alter_swap_rename::handle_swap_rename(
1071 handler_args,
1072 name,
1073 target_sink,
1074 StatementType::ALTER_SINK,
1075 )
1076 .await
1077 }
1078 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1079 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1080 handler_args,
1081 PbThrottleTarget::Sink,
1082 name,
1083 rate_limit,
1084 )
1085 .await
1086 }
1087 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1088 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1089 handler_args,
1090 name,
1091 enable,
1092 )
1093 .await
1094 }
1095 },
1096 Statement::AlterSubscription { name, operation } => match operation {
1097 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1098 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1099 .await
1100 }
1101 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1102 alter_owner::handle_alter_owner(
1103 handler_args,
1104 name,
1105 new_owner_name,
1106 StatementType::ALTER_SUBSCRIPTION,
1107 )
1108 .await
1109 }
1110 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1111 alter_set_schema::handle_alter_set_schema(
1112 handler_args,
1113 name,
1114 new_schema_name,
1115 StatementType::ALTER_SUBSCRIPTION,
1116 None,
1117 )
1118 .await
1119 }
1120 AlterSubscriptionOperation::SwapRenameSubscription {
1121 target_subscription,
1122 } => {
1123 alter_swap_rename::handle_swap_rename(
1124 handler_args,
1125 name,
1126 target_subscription,
1127 StatementType::ALTER_SUBSCRIPTION,
1128 )
1129 .await
1130 }
1131 },
1132 Statement::AlterSource { name, operation } => match operation {
1133 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1134 alter_source_props::handle_alter_source_connector_props(
1135 handler_args,
1136 name,
1137 alter_props,
1138 )
1139 .await
1140 }
1141 AlterSourceOperation::RenameSource { source_name } => {
1142 alter_rename::handle_rename_source(handler_args, name, source_name).await
1143 }
1144 AlterSourceOperation::AddColumn { .. } => {
1145 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1146 }
1147 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1148 alter_owner::handle_alter_owner(
1149 handler_args,
1150 name,
1151 new_owner_name,
1152 StatementType::ALTER_SOURCE,
1153 )
1154 .await
1155 }
1156 AlterSourceOperation::SetSchema { new_schema_name } => {
1157 alter_set_schema::handle_alter_set_schema(
1158 handler_args,
1159 name,
1160 new_schema_name,
1161 StatementType::ALTER_SOURCE,
1162 None,
1163 )
1164 .await
1165 }
1166 AlterSourceOperation::FormatEncode { format_encode } => {
1167 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1168 .await
1169 }
1170 AlterSourceOperation::RefreshSchema => {
1171 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1172 }
1173 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1174 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1175 handler_args,
1176 PbThrottleTarget::Source,
1177 name,
1178 rate_limit,
1179 )
1180 .await
1181 }
1182 AlterSourceOperation::SwapRenameSource { target_source } => {
1183 alter_swap_rename::handle_swap_rename(
1184 handler_args,
1185 name,
1186 target_source,
1187 StatementType::ALTER_SOURCE,
1188 )
1189 .await
1190 }
1191 AlterSourceOperation::SetParallelism {
1192 parallelism,
1193 deferred,
1194 } => {
1195 alter_parallelism::handle_alter_parallelism(
1196 handler_args,
1197 name,
1198 parallelism,
1199 StatementType::ALTER_SOURCE,
1200 deferred,
1201 )
1202 .await
1203 }
1204 },
1205 Statement::AlterFunction {
1206 name,
1207 args,
1208 operation,
1209 } => match operation {
1210 AlterFunctionOperation::SetSchema { new_schema_name } => {
1211 alter_set_schema::handle_alter_set_schema(
1212 handler_args,
1213 name,
1214 new_schema_name,
1215 StatementType::ALTER_FUNCTION,
1216 args,
1217 )
1218 .await
1219 }
1220 },
1221 Statement::AlterConnection { name, operation } => match operation {
1222 AlterConnectionOperation::SetSchema { new_schema_name } => {
1223 alter_set_schema::handle_alter_set_schema(
1224 handler_args,
1225 name,
1226 new_schema_name,
1227 StatementType::ALTER_CONNECTION,
1228 None,
1229 )
1230 .await
1231 }
1232 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1233 alter_owner::handle_alter_owner(
1234 handler_args,
1235 name,
1236 new_owner_name,
1237 StatementType::ALTER_CONNECTION,
1238 )
1239 .await
1240 }
1241 },
1242 Statement::AlterSystem { param, value } => {
1243 alter_system::handle_alter_system(handler_args, param, value).await
1244 }
1245 Statement::AlterSecret {
1246 name,
1247 with_options,
1248 operation,
1249 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1250 Statement::AlterFragment {
1251 fragment_id,
1252 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1253 } => {
1254 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1255 &handler_args.session,
1256 PbThrottleTarget::Fragment,
1257 fragment_id,
1258 rate_limit,
1259 StatementType::SET_VARIABLE,
1260 )
1261 .await
1262 }
1263 Statement::AlterDefaultPrivileges { .. } => {
1264 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1265 }
1266 Statement::StartTransaction { modes } => {
1267 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1268 }
1269 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1270 Statement::Commit { chain } => {
1271 transaction::handle_commit(handler_args, COMMIT, chain).await
1272 }
1273 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1274 Statement::Rollback { chain } => {
1275 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1276 }
1277 Statement::SetTransaction {
1278 modes,
1279 snapshot,
1280 session,
1281 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1282 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1283 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1284 Statement::Comment {
1285 object_type,
1286 object_name,
1287 comment,
1288 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1289 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1290 Statement::Prepare {
1291 name,
1292 data_types,
1293 statement,
1294 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1295 Statement::Deallocate { name, prepare } => {
1296 prepared_statement::handle_deallocate(name, prepare).await
1297 }
1298 Statement::Vacuum { object_name, full } => {
1299 vacuum::handle_vacuum(handler_args, object_name, full).await
1300 }
1301 Statement::Refresh { table_name } => {
1302 refresh::handle_refresh(handler_args, table_name).await
1303 }
1304 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1305 }
1306}
1307
1308fn check_ban_ddl_for_iceberg_engine_table(
1309 session: Arc<SessionImpl>,
1310 stmt: &Statement,
1311) -> Result<()> {
1312 match stmt {
1313 Statement::AlterTable {
1314 name,
1315 operation:
1316 operation @ (AlterTableOperation::AddColumn { .. }
1317 | AlterTableOperation::DropColumn { .. }),
1318 } => {
1319 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1320 if table.is_iceberg_engine_table() {
1321 bail!(
1322 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1323 operation,
1324 schema_name,
1325 name
1326 );
1327 }
1328 }
1329
1330 Statement::AlterTable {
1331 name,
1332 operation: AlterTableOperation::RenameTable { .. },
1333 } => {
1334 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1335 if table.is_iceberg_engine_table() {
1336 bail!(
1337 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1338 schema_name,
1339 name
1340 );
1341 }
1342 }
1343
1344 Statement::AlterTable {
1345 name,
1346 operation: AlterTableOperation::ChangeOwner { .. },
1347 } => {
1348 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1349 if table.is_iceberg_engine_table() {
1350 bail!(
1351 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1352 schema_name,
1353 name
1354 );
1355 }
1356 }
1357
1358 Statement::AlterTable {
1359 name,
1360 operation: AlterTableOperation::SetParallelism { .. },
1361 } => {
1362 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1363 if table.is_iceberg_engine_table() {
1364 bail!(
1365 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1366 schema_name,
1367 name
1368 );
1369 }
1370 }
1371
1372 Statement::AlterTable {
1373 name,
1374 operation: AlterTableOperation::SetSchema { .. },
1375 } => {
1376 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1377 if table.is_iceberg_engine_table() {
1378 bail!(
1379 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1380 schema_name,
1381 name
1382 );
1383 }
1384 }
1385
1386 Statement::AlterTable {
1387 name,
1388 operation: AlterTableOperation::RefreshSchema,
1389 } => {
1390 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1391 if table.is_iceberg_engine_table() {
1392 bail!(
1393 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1394 schema_name,
1395 name
1396 );
1397 }
1398 }
1399
1400 Statement::AlterTable {
1401 name,
1402 operation: AlterTableOperation::SetSourceRateLimit { .. },
1403 } => {
1404 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1405 if table.is_iceberg_engine_table() {
1406 bail!(
1407 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1408 schema_name,
1409 name
1410 );
1411 }
1412 }
1413
1414 _ => {}
1415 }
1416
1417 Ok(())
1418}