1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63mod alter_table_with_sr;
64pub mod alter_user;
65pub mod cancel_job;
66pub mod close_cursor;
67mod comment;
68pub mod create_aggregate;
69pub mod create_connection;
70mod create_database;
71pub mod create_function;
72pub mod create_index;
73pub mod create_mv;
74pub mod create_schema;
75pub mod create_secret;
76pub mod create_sink;
77pub mod create_source;
78pub mod create_sql_function;
79pub mod create_subscription;
80pub mod create_table;
81pub mod create_table_as;
82pub mod create_user;
83pub mod create_view;
84pub mod declare_cursor;
85pub mod describe;
86pub mod discard;
87mod drop_connection;
88mod drop_database;
89pub mod drop_function;
90mod drop_index;
91pub mod drop_mv;
92mod drop_schema;
93pub mod drop_secret;
94pub mod drop_sink;
95pub mod drop_source;
96pub mod drop_subscription;
97pub mod drop_table;
98pub mod drop_user;
99mod drop_view;
100pub mod explain;
101pub mod explain_analyze_stream_job;
102pub mod extended_handle;
103pub mod fetch_cursor;
104mod flush;
105pub mod handle_privilege;
106pub mod kill_process;
107mod prepared_statement;
108pub mod privilege;
109pub mod query;
110mod recover;
111pub mod show;
112mod transaction;
113mod use_db;
114pub mod util;
115pub mod variable;
116mod wait;
117
118pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
119
120pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
122
123pub type RwPgResponse = PgResponse<PgResponseStream>;
125
126#[easy_ext::ext(RwPgResponseBuilderExt)]
127impl RwPgResponseBuilder {
128 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
130 let fields = T::fields();
131 self.values(
132 rows.into_iter()
133 .map(|row| {
134 Row::new(
135 row.into_owned_row()
136 .into_iter()
137 .zip_eq_fast(&fields)
138 .map(|(datum, (_, ty))| {
139 datum.map(|scalar| {
140 scalar.as_scalar_ref_impl().text_format(ty).into()
141 })
142 })
143 .collect(),
144 )
145 })
146 .collect_vec()
147 .into(),
148 fields_to_descriptors(fields),
149 )
150 }
151}
152
153pub fn fields_to_descriptors(
154 fields: Vec<(&str, risingwave_common::types::DataType)>,
155) -> Vec<PgFieldDescriptor> {
156 fields
157 .iter()
158 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
159 .collect()
160}
161
162pub enum PgResponseStream {
163 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
164 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
165 Rows(BoxStream<'static, RowSetResult>),
166}
167
168impl Stream for PgResponseStream {
169 type Item = std::result::Result<Vec<Row>, BoxedError>;
170
171 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 match &mut *self {
173 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
174 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
175 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
176 }
177 }
178}
179
180impl From<Vec<Row>> for PgResponseStream {
181 fn from(rows: Vec<Row>) -> Self {
182 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
183 }
184}
185
186#[derive(Clone)]
187pub struct HandlerArgs {
188 pub session: Arc<SessionImpl>,
189 pub sql: Arc<str>,
190 pub normalized_sql: String,
191 pub with_options: WithOptions,
192}
193
194impl HandlerArgs {
195 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
196 Ok(Self {
197 session,
198 sql,
199 with_options: WithOptions::try_from(stmt)?,
200 normalized_sql: Self::normalize_sql(stmt),
201 })
202 }
203
204 fn normalize_sql(stmt: &Statement) -> String {
210 let mut stmt = stmt.clone();
211 match &mut stmt {
212 Statement::CreateView {
213 or_replace,
214 if_not_exists,
215 ..
216 } => {
217 *or_replace = false;
218 *if_not_exists = false;
219 }
220 Statement::CreateTable {
221 or_replace,
222 if_not_exists,
223 ..
224 } => {
225 *or_replace = false;
226 *if_not_exists = false;
227 }
228 Statement::CreateIndex { if_not_exists, .. } => {
229 *if_not_exists = false;
230 }
231 Statement::CreateSource {
232 stmt: CreateSourceStatement { if_not_exists, .. },
233 ..
234 } => {
235 *if_not_exists = false;
236 }
237 Statement::CreateSink {
238 stmt: CreateSinkStatement { if_not_exists, .. },
239 } => {
240 *if_not_exists = false;
241 }
242 Statement::CreateSubscription {
243 stmt: CreateSubscriptionStatement { if_not_exists, .. },
244 } => {
245 *if_not_exists = false;
246 }
247 Statement::CreateConnection {
248 stmt: CreateConnectionStatement { if_not_exists, .. },
249 } => {
250 *if_not_exists = false;
251 }
252 _ => {}
253 }
254 stmt.to_string()
255 }
256}
257
258pub async fn handle(
259 session: Arc<SessionImpl>,
260 stmt: Statement,
261 sql: Arc<str>,
262 formats: Vec<Format>,
263) -> Result<RwPgResponse> {
264 session.clear_cancel_query_flag();
265 let _guard = session.txn_begin_implicit();
266 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
267
268 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
269
270 match stmt {
271 Statement::Explain {
272 statement,
273 analyze,
274 options,
275 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
276 Statement::ExplainAnalyzeStreamJob {
277 target,
278 duration_secs,
279 } => {
280 explain_analyze_stream_job::handle_explain_analyze_stream_job(
281 handler_args,
282 target,
283 duration_secs,
284 )
285 .await
286 }
287 Statement::CreateSource { stmt } => {
288 create_source::handle_create_source(handler_args, stmt).await
289 }
290 Statement::CreateSink { stmt } => {
291 create_sink::handle_create_sink(handler_args, stmt, false).await
292 }
293 Statement::CreateSubscription { stmt } => {
294 create_subscription::handle_create_subscription(handler_args, stmt).await
295 }
296 Statement::CreateConnection { stmt } => {
297 create_connection::handle_create_connection(handler_args, stmt).await
298 }
299 Statement::CreateSecret { stmt } => {
300 create_secret::handle_create_secret(handler_args, stmt).await
301 }
302 Statement::CreateFunction {
303 or_replace,
304 temporary,
305 if_not_exists,
306 name,
307 args,
308 returns,
309 params,
310 with_options,
311 } => {
312 if params.language.is_none()
315 || !params
316 .language
317 .as_ref()
318 .unwrap()
319 .real_value()
320 .eq_ignore_ascii_case("sql")
321 {
322 create_function::handle_create_function(
323 handler_args,
324 or_replace,
325 temporary,
326 if_not_exists,
327 name,
328 args,
329 returns,
330 params,
331 with_options,
332 )
333 .await
334 } else {
335 create_sql_function::handle_create_sql_function(
336 handler_args,
337 or_replace,
338 temporary,
339 if_not_exists,
340 name,
341 args,
342 returns,
343 params,
344 )
345 .await
346 }
347 }
348 Statement::CreateAggregate {
349 or_replace,
350 if_not_exists,
351 name,
352 args,
353 returns,
354 params,
355 ..
356 } => {
357 create_aggregate::handle_create_aggregate(
358 handler_args,
359 or_replace,
360 if_not_exists,
361 name,
362 args,
363 returns,
364 params,
365 )
366 .await
367 }
368 Statement::CreateTable {
369 name,
370 columns,
371 wildcard_idx,
372 constraints,
373 query,
374 with_options: _, or_replace,
377 temporary,
378 if_not_exists,
379 format_encode,
380 source_watermarks,
381 append_only,
382 on_conflict,
383 with_version_column,
384 cdc_table_info,
385 include_column_options,
386 webhook_info,
387 engine,
388 } => {
389 if or_replace {
390 bail_not_implemented!("CREATE OR REPLACE TABLE");
391 }
392 if temporary {
393 bail_not_implemented!("CREATE TEMPORARY TABLE");
394 }
395 if let Some(query) = query {
396 return create_table_as::handle_create_as(
397 handler_args,
398 name,
399 if_not_exists,
400 query,
401 columns,
402 append_only,
403 on_conflict,
404 with_version_column.map(|x| x.real_value()),
405 engine,
406 )
407 .await;
408 }
409 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
410 create_table::handle_create_table(
411 handler_args,
412 name,
413 columns,
414 wildcard_idx,
415 constraints,
416 if_not_exists,
417 format_encode,
418 source_watermarks,
419 append_only,
420 on_conflict,
421 with_version_column.map(|x| x.real_value()),
422 cdc_table_info,
423 include_column_options,
424 webhook_info,
425 engine,
426 )
427 .await
428 }
429 Statement::CreateDatabase {
430 db_name,
431 if_not_exists,
432 owner,
433 resource_group,
434 barrier_interval_ms,
435 checkpoint_frequency,
436 } => {
437 create_database::handle_create_database(
438 handler_args,
439 db_name,
440 if_not_exists,
441 owner,
442 resource_group,
443 barrier_interval_ms,
444 checkpoint_frequency,
445 )
446 .await
447 }
448 Statement::CreateSchema {
449 schema_name,
450 if_not_exists,
451 owner,
452 } => {
453 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
454 .await
455 }
456 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
457 Statement::DeclareCursor { stmt } => {
458 declare_cursor::handle_declare_cursor(handler_args, stmt).await
459 }
460 Statement::FetchCursor { stmt } => {
461 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
462 }
463 Statement::CloseCursor { stmt } => {
464 close_cursor::handle_close_cursor(handler_args, stmt).await
465 }
466 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
467 Statement::Grant { .. } => {
468 handle_privilege::handle_grant_privilege(handler_args, stmt).await
469 }
470 Statement::Revoke { .. } => {
471 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
472 }
473 Statement::Describe { name, kind } => match kind {
474 DescribeKind::Fragments => {
475 describe::handle_describe_fragments(handler_args, name).await
476 }
477 DescribeKind::Plain => describe::handle_describe(handler_args, name),
478 },
479 Statement::DescribeFragment { fragment_id } => {
480 describe::handle_describe_fragment(handler_args, fragment_id).await
481 }
482 Statement::Discard(..) => discard::handle_discard(handler_args),
483 Statement::ShowObjects {
484 object: show_object,
485 filter,
486 } => show::handle_show_object(handler_args, show_object, filter).await,
487 Statement::ShowCreateObject { create_type, name } => {
488 show::handle_show_create_object(handler_args, create_type, name)
489 }
490 Statement::ShowTransactionIsolationLevel => {
491 transaction::handle_show_isolation_level(handler_args)
492 }
493 Statement::Drop(DropStatement {
494 object_type,
495 object_name,
496 if_exists,
497 drop_mode,
498 }) => {
499 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
500 match object_type {
501 ObjectType::MaterializedView
502 | ObjectType::View
503 | ObjectType::Sink
504 | ObjectType::Source
505 | ObjectType::Subscription
506 | ObjectType::Index
507 | ObjectType::Table
508 | ObjectType::Schema => true,
509 ObjectType::Database
510 | ObjectType::User
511 | ObjectType::Connection
512 | ObjectType::Secret => {
513 bail_not_implemented!("DROP CASCADE");
514 }
515 }
516 } else {
517 false
518 };
519 match object_type {
520 ObjectType::Table => {
521 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
522 .await
523 }
524 ObjectType::MaterializedView => {
525 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
526 }
527 ObjectType::Index => {
528 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
529 .await
530 }
531 ObjectType::Source => {
532 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
533 .await
534 }
535 ObjectType::Sink => {
536 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
537 }
538 ObjectType::Subscription => {
539 drop_subscription::handle_drop_subscription(
540 handler_args,
541 object_name,
542 if_exists,
543 cascade,
544 )
545 .await
546 }
547 ObjectType::Database => {
548 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
549 }
550 ObjectType::Schema => {
551 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
552 .await
553 }
554 ObjectType::User => {
555 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
556 }
557 ObjectType::View => {
558 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
559 }
560 ObjectType::Connection => {
561 drop_connection::handle_drop_connection(handler_args, object_name, if_exists)
562 .await
563 }
564 ObjectType::Secret => {
565 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
566 }
567 }
568 }
569 Statement::DropFunction {
571 if_exists,
572 func_desc,
573 option,
574 } => {
575 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
576 .await
577 }
578 Statement::DropAggregate {
579 if_exists,
580 func_desc,
581 option,
582 } => {
583 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
584 .await
585 }
586 Statement::Query(_)
587 | Statement::Insert { .. }
588 | Statement::Delete { .. }
589 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
590 Statement::CreateView {
591 materialized,
592 if_not_exists,
593 name,
594 columns,
595 query,
596 with_options: _, or_replace, emit_mode,
599 } => {
600 if or_replace {
601 bail_not_implemented!("CREATE OR REPLACE VIEW");
602 }
603 if materialized {
604 create_mv::handle_create_mv(
605 handler_args,
606 if_not_exists,
607 name,
608 *query,
609 columns,
610 emit_mode,
611 )
612 .await
613 } else {
614 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
615 .await
616 }
617 }
618 Statement::Flush => flush::handle_flush(handler_args).await,
619 Statement::Wait => wait::handle_wait(handler_args).await,
620 Statement::Recover => recover::handle_recover(handler_args).await,
621 Statement::SetVariable {
622 local: _,
623 variable,
624 value,
625 } => {
626 if variable.real_value().eq_ignore_ascii_case("database") {
628 let x = variable::set_var_to_param_str(&value);
629 let res = use_db::handle_use_db(
630 handler_args,
631 ObjectName::from(vec![Ident::from_real_value(
632 x.as_deref().unwrap_or("default"),
633 )]),
634 )?;
635 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
636 for notice in res.notices() {
637 builder = builder.notice(notice);
638 }
639 return Ok(builder.into());
640 }
641 variable::handle_set(handler_args, variable, value)
642 }
643 Statement::SetTimeZone { local: _, value } => {
644 variable::handle_set_time_zone(handler_args, value)
645 }
646 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
647 Statement::CreateIndex {
648 name,
649 table_name,
650 columns,
651 include,
652 distributed_by,
653 unique,
654 if_not_exists,
655 } => {
656 if unique {
657 bail_not_implemented!("create unique index");
658 }
659
660 create_index::handle_create_index(
661 handler_args,
662 if_not_exists,
663 name,
664 table_name,
665 columns.to_vec(),
666 include,
667 distributed_by,
668 )
669 .await
670 }
671 Statement::AlterDatabase { name, operation } => match operation {
672 AlterDatabaseOperation::RenameDatabase { database_name } => {
673 alter_rename::handle_rename_database(handler_args, name, database_name).await
674 }
675 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
676 alter_owner::handle_alter_owner(
677 handler_args,
678 name,
679 new_owner_name,
680 StatementType::ALTER_DATABASE,
681 )
682 .await
683 }
684 AlterDatabaseOperation::SetParam(config_param) => {
685 let ConfigParam { param, value } = config_param;
686
687 let database_param = match param.real_value().to_uppercase().as_str() {
688 "BARRIER_INTERVAL_MS" => {
689 let barrier_interval_ms = match value {
690 SetVariableValue::Default => None,
691 SetVariableValue::Single(SetVariableValueSingle::Literal(
692 Value::Number(num),
693 )) => {
694 let num = num.parse::<u32>().map_err(|e| {
695 ErrorCode::InvalidInputSyntax(format!(
696 "barrier_interval_ms must be a u32 integer: {}",
697 e.as_report()
698 ))
699 })?;
700 Some(num)
701 }
702 _ => {
703 return Err(ErrorCode::InvalidInputSyntax(
704 "barrier_interval_ms must be a u32 integer or DEFAULT"
705 .to_owned(),
706 )
707 .into());
708 }
709 };
710 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
711 }
712 "CHECKPOINT_FREQUENCY" => {
713 let checkpoint_frequency = match value {
714 SetVariableValue::Default => None,
715 SetVariableValue::Single(SetVariableValueSingle::Literal(
716 Value::Number(num),
717 )) => {
718 let num = num.parse::<u64>().map_err(|e| {
719 ErrorCode::InvalidInputSyntax(format!(
720 "checkpoint_frequency must be a u64 integer: {}",
721 e.as_report()
722 ))
723 })?;
724 Some(num)
725 }
726 _ => {
727 return Err(ErrorCode::InvalidInputSyntax(
728 "checkpoint_frequency must be a u64 integer or DEFAULT"
729 .to_owned(),
730 )
731 .into());
732 }
733 };
734 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
735 }
736 _ => {
737 return Err(ErrorCode::InvalidInputSyntax(format!(
738 "Unsupported database config parameter: {}",
739 param.real_value()
740 ))
741 .into());
742 }
743 };
744
745 alter_database_param::handle_alter_database_param(
746 handler_args,
747 name,
748 database_param,
749 )
750 .await
751 }
752 },
753 Statement::AlterSchema { name, operation } => match operation {
754 AlterSchemaOperation::RenameSchema { schema_name } => {
755 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
756 }
757 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
758 alter_owner::handle_alter_owner(
759 handler_args,
760 name,
761 new_owner_name,
762 StatementType::ALTER_SCHEMA,
763 )
764 .await
765 }
766 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
767 alter_swap_rename::handle_swap_rename(
768 handler_args,
769 name,
770 target_schema,
771 StatementType::ALTER_SCHEMA,
772 )
773 .await
774 }
775 },
776 Statement::AlterTable { name, operation } => match operation {
777 AlterTableOperation::AddColumn { .. }
778 | AlterTableOperation::DropColumn { .. }
779 | AlterTableOperation::AlterColumn { .. } => {
780 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
781 }
782 AlterTableOperation::RenameTable { table_name } => {
783 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
784 .await
785 }
786 AlterTableOperation::ChangeOwner { new_owner_name } => {
787 alter_owner::handle_alter_owner(
788 handler_args,
789 name,
790 new_owner_name,
791 StatementType::ALTER_TABLE,
792 )
793 .await
794 }
795 AlterTableOperation::SetParallelism {
796 parallelism,
797 deferred,
798 } => {
799 alter_parallelism::handle_alter_parallelism(
800 handler_args,
801 name,
802 parallelism,
803 StatementType::ALTER_TABLE,
804 deferred,
805 )
806 .await
807 }
808 AlterTableOperation::SetSchema { new_schema_name } => {
809 alter_set_schema::handle_alter_set_schema(
810 handler_args,
811 name,
812 new_schema_name,
813 StatementType::ALTER_TABLE,
814 None,
815 )
816 .await
817 }
818 AlterTableOperation::RefreshSchema => {
819 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
820 }
821 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
822 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
823 handler_args,
824 PbThrottleTarget::TableWithSource,
825 name,
826 rate_limit,
827 )
828 .await
829 }
830 AlterTableOperation::DropConnector => {
831 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
832 .await
833 }
834 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
835 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
836 handler_args,
837 PbThrottleTarget::TableDml,
838 name,
839 rate_limit,
840 )
841 .await
842 }
843 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
844 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
845 handler_args,
846 PbThrottleTarget::CdcTable,
847 name,
848 rate_limit,
849 )
850 .await
851 }
852 AlterTableOperation::SwapRenameTable { target_table } => {
853 alter_swap_rename::handle_swap_rename(
854 handler_args,
855 name,
856 target_table,
857 StatementType::ALTER_TABLE,
858 )
859 .await
860 }
861 AlterTableOperation::AlterConnectorProps { alter_props } => {
862 crate::handler::alter_source_props::handle_alter_table_connector_props(
864 handler_args,
865 name,
866 alter_props,
867 )
868 .await
869 }
870 AlterTableOperation::AddConstraint { .. }
871 | AlterTableOperation::DropConstraint { .. }
872 | AlterTableOperation::RenameColumn { .. }
873 | AlterTableOperation::ChangeColumn { .. }
874 | AlterTableOperation::RenameConstraint { .. } => {
875 bail_not_implemented!(
876 "Unhandled statement: {}",
877 Statement::AlterTable { name, operation }
878 )
879 }
880 },
881 Statement::AlterIndex { name, operation } => match operation {
882 AlterIndexOperation::RenameIndex { index_name } => {
883 alter_rename::handle_rename_index(handler_args, name, index_name).await
884 }
885 AlterIndexOperation::SetParallelism {
886 parallelism,
887 deferred,
888 } => {
889 alter_parallelism::handle_alter_parallelism(
890 handler_args,
891 name,
892 parallelism,
893 StatementType::ALTER_INDEX,
894 deferred,
895 )
896 .await
897 }
898 },
899 Statement::AlterView {
900 materialized,
901 name,
902 operation,
903 } => {
904 let statement_type = if materialized {
905 StatementType::ALTER_MATERIALIZED_VIEW
906 } else {
907 StatementType::ALTER_VIEW
908 };
909 match operation {
910 AlterViewOperation::RenameView { view_name } => {
911 if materialized {
912 alter_rename::handle_rename_table(
913 handler_args,
914 TableType::MaterializedView,
915 name,
916 view_name,
917 )
918 .await
919 } else {
920 alter_rename::handle_rename_view(handler_args, name, view_name).await
921 }
922 }
923 AlterViewOperation::SetParallelism {
924 parallelism,
925 deferred,
926 } => {
927 if !materialized {
928 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
929 }
930 alter_parallelism::handle_alter_parallelism(
931 handler_args,
932 name,
933 parallelism,
934 statement_type,
935 deferred,
936 )
937 .await
938 }
939 AlterViewOperation::SetResourceGroup {
940 resource_group,
941 deferred,
942 } => {
943 if !materialized {
944 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
945 }
946 alter_resource_group::handle_alter_resource_group(
947 handler_args,
948 name,
949 resource_group,
950 statement_type,
951 deferred,
952 )
953 .await
954 }
955 AlterViewOperation::ChangeOwner { new_owner_name } => {
956 alter_owner::handle_alter_owner(
957 handler_args,
958 name,
959 new_owner_name,
960 statement_type,
961 )
962 .await
963 }
964 AlterViewOperation::SetSchema { new_schema_name } => {
965 alter_set_schema::handle_alter_set_schema(
966 handler_args,
967 name,
968 new_schema_name,
969 statement_type,
970 None,
971 )
972 .await
973 }
974 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
975 if !materialized {
976 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
977 }
978 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
979 handler_args,
980 PbThrottleTarget::Mv,
981 name,
982 rate_limit,
983 )
984 .await
985 }
986 AlterViewOperation::SwapRenameView { target_view } => {
987 alter_swap_rename::handle_swap_rename(
988 handler_args,
989 name,
990 target_view,
991 statement_type,
992 )
993 .await
994 }
995 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
996 if !materialized {
997 bail!(
998 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
999 );
1000 }
1001 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1002 }
1003 AlterViewOperation::AsQuery { query } => {
1004 if !materialized {
1005 bail_not_implemented!("ALTER VIEW AS QUERY");
1006 }
1007 if !cfg!(debug_assertions) {
1009 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1010 }
1011 alter_mv::handle_alter_mv(handler_args, name, query).await
1012 }
1013 }
1014 }
1015
1016 Statement::AlterSink { name, operation } => match operation {
1017 AlterSinkOperation::AlterConnectorProps {
1018 alter_props: changed_props,
1019 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1020 AlterSinkOperation::RenameSink { sink_name } => {
1021 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1022 }
1023 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1024 alter_owner::handle_alter_owner(
1025 handler_args,
1026 name,
1027 new_owner_name,
1028 StatementType::ALTER_SINK,
1029 )
1030 .await
1031 }
1032 AlterSinkOperation::SetSchema { new_schema_name } => {
1033 alter_set_schema::handle_alter_set_schema(
1034 handler_args,
1035 name,
1036 new_schema_name,
1037 StatementType::ALTER_SINK,
1038 None,
1039 )
1040 .await
1041 }
1042 AlterSinkOperation::SetParallelism {
1043 parallelism,
1044 deferred,
1045 } => {
1046 alter_parallelism::handle_alter_parallelism(
1047 handler_args,
1048 name,
1049 parallelism,
1050 StatementType::ALTER_SINK,
1051 deferred,
1052 )
1053 .await
1054 }
1055 AlterSinkOperation::SwapRenameSink { target_sink } => {
1056 alter_swap_rename::handle_swap_rename(
1057 handler_args,
1058 name,
1059 target_sink,
1060 StatementType::ALTER_SINK,
1061 )
1062 .await
1063 }
1064 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1065 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1066 handler_args,
1067 PbThrottleTarget::Sink,
1068 name,
1069 rate_limit,
1070 )
1071 .await
1072 }
1073 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1074 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1075 handler_args,
1076 name,
1077 enable,
1078 )
1079 .await
1080 }
1081 },
1082 Statement::AlterSubscription { name, operation } => match operation {
1083 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1084 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1085 .await
1086 }
1087 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1088 alter_owner::handle_alter_owner(
1089 handler_args,
1090 name,
1091 new_owner_name,
1092 StatementType::ALTER_SUBSCRIPTION,
1093 )
1094 .await
1095 }
1096 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1097 alter_set_schema::handle_alter_set_schema(
1098 handler_args,
1099 name,
1100 new_schema_name,
1101 StatementType::ALTER_SUBSCRIPTION,
1102 None,
1103 )
1104 .await
1105 }
1106 AlterSubscriptionOperation::SwapRenameSubscription {
1107 target_subscription,
1108 } => {
1109 alter_swap_rename::handle_swap_rename(
1110 handler_args,
1111 name,
1112 target_subscription,
1113 StatementType::ALTER_SUBSCRIPTION,
1114 )
1115 .await
1116 }
1117 },
1118 Statement::AlterSource { name, operation } => match operation {
1119 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1120 alter_source_props::handle_alter_source_connector_props(
1121 handler_args,
1122 name,
1123 alter_props,
1124 )
1125 .await
1126 }
1127 AlterSourceOperation::RenameSource { source_name } => {
1128 alter_rename::handle_rename_source(handler_args, name, source_name).await
1129 }
1130 AlterSourceOperation::AddColumn { .. } => {
1131 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1132 }
1133 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1134 alter_owner::handle_alter_owner(
1135 handler_args,
1136 name,
1137 new_owner_name,
1138 StatementType::ALTER_SOURCE,
1139 )
1140 .await
1141 }
1142 AlterSourceOperation::SetSchema { new_schema_name } => {
1143 alter_set_schema::handle_alter_set_schema(
1144 handler_args,
1145 name,
1146 new_schema_name,
1147 StatementType::ALTER_SOURCE,
1148 None,
1149 )
1150 .await
1151 }
1152 AlterSourceOperation::FormatEncode { format_encode } => {
1153 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1154 .await
1155 }
1156 AlterSourceOperation::RefreshSchema => {
1157 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1158 }
1159 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1160 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1161 handler_args,
1162 PbThrottleTarget::Source,
1163 name,
1164 rate_limit,
1165 )
1166 .await
1167 }
1168 AlterSourceOperation::SwapRenameSource { target_source } => {
1169 alter_swap_rename::handle_swap_rename(
1170 handler_args,
1171 name,
1172 target_source,
1173 StatementType::ALTER_SOURCE,
1174 )
1175 .await
1176 }
1177 AlterSourceOperation::SetParallelism {
1178 parallelism,
1179 deferred,
1180 } => {
1181 alter_parallelism::handle_alter_parallelism(
1182 handler_args,
1183 name,
1184 parallelism,
1185 StatementType::ALTER_SOURCE,
1186 deferred,
1187 )
1188 .await
1189 }
1190 },
1191 Statement::AlterFunction {
1192 name,
1193 args,
1194 operation,
1195 } => match operation {
1196 AlterFunctionOperation::SetSchema { new_schema_name } => {
1197 alter_set_schema::handle_alter_set_schema(
1198 handler_args,
1199 name,
1200 new_schema_name,
1201 StatementType::ALTER_FUNCTION,
1202 args,
1203 )
1204 .await
1205 }
1206 },
1207 Statement::AlterConnection { name, operation } => match operation {
1208 AlterConnectionOperation::SetSchema { new_schema_name } => {
1209 alter_set_schema::handle_alter_set_schema(
1210 handler_args,
1211 name,
1212 new_schema_name,
1213 StatementType::ALTER_CONNECTION,
1214 None,
1215 )
1216 .await
1217 }
1218 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1219 alter_owner::handle_alter_owner(
1220 handler_args,
1221 name,
1222 new_owner_name,
1223 StatementType::ALTER_CONNECTION,
1224 )
1225 .await
1226 }
1227 },
1228 Statement::AlterSystem { param, value } => {
1229 alter_system::handle_alter_system(handler_args, param, value).await
1230 }
1231 Statement::AlterSecret {
1232 name,
1233 with_options,
1234 operation,
1235 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1236 Statement::AlterFragment {
1237 fragment_id,
1238 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1239 } => {
1240 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1241 &handler_args.session,
1242 PbThrottleTarget::Fragment,
1243 fragment_id,
1244 rate_limit,
1245 StatementType::SET_VARIABLE,
1246 )
1247 .await
1248 }
1249 Statement::AlterDefaultPrivileges { .. } => {
1250 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1251 }
1252 Statement::StartTransaction { modes } => {
1253 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1254 }
1255 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1256 Statement::Commit { chain } => {
1257 transaction::handle_commit(handler_args, COMMIT, chain).await
1258 }
1259 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1260 Statement::Rollback { chain } => {
1261 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1262 }
1263 Statement::SetTransaction {
1264 modes,
1265 snapshot,
1266 session,
1267 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1268 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1269 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1270 Statement::Comment {
1271 object_type,
1272 object_name,
1273 comment,
1274 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1275 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1276 Statement::Prepare {
1277 name,
1278 data_types,
1279 statement,
1280 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1281 Statement::Deallocate { name, prepare } => {
1282 prepared_statement::handle_deallocate(name, prepare).await
1283 }
1284 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1285 }
1286}
1287
1288fn check_ban_ddl_for_iceberg_engine_table(
1289 session: Arc<SessionImpl>,
1290 stmt: &Statement,
1291) -> Result<()> {
1292 match stmt {
1293 Statement::AlterTable {
1294 name,
1295 operation:
1296 operation @ (AlterTableOperation::AddColumn { .. }
1297 | AlterTableOperation::DropColumn { .. }),
1298 } => {
1299 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1300 if table.is_iceberg_engine_table() {
1301 bail!(
1302 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1303 operation,
1304 schema_name,
1305 name
1306 );
1307 }
1308 }
1309
1310 Statement::AlterTable {
1311 name,
1312 operation: AlterTableOperation::RenameTable { .. },
1313 } => {
1314 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1315 if table.is_iceberg_engine_table() {
1316 bail!(
1317 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1318 schema_name,
1319 name
1320 );
1321 }
1322 }
1323
1324 Statement::AlterTable {
1325 name,
1326 operation: AlterTableOperation::ChangeOwner { .. },
1327 } => {
1328 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1329 if table.is_iceberg_engine_table() {
1330 bail!(
1331 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1332 schema_name,
1333 name
1334 );
1335 }
1336 }
1337
1338 Statement::AlterTable {
1339 name,
1340 operation: AlterTableOperation::SetParallelism { .. },
1341 } => {
1342 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1343 if table.is_iceberg_engine_table() {
1344 bail!(
1345 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1346 schema_name,
1347 name
1348 );
1349 }
1350 }
1351
1352 Statement::AlterTable {
1353 name,
1354 operation: AlterTableOperation::SetSchema { .. },
1355 } => {
1356 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1357 if table.is_iceberg_engine_table() {
1358 bail!(
1359 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1360 schema_name,
1361 name
1362 );
1363 }
1364 }
1365
1366 Statement::AlterTable {
1367 name,
1368 operation: AlterTableOperation::RefreshSchema,
1369 } => {
1370 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1371 if table.is_iceberg_engine_table() {
1372 bail!(
1373 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1374 schema_name,
1375 name
1376 );
1377 }
1378 }
1379
1380 Statement::AlterTable {
1381 name,
1382 operation: AlterTableOperation::SetSourceRateLimit { .. },
1383 } => {
1384 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1385 if table.is_iceberg_engine_table() {
1386 bail!(
1387 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1388 schema_name,
1389 name
1390 );
1391 }
1392 }
1393
1394 _ => {}
1395 }
1396
1397 Ok(())
1398}