1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63pub mod alter_table_props;
64mod alter_table_with_sr;
65pub mod alter_user;
66pub mod cancel_job;
67pub mod close_cursor;
68mod comment;
69pub mod create_aggregate;
70pub mod create_connection;
71mod create_database;
72pub mod create_function;
73pub mod create_index;
74pub mod create_mv;
75pub mod create_schema;
76pub mod create_secret;
77pub mod create_sink;
78pub mod create_source;
79pub mod create_sql_function;
80pub mod create_subscription;
81pub mod create_table;
82pub mod create_table_as;
83pub mod create_user;
84pub mod create_view;
85pub mod declare_cursor;
86pub mod describe;
87pub mod discard;
88mod drop_connection;
89mod drop_database;
90pub mod drop_function;
91mod drop_index;
92pub mod drop_mv;
93mod drop_schema;
94pub mod drop_secret;
95pub mod drop_sink;
96pub mod drop_source;
97pub mod drop_subscription;
98pub mod drop_table;
99pub mod drop_user;
100mod drop_view;
101pub mod explain;
102pub mod explain_analyze_stream_job;
103pub mod extended_handle;
104pub mod fetch_cursor;
105mod flush;
106pub mod handle_privilege;
107pub mod kill_process;
108mod prepared_statement;
109pub mod privilege;
110pub mod query;
111mod recover;
112mod refresh;
113pub mod show;
114mod transaction;
115mod use_db;
116pub mod util;
117pub mod vacuum;
118pub mod variable;
119mod wait;
120
121pub use alter_table_column::{
122 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
123};
124
125pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
127
128pub type RwPgResponse = PgResponse<PgResponseStream>;
130
131#[easy_ext::ext(RwPgResponseBuilderExt)]
132impl RwPgResponseBuilder {
133 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
135 let fields = T::fields();
136 self.values(
137 rows.into_iter()
138 .map(|row| {
139 Row::new(
140 row.into_owned_row()
141 .into_iter()
142 .zip_eq_fast(&fields)
143 .map(|(datum, (_, ty))| {
144 datum.map(|scalar| {
145 scalar.as_scalar_ref_impl().text_format(ty).into()
146 })
147 })
148 .collect(),
149 )
150 })
151 .collect_vec()
152 .into(),
153 fields_to_descriptors(fields),
154 )
155 }
156}
157
158pub fn fields_to_descriptors(
159 fields: Vec<(&str, risingwave_common::types::DataType)>,
160) -> Vec<PgFieldDescriptor> {
161 fields
162 .iter()
163 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
164 .collect()
165}
166
167pub enum PgResponseStream {
168 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
169 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
170 Rows(BoxStream<'static, RowSetResult>),
171}
172
173impl Stream for PgResponseStream {
174 type Item = std::result::Result<Vec<Row>, BoxedError>;
175
176 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177 match &mut *self {
178 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
179 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
180 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
181 }
182 }
183}
184
185impl From<Vec<Row>> for PgResponseStream {
186 fn from(rows: Vec<Row>) -> Self {
187 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
188 }
189}
190
191#[derive(Clone)]
192pub struct HandlerArgs {
193 pub session: Arc<SessionImpl>,
194 pub sql: Arc<str>,
195 pub normalized_sql: String,
196 pub with_options: WithOptions,
197}
198
199impl HandlerArgs {
200 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
201 Ok(Self {
202 session,
203 sql,
204 with_options: WithOptions::try_from(stmt)?,
205 normalized_sql: Self::normalize_sql(stmt),
206 })
207 }
208
209 fn normalize_sql(stmt: &Statement) -> String {
215 let mut stmt = stmt.clone();
216 match &mut stmt {
217 Statement::CreateView {
218 or_replace,
219 if_not_exists,
220 ..
221 } => {
222 *or_replace = false;
223 *if_not_exists = false;
224 }
225 Statement::CreateTable {
226 or_replace,
227 if_not_exists,
228 ..
229 } => {
230 *or_replace = false;
231 *if_not_exists = false;
232 }
233 Statement::CreateIndex { if_not_exists, .. } => {
234 *if_not_exists = false;
235 }
236 Statement::CreateSource {
237 stmt: CreateSourceStatement { if_not_exists, .. },
238 ..
239 } => {
240 *if_not_exists = false;
241 }
242 Statement::CreateSink {
243 stmt: CreateSinkStatement { if_not_exists, .. },
244 } => {
245 *if_not_exists = false;
246 }
247 Statement::CreateSubscription {
248 stmt: CreateSubscriptionStatement { if_not_exists, .. },
249 } => {
250 *if_not_exists = false;
251 }
252 Statement::CreateConnection {
253 stmt: CreateConnectionStatement { if_not_exists, .. },
254 } => {
255 *if_not_exists = false;
256 }
257 _ => {}
258 }
259 stmt.to_string()
260 }
261}
262
263pub async fn handle(
264 session: Arc<SessionImpl>,
265 stmt: Statement,
266 sql: Arc<str>,
267 formats: Vec<Format>,
268) -> Result<RwPgResponse> {
269 session.clear_cancel_query_flag();
270 let _guard = session.txn_begin_implicit();
271 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
272
273 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
274
275 match stmt {
276 Statement::Explain {
277 statement,
278 analyze,
279 options,
280 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
281 Statement::ExplainAnalyzeStreamJob {
282 target,
283 duration_secs,
284 } => {
285 explain_analyze_stream_job::handle_explain_analyze_stream_job(
286 handler_args,
287 target,
288 duration_secs,
289 )
290 .await
291 }
292 Statement::CreateSource { stmt } => {
293 create_source::handle_create_source(handler_args, stmt).await
294 }
295 Statement::CreateSink { stmt } => {
296 create_sink::handle_create_sink(handler_args, stmt, false).await
297 }
298 Statement::CreateSubscription { stmt } => {
299 create_subscription::handle_create_subscription(handler_args, stmt).await
300 }
301 Statement::CreateConnection { stmt } => {
302 create_connection::handle_create_connection(handler_args, stmt).await
303 }
304 Statement::CreateSecret { stmt } => {
305 create_secret::handle_create_secret(handler_args, stmt).await
306 }
307 Statement::CreateFunction {
308 or_replace,
309 temporary,
310 if_not_exists,
311 name,
312 args,
313 returns,
314 params,
315 with_options,
316 } => {
317 if params.language.is_none()
320 || !params
321 .language
322 .as_ref()
323 .unwrap()
324 .real_value()
325 .eq_ignore_ascii_case("sql")
326 {
327 create_function::handle_create_function(
328 handler_args,
329 or_replace,
330 temporary,
331 if_not_exists,
332 name,
333 args,
334 returns,
335 params,
336 with_options,
337 )
338 .await
339 } else {
340 create_sql_function::handle_create_sql_function(
341 handler_args,
342 or_replace,
343 temporary,
344 if_not_exists,
345 name,
346 args,
347 returns,
348 params,
349 )
350 .await
351 }
352 }
353 Statement::CreateAggregate {
354 or_replace,
355 if_not_exists,
356 name,
357 args,
358 returns,
359 params,
360 ..
361 } => {
362 create_aggregate::handle_create_aggregate(
363 handler_args,
364 or_replace,
365 if_not_exists,
366 name,
367 args,
368 returns,
369 params,
370 )
371 .await
372 }
373 Statement::CreateTable {
374 name,
375 columns,
376 wildcard_idx,
377 constraints,
378 query,
379 with_options: _, or_replace,
382 temporary,
383 if_not_exists,
384 format_encode,
385 source_watermarks,
386 append_only,
387 on_conflict,
388 with_version_columns,
389 cdc_table_info,
390 include_column_options,
391 webhook_info,
392 engine,
393 } => {
394 if or_replace {
395 bail_not_implemented!("CREATE OR REPLACE TABLE");
396 }
397 if temporary {
398 bail_not_implemented!("CREATE TEMPORARY TABLE");
399 }
400 if let Some(query) = query {
401 return create_table_as::handle_create_as(
402 handler_args,
403 name,
404 if_not_exists,
405 query,
406 columns,
407 append_only,
408 on_conflict,
409 with_version_columns
410 .iter()
411 .map(|col| col.real_value())
412 .collect(),
413 engine,
414 )
415 .await;
416 }
417 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
418 create_table::handle_create_table(
419 handler_args,
420 name,
421 columns,
422 wildcard_idx,
423 constraints,
424 if_not_exists,
425 format_encode,
426 source_watermarks,
427 append_only,
428 on_conflict,
429 with_version_columns
430 .iter()
431 .map(|col| col.real_value())
432 .collect(),
433 cdc_table_info,
434 include_column_options,
435 webhook_info,
436 engine,
437 )
438 .await
439 }
440 Statement::CreateDatabase {
441 db_name,
442 if_not_exists,
443 owner,
444 resource_group,
445 barrier_interval_ms,
446 checkpoint_frequency,
447 } => {
448 create_database::handle_create_database(
449 handler_args,
450 db_name,
451 if_not_exists,
452 owner,
453 resource_group,
454 barrier_interval_ms,
455 checkpoint_frequency,
456 )
457 .await
458 }
459 Statement::CreateSchema {
460 schema_name,
461 if_not_exists,
462 owner,
463 } => {
464 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
465 .await
466 }
467 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
468 Statement::DeclareCursor { stmt } => {
469 declare_cursor::handle_declare_cursor(handler_args, stmt).await
470 }
471 Statement::FetchCursor { stmt } => {
472 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
473 }
474 Statement::CloseCursor { stmt } => {
475 close_cursor::handle_close_cursor(handler_args, stmt).await
476 }
477 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
478 Statement::Grant { .. } => {
479 handle_privilege::handle_grant_privilege(handler_args, stmt).await
480 }
481 Statement::Revoke { .. } => {
482 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
483 }
484 Statement::Describe { name, kind } => match kind {
485 DescribeKind::Fragments => {
486 describe::handle_describe_fragments(handler_args, name).await
487 }
488 DescribeKind::Plain => describe::handle_describe(handler_args, name),
489 },
490 Statement::DescribeFragment { fragment_id } => {
491 describe::handle_describe_fragment(handler_args, fragment_id).await
492 }
493 Statement::Discard(..) => discard::handle_discard(handler_args),
494 Statement::ShowObjects {
495 object: show_object,
496 filter,
497 } => show::handle_show_object(handler_args, show_object, filter).await,
498 Statement::ShowCreateObject { create_type, name } => {
499 show::handle_show_create_object(handler_args, create_type, name)
500 }
501 Statement::ShowTransactionIsolationLevel => {
502 transaction::handle_show_isolation_level(handler_args)
503 }
504 Statement::Drop(DropStatement {
505 object_type,
506 object_name,
507 if_exists,
508 drop_mode,
509 }) => {
510 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
511 match object_type {
512 ObjectType::MaterializedView
513 | ObjectType::View
514 | ObjectType::Sink
515 | ObjectType::Source
516 | ObjectType::Subscription
517 | ObjectType::Index
518 | ObjectType::Table
519 | ObjectType::Schema
520 | ObjectType::Connection => true,
521 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
522 bail_not_implemented!("DROP CASCADE");
523 }
524 }
525 } else {
526 false
527 };
528 match object_type {
529 ObjectType::Table => {
530 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
531 .await
532 }
533 ObjectType::MaterializedView => {
534 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
535 }
536 ObjectType::Index => {
537 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
538 .await
539 }
540 ObjectType::Source => {
541 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
542 .await
543 }
544 ObjectType::Sink => {
545 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
546 }
547 ObjectType::Subscription => {
548 drop_subscription::handle_drop_subscription(
549 handler_args,
550 object_name,
551 if_exists,
552 cascade,
553 )
554 .await
555 }
556 ObjectType::Database => {
557 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
558 }
559 ObjectType::Schema => {
560 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
561 .await
562 }
563 ObjectType::User => {
564 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
565 }
566 ObjectType::View => {
567 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
568 }
569 ObjectType::Connection => {
570 drop_connection::handle_drop_connection(
571 handler_args,
572 object_name,
573 if_exists,
574 cascade,
575 )
576 .await
577 }
578 ObjectType::Secret => {
579 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
580 }
581 }
582 }
583 Statement::DropFunction {
585 if_exists,
586 func_desc,
587 option,
588 } => {
589 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
590 .await
591 }
592 Statement::DropAggregate {
593 if_exists,
594 func_desc,
595 option,
596 } => {
597 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
598 .await
599 }
600 Statement::Query(_)
601 | Statement::Insert { .. }
602 | Statement::Delete { .. }
603 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
604 Statement::Copy {
605 entity: CopyEntity::Query(query),
606 target: CopyTarget::Stdout,
607 } => {
608 let response =
609 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
610 .await?;
611 Ok(response.into_copy_query_to_stdout())
612 }
613 Statement::CreateView {
614 materialized,
615 if_not_exists,
616 name,
617 columns,
618 query,
619 with_options: _, or_replace, emit_mode,
622 } => {
623 if or_replace {
624 bail_not_implemented!("CREATE OR REPLACE VIEW");
625 }
626 if materialized {
627 create_mv::handle_create_mv(
628 handler_args,
629 if_not_exists,
630 name,
631 *query,
632 columns,
633 emit_mode,
634 )
635 .await
636 } else {
637 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
638 .await
639 }
640 }
641 Statement::Flush => flush::handle_flush(handler_args).await,
642 Statement::Wait => wait::handle_wait(handler_args).await,
643 Statement::Recover => recover::handle_recover(handler_args).await,
644 Statement::SetVariable {
645 local: _,
646 variable,
647 value,
648 } => {
649 if variable.real_value().eq_ignore_ascii_case("database") {
651 let x = variable::set_var_to_param_str(&value);
652 let res = use_db::handle_use_db(
653 handler_args,
654 ObjectName::from(vec![Ident::from_real_value(
655 x.as_deref().unwrap_or("default"),
656 )]),
657 )?;
658 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
659 for notice in res.notices() {
660 builder = builder.notice(notice);
661 }
662 return Ok(builder.into());
663 }
664 variable::handle_set(handler_args, variable, value)
665 }
666 Statement::SetTimeZone { local: _, value } => {
667 variable::handle_set_time_zone(handler_args, value)
668 }
669 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
670 Statement::CreateIndex {
671 name,
672 table_name,
673 method,
674 columns,
675 include,
676 distributed_by,
677 unique,
678 if_not_exists,
679 with_properties: _,
680 } => {
681 if unique {
682 bail_not_implemented!("create unique index");
683 }
684
685 create_index::handle_create_index(
686 handler_args,
687 if_not_exists,
688 name,
689 table_name,
690 method,
691 columns.to_vec(),
692 include,
693 distributed_by,
694 )
695 .await
696 }
697 Statement::AlterDatabase { name, operation } => match operation {
698 AlterDatabaseOperation::RenameDatabase { database_name } => {
699 alter_rename::handle_rename_database(handler_args, name, database_name).await
700 }
701 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
702 alter_owner::handle_alter_owner(
703 handler_args,
704 name,
705 new_owner_name,
706 StatementType::ALTER_DATABASE,
707 )
708 .await
709 }
710 AlterDatabaseOperation::SetParam(config_param) => {
711 let ConfigParam { param, value } = config_param;
712
713 let database_param = match param.real_value().to_uppercase().as_str() {
714 "BARRIER_INTERVAL_MS" => {
715 let barrier_interval_ms = match value {
716 SetVariableValue::Default => None,
717 SetVariableValue::Single(SetVariableValueSingle::Literal(
718 Value::Number(num),
719 )) => {
720 let num = num.parse::<u32>().map_err(|e| {
721 ErrorCode::InvalidInputSyntax(format!(
722 "barrier_interval_ms must be a u32 integer: {}",
723 e.as_report()
724 ))
725 })?;
726 Some(num)
727 }
728 _ => {
729 return Err(ErrorCode::InvalidInputSyntax(
730 "barrier_interval_ms must be a u32 integer or DEFAULT"
731 .to_owned(),
732 )
733 .into());
734 }
735 };
736 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
737 }
738 "CHECKPOINT_FREQUENCY" => {
739 let checkpoint_frequency = match value {
740 SetVariableValue::Default => None,
741 SetVariableValue::Single(SetVariableValueSingle::Literal(
742 Value::Number(num),
743 )) => {
744 let num = num.parse::<u64>().map_err(|e| {
745 ErrorCode::InvalidInputSyntax(format!(
746 "checkpoint_frequency must be a u64 integer: {}",
747 e.as_report()
748 ))
749 })?;
750 Some(num)
751 }
752 _ => {
753 return Err(ErrorCode::InvalidInputSyntax(
754 "checkpoint_frequency must be a u64 integer or DEFAULT"
755 .to_owned(),
756 )
757 .into());
758 }
759 };
760 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
761 }
762 _ => {
763 return Err(ErrorCode::InvalidInputSyntax(format!(
764 "Unsupported database config parameter: {}",
765 param.real_value()
766 ))
767 .into());
768 }
769 };
770
771 alter_database_param::handle_alter_database_param(
772 handler_args,
773 name,
774 database_param,
775 )
776 .await
777 }
778 },
779 Statement::AlterSchema { name, operation } => match operation {
780 AlterSchemaOperation::RenameSchema { schema_name } => {
781 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
782 }
783 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
784 alter_owner::handle_alter_owner(
785 handler_args,
786 name,
787 new_owner_name,
788 StatementType::ALTER_SCHEMA,
789 )
790 .await
791 }
792 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
793 alter_swap_rename::handle_swap_rename(
794 handler_args,
795 name,
796 target_schema,
797 StatementType::ALTER_SCHEMA,
798 )
799 .await
800 }
801 },
802 Statement::AlterTable { name, operation } => match operation {
803 AlterTableOperation::AddColumn { .. }
804 | AlterTableOperation::DropColumn { .. }
805 | AlterTableOperation::AlterColumn { .. } => {
806 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
807 }
808 AlterTableOperation::RenameTable { table_name } => {
809 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
810 .await
811 }
812 AlterTableOperation::ChangeOwner { new_owner_name } => {
813 alter_owner::handle_alter_owner(
814 handler_args,
815 name,
816 new_owner_name,
817 StatementType::ALTER_TABLE,
818 )
819 .await
820 }
821 AlterTableOperation::SetParallelism {
822 parallelism,
823 deferred,
824 } => {
825 alter_parallelism::handle_alter_parallelism(
826 handler_args,
827 name,
828 parallelism,
829 StatementType::ALTER_TABLE,
830 deferred,
831 )
832 .await
833 }
834 AlterTableOperation::SetSchema { new_schema_name } => {
835 alter_set_schema::handle_alter_set_schema(
836 handler_args,
837 name,
838 new_schema_name,
839 StatementType::ALTER_TABLE,
840 None,
841 )
842 .await
843 }
844 AlterTableOperation::RefreshSchema => {
845 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
846 }
847 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
848 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
849 handler_args,
850 PbThrottleTarget::TableWithSource,
851 name,
852 rate_limit,
853 )
854 .await
855 }
856 AlterTableOperation::DropConnector => {
857 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
858 .await
859 }
860 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
861 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
862 handler_args,
863 PbThrottleTarget::TableDml,
864 name,
865 rate_limit,
866 )
867 .await
868 }
869 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
870 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
871 handler_args,
872 PbThrottleTarget::CdcTable,
873 name,
874 rate_limit,
875 )
876 .await
877 }
878 AlterTableOperation::SwapRenameTable { target_table } => {
879 alter_swap_rename::handle_swap_rename(
880 handler_args,
881 name,
882 target_table,
883 StatementType::ALTER_TABLE,
884 )
885 .await
886 }
887 AlterTableOperation::AlterConnectorProps { alter_props } => {
888 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
889 }
890 AlterTableOperation::AddConstraint { .. }
891 | AlterTableOperation::DropConstraint { .. }
892 | AlterTableOperation::RenameColumn { .. }
893 | AlterTableOperation::ChangeColumn { .. }
894 | AlterTableOperation::RenameConstraint { .. } => {
895 bail_not_implemented!(
896 "Unhandled statement: {}",
897 Statement::AlterTable { name, operation }
898 )
899 }
900 },
901 Statement::AlterIndex { name, operation } => match operation {
902 AlterIndexOperation::RenameIndex { index_name } => {
903 alter_rename::handle_rename_index(handler_args, name, index_name).await
904 }
905 AlterIndexOperation::SetParallelism {
906 parallelism,
907 deferred,
908 } => {
909 alter_parallelism::handle_alter_parallelism(
910 handler_args,
911 name,
912 parallelism,
913 StatementType::ALTER_INDEX,
914 deferred,
915 )
916 .await
917 }
918 },
919 Statement::AlterView {
920 materialized,
921 name,
922 operation,
923 } => {
924 let statement_type = if materialized {
925 StatementType::ALTER_MATERIALIZED_VIEW
926 } else {
927 StatementType::ALTER_VIEW
928 };
929 match operation {
930 AlterViewOperation::RenameView { view_name } => {
931 if materialized {
932 alter_rename::handle_rename_table(
933 handler_args,
934 TableType::MaterializedView,
935 name,
936 view_name,
937 )
938 .await
939 } else {
940 alter_rename::handle_rename_view(handler_args, name, view_name).await
941 }
942 }
943 AlterViewOperation::SetParallelism {
944 parallelism,
945 deferred,
946 } => {
947 if !materialized {
948 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
949 }
950 alter_parallelism::handle_alter_parallelism(
951 handler_args,
952 name,
953 parallelism,
954 statement_type,
955 deferred,
956 )
957 .await
958 }
959 AlterViewOperation::SetResourceGroup {
960 resource_group,
961 deferred,
962 } => {
963 if !materialized {
964 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
965 }
966 alter_resource_group::handle_alter_resource_group(
967 handler_args,
968 name,
969 resource_group,
970 statement_type,
971 deferred,
972 )
973 .await
974 }
975 AlterViewOperation::ChangeOwner { new_owner_name } => {
976 alter_owner::handle_alter_owner(
977 handler_args,
978 name,
979 new_owner_name,
980 statement_type,
981 )
982 .await
983 }
984 AlterViewOperation::SetSchema { new_schema_name } => {
985 alter_set_schema::handle_alter_set_schema(
986 handler_args,
987 name,
988 new_schema_name,
989 statement_type,
990 None,
991 )
992 .await
993 }
994 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
995 if !materialized {
996 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
997 }
998 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
999 handler_args,
1000 PbThrottleTarget::Mv,
1001 name,
1002 rate_limit,
1003 )
1004 .await
1005 }
1006 AlterViewOperation::SwapRenameView { target_view } => {
1007 alter_swap_rename::handle_swap_rename(
1008 handler_args,
1009 name,
1010 target_view,
1011 statement_type,
1012 )
1013 .await
1014 }
1015 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1016 if !materialized {
1017 bail!(
1018 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1019 );
1020 }
1021 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1022 }
1023 AlterViewOperation::AsQuery { query } => {
1024 if !materialized {
1025 bail_not_implemented!("ALTER VIEW AS QUERY");
1026 }
1027 if !cfg!(debug_assertions) {
1029 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1030 }
1031 alter_mv::handle_alter_mv(handler_args, name, query).await
1032 }
1033 }
1034 }
1035
1036 Statement::AlterSink { name, operation } => match operation {
1037 AlterSinkOperation::AlterConnectorProps {
1038 alter_props: changed_props,
1039 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1040 AlterSinkOperation::RenameSink { sink_name } => {
1041 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1042 }
1043 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1044 alter_owner::handle_alter_owner(
1045 handler_args,
1046 name,
1047 new_owner_name,
1048 StatementType::ALTER_SINK,
1049 )
1050 .await
1051 }
1052 AlterSinkOperation::SetSchema { new_schema_name } => {
1053 alter_set_schema::handle_alter_set_schema(
1054 handler_args,
1055 name,
1056 new_schema_name,
1057 StatementType::ALTER_SINK,
1058 None,
1059 )
1060 .await
1061 }
1062 AlterSinkOperation::SetParallelism {
1063 parallelism,
1064 deferred,
1065 } => {
1066 alter_parallelism::handle_alter_parallelism(
1067 handler_args,
1068 name,
1069 parallelism,
1070 StatementType::ALTER_SINK,
1071 deferred,
1072 )
1073 .await
1074 }
1075 AlterSinkOperation::SwapRenameSink { target_sink } => {
1076 alter_swap_rename::handle_swap_rename(
1077 handler_args,
1078 name,
1079 target_sink,
1080 StatementType::ALTER_SINK,
1081 )
1082 .await
1083 }
1084 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1085 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1086 handler_args,
1087 PbThrottleTarget::Sink,
1088 name,
1089 rate_limit,
1090 )
1091 .await
1092 }
1093 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1094 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1095 handler_args,
1096 name,
1097 enable,
1098 )
1099 .await
1100 }
1101 },
1102 Statement::AlterSubscription { name, operation } => match operation {
1103 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1104 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1105 .await
1106 }
1107 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1108 alter_owner::handle_alter_owner(
1109 handler_args,
1110 name,
1111 new_owner_name,
1112 StatementType::ALTER_SUBSCRIPTION,
1113 )
1114 .await
1115 }
1116 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1117 alter_set_schema::handle_alter_set_schema(
1118 handler_args,
1119 name,
1120 new_schema_name,
1121 StatementType::ALTER_SUBSCRIPTION,
1122 None,
1123 )
1124 .await
1125 }
1126 AlterSubscriptionOperation::SwapRenameSubscription {
1127 target_subscription,
1128 } => {
1129 alter_swap_rename::handle_swap_rename(
1130 handler_args,
1131 name,
1132 target_subscription,
1133 StatementType::ALTER_SUBSCRIPTION,
1134 )
1135 .await
1136 }
1137 },
1138 Statement::AlterSource { name, operation } => match operation {
1139 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1140 alter_source_props::handle_alter_source_connector_props(
1141 handler_args,
1142 name,
1143 alter_props,
1144 )
1145 .await
1146 }
1147 AlterSourceOperation::RenameSource { source_name } => {
1148 alter_rename::handle_rename_source(handler_args, name, source_name).await
1149 }
1150 AlterSourceOperation::AddColumn { .. } => {
1151 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1152 }
1153 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1154 alter_owner::handle_alter_owner(
1155 handler_args,
1156 name,
1157 new_owner_name,
1158 StatementType::ALTER_SOURCE,
1159 )
1160 .await
1161 }
1162 AlterSourceOperation::SetSchema { new_schema_name } => {
1163 alter_set_schema::handle_alter_set_schema(
1164 handler_args,
1165 name,
1166 new_schema_name,
1167 StatementType::ALTER_SOURCE,
1168 None,
1169 )
1170 .await
1171 }
1172 AlterSourceOperation::FormatEncode { format_encode } => {
1173 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1174 .await
1175 }
1176 AlterSourceOperation::RefreshSchema => {
1177 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1178 }
1179 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1180 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1181 handler_args,
1182 PbThrottleTarget::Source,
1183 name,
1184 rate_limit,
1185 )
1186 .await
1187 }
1188 AlterSourceOperation::SwapRenameSource { target_source } => {
1189 alter_swap_rename::handle_swap_rename(
1190 handler_args,
1191 name,
1192 target_source,
1193 StatementType::ALTER_SOURCE,
1194 )
1195 .await
1196 }
1197 AlterSourceOperation::SetParallelism {
1198 parallelism,
1199 deferred,
1200 } => {
1201 alter_parallelism::handle_alter_parallelism(
1202 handler_args,
1203 name,
1204 parallelism,
1205 StatementType::ALTER_SOURCE,
1206 deferred,
1207 )
1208 .await
1209 }
1210 },
1211 Statement::AlterFunction {
1212 name,
1213 args,
1214 operation,
1215 } => match operation {
1216 AlterFunctionOperation::SetSchema { new_schema_name } => {
1217 alter_set_schema::handle_alter_set_schema(
1218 handler_args,
1219 name,
1220 new_schema_name,
1221 StatementType::ALTER_FUNCTION,
1222 args,
1223 )
1224 .await
1225 }
1226 },
1227 Statement::AlterConnection { name, operation } => match operation {
1228 AlterConnectionOperation::SetSchema { new_schema_name } => {
1229 alter_set_schema::handle_alter_set_schema(
1230 handler_args,
1231 name,
1232 new_schema_name,
1233 StatementType::ALTER_CONNECTION,
1234 None,
1235 )
1236 .await
1237 }
1238 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1239 alter_owner::handle_alter_owner(
1240 handler_args,
1241 name,
1242 new_owner_name,
1243 StatementType::ALTER_CONNECTION,
1244 )
1245 .await
1246 }
1247 },
1248 Statement::AlterSystem { param, value } => {
1249 alter_system::handle_alter_system(handler_args, param, value).await
1250 }
1251 Statement::AlterSecret {
1252 name,
1253 with_options,
1254 operation,
1255 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1256 Statement::AlterFragment {
1257 fragment_id,
1258 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1259 } => {
1260 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1261 &handler_args.session,
1262 PbThrottleTarget::Fragment,
1263 fragment_id,
1264 rate_limit,
1265 StatementType::SET_VARIABLE,
1266 )
1267 .await
1268 }
1269 Statement::AlterDefaultPrivileges { .. } => {
1270 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1271 }
1272 Statement::StartTransaction { modes } => {
1273 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1274 }
1275 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1276 Statement::Commit { chain } => {
1277 transaction::handle_commit(handler_args, COMMIT, chain).await
1278 }
1279 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1280 Statement::Rollback { chain } => {
1281 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1282 }
1283 Statement::SetTransaction {
1284 modes,
1285 snapshot,
1286 session,
1287 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1288 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1289 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1290 Statement::Comment {
1291 object_type,
1292 object_name,
1293 comment,
1294 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1295 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1296 Statement::Prepare {
1297 name,
1298 data_types,
1299 statement,
1300 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1301 Statement::Deallocate { name, prepare } => {
1302 prepared_statement::handle_deallocate(name, prepare).await
1303 }
1304 Statement::Vacuum { object_name, full } => {
1305 vacuum::handle_vacuum(handler_args, object_name, full).await
1306 }
1307 Statement::Refresh { table_name } => {
1308 refresh::handle_refresh(handler_args, table_name).await
1309 }
1310 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1311 }
1312}
1313
1314fn check_ban_ddl_for_iceberg_engine_table(
1315 session: Arc<SessionImpl>,
1316 stmt: &Statement,
1317) -> Result<()> {
1318 match stmt {
1319 Statement::AlterTable {
1320 name,
1321 operation:
1322 operation @ (AlterTableOperation::AddColumn { .. }
1323 | AlterTableOperation::DropColumn { .. }),
1324 } => {
1325 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1326 if table.is_iceberg_engine_table() {
1327 bail!(
1328 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1329 operation,
1330 schema_name,
1331 name
1332 );
1333 }
1334 }
1335
1336 Statement::AlterTable {
1337 name,
1338 operation: AlterTableOperation::RenameTable { .. },
1339 } => {
1340 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1341 if table.is_iceberg_engine_table() {
1342 bail!(
1343 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1344 schema_name,
1345 name
1346 );
1347 }
1348 }
1349
1350 Statement::AlterTable {
1351 name,
1352 operation: AlterTableOperation::ChangeOwner { .. },
1353 } => {
1354 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1355 if table.is_iceberg_engine_table() {
1356 bail!(
1357 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1358 schema_name,
1359 name
1360 );
1361 }
1362 }
1363
1364 Statement::AlterTable {
1365 name,
1366 operation: AlterTableOperation::SetParallelism { .. },
1367 } => {
1368 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1369 if table.is_iceberg_engine_table() {
1370 bail!(
1371 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1372 schema_name,
1373 name
1374 );
1375 }
1376 }
1377
1378 Statement::AlterTable {
1379 name,
1380 operation: AlterTableOperation::SetSchema { .. },
1381 } => {
1382 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1383 if table.is_iceberg_engine_table() {
1384 bail!(
1385 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1386 schema_name,
1387 name
1388 );
1389 }
1390 }
1391
1392 Statement::AlterTable {
1393 name,
1394 operation: AlterTableOperation::RefreshSchema,
1395 } => {
1396 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1397 if table.is_iceberg_engine_table() {
1398 bail!(
1399 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1400 schema_name,
1401 name
1402 );
1403 }
1404 }
1405
1406 Statement::AlterTable {
1407 name,
1408 operation: AlterTableOperation::SetSourceRateLimit { .. },
1409 } => {
1410 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1411 if table.is_iceberg_engine_table() {
1412 bail!(
1413 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1414 schema_name,
1415 name
1416 );
1417 }
1418 }
1419
1420 _ => {}
1421 }
1422
1423 Ok(())
1424}