1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_enable_unaligned_join;
58mod alter_streaming_rate_limit;
59mod alter_swap_rename;
60mod alter_system;
61mod alter_table_column;
62pub mod alter_table_drop_connector;
63pub mod alter_table_props;
64mod alter_table_with_sr;
65pub mod alter_user;
66pub mod cancel_job;
67pub mod close_cursor;
68mod comment;
69pub mod create_aggregate;
70pub mod create_connection;
71mod create_database;
72pub mod create_function;
73pub mod create_index;
74pub mod create_mv;
75pub mod create_schema;
76pub mod create_secret;
77pub mod create_sink;
78pub mod create_source;
79pub mod create_sql_function;
80pub mod create_subscription;
81pub mod create_table;
82pub mod create_table_as;
83pub mod create_user;
84pub mod create_view;
85pub mod declare_cursor;
86pub mod describe;
87pub mod discard;
88mod drop_connection;
89mod drop_database;
90pub mod drop_function;
91mod drop_index;
92pub mod drop_mv;
93mod drop_schema;
94pub mod drop_secret;
95pub mod drop_sink;
96pub mod drop_source;
97pub mod drop_subscription;
98pub mod drop_table;
99pub mod drop_user;
100mod drop_view;
101pub mod explain;
102pub mod explain_analyze_stream_job;
103pub mod extended_handle;
104pub mod fetch_cursor;
105mod flush;
106pub mod handle_privilege;
107pub mod kill_process;
108mod prepared_statement;
109pub mod privilege;
110pub mod query;
111mod recover;
112mod refresh;
113pub mod show;
114mod transaction;
115mod use_db;
116pub mod util;
117pub mod vacuum;
118pub mod variable;
119mod wait;
120
121pub use alter_table_column::{
122 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
123};
124
125pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
127
128pub type RwPgResponse = PgResponse<PgResponseStream>;
130
131#[easy_ext::ext(RwPgResponseBuilderExt)]
132impl RwPgResponseBuilder {
133 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
135 let fields = T::fields();
136 self.values(
137 rows.into_iter()
138 .map(|row| {
139 Row::new(
140 row.into_owned_row()
141 .into_iter()
142 .zip_eq_fast(&fields)
143 .map(|(datum, (_, ty))| {
144 datum.map(|scalar| {
145 scalar.as_scalar_ref_impl().text_format(ty).into()
146 })
147 })
148 .collect(),
149 )
150 })
151 .collect_vec()
152 .into(),
153 fields_to_descriptors(fields),
154 )
155 }
156}
157
158pub fn fields_to_descriptors(
159 fields: Vec<(&str, risingwave_common::types::DataType)>,
160) -> Vec<PgFieldDescriptor> {
161 fields
162 .iter()
163 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
164 .collect()
165}
166
167pub enum PgResponseStream {
168 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
169 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
170 Rows(BoxStream<'static, RowSetResult>),
171}
172
173impl Stream for PgResponseStream {
174 type Item = std::result::Result<Vec<Row>, BoxedError>;
175
176 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177 match &mut *self {
178 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
179 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
180 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
181 }
182 }
183}
184
185impl From<Vec<Row>> for PgResponseStream {
186 fn from(rows: Vec<Row>) -> Self {
187 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
188 }
189}
190
191#[derive(Clone)]
192pub struct HandlerArgs {
193 pub session: Arc<SessionImpl>,
194 pub sql: Arc<str>,
195 pub normalized_sql: String,
196 pub with_options: WithOptions,
197}
198
199impl HandlerArgs {
200 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
201 Ok(Self {
202 session,
203 sql,
204 with_options: WithOptions::try_from(stmt)?,
205 normalized_sql: Self::normalize_sql(stmt),
206 })
207 }
208
209 fn normalize_sql(stmt: &Statement) -> String {
215 let mut stmt = stmt.clone();
216 match &mut stmt {
217 Statement::CreateView {
218 or_replace,
219 if_not_exists,
220 ..
221 } => {
222 *or_replace = false;
223 *if_not_exists = false;
224 }
225 Statement::CreateTable {
226 or_replace,
227 if_not_exists,
228 ..
229 } => {
230 *or_replace = false;
231 *if_not_exists = false;
232 }
233 Statement::CreateIndex { if_not_exists, .. } => {
234 *if_not_exists = false;
235 }
236 Statement::CreateSource {
237 stmt: CreateSourceStatement { if_not_exists, .. },
238 ..
239 } => {
240 *if_not_exists = false;
241 }
242 Statement::CreateSink {
243 stmt: CreateSinkStatement { if_not_exists, .. },
244 } => {
245 *if_not_exists = false;
246 }
247 Statement::CreateSubscription {
248 stmt: CreateSubscriptionStatement { if_not_exists, .. },
249 } => {
250 *if_not_exists = false;
251 }
252 Statement::CreateConnection {
253 stmt: CreateConnectionStatement { if_not_exists, .. },
254 } => {
255 *if_not_exists = false;
256 }
257 _ => {}
258 }
259 stmt.to_string()
260 }
261}
262
263pub async fn handle(
264 session: Arc<SessionImpl>,
265 stmt: Statement,
266 sql: Arc<str>,
267 formats: Vec<Format>,
268) -> Result<RwPgResponse> {
269 session.clear_cancel_query_flag();
270 let _guard = session.txn_begin_implicit();
271 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
272
273 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
274
275 match stmt {
276 Statement::Explain {
277 statement,
278 analyze,
279 options,
280 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
281 Statement::ExplainAnalyzeStreamJob {
282 target,
283 duration_secs,
284 } => {
285 explain_analyze_stream_job::handle_explain_analyze_stream_job(
286 handler_args,
287 target,
288 duration_secs,
289 )
290 .await
291 }
292 Statement::CreateSource { stmt } => {
293 create_source::handle_create_source(handler_args, stmt).await
294 }
295 Statement::CreateSink { stmt } => {
296 create_sink::handle_create_sink(handler_args, stmt, false).await
297 }
298 Statement::CreateSubscription { stmt } => {
299 create_subscription::handle_create_subscription(handler_args, stmt).await
300 }
301 Statement::CreateConnection { stmt } => {
302 create_connection::handle_create_connection(handler_args, stmt).await
303 }
304 Statement::CreateSecret { stmt } => {
305 create_secret::handle_create_secret(handler_args, stmt).await
306 }
307 Statement::CreateFunction {
308 or_replace,
309 temporary,
310 if_not_exists,
311 name,
312 args,
313 returns,
314 params,
315 with_options,
316 } => {
317 if params.language.is_none()
320 || !params
321 .language
322 .as_ref()
323 .unwrap()
324 .real_value()
325 .eq_ignore_ascii_case("sql")
326 {
327 create_function::handle_create_function(
328 handler_args,
329 or_replace,
330 temporary,
331 if_not_exists,
332 name,
333 args,
334 returns,
335 params,
336 with_options,
337 )
338 .await
339 } else {
340 create_sql_function::handle_create_sql_function(
341 handler_args,
342 or_replace,
343 temporary,
344 if_not_exists,
345 name,
346 args,
347 returns,
348 params,
349 )
350 .await
351 }
352 }
353 Statement::CreateAggregate {
354 or_replace,
355 if_not_exists,
356 name,
357 args,
358 returns,
359 params,
360 ..
361 } => {
362 create_aggregate::handle_create_aggregate(
363 handler_args,
364 or_replace,
365 if_not_exists,
366 name,
367 args,
368 returns,
369 params,
370 )
371 .await
372 }
373 Statement::CreateTable {
374 name,
375 columns,
376 wildcard_idx,
377 constraints,
378 query,
379 with_options: _, or_replace,
382 temporary,
383 if_not_exists,
384 format_encode,
385 source_watermarks,
386 append_only,
387 on_conflict,
388 with_version_columns,
389 cdc_table_info,
390 include_column_options,
391 webhook_info,
392 engine,
393 } => {
394 if or_replace {
395 bail_not_implemented!("CREATE OR REPLACE TABLE");
396 }
397 if temporary {
398 bail_not_implemented!("CREATE TEMPORARY TABLE");
399 }
400 if let Some(query) = query {
401 return create_table_as::handle_create_as(
402 handler_args,
403 name,
404 if_not_exists,
405 query,
406 columns,
407 append_only,
408 on_conflict,
409 with_version_columns
410 .iter()
411 .map(|col| col.real_value())
412 .collect(),
413 engine,
414 )
415 .await;
416 }
417 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
418 create_table::handle_create_table(
419 handler_args,
420 name,
421 columns,
422 wildcard_idx,
423 constraints,
424 if_not_exists,
425 format_encode,
426 source_watermarks,
427 append_only,
428 on_conflict,
429 with_version_columns
430 .iter()
431 .map(|col| col.real_value())
432 .collect(),
433 cdc_table_info,
434 include_column_options,
435 webhook_info,
436 engine,
437 )
438 .await
439 }
440 Statement::CreateDatabase {
441 db_name,
442 if_not_exists,
443 owner,
444 resource_group,
445 barrier_interval_ms,
446 checkpoint_frequency,
447 } => {
448 create_database::handle_create_database(
449 handler_args,
450 db_name,
451 if_not_exists,
452 owner,
453 resource_group,
454 barrier_interval_ms,
455 checkpoint_frequency,
456 )
457 .await
458 }
459 Statement::CreateSchema {
460 schema_name,
461 if_not_exists,
462 owner,
463 } => {
464 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
465 .await
466 }
467 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
468 Statement::DeclareCursor { stmt } => {
469 declare_cursor::handle_declare_cursor(handler_args, stmt).await
470 }
471 Statement::FetchCursor { stmt } => {
472 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
473 }
474 Statement::CloseCursor { stmt } => {
475 close_cursor::handle_close_cursor(handler_args, stmt).await
476 }
477 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
478 Statement::Grant { .. } => {
479 handle_privilege::handle_grant_privilege(handler_args, stmt).await
480 }
481 Statement::Revoke { .. } => {
482 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
483 }
484 Statement::Describe { name, kind } => match kind {
485 DescribeKind::Fragments => {
486 describe::handle_describe_fragments(handler_args, name).await
487 }
488 DescribeKind::Plain => describe::handle_describe(handler_args, name),
489 },
490 Statement::DescribeFragment { fragment_id } => {
491 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
492 }
493 Statement::Discard(..) => discard::handle_discard(handler_args),
494 Statement::ShowObjects {
495 object: show_object,
496 filter,
497 } => show::handle_show_object(handler_args, show_object, filter).await,
498 Statement::ShowCreateObject { create_type, name } => {
499 show::handle_show_create_object(handler_args, create_type, name)
500 }
501 Statement::ShowTransactionIsolationLevel => {
502 transaction::handle_show_isolation_level(handler_args)
503 }
504 Statement::Drop(DropStatement {
505 object_type,
506 object_name,
507 if_exists,
508 drop_mode,
509 }) => {
510 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
511 match object_type {
512 ObjectType::MaterializedView
513 | ObjectType::View
514 | ObjectType::Sink
515 | ObjectType::Source
516 | ObjectType::Subscription
517 | ObjectType::Index
518 | ObjectType::Table
519 | ObjectType::Schema
520 | ObjectType::Connection => true,
521 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
522 bail_not_implemented!("DROP CASCADE");
523 }
524 }
525 } else {
526 false
527 };
528 match object_type {
529 ObjectType::Table => {
530 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
531 .await
532 }
533 ObjectType::MaterializedView => {
534 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
535 }
536 ObjectType::Index => {
537 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
538 .await
539 }
540 ObjectType::Source => {
541 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
542 .await
543 }
544 ObjectType::Sink => {
545 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
546 }
547 ObjectType::Subscription => {
548 drop_subscription::handle_drop_subscription(
549 handler_args,
550 object_name,
551 if_exists,
552 cascade,
553 )
554 .await
555 }
556 ObjectType::Database => {
557 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
558 }
559 ObjectType::Schema => {
560 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
561 .await
562 }
563 ObjectType::User => {
564 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
565 }
566 ObjectType::View => {
567 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
568 }
569 ObjectType::Connection => {
570 drop_connection::handle_drop_connection(
571 handler_args,
572 object_name,
573 if_exists,
574 cascade,
575 )
576 .await
577 }
578 ObjectType::Secret => {
579 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
580 }
581 }
582 }
583 Statement::DropFunction {
585 if_exists,
586 func_desc,
587 option,
588 } => {
589 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
590 .await
591 }
592 Statement::DropAggregate {
593 if_exists,
594 func_desc,
595 option,
596 } => {
597 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
598 .await
599 }
600 Statement::Query(_)
601 | Statement::Insert { .. }
602 | Statement::Delete { .. }
603 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
604 Statement::Copy {
605 entity: CopyEntity::Query(query),
606 target: CopyTarget::Stdout,
607 } => {
608 let response =
609 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
610 .await?;
611 Ok(response.into_copy_query_to_stdout())
612 }
613 Statement::CreateView {
614 materialized,
615 if_not_exists,
616 name,
617 columns,
618 query,
619 with_options: _, or_replace, emit_mode,
622 } => {
623 if or_replace {
624 bail_not_implemented!("CREATE OR REPLACE VIEW");
625 }
626 if materialized {
627 create_mv::handle_create_mv(
628 handler_args,
629 if_not_exists,
630 name,
631 *query,
632 columns,
633 emit_mode,
634 )
635 .await
636 } else {
637 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
638 .await
639 }
640 }
641 Statement::Flush => flush::handle_flush(handler_args).await,
642 Statement::Wait => wait::handle_wait(handler_args).await,
643 Statement::Recover => recover::handle_recover(handler_args).await,
644 Statement::SetVariable {
645 local: _,
646 variable,
647 value,
648 } => {
649 if variable.real_value().eq_ignore_ascii_case("database") {
651 let x = variable::set_var_to_param_str(&value);
652 let res = use_db::handle_use_db(
653 handler_args,
654 ObjectName::from(vec![Ident::from_real_value(
655 x.as_deref().unwrap_or("default"),
656 )]),
657 )?;
658 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
659 for notice in res.notices() {
660 builder = builder.notice(notice);
661 }
662 return Ok(builder.into());
663 }
664 variable::handle_set(handler_args, variable, value)
665 }
666 Statement::SetTimeZone { local: _, value } => {
667 variable::handle_set_time_zone(handler_args, value)
668 }
669 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
670 Statement::CreateIndex {
671 name,
672 table_name,
673 method,
674 columns,
675 include,
676 distributed_by,
677 unique,
678 if_not_exists,
679 with_properties: _,
680 } => {
681 if unique {
682 bail_not_implemented!("create unique index");
683 }
684
685 create_index::handle_create_index(
686 handler_args,
687 if_not_exists,
688 name,
689 table_name,
690 method,
691 columns.clone(),
692 include,
693 distributed_by,
694 )
695 .await
696 }
697 Statement::AlterDatabase { name, operation } => match operation {
698 AlterDatabaseOperation::RenameDatabase { database_name } => {
699 alter_rename::handle_rename_database(handler_args, name, database_name).await
700 }
701 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
702 alter_owner::handle_alter_owner(
703 handler_args,
704 name,
705 new_owner_name,
706 StatementType::ALTER_DATABASE,
707 )
708 .await
709 }
710 AlterDatabaseOperation::SetParam(config_param) => {
711 let ConfigParam { param, value } = config_param;
712
713 let database_param = match param.real_value().to_uppercase().as_str() {
714 "BARRIER_INTERVAL_MS" => {
715 let barrier_interval_ms = match value {
716 SetVariableValue::Default => None,
717 SetVariableValue::Single(SetVariableValueSingle::Literal(
718 Value::Number(num),
719 )) => {
720 let num = num.parse::<u32>().map_err(|e| {
721 ErrorCode::InvalidInputSyntax(format!(
722 "barrier_interval_ms must be a u32 integer: {}",
723 e.as_report()
724 ))
725 })?;
726 Some(num)
727 }
728 _ => {
729 return Err(ErrorCode::InvalidInputSyntax(
730 "barrier_interval_ms must be a u32 integer or DEFAULT"
731 .to_owned(),
732 )
733 .into());
734 }
735 };
736 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
737 }
738 "CHECKPOINT_FREQUENCY" => {
739 let checkpoint_frequency = match value {
740 SetVariableValue::Default => None,
741 SetVariableValue::Single(SetVariableValueSingle::Literal(
742 Value::Number(num),
743 )) => {
744 let num = num.parse::<u64>().map_err(|e| {
745 ErrorCode::InvalidInputSyntax(format!(
746 "checkpoint_frequency must be a u64 integer: {}",
747 e.as_report()
748 ))
749 })?;
750 Some(num)
751 }
752 _ => {
753 return Err(ErrorCode::InvalidInputSyntax(
754 "checkpoint_frequency must be a u64 integer or DEFAULT"
755 .to_owned(),
756 )
757 .into());
758 }
759 };
760 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
761 }
762 _ => {
763 return Err(ErrorCode::InvalidInputSyntax(format!(
764 "Unsupported database config parameter: {}",
765 param.real_value()
766 ))
767 .into());
768 }
769 };
770
771 alter_database_param::handle_alter_database_param(
772 handler_args,
773 name,
774 database_param,
775 )
776 .await
777 }
778 },
779 Statement::AlterSchema { name, operation } => match operation {
780 AlterSchemaOperation::RenameSchema { schema_name } => {
781 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
782 }
783 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
784 alter_owner::handle_alter_owner(
785 handler_args,
786 name,
787 new_owner_name,
788 StatementType::ALTER_SCHEMA,
789 )
790 .await
791 }
792 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
793 alter_swap_rename::handle_swap_rename(
794 handler_args,
795 name,
796 target_schema,
797 StatementType::ALTER_SCHEMA,
798 )
799 .await
800 }
801 },
802 Statement::AlterTable { name, operation } => match operation {
803 AlterTableOperation::AddColumn { .. }
804 | AlterTableOperation::DropColumn { .. }
805 | AlterTableOperation::AlterColumn { .. } => {
806 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
807 }
808 AlterTableOperation::RenameTable { table_name } => {
809 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
810 .await
811 }
812 AlterTableOperation::ChangeOwner { new_owner_name } => {
813 alter_owner::handle_alter_owner(
814 handler_args,
815 name,
816 new_owner_name,
817 StatementType::ALTER_TABLE,
818 )
819 .await
820 }
821 AlterTableOperation::SetParallelism {
822 parallelism,
823 deferred,
824 } => {
825 alter_parallelism::handle_alter_parallelism(
826 handler_args,
827 name,
828 parallelism,
829 StatementType::ALTER_TABLE,
830 deferred,
831 )
832 .await
833 }
834 AlterTableOperation::SetSchema { new_schema_name } => {
835 alter_set_schema::handle_alter_set_schema(
836 handler_args,
837 name,
838 new_schema_name,
839 StatementType::ALTER_TABLE,
840 None,
841 )
842 .await
843 }
844 AlterTableOperation::RefreshSchema => {
845 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
846 }
847 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
848 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
849 handler_args,
850 PbThrottleTarget::TableWithSource,
851 name,
852 rate_limit,
853 )
854 .await
855 }
856 AlterTableOperation::DropConnector => {
857 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
858 .await
859 }
860 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
861 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
862 handler_args,
863 PbThrottleTarget::TableDml,
864 name,
865 rate_limit,
866 )
867 .await
868 }
869 AlterTableOperation::SetConfig { .. } => {
870 bail_not_implemented!("ALTER TABLE SET CONFIG")
871 }
872 AlterTableOperation::ResetConfig { .. } => {
873 bail_not_implemented!("ALTER TABLE RESET CONFIG")
874 }
875 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
876 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
877 handler_args,
878 PbThrottleTarget::CdcTable,
879 name,
880 rate_limit,
881 )
882 .await
883 }
884 AlterTableOperation::SwapRenameTable { target_table } => {
885 alter_swap_rename::handle_swap_rename(
886 handler_args,
887 name,
888 target_table,
889 StatementType::ALTER_TABLE,
890 )
891 .await
892 }
893 AlterTableOperation::AlterConnectorProps { alter_props } => {
894 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
895 }
896 AlterTableOperation::AddConstraint { .. }
897 | AlterTableOperation::DropConstraint { .. }
898 | AlterTableOperation::RenameColumn { .. }
899 | AlterTableOperation::ChangeColumn { .. }
900 | AlterTableOperation::RenameConstraint { .. } => {
901 bail_not_implemented!(
902 "Unhandled statement: {}",
903 Statement::AlterTable { name, operation }
904 )
905 }
906 },
907 Statement::AlterIndex { name, operation } => match operation {
908 AlterIndexOperation::RenameIndex { index_name } => {
909 alter_rename::handle_rename_index(handler_args, name, index_name).await
910 }
911 AlterIndexOperation::SetParallelism {
912 parallelism,
913 deferred,
914 } => {
915 alter_parallelism::handle_alter_parallelism(
916 handler_args,
917 name,
918 parallelism,
919 StatementType::ALTER_INDEX,
920 deferred,
921 )
922 .await
923 }
924 AlterIndexOperation::SetConfig { .. } => {
925 bail_not_implemented!("ALTER INDEX SET CONFIG")
926 }
927 AlterIndexOperation::ResetConfig { .. } => {
928 bail_not_implemented!("ALTER INDEX RESET CONFIG")
929 }
930 },
931 Statement::AlterView {
932 materialized,
933 name,
934 operation,
935 } => {
936 let statement_type = if materialized {
937 StatementType::ALTER_MATERIALIZED_VIEW
938 } else {
939 StatementType::ALTER_VIEW
940 };
941 match operation {
942 AlterViewOperation::RenameView { view_name } => {
943 if materialized {
944 alter_rename::handle_rename_table(
945 handler_args,
946 TableType::MaterializedView,
947 name,
948 view_name,
949 )
950 .await
951 } else {
952 alter_rename::handle_rename_view(handler_args, name, view_name).await
953 }
954 }
955 AlterViewOperation::SetParallelism {
956 parallelism,
957 deferred,
958 } => {
959 if !materialized {
960 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
961 }
962 alter_parallelism::handle_alter_parallelism(
963 handler_args,
964 name,
965 parallelism,
966 statement_type,
967 deferred,
968 )
969 .await
970 }
971 AlterViewOperation::SetResourceGroup {
972 resource_group,
973 deferred,
974 } => {
975 if !materialized {
976 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
977 }
978 alter_resource_group::handle_alter_resource_group(
979 handler_args,
980 name,
981 resource_group,
982 statement_type,
983 deferred,
984 )
985 .await
986 }
987 AlterViewOperation::ChangeOwner { new_owner_name } => {
988 alter_owner::handle_alter_owner(
989 handler_args,
990 name,
991 new_owner_name,
992 statement_type,
993 )
994 .await
995 }
996 AlterViewOperation::SetSchema { new_schema_name } => {
997 alter_set_schema::handle_alter_set_schema(
998 handler_args,
999 name,
1000 new_schema_name,
1001 statement_type,
1002 None,
1003 )
1004 .await
1005 }
1006 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1007 if !materialized {
1008 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1009 }
1010 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1011 handler_args,
1012 PbThrottleTarget::Mv,
1013 name,
1014 rate_limit,
1015 )
1016 .await
1017 }
1018 AlterViewOperation::SwapRenameView { target_view } => {
1019 alter_swap_rename::handle_swap_rename(
1020 handler_args,
1021 name,
1022 target_view,
1023 statement_type,
1024 )
1025 .await
1026 }
1027 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1028 if !materialized {
1029 bail!(
1030 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1031 );
1032 }
1033 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1034 }
1035 AlterViewOperation::AsQuery { query } => {
1036 if !materialized {
1037 bail_not_implemented!("ALTER VIEW AS QUERY");
1038 }
1039 if !cfg!(debug_assertions) {
1041 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1042 }
1043 alter_mv::handle_alter_mv(handler_args, name, query).await
1044 }
1045 AlterViewOperation::SetConfig { .. } => {
1046 if !materialized {
1047 bail!("SET CONFIG is only supported for materialized views");
1048 }
1049 bail_not_implemented!("ALTER MATERIALIZED VIEW SET CONFIG")
1050 }
1051 AlterViewOperation::ResetConfig { .. } => {
1052 if !materialized {
1053 bail!("RESET CONFIG is only supported for materialized views");
1054 }
1055 bail_not_implemented!("ALTER MATERIALIZED VIEW RESET CONFIG")
1056 }
1057 }
1058 }
1059
1060 Statement::AlterSink { name, operation } => match operation {
1061 AlterSinkOperation::AlterConnectorProps {
1062 alter_props: changed_props,
1063 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1064 AlterSinkOperation::RenameSink { sink_name } => {
1065 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1066 }
1067 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1068 alter_owner::handle_alter_owner(
1069 handler_args,
1070 name,
1071 new_owner_name,
1072 StatementType::ALTER_SINK,
1073 )
1074 .await
1075 }
1076 AlterSinkOperation::SetSchema { new_schema_name } => {
1077 alter_set_schema::handle_alter_set_schema(
1078 handler_args,
1079 name,
1080 new_schema_name,
1081 StatementType::ALTER_SINK,
1082 None,
1083 )
1084 .await
1085 }
1086 AlterSinkOperation::SetParallelism {
1087 parallelism,
1088 deferred,
1089 } => {
1090 alter_parallelism::handle_alter_parallelism(
1091 handler_args,
1092 name,
1093 parallelism,
1094 StatementType::ALTER_SINK,
1095 deferred,
1096 )
1097 .await
1098 }
1099 AlterSinkOperation::SetConfig { .. } => {
1100 bail_not_implemented!("ALTER SINK SET CONFIG")
1101 }
1102 AlterSinkOperation::ResetConfig { .. } => {
1103 bail_not_implemented!("ALTER SINK RESET CONFIG")
1104 }
1105 AlterSinkOperation::SwapRenameSink { target_sink } => {
1106 alter_swap_rename::handle_swap_rename(
1107 handler_args,
1108 name,
1109 target_sink,
1110 StatementType::ALTER_SINK,
1111 )
1112 .await
1113 }
1114 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1115 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1116 handler_args,
1117 PbThrottleTarget::Sink,
1118 name,
1119 rate_limit,
1120 )
1121 .await
1122 }
1123 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1124 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1125 handler_args,
1126 name,
1127 enable,
1128 )
1129 .await
1130 }
1131 },
1132 Statement::AlterSubscription { name, operation } => match operation {
1133 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1134 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1135 .await
1136 }
1137 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1138 alter_owner::handle_alter_owner(
1139 handler_args,
1140 name,
1141 new_owner_name,
1142 StatementType::ALTER_SUBSCRIPTION,
1143 )
1144 .await
1145 }
1146 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1147 alter_set_schema::handle_alter_set_schema(
1148 handler_args,
1149 name,
1150 new_schema_name,
1151 StatementType::ALTER_SUBSCRIPTION,
1152 None,
1153 )
1154 .await
1155 }
1156 AlterSubscriptionOperation::SwapRenameSubscription {
1157 target_subscription,
1158 } => {
1159 alter_swap_rename::handle_swap_rename(
1160 handler_args,
1161 name,
1162 target_subscription,
1163 StatementType::ALTER_SUBSCRIPTION,
1164 )
1165 .await
1166 }
1167 },
1168 Statement::AlterSource { name, operation } => match operation {
1169 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1170 alter_source_props::handle_alter_source_connector_props(
1171 handler_args,
1172 name,
1173 alter_props,
1174 )
1175 .await
1176 }
1177 AlterSourceOperation::RenameSource { source_name } => {
1178 alter_rename::handle_rename_source(handler_args, name, source_name).await
1179 }
1180 AlterSourceOperation::AddColumn { .. } => {
1181 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1182 }
1183 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1184 alter_owner::handle_alter_owner(
1185 handler_args,
1186 name,
1187 new_owner_name,
1188 StatementType::ALTER_SOURCE,
1189 )
1190 .await
1191 }
1192 AlterSourceOperation::SetSchema { new_schema_name } => {
1193 alter_set_schema::handle_alter_set_schema(
1194 handler_args,
1195 name,
1196 new_schema_name,
1197 StatementType::ALTER_SOURCE,
1198 None,
1199 )
1200 .await
1201 }
1202 AlterSourceOperation::FormatEncode { format_encode } => {
1203 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1204 .await
1205 }
1206 AlterSourceOperation::RefreshSchema => {
1207 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1208 }
1209 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1210 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1211 handler_args,
1212 PbThrottleTarget::Source,
1213 name,
1214 rate_limit,
1215 )
1216 .await
1217 }
1218 AlterSourceOperation::SwapRenameSource { target_source } => {
1219 alter_swap_rename::handle_swap_rename(
1220 handler_args,
1221 name,
1222 target_source,
1223 StatementType::ALTER_SOURCE,
1224 )
1225 .await
1226 }
1227 AlterSourceOperation::SetParallelism {
1228 parallelism,
1229 deferred,
1230 } => {
1231 alter_parallelism::handle_alter_parallelism(
1232 handler_args,
1233 name,
1234 parallelism,
1235 StatementType::ALTER_SOURCE,
1236 deferred,
1237 )
1238 .await
1239 }
1240 AlterSourceOperation::SetConfig { .. } => {
1241 bail_not_implemented!("ALTER SOURCE SET CONFIG")
1242 }
1243 AlterSourceOperation::ResetConfig { .. } => {
1244 bail_not_implemented!("ALTER SOURCE RESET CONFIG")
1245 }
1246 },
1247 Statement::AlterFunction {
1248 name,
1249 args,
1250 operation,
1251 } => match operation {
1252 AlterFunctionOperation::SetSchema { new_schema_name } => {
1253 alter_set_schema::handle_alter_set_schema(
1254 handler_args,
1255 name,
1256 new_schema_name,
1257 StatementType::ALTER_FUNCTION,
1258 args,
1259 )
1260 .await
1261 }
1262 },
1263 Statement::AlterConnection { name, operation } => match operation {
1264 AlterConnectionOperation::SetSchema { new_schema_name } => {
1265 alter_set_schema::handle_alter_set_schema(
1266 handler_args,
1267 name,
1268 new_schema_name,
1269 StatementType::ALTER_CONNECTION,
1270 None,
1271 )
1272 .await
1273 }
1274 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1275 alter_owner::handle_alter_owner(
1276 handler_args,
1277 name,
1278 new_owner_name,
1279 StatementType::ALTER_CONNECTION,
1280 )
1281 .await
1282 }
1283 },
1284 Statement::AlterSystem { param, value } => {
1285 alter_system::handle_alter_system(handler_args, param, value).await
1286 }
1287 Statement::AlterSecret {
1288 name,
1289 with_options,
1290 operation,
1291 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1292 Statement::AlterFragment {
1293 fragment_ids,
1294 operation,
1295 } => match operation {
1296 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1297 let [fragment_id] = fragment_ids.as_slice() else {
1298 return Err(ErrorCode::InvalidInputSyntax(
1299 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1300 .to_owned(),
1301 )
1302 .into());
1303 };
1304 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1305 &handler_args.session,
1306 PbThrottleTarget::Fragment,
1307 *fragment_id,
1308 rate_limit,
1309 StatementType::SET_VARIABLE,
1310 )
1311 .await
1312 }
1313 AlterFragmentOperation::SetParallelism { parallelism } => {
1314 alter_parallelism::handle_alter_fragment_parallelism(
1315 handler_args,
1316 fragment_ids.into_iter().map_into().collect(),
1317 parallelism,
1318 )
1319 .await
1320 }
1321 },
1322 Statement::AlterDefaultPrivileges { .. } => {
1323 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1324 }
1325 Statement::StartTransaction { modes } => {
1326 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1327 }
1328 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1329 Statement::Commit { chain } => {
1330 transaction::handle_commit(handler_args, COMMIT, chain).await
1331 }
1332 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1333 Statement::Rollback { chain } => {
1334 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1335 }
1336 Statement::SetTransaction {
1337 modes,
1338 snapshot,
1339 session,
1340 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1341 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1342 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1343 Statement::Comment {
1344 object_type,
1345 object_name,
1346 comment,
1347 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1348 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1349 Statement::Prepare {
1350 name,
1351 data_types,
1352 statement,
1353 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1354 Statement::Deallocate { name, prepare } => {
1355 prepared_statement::handle_deallocate(name, prepare).await
1356 }
1357 Statement::Vacuum { object_name, full } => {
1358 vacuum::handle_vacuum(handler_args, object_name, full).await
1359 }
1360 Statement::Refresh { table_name } => {
1361 refresh::handle_refresh(handler_args, table_name).await
1362 }
1363 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1364 }
1365}
1366
1367fn check_ban_ddl_for_iceberg_engine_table(
1368 session: Arc<SessionImpl>,
1369 stmt: &Statement,
1370) -> Result<()> {
1371 match stmt {
1372 Statement::AlterTable {
1373 name,
1374 operation:
1375 operation @ (AlterTableOperation::AddColumn { .. }
1376 | AlterTableOperation::DropColumn { .. }),
1377 } => {
1378 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1379 if table.is_iceberg_engine_table() {
1380 bail!(
1381 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1382 operation,
1383 schema_name,
1384 name
1385 );
1386 }
1387 }
1388
1389 Statement::AlterTable {
1390 name,
1391 operation: AlterTableOperation::RenameTable { .. },
1392 } => {
1393 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1394 if table.is_iceberg_engine_table() {
1395 bail!(
1396 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1397 schema_name,
1398 name
1399 );
1400 }
1401 }
1402
1403 Statement::AlterTable {
1404 name,
1405 operation: AlterTableOperation::ChangeOwner { .. },
1406 } => {
1407 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1408 if table.is_iceberg_engine_table() {
1409 bail!(
1410 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1411 schema_name,
1412 name
1413 );
1414 }
1415 }
1416
1417 Statement::AlterTable {
1418 name,
1419 operation: AlterTableOperation::SetParallelism { .. },
1420 } => {
1421 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1422 if table.is_iceberg_engine_table() {
1423 bail!(
1424 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1425 schema_name,
1426 name
1427 );
1428 }
1429 }
1430
1431 Statement::AlterTable {
1432 name,
1433 operation: AlterTableOperation::SetSchema { .. },
1434 } => {
1435 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1436 if table.is_iceberg_engine_table() {
1437 bail!(
1438 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1439 schema_name,
1440 name
1441 );
1442 }
1443 }
1444
1445 Statement::AlterTable {
1446 name,
1447 operation: AlterTableOperation::RefreshSchema,
1448 } => {
1449 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1450 if table.is_iceberg_engine_table() {
1451 bail!(
1452 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1453 schema_name,
1454 name
1455 );
1456 }
1457 }
1458
1459 Statement::AlterTable {
1460 name,
1461 operation: AlterTableOperation::SetSourceRateLimit { .. },
1462 } => {
1463 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1464 if table.is_iceberg_engine_table() {
1465 bail!(
1466 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1467 schema_name,
1468 name
1469 );
1470 }
1471 }
1472
1473 _ => {}
1474 }
1475
1476 Ok(())
1477}