1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63mod alter_table_with_sr;
64pub mod alter_user;
65pub mod cancel_job;
66pub mod close_cursor;
67mod comment;
68pub mod create_aggregate;
69pub mod create_connection;
70mod create_database;
71pub mod create_function;
72pub mod create_index;
73pub mod create_mv;
74pub mod create_schema;
75pub mod create_secret;
76pub mod create_sink;
77pub mod create_source;
78pub mod create_sql_function;
79pub mod create_subscription;
80pub mod create_table;
81pub mod create_table_as;
82pub mod create_user;
83pub mod create_view;
84pub mod declare_cursor;
85pub mod describe;
86pub mod discard;
87mod drop_connection;
88mod drop_database;
89pub mod drop_function;
90mod drop_index;
91pub mod drop_mv;
92mod drop_schema;
93pub mod drop_secret;
94pub mod drop_sink;
95pub mod drop_source;
96pub mod drop_subscription;
97pub mod drop_table;
98pub mod drop_user;
99mod drop_view;
100pub mod explain;
101pub mod explain_analyze_stream_job;
102pub mod extended_handle;
103pub mod fetch_cursor;
104mod flush;
105pub mod handle_privilege;
106pub mod kill_process;
107mod prepared_statement;
108pub mod privilege;
109pub mod query;
110mod recover;
111mod refresh;
112pub mod show;
113mod transaction;
114mod use_db;
115pub mod util;
116pub mod vacuum;
117pub mod variable;
118mod wait;
119
120pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
121
122pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
124
125pub type RwPgResponse = PgResponse<PgResponseStream>;
127
128#[easy_ext::ext(RwPgResponseBuilderExt)]
129impl RwPgResponseBuilder {
130 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
132 let fields = T::fields();
133 self.values(
134 rows.into_iter()
135 .map(|row| {
136 Row::new(
137 row.into_owned_row()
138 .into_iter()
139 .zip_eq_fast(&fields)
140 .map(|(datum, (_, ty))| {
141 datum.map(|scalar| {
142 scalar.as_scalar_ref_impl().text_format(ty).into()
143 })
144 })
145 .collect(),
146 )
147 })
148 .collect_vec()
149 .into(),
150 fields_to_descriptors(fields),
151 )
152 }
153}
154
155pub fn fields_to_descriptors(
156 fields: Vec<(&str, risingwave_common::types::DataType)>,
157) -> Vec<PgFieldDescriptor> {
158 fields
159 .iter()
160 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
161 .collect()
162}
163
164pub enum PgResponseStream {
165 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
166 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
167 Rows(BoxStream<'static, RowSetResult>),
168}
169
170impl Stream for PgResponseStream {
171 type Item = std::result::Result<Vec<Row>, BoxedError>;
172
173 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174 match &mut *self {
175 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
176 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
177 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
178 }
179 }
180}
181
182impl From<Vec<Row>> for PgResponseStream {
183 fn from(rows: Vec<Row>) -> Self {
184 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
185 }
186}
187
188#[derive(Clone)]
189pub struct HandlerArgs {
190 pub session: Arc<SessionImpl>,
191 pub sql: Arc<str>,
192 pub normalized_sql: String,
193 pub with_options: WithOptions,
194}
195
196impl HandlerArgs {
197 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
198 Ok(Self {
199 session,
200 sql,
201 with_options: WithOptions::try_from(stmt)?,
202 normalized_sql: Self::normalize_sql(stmt),
203 })
204 }
205
206 fn normalize_sql(stmt: &Statement) -> String {
212 let mut stmt = stmt.clone();
213 match &mut stmt {
214 Statement::CreateView {
215 or_replace,
216 if_not_exists,
217 ..
218 } => {
219 *or_replace = false;
220 *if_not_exists = false;
221 }
222 Statement::CreateTable {
223 or_replace,
224 if_not_exists,
225 ..
226 } => {
227 *or_replace = false;
228 *if_not_exists = false;
229 }
230 Statement::CreateIndex { if_not_exists, .. } => {
231 *if_not_exists = false;
232 }
233 Statement::CreateSource {
234 stmt: CreateSourceStatement { if_not_exists, .. },
235 ..
236 } => {
237 *if_not_exists = false;
238 }
239 Statement::CreateSink {
240 stmt: CreateSinkStatement { if_not_exists, .. },
241 } => {
242 *if_not_exists = false;
243 }
244 Statement::CreateSubscription {
245 stmt: CreateSubscriptionStatement { if_not_exists, .. },
246 } => {
247 *if_not_exists = false;
248 }
249 Statement::CreateConnection {
250 stmt: CreateConnectionStatement { if_not_exists, .. },
251 } => {
252 *if_not_exists = false;
253 }
254 _ => {}
255 }
256 stmt.to_string()
257 }
258}
259
260pub async fn handle(
261 session: Arc<SessionImpl>,
262 stmt: Statement,
263 sql: Arc<str>,
264 formats: Vec<Format>,
265) -> Result<RwPgResponse> {
266 session.clear_cancel_query_flag();
267 let _guard = session.txn_begin_implicit();
268 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
269
270 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
271
272 match stmt {
273 Statement::Explain {
274 statement,
275 analyze,
276 options,
277 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
278 Statement::ExplainAnalyzeStreamJob {
279 target,
280 duration_secs,
281 } => {
282 explain_analyze_stream_job::handle_explain_analyze_stream_job(
283 handler_args,
284 target,
285 duration_secs,
286 )
287 .await
288 }
289 Statement::CreateSource { stmt } => {
290 create_source::handle_create_source(handler_args, stmt).await
291 }
292 Statement::CreateSink { stmt } => {
293 create_sink::handle_create_sink(handler_args, stmt, false).await
294 }
295 Statement::CreateSubscription { stmt } => {
296 create_subscription::handle_create_subscription(handler_args, stmt).await
297 }
298 Statement::CreateConnection { stmt } => {
299 create_connection::handle_create_connection(handler_args, stmt).await
300 }
301 Statement::CreateSecret { stmt } => {
302 create_secret::handle_create_secret(handler_args, stmt).await
303 }
304 Statement::CreateFunction {
305 or_replace,
306 temporary,
307 if_not_exists,
308 name,
309 args,
310 returns,
311 params,
312 with_options,
313 } => {
314 if params.language.is_none()
317 || !params
318 .language
319 .as_ref()
320 .unwrap()
321 .real_value()
322 .eq_ignore_ascii_case("sql")
323 {
324 create_function::handle_create_function(
325 handler_args,
326 or_replace,
327 temporary,
328 if_not_exists,
329 name,
330 args,
331 returns,
332 params,
333 with_options,
334 )
335 .await
336 } else {
337 create_sql_function::handle_create_sql_function(
338 handler_args,
339 or_replace,
340 temporary,
341 if_not_exists,
342 name,
343 args,
344 returns,
345 params,
346 )
347 .await
348 }
349 }
350 Statement::CreateAggregate {
351 or_replace,
352 if_not_exists,
353 name,
354 args,
355 returns,
356 params,
357 ..
358 } => {
359 create_aggregate::handle_create_aggregate(
360 handler_args,
361 or_replace,
362 if_not_exists,
363 name,
364 args,
365 returns,
366 params,
367 )
368 .await
369 }
370 Statement::CreateTable {
371 name,
372 columns,
373 wildcard_idx,
374 constraints,
375 query,
376 with_options: _, or_replace,
379 temporary,
380 if_not_exists,
381 format_encode,
382 source_watermarks,
383 append_only,
384 on_conflict,
385 with_version_column,
386 cdc_table_info,
387 include_column_options,
388 webhook_info,
389 engine,
390 } => {
391 if or_replace {
392 bail_not_implemented!("CREATE OR REPLACE TABLE");
393 }
394 if temporary {
395 bail_not_implemented!("CREATE TEMPORARY TABLE");
396 }
397 if let Some(query) = query {
398 return create_table_as::handle_create_as(
399 handler_args,
400 name,
401 if_not_exists,
402 query,
403 columns,
404 append_only,
405 on_conflict,
406 with_version_column.map(|x| x.real_value()),
407 engine,
408 )
409 .await;
410 }
411 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
412 create_table::handle_create_table(
413 handler_args,
414 name,
415 columns,
416 wildcard_idx,
417 constraints,
418 if_not_exists,
419 format_encode,
420 source_watermarks,
421 append_only,
422 on_conflict,
423 with_version_column.map(|x| x.real_value()),
424 cdc_table_info,
425 include_column_options,
426 webhook_info,
427 engine,
428 )
429 .await
430 }
431 Statement::CreateDatabase {
432 db_name,
433 if_not_exists,
434 owner,
435 resource_group,
436 barrier_interval_ms,
437 checkpoint_frequency,
438 } => {
439 create_database::handle_create_database(
440 handler_args,
441 db_name,
442 if_not_exists,
443 owner,
444 resource_group,
445 barrier_interval_ms,
446 checkpoint_frequency,
447 )
448 .await
449 }
450 Statement::CreateSchema {
451 schema_name,
452 if_not_exists,
453 owner,
454 } => {
455 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
456 .await
457 }
458 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
459 Statement::DeclareCursor { stmt } => {
460 declare_cursor::handle_declare_cursor(handler_args, stmt).await
461 }
462 Statement::FetchCursor { stmt } => {
463 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
464 }
465 Statement::CloseCursor { stmt } => {
466 close_cursor::handle_close_cursor(handler_args, stmt).await
467 }
468 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
469 Statement::Grant { .. } => {
470 handle_privilege::handle_grant_privilege(handler_args, stmt).await
471 }
472 Statement::Revoke { .. } => {
473 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
474 }
475 Statement::Describe { name, kind } => match kind {
476 DescribeKind::Fragments => {
477 describe::handle_describe_fragments(handler_args, name).await
478 }
479 DescribeKind::Plain => describe::handle_describe(handler_args, name),
480 },
481 Statement::DescribeFragment { fragment_id } => {
482 describe::handle_describe_fragment(handler_args, fragment_id).await
483 }
484 Statement::Discard(..) => discard::handle_discard(handler_args),
485 Statement::ShowObjects {
486 object: show_object,
487 filter,
488 } => show::handle_show_object(handler_args, show_object, filter).await,
489 Statement::ShowCreateObject { create_type, name } => {
490 show::handle_show_create_object(handler_args, create_type, name)
491 }
492 Statement::ShowTransactionIsolationLevel => {
493 transaction::handle_show_isolation_level(handler_args)
494 }
495 Statement::Drop(DropStatement {
496 object_type,
497 object_name,
498 if_exists,
499 drop_mode,
500 }) => {
501 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
502 match object_type {
503 ObjectType::MaterializedView
504 | ObjectType::View
505 | ObjectType::Sink
506 | ObjectType::Source
507 | ObjectType::Subscription
508 | ObjectType::Index
509 | ObjectType::Table
510 | ObjectType::Schema
511 | ObjectType::Connection => true,
512 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
513 bail_not_implemented!("DROP CASCADE");
514 }
515 }
516 } else {
517 false
518 };
519 match object_type {
520 ObjectType::Table => {
521 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
522 .await
523 }
524 ObjectType::MaterializedView => {
525 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
526 }
527 ObjectType::Index => {
528 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
529 .await
530 }
531 ObjectType::Source => {
532 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
533 .await
534 }
535 ObjectType::Sink => {
536 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
537 }
538 ObjectType::Subscription => {
539 drop_subscription::handle_drop_subscription(
540 handler_args,
541 object_name,
542 if_exists,
543 cascade,
544 )
545 .await
546 }
547 ObjectType::Database => {
548 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
549 }
550 ObjectType::Schema => {
551 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
552 .await
553 }
554 ObjectType::User => {
555 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
556 }
557 ObjectType::View => {
558 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
559 }
560 ObjectType::Connection => {
561 drop_connection::handle_drop_connection(
562 handler_args,
563 object_name,
564 if_exists,
565 cascade,
566 )
567 .await
568 }
569 ObjectType::Secret => {
570 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
571 }
572 }
573 }
574 Statement::DropFunction {
576 if_exists,
577 func_desc,
578 option,
579 } => {
580 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
581 .await
582 }
583 Statement::DropAggregate {
584 if_exists,
585 func_desc,
586 option,
587 } => {
588 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
589 .await
590 }
591 Statement::Query(_)
592 | Statement::Insert { .. }
593 | Statement::Delete { .. }
594 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
595 Statement::CreateView {
596 materialized,
597 if_not_exists,
598 name,
599 columns,
600 query,
601 with_options: _, or_replace, emit_mode,
604 } => {
605 if or_replace {
606 bail_not_implemented!("CREATE OR REPLACE VIEW");
607 }
608 if materialized {
609 create_mv::handle_create_mv(
610 handler_args,
611 if_not_exists,
612 name,
613 *query,
614 columns,
615 emit_mode,
616 )
617 .await
618 } else {
619 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
620 .await
621 }
622 }
623 Statement::Flush => flush::handle_flush(handler_args).await,
624 Statement::Wait => wait::handle_wait(handler_args).await,
625 Statement::Recover => recover::handle_recover(handler_args).await,
626 Statement::SetVariable {
627 local: _,
628 variable,
629 value,
630 } => {
631 if variable.real_value().eq_ignore_ascii_case("database") {
633 let x = variable::set_var_to_param_str(&value);
634 let res = use_db::handle_use_db(
635 handler_args,
636 ObjectName::from(vec![Ident::from_real_value(
637 x.as_deref().unwrap_or("default"),
638 )]),
639 )?;
640 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
641 for notice in res.notices() {
642 builder = builder.notice(notice);
643 }
644 return Ok(builder.into());
645 }
646 variable::handle_set(handler_args, variable, value)
647 }
648 Statement::SetTimeZone { local: _, value } => {
649 variable::handle_set_time_zone(handler_args, value)
650 }
651 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
652 Statement::CreateIndex {
653 name,
654 table_name,
655 columns,
656 include,
657 distributed_by,
658 unique,
659 if_not_exists,
660 } => {
661 if unique {
662 bail_not_implemented!("create unique index");
663 }
664
665 create_index::handle_create_index(
666 handler_args,
667 if_not_exists,
668 name,
669 table_name,
670 columns.to_vec(),
671 include,
672 distributed_by,
673 )
674 .await
675 }
676 Statement::AlterDatabase { name, operation } => match operation {
677 AlterDatabaseOperation::RenameDatabase { database_name } => {
678 alter_rename::handle_rename_database(handler_args, name, database_name).await
679 }
680 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
681 alter_owner::handle_alter_owner(
682 handler_args,
683 name,
684 new_owner_name,
685 StatementType::ALTER_DATABASE,
686 )
687 .await
688 }
689 AlterDatabaseOperation::SetParam(config_param) => {
690 let ConfigParam { param, value } = config_param;
691
692 let database_param = match param.real_value().to_uppercase().as_str() {
693 "BARRIER_INTERVAL_MS" => {
694 let barrier_interval_ms = match value {
695 SetVariableValue::Default => None,
696 SetVariableValue::Single(SetVariableValueSingle::Literal(
697 Value::Number(num),
698 )) => {
699 let num = num.parse::<u32>().map_err(|e| {
700 ErrorCode::InvalidInputSyntax(format!(
701 "barrier_interval_ms must be a u32 integer: {}",
702 e.as_report()
703 ))
704 })?;
705 Some(num)
706 }
707 _ => {
708 return Err(ErrorCode::InvalidInputSyntax(
709 "barrier_interval_ms must be a u32 integer or DEFAULT"
710 .to_owned(),
711 )
712 .into());
713 }
714 };
715 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
716 }
717 "CHECKPOINT_FREQUENCY" => {
718 let checkpoint_frequency = match value {
719 SetVariableValue::Default => None,
720 SetVariableValue::Single(SetVariableValueSingle::Literal(
721 Value::Number(num),
722 )) => {
723 let num = num.parse::<u64>().map_err(|e| {
724 ErrorCode::InvalidInputSyntax(format!(
725 "checkpoint_frequency must be a u64 integer: {}",
726 e.as_report()
727 ))
728 })?;
729 Some(num)
730 }
731 _ => {
732 return Err(ErrorCode::InvalidInputSyntax(
733 "checkpoint_frequency must be a u64 integer or DEFAULT"
734 .to_owned(),
735 )
736 .into());
737 }
738 };
739 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
740 }
741 _ => {
742 return Err(ErrorCode::InvalidInputSyntax(format!(
743 "Unsupported database config parameter: {}",
744 param.real_value()
745 ))
746 .into());
747 }
748 };
749
750 alter_database_param::handle_alter_database_param(
751 handler_args,
752 name,
753 database_param,
754 )
755 .await
756 }
757 },
758 Statement::AlterSchema { name, operation } => match operation {
759 AlterSchemaOperation::RenameSchema { schema_name } => {
760 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
761 }
762 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
763 alter_owner::handle_alter_owner(
764 handler_args,
765 name,
766 new_owner_name,
767 StatementType::ALTER_SCHEMA,
768 )
769 .await
770 }
771 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
772 alter_swap_rename::handle_swap_rename(
773 handler_args,
774 name,
775 target_schema,
776 StatementType::ALTER_SCHEMA,
777 )
778 .await
779 }
780 },
781 Statement::AlterTable { name, operation } => match operation {
782 AlterTableOperation::AddColumn { .. }
783 | AlterTableOperation::DropColumn { .. }
784 | AlterTableOperation::AlterColumn { .. } => {
785 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
786 }
787 AlterTableOperation::RenameTable { table_name } => {
788 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
789 .await
790 }
791 AlterTableOperation::ChangeOwner { new_owner_name } => {
792 alter_owner::handle_alter_owner(
793 handler_args,
794 name,
795 new_owner_name,
796 StatementType::ALTER_TABLE,
797 )
798 .await
799 }
800 AlterTableOperation::SetParallelism {
801 parallelism,
802 deferred,
803 } => {
804 alter_parallelism::handle_alter_parallelism(
805 handler_args,
806 name,
807 parallelism,
808 StatementType::ALTER_TABLE,
809 deferred,
810 )
811 .await
812 }
813 AlterTableOperation::SetSchema { new_schema_name } => {
814 alter_set_schema::handle_alter_set_schema(
815 handler_args,
816 name,
817 new_schema_name,
818 StatementType::ALTER_TABLE,
819 None,
820 )
821 .await
822 }
823 AlterTableOperation::RefreshSchema => {
824 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
825 }
826 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
827 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
828 handler_args,
829 PbThrottleTarget::TableWithSource,
830 name,
831 rate_limit,
832 )
833 .await
834 }
835 AlterTableOperation::DropConnector => {
836 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
837 .await
838 }
839 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
840 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
841 handler_args,
842 PbThrottleTarget::TableDml,
843 name,
844 rate_limit,
845 )
846 .await
847 }
848 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
849 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
850 handler_args,
851 PbThrottleTarget::CdcTable,
852 name,
853 rate_limit,
854 )
855 .await
856 }
857 AlterTableOperation::SwapRenameTable { target_table } => {
858 alter_swap_rename::handle_swap_rename(
859 handler_args,
860 name,
861 target_table,
862 StatementType::ALTER_TABLE,
863 )
864 .await
865 }
866 AlterTableOperation::AlterConnectorProps { alter_props } => {
867 crate::handler::alter_source_props::handle_alter_table_connector_props(
869 handler_args,
870 name,
871 alter_props,
872 )
873 .await
874 }
875 AlterTableOperation::AddConstraint { .. }
876 | AlterTableOperation::DropConstraint { .. }
877 | AlterTableOperation::RenameColumn { .. }
878 | AlterTableOperation::ChangeColumn { .. }
879 | AlterTableOperation::RenameConstraint { .. } => {
880 bail_not_implemented!(
881 "Unhandled statement: {}",
882 Statement::AlterTable { name, operation }
883 )
884 }
885 },
886 Statement::AlterIndex { name, operation } => match operation {
887 AlterIndexOperation::RenameIndex { index_name } => {
888 alter_rename::handle_rename_index(handler_args, name, index_name).await
889 }
890 AlterIndexOperation::SetParallelism {
891 parallelism,
892 deferred,
893 } => {
894 alter_parallelism::handle_alter_parallelism(
895 handler_args,
896 name,
897 parallelism,
898 StatementType::ALTER_INDEX,
899 deferred,
900 )
901 .await
902 }
903 },
904 Statement::AlterView {
905 materialized,
906 name,
907 operation,
908 } => {
909 let statement_type = if materialized {
910 StatementType::ALTER_MATERIALIZED_VIEW
911 } else {
912 StatementType::ALTER_VIEW
913 };
914 match operation {
915 AlterViewOperation::RenameView { view_name } => {
916 if materialized {
917 alter_rename::handle_rename_table(
918 handler_args,
919 TableType::MaterializedView,
920 name,
921 view_name,
922 )
923 .await
924 } else {
925 alter_rename::handle_rename_view(handler_args, name, view_name).await
926 }
927 }
928 AlterViewOperation::SetParallelism {
929 parallelism,
930 deferred,
931 } => {
932 if !materialized {
933 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
934 }
935 alter_parallelism::handle_alter_parallelism(
936 handler_args,
937 name,
938 parallelism,
939 statement_type,
940 deferred,
941 )
942 .await
943 }
944 AlterViewOperation::SetResourceGroup {
945 resource_group,
946 deferred,
947 } => {
948 if !materialized {
949 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
950 }
951 alter_resource_group::handle_alter_resource_group(
952 handler_args,
953 name,
954 resource_group,
955 statement_type,
956 deferred,
957 )
958 .await
959 }
960 AlterViewOperation::ChangeOwner { new_owner_name } => {
961 alter_owner::handle_alter_owner(
962 handler_args,
963 name,
964 new_owner_name,
965 statement_type,
966 )
967 .await
968 }
969 AlterViewOperation::SetSchema { new_schema_name } => {
970 alter_set_schema::handle_alter_set_schema(
971 handler_args,
972 name,
973 new_schema_name,
974 statement_type,
975 None,
976 )
977 .await
978 }
979 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
980 if !materialized {
981 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
982 }
983 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
984 handler_args,
985 PbThrottleTarget::Mv,
986 name,
987 rate_limit,
988 )
989 .await
990 }
991 AlterViewOperation::SwapRenameView { target_view } => {
992 alter_swap_rename::handle_swap_rename(
993 handler_args,
994 name,
995 target_view,
996 statement_type,
997 )
998 .await
999 }
1000 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1001 if !materialized {
1002 bail!(
1003 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1004 );
1005 }
1006 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1007 }
1008 AlterViewOperation::AsQuery { query } => {
1009 if !materialized {
1010 bail_not_implemented!("ALTER VIEW AS QUERY");
1011 }
1012 if !cfg!(debug_assertions) {
1014 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1015 }
1016 alter_mv::handle_alter_mv(handler_args, name, query).await
1017 }
1018 }
1019 }
1020
1021 Statement::AlterSink { name, operation } => match operation {
1022 AlterSinkOperation::AlterConnectorProps {
1023 alter_props: changed_props,
1024 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1025 AlterSinkOperation::RenameSink { sink_name } => {
1026 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1027 }
1028 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1029 alter_owner::handle_alter_owner(
1030 handler_args,
1031 name,
1032 new_owner_name,
1033 StatementType::ALTER_SINK,
1034 )
1035 .await
1036 }
1037 AlterSinkOperation::SetSchema { new_schema_name } => {
1038 alter_set_schema::handle_alter_set_schema(
1039 handler_args,
1040 name,
1041 new_schema_name,
1042 StatementType::ALTER_SINK,
1043 None,
1044 )
1045 .await
1046 }
1047 AlterSinkOperation::SetParallelism {
1048 parallelism,
1049 deferred,
1050 } => {
1051 alter_parallelism::handle_alter_parallelism(
1052 handler_args,
1053 name,
1054 parallelism,
1055 StatementType::ALTER_SINK,
1056 deferred,
1057 )
1058 .await
1059 }
1060 AlterSinkOperation::SwapRenameSink { target_sink } => {
1061 alter_swap_rename::handle_swap_rename(
1062 handler_args,
1063 name,
1064 target_sink,
1065 StatementType::ALTER_SINK,
1066 )
1067 .await
1068 }
1069 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1070 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1071 handler_args,
1072 PbThrottleTarget::Sink,
1073 name,
1074 rate_limit,
1075 )
1076 .await
1077 }
1078 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1079 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1080 handler_args,
1081 name,
1082 enable,
1083 )
1084 .await
1085 }
1086 },
1087 Statement::AlterSubscription { name, operation } => match operation {
1088 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1089 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1090 .await
1091 }
1092 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1093 alter_owner::handle_alter_owner(
1094 handler_args,
1095 name,
1096 new_owner_name,
1097 StatementType::ALTER_SUBSCRIPTION,
1098 )
1099 .await
1100 }
1101 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1102 alter_set_schema::handle_alter_set_schema(
1103 handler_args,
1104 name,
1105 new_schema_name,
1106 StatementType::ALTER_SUBSCRIPTION,
1107 None,
1108 )
1109 .await
1110 }
1111 AlterSubscriptionOperation::SwapRenameSubscription {
1112 target_subscription,
1113 } => {
1114 alter_swap_rename::handle_swap_rename(
1115 handler_args,
1116 name,
1117 target_subscription,
1118 StatementType::ALTER_SUBSCRIPTION,
1119 )
1120 .await
1121 }
1122 },
1123 Statement::AlterSource { name, operation } => match operation {
1124 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1125 alter_source_props::handle_alter_source_connector_props(
1126 handler_args,
1127 name,
1128 alter_props,
1129 )
1130 .await
1131 }
1132 AlterSourceOperation::RenameSource { source_name } => {
1133 alter_rename::handle_rename_source(handler_args, name, source_name).await
1134 }
1135 AlterSourceOperation::AddColumn { .. } => {
1136 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1137 }
1138 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1139 alter_owner::handle_alter_owner(
1140 handler_args,
1141 name,
1142 new_owner_name,
1143 StatementType::ALTER_SOURCE,
1144 )
1145 .await
1146 }
1147 AlterSourceOperation::SetSchema { new_schema_name } => {
1148 alter_set_schema::handle_alter_set_schema(
1149 handler_args,
1150 name,
1151 new_schema_name,
1152 StatementType::ALTER_SOURCE,
1153 None,
1154 )
1155 .await
1156 }
1157 AlterSourceOperation::FormatEncode { format_encode } => {
1158 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1159 .await
1160 }
1161 AlterSourceOperation::RefreshSchema => {
1162 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1163 }
1164 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1165 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1166 handler_args,
1167 PbThrottleTarget::Source,
1168 name,
1169 rate_limit,
1170 )
1171 .await
1172 }
1173 AlterSourceOperation::SwapRenameSource { target_source } => {
1174 alter_swap_rename::handle_swap_rename(
1175 handler_args,
1176 name,
1177 target_source,
1178 StatementType::ALTER_SOURCE,
1179 )
1180 .await
1181 }
1182 AlterSourceOperation::SetParallelism {
1183 parallelism,
1184 deferred,
1185 } => {
1186 alter_parallelism::handle_alter_parallelism(
1187 handler_args,
1188 name,
1189 parallelism,
1190 StatementType::ALTER_SOURCE,
1191 deferred,
1192 )
1193 .await
1194 }
1195 },
1196 Statement::AlterFunction {
1197 name,
1198 args,
1199 operation,
1200 } => match operation {
1201 AlterFunctionOperation::SetSchema { new_schema_name } => {
1202 alter_set_schema::handle_alter_set_schema(
1203 handler_args,
1204 name,
1205 new_schema_name,
1206 StatementType::ALTER_FUNCTION,
1207 args,
1208 )
1209 .await
1210 }
1211 },
1212 Statement::AlterConnection { name, operation } => match operation {
1213 AlterConnectionOperation::SetSchema { new_schema_name } => {
1214 alter_set_schema::handle_alter_set_schema(
1215 handler_args,
1216 name,
1217 new_schema_name,
1218 StatementType::ALTER_CONNECTION,
1219 None,
1220 )
1221 .await
1222 }
1223 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1224 alter_owner::handle_alter_owner(
1225 handler_args,
1226 name,
1227 new_owner_name,
1228 StatementType::ALTER_CONNECTION,
1229 )
1230 .await
1231 }
1232 },
1233 Statement::AlterSystem { param, value } => {
1234 alter_system::handle_alter_system(handler_args, param, value).await
1235 }
1236 Statement::AlterSecret {
1237 name,
1238 with_options,
1239 operation,
1240 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1241 Statement::AlterFragment {
1242 fragment_id,
1243 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1244 } => {
1245 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1246 &handler_args.session,
1247 PbThrottleTarget::Fragment,
1248 fragment_id,
1249 rate_limit,
1250 StatementType::SET_VARIABLE,
1251 )
1252 .await
1253 }
1254 Statement::AlterDefaultPrivileges { .. } => {
1255 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1256 }
1257 Statement::StartTransaction { modes } => {
1258 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1259 }
1260 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1261 Statement::Commit { chain } => {
1262 transaction::handle_commit(handler_args, COMMIT, chain).await
1263 }
1264 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1265 Statement::Rollback { chain } => {
1266 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1267 }
1268 Statement::SetTransaction {
1269 modes,
1270 snapshot,
1271 session,
1272 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1273 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1274 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1275 Statement::Comment {
1276 object_type,
1277 object_name,
1278 comment,
1279 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1280 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1281 Statement::Prepare {
1282 name,
1283 data_types,
1284 statement,
1285 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1286 Statement::Deallocate { name, prepare } => {
1287 prepared_statement::handle_deallocate(name, prepare).await
1288 }
1289 Statement::Vacuum { object_name } => vacuum::handle_vacuum(handler_args, object_name).await,
1290 Statement::Refresh { table_name } => {
1291 refresh::handle_refresh(handler_args, table_name).await
1292 }
1293 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1294 }
1295}
1296
1297fn check_ban_ddl_for_iceberg_engine_table(
1298 session: Arc<SessionImpl>,
1299 stmt: &Statement,
1300) -> Result<()> {
1301 match stmt {
1302 Statement::AlterTable {
1303 name,
1304 operation:
1305 operation @ (AlterTableOperation::AddColumn { .. }
1306 | AlterTableOperation::DropColumn { .. }),
1307 } => {
1308 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1309 if table.is_iceberg_engine_table() {
1310 bail!(
1311 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1312 operation,
1313 schema_name,
1314 name
1315 );
1316 }
1317 }
1318
1319 Statement::AlterTable {
1320 name,
1321 operation: AlterTableOperation::RenameTable { .. },
1322 } => {
1323 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1324 if table.is_iceberg_engine_table() {
1325 bail!(
1326 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1327 schema_name,
1328 name
1329 );
1330 }
1331 }
1332
1333 Statement::AlterTable {
1334 name,
1335 operation: AlterTableOperation::ChangeOwner { .. },
1336 } => {
1337 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1338 if table.is_iceberg_engine_table() {
1339 bail!(
1340 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1341 schema_name,
1342 name
1343 );
1344 }
1345 }
1346
1347 Statement::AlterTable {
1348 name,
1349 operation: AlterTableOperation::SetParallelism { .. },
1350 } => {
1351 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1352 if table.is_iceberg_engine_table() {
1353 bail!(
1354 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1355 schema_name,
1356 name
1357 );
1358 }
1359 }
1360
1361 Statement::AlterTable {
1362 name,
1363 operation: AlterTableOperation::SetSchema { .. },
1364 } => {
1365 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1366 if table.is_iceberg_engine_table() {
1367 bail!(
1368 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1369 schema_name,
1370 name
1371 );
1372 }
1373 }
1374
1375 Statement::AlterTable {
1376 name,
1377 operation: AlterTableOperation::RefreshSchema,
1378 } => {
1379 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1380 if table.is_iceberg_engine_table() {
1381 bail!(
1382 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1383 schema_name,
1384 name
1385 );
1386 }
1387 }
1388
1389 Statement::AlterTable {
1390 name,
1391 operation: AlterTableOperation::SetSourceRateLimit { .. },
1392 } => {
1393 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1394 if table.is_iceberg_engine_table() {
1395 bail!(
1396 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1397 schema_name,
1398 name
1399 );
1400 }
1401 }
1402
1403 _ => {}
1404 }
1405
1406 Ok(())
1407}