1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63pub mod alter_table_props;
64mod alter_table_with_sr;
65pub mod alter_user;
66pub mod cancel_job;
67pub mod close_cursor;
68mod comment;
69pub mod create_aggregate;
70pub mod create_connection;
71mod create_database;
72pub mod create_function;
73pub mod create_index;
74pub mod create_mv;
75pub mod create_schema;
76pub mod create_secret;
77pub mod create_sink;
78pub mod create_source;
79pub mod create_sql_function;
80pub mod create_subscription;
81pub mod create_table;
82pub mod create_table_as;
83pub mod create_user;
84pub mod create_view;
85pub mod declare_cursor;
86pub mod describe;
87pub mod discard;
88mod drop_connection;
89mod drop_database;
90pub mod drop_function;
91mod drop_index;
92pub mod drop_mv;
93mod drop_schema;
94pub mod drop_secret;
95pub mod drop_sink;
96pub mod drop_source;
97pub mod drop_subscription;
98pub mod drop_table;
99pub mod drop_user;
100mod drop_view;
101pub mod explain;
102pub mod explain_analyze_stream_job;
103pub mod extended_handle;
104pub mod fetch_cursor;
105mod flush;
106pub mod handle_privilege;
107pub mod kill_process;
108mod prepared_statement;
109pub mod privilege;
110pub mod query;
111mod recover;
112mod refresh;
113pub mod show;
114mod transaction;
115mod use_db;
116pub mod util;
117pub mod vacuum;
118pub mod variable;
119mod wait;
120
121pub use alter_table_column::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
122
123pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
125
126pub type RwPgResponse = PgResponse<PgResponseStream>;
128
129#[easy_ext::ext(RwPgResponseBuilderExt)]
130impl RwPgResponseBuilder {
131 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
133 let fields = T::fields();
134 self.values(
135 rows.into_iter()
136 .map(|row| {
137 Row::new(
138 row.into_owned_row()
139 .into_iter()
140 .zip_eq_fast(&fields)
141 .map(|(datum, (_, ty))| {
142 datum.map(|scalar| {
143 scalar.as_scalar_ref_impl().text_format(ty).into()
144 })
145 })
146 .collect(),
147 )
148 })
149 .collect_vec()
150 .into(),
151 fields_to_descriptors(fields),
152 )
153 }
154}
155
156pub fn fields_to_descriptors(
157 fields: Vec<(&str, risingwave_common::types::DataType)>,
158) -> Vec<PgFieldDescriptor> {
159 fields
160 .iter()
161 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
162 .collect()
163}
164
165pub enum PgResponseStream {
166 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
167 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
168 Rows(BoxStream<'static, RowSetResult>),
169}
170
171impl Stream for PgResponseStream {
172 type Item = std::result::Result<Vec<Row>, BoxedError>;
173
174 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175 match &mut *self {
176 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
177 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
178 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
179 }
180 }
181}
182
183impl From<Vec<Row>> for PgResponseStream {
184 fn from(rows: Vec<Row>) -> Self {
185 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
186 }
187}
188
189#[derive(Clone)]
190pub struct HandlerArgs {
191 pub session: Arc<SessionImpl>,
192 pub sql: Arc<str>,
193 pub normalized_sql: String,
194 pub with_options: WithOptions,
195}
196
197impl HandlerArgs {
198 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
199 Ok(Self {
200 session,
201 sql,
202 with_options: WithOptions::try_from(stmt)?,
203 normalized_sql: Self::normalize_sql(stmt),
204 })
205 }
206
207 fn normalize_sql(stmt: &Statement) -> String {
213 let mut stmt = stmt.clone();
214 match &mut stmt {
215 Statement::CreateView {
216 or_replace,
217 if_not_exists,
218 ..
219 } => {
220 *or_replace = false;
221 *if_not_exists = false;
222 }
223 Statement::CreateTable {
224 or_replace,
225 if_not_exists,
226 ..
227 } => {
228 *or_replace = false;
229 *if_not_exists = false;
230 }
231 Statement::CreateIndex { if_not_exists, .. } => {
232 *if_not_exists = false;
233 }
234 Statement::CreateSource {
235 stmt: CreateSourceStatement { if_not_exists, .. },
236 ..
237 } => {
238 *if_not_exists = false;
239 }
240 Statement::CreateSink {
241 stmt: CreateSinkStatement { if_not_exists, .. },
242 } => {
243 *if_not_exists = false;
244 }
245 Statement::CreateSubscription {
246 stmt: CreateSubscriptionStatement { if_not_exists, .. },
247 } => {
248 *if_not_exists = false;
249 }
250 Statement::CreateConnection {
251 stmt: CreateConnectionStatement { if_not_exists, .. },
252 } => {
253 *if_not_exists = false;
254 }
255 _ => {}
256 }
257 stmt.to_string()
258 }
259}
260
261pub async fn handle(
262 session: Arc<SessionImpl>,
263 stmt: Statement,
264 sql: Arc<str>,
265 formats: Vec<Format>,
266) -> Result<RwPgResponse> {
267 session.clear_cancel_query_flag();
268 let _guard = session.txn_begin_implicit();
269 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
270
271 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
272
273 match stmt {
274 Statement::Explain {
275 statement,
276 analyze,
277 options,
278 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
279 Statement::ExplainAnalyzeStreamJob {
280 target,
281 duration_secs,
282 } => {
283 explain_analyze_stream_job::handle_explain_analyze_stream_job(
284 handler_args,
285 target,
286 duration_secs,
287 )
288 .await
289 }
290 Statement::CreateSource { stmt } => {
291 create_source::handle_create_source(handler_args, stmt).await
292 }
293 Statement::CreateSink { stmt } => {
294 create_sink::handle_create_sink(handler_args, stmt, false).await
295 }
296 Statement::CreateSubscription { stmt } => {
297 create_subscription::handle_create_subscription(handler_args, stmt).await
298 }
299 Statement::CreateConnection { stmt } => {
300 create_connection::handle_create_connection(handler_args, stmt).await
301 }
302 Statement::CreateSecret { stmt } => {
303 create_secret::handle_create_secret(handler_args, stmt).await
304 }
305 Statement::CreateFunction {
306 or_replace,
307 temporary,
308 if_not_exists,
309 name,
310 args,
311 returns,
312 params,
313 with_options,
314 } => {
315 if params.language.is_none()
318 || !params
319 .language
320 .as_ref()
321 .unwrap()
322 .real_value()
323 .eq_ignore_ascii_case("sql")
324 {
325 create_function::handle_create_function(
326 handler_args,
327 or_replace,
328 temporary,
329 if_not_exists,
330 name,
331 args,
332 returns,
333 params,
334 with_options,
335 )
336 .await
337 } else {
338 create_sql_function::handle_create_sql_function(
339 handler_args,
340 or_replace,
341 temporary,
342 if_not_exists,
343 name,
344 args,
345 returns,
346 params,
347 )
348 .await
349 }
350 }
351 Statement::CreateAggregate {
352 or_replace,
353 if_not_exists,
354 name,
355 args,
356 returns,
357 params,
358 ..
359 } => {
360 create_aggregate::handle_create_aggregate(
361 handler_args,
362 or_replace,
363 if_not_exists,
364 name,
365 args,
366 returns,
367 params,
368 )
369 .await
370 }
371 Statement::CreateTable {
372 name,
373 columns,
374 wildcard_idx,
375 constraints,
376 query,
377 with_options: _, or_replace,
380 temporary,
381 if_not_exists,
382 format_encode,
383 source_watermarks,
384 append_only,
385 on_conflict,
386 with_version_columns,
387 cdc_table_info,
388 include_column_options,
389 webhook_info,
390 engine,
391 } => {
392 if or_replace {
393 bail_not_implemented!("CREATE OR REPLACE TABLE");
394 }
395 if temporary {
396 bail_not_implemented!("CREATE TEMPORARY TABLE");
397 }
398 if let Some(query) = query {
399 return create_table_as::handle_create_as(
400 handler_args,
401 name,
402 if_not_exists,
403 query,
404 columns,
405 append_only,
406 on_conflict,
407 with_version_columns
408 .iter()
409 .map(|col| col.real_value())
410 .collect(),
411 engine,
412 )
413 .await;
414 }
415 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
416 create_table::handle_create_table(
417 handler_args,
418 name,
419 columns,
420 wildcard_idx,
421 constraints,
422 if_not_exists,
423 format_encode,
424 source_watermarks,
425 append_only,
426 on_conflict,
427 with_version_columns
428 .iter()
429 .map(|col| col.real_value())
430 .collect(),
431 cdc_table_info,
432 include_column_options,
433 webhook_info,
434 engine,
435 )
436 .await
437 }
438 Statement::CreateDatabase {
439 db_name,
440 if_not_exists,
441 owner,
442 resource_group,
443 barrier_interval_ms,
444 checkpoint_frequency,
445 } => {
446 create_database::handle_create_database(
447 handler_args,
448 db_name,
449 if_not_exists,
450 owner,
451 resource_group,
452 barrier_interval_ms,
453 checkpoint_frequency,
454 )
455 .await
456 }
457 Statement::CreateSchema {
458 schema_name,
459 if_not_exists,
460 owner,
461 } => {
462 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
463 .await
464 }
465 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
466 Statement::DeclareCursor { stmt } => {
467 declare_cursor::handle_declare_cursor(handler_args, stmt).await
468 }
469 Statement::FetchCursor { stmt } => {
470 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
471 }
472 Statement::CloseCursor { stmt } => {
473 close_cursor::handle_close_cursor(handler_args, stmt).await
474 }
475 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
476 Statement::Grant { .. } => {
477 handle_privilege::handle_grant_privilege(handler_args, stmt).await
478 }
479 Statement::Revoke { .. } => {
480 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
481 }
482 Statement::Describe { name, kind } => match kind {
483 DescribeKind::Fragments => {
484 describe::handle_describe_fragments(handler_args, name).await
485 }
486 DescribeKind::Plain => describe::handle_describe(handler_args, name),
487 },
488 Statement::DescribeFragment { fragment_id } => {
489 describe::handle_describe_fragment(handler_args, fragment_id).await
490 }
491 Statement::Discard(..) => discard::handle_discard(handler_args),
492 Statement::ShowObjects {
493 object: show_object,
494 filter,
495 } => show::handle_show_object(handler_args, show_object, filter).await,
496 Statement::ShowCreateObject { create_type, name } => {
497 show::handle_show_create_object(handler_args, create_type, name)
498 }
499 Statement::ShowTransactionIsolationLevel => {
500 transaction::handle_show_isolation_level(handler_args)
501 }
502 Statement::Drop(DropStatement {
503 object_type,
504 object_name,
505 if_exists,
506 drop_mode,
507 }) => {
508 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
509 match object_type {
510 ObjectType::MaterializedView
511 | ObjectType::View
512 | ObjectType::Sink
513 | ObjectType::Source
514 | ObjectType::Subscription
515 | ObjectType::Index
516 | ObjectType::Table
517 | ObjectType::Schema
518 | ObjectType::Connection => true,
519 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
520 bail_not_implemented!("DROP CASCADE");
521 }
522 }
523 } else {
524 false
525 };
526 match object_type {
527 ObjectType::Table => {
528 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
529 .await
530 }
531 ObjectType::MaterializedView => {
532 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
533 }
534 ObjectType::Index => {
535 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
536 .await
537 }
538 ObjectType::Source => {
539 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
540 .await
541 }
542 ObjectType::Sink => {
543 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
544 }
545 ObjectType::Subscription => {
546 drop_subscription::handle_drop_subscription(
547 handler_args,
548 object_name,
549 if_exists,
550 cascade,
551 )
552 .await
553 }
554 ObjectType::Database => {
555 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
556 }
557 ObjectType::Schema => {
558 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
559 .await
560 }
561 ObjectType::User => {
562 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
563 }
564 ObjectType::View => {
565 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
566 }
567 ObjectType::Connection => {
568 drop_connection::handle_drop_connection(
569 handler_args,
570 object_name,
571 if_exists,
572 cascade,
573 )
574 .await
575 }
576 ObjectType::Secret => {
577 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
578 }
579 }
580 }
581 Statement::DropFunction {
583 if_exists,
584 func_desc,
585 option,
586 } => {
587 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
588 .await
589 }
590 Statement::DropAggregate {
591 if_exists,
592 func_desc,
593 option,
594 } => {
595 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
596 .await
597 }
598 Statement::Query(_)
599 | Statement::Insert { .. }
600 | Statement::Delete { .. }
601 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
602 Statement::CreateView {
603 materialized,
604 if_not_exists,
605 name,
606 columns,
607 query,
608 with_options: _, or_replace, emit_mode,
611 } => {
612 if or_replace {
613 bail_not_implemented!("CREATE OR REPLACE VIEW");
614 }
615 if materialized {
616 create_mv::handle_create_mv(
617 handler_args,
618 if_not_exists,
619 name,
620 *query,
621 columns,
622 emit_mode,
623 )
624 .await
625 } else {
626 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
627 .await
628 }
629 }
630 Statement::Flush => flush::handle_flush(handler_args).await,
631 Statement::Wait => wait::handle_wait(handler_args).await,
632 Statement::Recover => recover::handle_recover(handler_args).await,
633 Statement::SetVariable {
634 local: _,
635 variable,
636 value,
637 } => {
638 if variable.real_value().eq_ignore_ascii_case("database") {
640 let x = variable::set_var_to_param_str(&value);
641 let res = use_db::handle_use_db(
642 handler_args,
643 ObjectName::from(vec![Ident::from_real_value(
644 x.as_deref().unwrap_or("default"),
645 )]),
646 )?;
647 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
648 for notice in res.notices() {
649 builder = builder.notice(notice);
650 }
651 return Ok(builder.into());
652 }
653 variable::handle_set(handler_args, variable, value)
654 }
655 Statement::SetTimeZone { local: _, value } => {
656 variable::handle_set_time_zone(handler_args, value)
657 }
658 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
659 Statement::CreateIndex {
660 name,
661 table_name,
662 method,
663 columns,
664 include,
665 distributed_by,
666 unique,
667 if_not_exists,
668 with_properties: _,
669 } => {
670 if unique {
671 bail_not_implemented!("create unique index");
672 }
673
674 create_index::handle_create_index(
675 handler_args,
676 if_not_exists,
677 name,
678 table_name,
679 method,
680 columns.to_vec(),
681 include,
682 distributed_by,
683 )
684 .await
685 }
686 Statement::AlterDatabase { name, operation } => match operation {
687 AlterDatabaseOperation::RenameDatabase { database_name } => {
688 alter_rename::handle_rename_database(handler_args, name, database_name).await
689 }
690 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
691 alter_owner::handle_alter_owner(
692 handler_args,
693 name,
694 new_owner_name,
695 StatementType::ALTER_DATABASE,
696 )
697 .await
698 }
699 AlterDatabaseOperation::SetParam(config_param) => {
700 let ConfigParam { param, value } = config_param;
701
702 let database_param = match param.real_value().to_uppercase().as_str() {
703 "BARRIER_INTERVAL_MS" => {
704 let barrier_interval_ms = match value {
705 SetVariableValue::Default => None,
706 SetVariableValue::Single(SetVariableValueSingle::Literal(
707 Value::Number(num),
708 )) => {
709 let num = num.parse::<u32>().map_err(|e| {
710 ErrorCode::InvalidInputSyntax(format!(
711 "barrier_interval_ms must be a u32 integer: {}",
712 e.as_report()
713 ))
714 })?;
715 Some(num)
716 }
717 _ => {
718 return Err(ErrorCode::InvalidInputSyntax(
719 "barrier_interval_ms must be a u32 integer or DEFAULT"
720 .to_owned(),
721 )
722 .into());
723 }
724 };
725 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
726 }
727 "CHECKPOINT_FREQUENCY" => {
728 let checkpoint_frequency = match value {
729 SetVariableValue::Default => None,
730 SetVariableValue::Single(SetVariableValueSingle::Literal(
731 Value::Number(num),
732 )) => {
733 let num = num.parse::<u64>().map_err(|e| {
734 ErrorCode::InvalidInputSyntax(format!(
735 "checkpoint_frequency must be a u64 integer: {}",
736 e.as_report()
737 ))
738 })?;
739 Some(num)
740 }
741 _ => {
742 return Err(ErrorCode::InvalidInputSyntax(
743 "checkpoint_frequency must be a u64 integer or DEFAULT"
744 .to_owned(),
745 )
746 .into());
747 }
748 };
749 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
750 }
751 _ => {
752 return Err(ErrorCode::InvalidInputSyntax(format!(
753 "Unsupported database config parameter: {}",
754 param.real_value()
755 ))
756 .into());
757 }
758 };
759
760 alter_database_param::handle_alter_database_param(
761 handler_args,
762 name,
763 database_param,
764 )
765 .await
766 }
767 },
768 Statement::AlterSchema { name, operation } => match operation {
769 AlterSchemaOperation::RenameSchema { schema_name } => {
770 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
771 }
772 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
773 alter_owner::handle_alter_owner(
774 handler_args,
775 name,
776 new_owner_name,
777 StatementType::ALTER_SCHEMA,
778 )
779 .await
780 }
781 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
782 alter_swap_rename::handle_swap_rename(
783 handler_args,
784 name,
785 target_schema,
786 StatementType::ALTER_SCHEMA,
787 )
788 .await
789 }
790 },
791 Statement::AlterTable { name, operation } => match operation {
792 AlterTableOperation::AddColumn { .. }
793 | AlterTableOperation::DropColumn { .. }
794 | AlterTableOperation::AlterColumn { .. } => {
795 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
796 }
797 AlterTableOperation::RenameTable { table_name } => {
798 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
799 .await
800 }
801 AlterTableOperation::ChangeOwner { new_owner_name } => {
802 alter_owner::handle_alter_owner(
803 handler_args,
804 name,
805 new_owner_name,
806 StatementType::ALTER_TABLE,
807 )
808 .await
809 }
810 AlterTableOperation::SetParallelism {
811 parallelism,
812 deferred,
813 } => {
814 alter_parallelism::handle_alter_parallelism(
815 handler_args,
816 name,
817 parallelism,
818 StatementType::ALTER_TABLE,
819 deferred,
820 )
821 .await
822 }
823 AlterTableOperation::SetSchema { new_schema_name } => {
824 alter_set_schema::handle_alter_set_schema(
825 handler_args,
826 name,
827 new_schema_name,
828 StatementType::ALTER_TABLE,
829 None,
830 )
831 .await
832 }
833 AlterTableOperation::RefreshSchema => {
834 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
835 }
836 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
837 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
838 handler_args,
839 PbThrottleTarget::TableWithSource,
840 name,
841 rate_limit,
842 )
843 .await
844 }
845 AlterTableOperation::DropConnector => {
846 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
847 .await
848 }
849 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
850 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
851 handler_args,
852 PbThrottleTarget::TableDml,
853 name,
854 rate_limit,
855 )
856 .await
857 }
858 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
859 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
860 handler_args,
861 PbThrottleTarget::CdcTable,
862 name,
863 rate_limit,
864 )
865 .await
866 }
867 AlterTableOperation::SwapRenameTable { target_table } => {
868 alter_swap_rename::handle_swap_rename(
869 handler_args,
870 name,
871 target_table,
872 StatementType::ALTER_TABLE,
873 )
874 .await
875 }
876 AlterTableOperation::AlterConnectorProps { alter_props } => {
877 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
878 }
879 AlterTableOperation::AddConstraint { .. }
880 | AlterTableOperation::DropConstraint { .. }
881 | AlterTableOperation::RenameColumn { .. }
882 | AlterTableOperation::ChangeColumn { .. }
883 | AlterTableOperation::RenameConstraint { .. } => {
884 bail_not_implemented!(
885 "Unhandled statement: {}",
886 Statement::AlterTable { name, operation }
887 )
888 }
889 },
890 Statement::AlterIndex { name, operation } => match operation {
891 AlterIndexOperation::RenameIndex { index_name } => {
892 alter_rename::handle_rename_index(handler_args, name, index_name).await
893 }
894 AlterIndexOperation::SetParallelism {
895 parallelism,
896 deferred,
897 } => {
898 alter_parallelism::handle_alter_parallelism(
899 handler_args,
900 name,
901 parallelism,
902 StatementType::ALTER_INDEX,
903 deferred,
904 )
905 .await
906 }
907 },
908 Statement::AlterView {
909 materialized,
910 name,
911 operation,
912 } => {
913 let statement_type = if materialized {
914 StatementType::ALTER_MATERIALIZED_VIEW
915 } else {
916 StatementType::ALTER_VIEW
917 };
918 match operation {
919 AlterViewOperation::RenameView { view_name } => {
920 if materialized {
921 alter_rename::handle_rename_table(
922 handler_args,
923 TableType::MaterializedView,
924 name,
925 view_name,
926 )
927 .await
928 } else {
929 alter_rename::handle_rename_view(handler_args, name, view_name).await
930 }
931 }
932 AlterViewOperation::SetParallelism {
933 parallelism,
934 deferred,
935 } => {
936 if !materialized {
937 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
938 }
939 alter_parallelism::handle_alter_parallelism(
940 handler_args,
941 name,
942 parallelism,
943 statement_type,
944 deferred,
945 )
946 .await
947 }
948 AlterViewOperation::SetResourceGroup {
949 resource_group,
950 deferred,
951 } => {
952 if !materialized {
953 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
954 }
955 alter_resource_group::handle_alter_resource_group(
956 handler_args,
957 name,
958 resource_group,
959 statement_type,
960 deferred,
961 )
962 .await
963 }
964 AlterViewOperation::ChangeOwner { new_owner_name } => {
965 alter_owner::handle_alter_owner(
966 handler_args,
967 name,
968 new_owner_name,
969 statement_type,
970 )
971 .await
972 }
973 AlterViewOperation::SetSchema { new_schema_name } => {
974 alter_set_schema::handle_alter_set_schema(
975 handler_args,
976 name,
977 new_schema_name,
978 statement_type,
979 None,
980 )
981 .await
982 }
983 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
984 if !materialized {
985 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
986 }
987 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
988 handler_args,
989 PbThrottleTarget::Mv,
990 name,
991 rate_limit,
992 )
993 .await
994 }
995 AlterViewOperation::SwapRenameView { target_view } => {
996 alter_swap_rename::handle_swap_rename(
997 handler_args,
998 name,
999 target_view,
1000 statement_type,
1001 )
1002 .await
1003 }
1004 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1005 if !materialized {
1006 bail!(
1007 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1008 );
1009 }
1010 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1011 }
1012 AlterViewOperation::AsQuery { query } => {
1013 if !materialized {
1014 bail_not_implemented!("ALTER VIEW AS QUERY");
1015 }
1016 if !cfg!(debug_assertions) {
1018 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1019 }
1020 alter_mv::handle_alter_mv(handler_args, name, query).await
1021 }
1022 }
1023 }
1024
1025 Statement::AlterSink { name, operation } => match operation {
1026 AlterSinkOperation::AlterConnectorProps {
1027 alter_props: changed_props,
1028 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1029 AlterSinkOperation::RenameSink { sink_name } => {
1030 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1031 }
1032 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1033 alter_owner::handle_alter_owner(
1034 handler_args,
1035 name,
1036 new_owner_name,
1037 StatementType::ALTER_SINK,
1038 )
1039 .await
1040 }
1041 AlterSinkOperation::SetSchema { new_schema_name } => {
1042 alter_set_schema::handle_alter_set_schema(
1043 handler_args,
1044 name,
1045 new_schema_name,
1046 StatementType::ALTER_SINK,
1047 None,
1048 )
1049 .await
1050 }
1051 AlterSinkOperation::SetParallelism {
1052 parallelism,
1053 deferred,
1054 } => {
1055 alter_parallelism::handle_alter_parallelism(
1056 handler_args,
1057 name,
1058 parallelism,
1059 StatementType::ALTER_SINK,
1060 deferred,
1061 )
1062 .await
1063 }
1064 AlterSinkOperation::SwapRenameSink { target_sink } => {
1065 alter_swap_rename::handle_swap_rename(
1066 handler_args,
1067 name,
1068 target_sink,
1069 StatementType::ALTER_SINK,
1070 )
1071 .await
1072 }
1073 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1074 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1075 handler_args,
1076 PbThrottleTarget::Sink,
1077 name,
1078 rate_limit,
1079 )
1080 .await
1081 }
1082 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1083 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1084 handler_args,
1085 name,
1086 enable,
1087 )
1088 .await
1089 }
1090 },
1091 Statement::AlterSubscription { name, operation } => match operation {
1092 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1093 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1094 .await
1095 }
1096 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1097 alter_owner::handle_alter_owner(
1098 handler_args,
1099 name,
1100 new_owner_name,
1101 StatementType::ALTER_SUBSCRIPTION,
1102 )
1103 .await
1104 }
1105 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1106 alter_set_schema::handle_alter_set_schema(
1107 handler_args,
1108 name,
1109 new_schema_name,
1110 StatementType::ALTER_SUBSCRIPTION,
1111 None,
1112 )
1113 .await
1114 }
1115 AlterSubscriptionOperation::SwapRenameSubscription {
1116 target_subscription,
1117 } => {
1118 alter_swap_rename::handle_swap_rename(
1119 handler_args,
1120 name,
1121 target_subscription,
1122 StatementType::ALTER_SUBSCRIPTION,
1123 )
1124 .await
1125 }
1126 },
1127 Statement::AlterSource { name, operation } => match operation {
1128 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1129 alter_source_props::handle_alter_source_connector_props(
1130 handler_args,
1131 name,
1132 alter_props,
1133 )
1134 .await
1135 }
1136 AlterSourceOperation::RenameSource { source_name } => {
1137 alter_rename::handle_rename_source(handler_args, name, source_name).await
1138 }
1139 AlterSourceOperation::AddColumn { .. } => {
1140 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1141 }
1142 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1143 alter_owner::handle_alter_owner(
1144 handler_args,
1145 name,
1146 new_owner_name,
1147 StatementType::ALTER_SOURCE,
1148 )
1149 .await
1150 }
1151 AlterSourceOperation::SetSchema { new_schema_name } => {
1152 alter_set_schema::handle_alter_set_schema(
1153 handler_args,
1154 name,
1155 new_schema_name,
1156 StatementType::ALTER_SOURCE,
1157 None,
1158 )
1159 .await
1160 }
1161 AlterSourceOperation::FormatEncode { format_encode } => {
1162 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1163 .await
1164 }
1165 AlterSourceOperation::RefreshSchema => {
1166 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1167 }
1168 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1169 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1170 handler_args,
1171 PbThrottleTarget::Source,
1172 name,
1173 rate_limit,
1174 )
1175 .await
1176 }
1177 AlterSourceOperation::SwapRenameSource { target_source } => {
1178 alter_swap_rename::handle_swap_rename(
1179 handler_args,
1180 name,
1181 target_source,
1182 StatementType::ALTER_SOURCE,
1183 )
1184 .await
1185 }
1186 AlterSourceOperation::SetParallelism {
1187 parallelism,
1188 deferred,
1189 } => {
1190 alter_parallelism::handle_alter_parallelism(
1191 handler_args,
1192 name,
1193 parallelism,
1194 StatementType::ALTER_SOURCE,
1195 deferred,
1196 )
1197 .await
1198 }
1199 },
1200 Statement::AlterFunction {
1201 name,
1202 args,
1203 operation,
1204 } => match operation {
1205 AlterFunctionOperation::SetSchema { new_schema_name } => {
1206 alter_set_schema::handle_alter_set_schema(
1207 handler_args,
1208 name,
1209 new_schema_name,
1210 StatementType::ALTER_FUNCTION,
1211 args,
1212 )
1213 .await
1214 }
1215 },
1216 Statement::AlterConnection { name, operation } => match operation {
1217 AlterConnectionOperation::SetSchema { new_schema_name } => {
1218 alter_set_schema::handle_alter_set_schema(
1219 handler_args,
1220 name,
1221 new_schema_name,
1222 StatementType::ALTER_CONNECTION,
1223 None,
1224 )
1225 .await
1226 }
1227 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1228 alter_owner::handle_alter_owner(
1229 handler_args,
1230 name,
1231 new_owner_name,
1232 StatementType::ALTER_CONNECTION,
1233 )
1234 .await
1235 }
1236 },
1237 Statement::AlterSystem { param, value } => {
1238 alter_system::handle_alter_system(handler_args, param, value).await
1239 }
1240 Statement::AlterSecret {
1241 name,
1242 with_options,
1243 operation,
1244 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1245 Statement::AlterFragment {
1246 fragment_id,
1247 operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1248 } => {
1249 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1250 &handler_args.session,
1251 PbThrottleTarget::Fragment,
1252 fragment_id,
1253 rate_limit,
1254 StatementType::SET_VARIABLE,
1255 )
1256 .await
1257 }
1258 Statement::AlterDefaultPrivileges { .. } => {
1259 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1260 }
1261 Statement::StartTransaction { modes } => {
1262 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1263 }
1264 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1265 Statement::Commit { chain } => {
1266 transaction::handle_commit(handler_args, COMMIT, chain).await
1267 }
1268 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1269 Statement::Rollback { chain } => {
1270 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1271 }
1272 Statement::SetTransaction {
1273 modes,
1274 snapshot,
1275 session,
1276 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1277 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1278 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1279 Statement::Comment {
1280 object_type,
1281 object_name,
1282 comment,
1283 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1284 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1285 Statement::Prepare {
1286 name,
1287 data_types,
1288 statement,
1289 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1290 Statement::Deallocate { name, prepare } => {
1291 prepared_statement::handle_deallocate(name, prepare).await
1292 }
1293 Statement::Vacuum { object_name, full } => {
1294 vacuum::handle_vacuum(handler_args, object_name, full).await
1295 }
1296 Statement::Refresh { table_name } => {
1297 refresh::handle_refresh(handler_args, table_name).await
1298 }
1299 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1300 }
1301}
1302
1303fn check_ban_ddl_for_iceberg_engine_table(
1304 session: Arc<SessionImpl>,
1305 stmt: &Statement,
1306) -> Result<()> {
1307 match stmt {
1308 Statement::AlterTable {
1309 name,
1310 operation:
1311 operation @ (AlterTableOperation::AddColumn { .. }
1312 | AlterTableOperation::DropColumn { .. }),
1313 } => {
1314 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1315 if table.is_iceberg_engine_table() {
1316 bail!(
1317 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1318 operation,
1319 schema_name,
1320 name
1321 );
1322 }
1323 }
1324
1325 Statement::AlterTable {
1326 name,
1327 operation: AlterTableOperation::RenameTable { .. },
1328 } => {
1329 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1330 if table.is_iceberg_engine_table() {
1331 bail!(
1332 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1333 schema_name,
1334 name
1335 );
1336 }
1337 }
1338
1339 Statement::AlterTable {
1340 name,
1341 operation: AlterTableOperation::ChangeOwner { .. },
1342 } => {
1343 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1344 if table.is_iceberg_engine_table() {
1345 bail!(
1346 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1347 schema_name,
1348 name
1349 );
1350 }
1351 }
1352
1353 Statement::AlterTable {
1354 name,
1355 operation: AlterTableOperation::SetParallelism { .. },
1356 } => {
1357 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1358 if table.is_iceberg_engine_table() {
1359 bail!(
1360 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1361 schema_name,
1362 name
1363 );
1364 }
1365 }
1366
1367 Statement::AlterTable {
1368 name,
1369 operation: AlterTableOperation::SetSchema { .. },
1370 } => {
1371 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1372 if table.is_iceberg_engine_table() {
1373 bail!(
1374 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1375 schema_name,
1376 name
1377 );
1378 }
1379 }
1380
1381 Statement::AlterTable {
1382 name,
1383 operation: AlterTableOperation::RefreshSchema,
1384 } => {
1385 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1386 if table.is_iceberg_engine_table() {
1387 bail!(
1388 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1389 schema_name,
1390 name
1391 );
1392 }
1393 }
1394
1395 Statement::AlterTable {
1396 name,
1397 operation: AlterTableOperation::SetSourceRateLimit { .. },
1398 } => {
1399 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1400 if table.is_iceberg_engine_table() {
1401 bail!(
1402 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1403 schema_name,
1404 name
1405 );
1406 }
1407 }
1408
1409 _ => {}
1410 }
1411
1412 Ok(())
1413}