1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116mod reset_source;
117pub mod show;
118mod transaction;
119mod use_db;
120pub mod util;
121pub mod vacuum;
122pub mod variable;
123mod wait;
124
125pub use alter_table_column::{
126 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
127};
128
129pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
131
132pub type RwPgResponse = PgResponse<PgResponseStream>;
134
135#[easy_ext::ext(RwPgResponseBuilderExt)]
136impl RwPgResponseBuilder {
137 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
139 let fields = T::fields();
140 self.values(
141 rows.into_iter()
142 .map(|row| {
143 Row::new(
144 row.into_owned_row()
145 .into_iter()
146 .zip_eq_fast(&fields)
147 .map(|(datum, (_, ty))| {
148 datum.map(|scalar| {
149 scalar.as_scalar_ref_impl().text_format(ty).into()
150 })
151 })
152 .collect(),
153 )
154 })
155 .collect_vec()
156 .into(),
157 fields_to_descriptors(fields),
158 )
159 }
160}
161
162pub fn fields_to_descriptors(
163 fields: Vec<(&str, risingwave_common::types::DataType)>,
164) -> Vec<PgFieldDescriptor> {
165 fields
166 .iter()
167 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
168 .collect()
169}
170
171pub enum PgResponseStream {
172 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
173 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
174 Rows(BoxStream<'static, RowSetResult>),
175}
176
177impl Stream for PgResponseStream {
178 type Item = std::result::Result<Vec<Row>, BoxedError>;
179
180 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 match &mut *self {
182 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
183 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
184 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
185 }
186 }
187}
188
189impl From<Vec<Row>> for PgResponseStream {
190 fn from(rows: Vec<Row>) -> Self {
191 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
192 }
193}
194
195#[derive(Clone)]
196pub struct HandlerArgs {
197 pub session: Arc<SessionImpl>,
198 pub sql: Arc<str>,
199 pub normalized_sql: String,
200 pub with_options: WithOptions,
201}
202
203impl HandlerArgs {
204 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
205 Ok(Self {
206 session,
207 sql,
208 with_options: WithOptions::try_from(stmt)?,
209 normalized_sql: Self::normalize_sql(stmt),
210 })
211 }
212
213 fn normalize_sql(stmt: &Statement) -> String {
219 let mut stmt = stmt.clone();
220 match &mut stmt {
221 Statement::CreateView {
222 or_replace,
223 if_not_exists,
224 ..
225 } => {
226 *or_replace = false;
227 *if_not_exists = false;
228 }
229 Statement::CreateTable {
230 or_replace,
231 if_not_exists,
232 ..
233 } => {
234 *or_replace = false;
235 *if_not_exists = false;
236 }
237 Statement::CreateIndex { if_not_exists, .. } => {
238 *if_not_exists = false;
239 }
240 Statement::CreateSource {
241 stmt: CreateSourceStatement { if_not_exists, .. },
242 ..
243 } => {
244 *if_not_exists = false;
245 }
246 Statement::CreateSink {
247 stmt: CreateSinkStatement { if_not_exists, .. },
248 } => {
249 *if_not_exists = false;
250 }
251 Statement::CreateSubscription {
252 stmt: CreateSubscriptionStatement { if_not_exists, .. },
253 } => {
254 *if_not_exists = false;
255 }
256 Statement::CreateConnection {
257 stmt: CreateConnectionStatement { if_not_exists, .. },
258 } => {
259 *if_not_exists = false;
260 }
261 _ => {}
262 }
263 stmt.to_string()
264 }
265}
266
267pub async fn handle(
268 session: Arc<SessionImpl>,
269 stmt: Statement,
270 sql: Arc<str>,
271 formats: Vec<Format>,
272) -> Result<RwPgResponse> {
273 session.clear_cancel_query_flag();
274 let _guard = session.txn_begin_implicit();
275 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
276
277 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
278
279 match stmt {
280 Statement::Explain {
281 statement,
282 analyze,
283 options,
284 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
285 Statement::ExplainAnalyzeStreamJob {
286 target,
287 duration_secs,
288 } => {
289 explain_analyze_stream_job::handle_explain_analyze_stream_job(
290 handler_args,
291 target,
292 duration_secs,
293 )
294 .await
295 }
296 Statement::CreateSource { stmt } => {
297 create_source::handle_create_source(handler_args, stmt).await
298 }
299 Statement::CreateSink { stmt } => {
300 create_sink::handle_create_sink(handler_args, stmt, false).await
301 }
302 Statement::CreateSubscription { stmt } => {
303 create_subscription::handle_create_subscription(handler_args, stmt).await
304 }
305 Statement::CreateConnection { stmt } => {
306 create_connection::handle_create_connection(handler_args, stmt).await
307 }
308 Statement::CreateSecret { stmt } => {
309 create_secret::handle_create_secret(handler_args, stmt).await
310 }
311 Statement::CreateFunction {
312 or_replace,
313 temporary,
314 if_not_exists,
315 name,
316 args,
317 returns,
318 params,
319 with_options,
320 } => {
321 if params.language.is_none()
324 || !params
325 .language
326 .as_ref()
327 .unwrap()
328 .real_value()
329 .eq_ignore_ascii_case("sql")
330 {
331 create_function::handle_create_function(
332 handler_args,
333 or_replace,
334 temporary,
335 if_not_exists,
336 name,
337 args,
338 returns,
339 params,
340 with_options,
341 )
342 .await
343 } else {
344 create_sql_function::handle_create_sql_function(
345 handler_args,
346 or_replace,
347 temporary,
348 if_not_exists,
349 name,
350 args,
351 returns,
352 params,
353 )
354 .await
355 }
356 }
357 Statement::CreateAggregate {
358 or_replace,
359 if_not_exists,
360 name,
361 args,
362 returns,
363 params,
364 ..
365 } => {
366 create_aggregate::handle_create_aggregate(
367 handler_args,
368 or_replace,
369 if_not_exists,
370 name,
371 args,
372 returns,
373 params,
374 )
375 .await
376 }
377 Statement::CreateTable {
378 name,
379 columns,
380 wildcard_idx,
381 constraints,
382 query,
383 with_options: _, or_replace,
386 temporary,
387 if_not_exists,
388 format_encode,
389 source_watermarks,
390 append_only,
391 on_conflict,
392 with_version_columns,
393 cdc_table_info,
394 include_column_options,
395 webhook_info,
396 engine,
397 } => {
398 if or_replace {
399 bail_not_implemented!("CREATE OR REPLACE TABLE");
400 }
401 if temporary {
402 bail_not_implemented!("CREATE TEMPORARY TABLE");
403 }
404 if let Some(query) = query {
405 return create_table_as::handle_create_as(
406 handler_args,
407 name,
408 if_not_exists,
409 query,
410 columns,
411 append_only,
412 on_conflict,
413 with_version_columns
414 .iter()
415 .map(|col| col.real_value())
416 .collect(),
417 engine,
418 )
419 .await;
420 }
421 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
422 create_table::handle_create_table(
423 handler_args,
424 name,
425 columns,
426 wildcard_idx,
427 constraints,
428 if_not_exists,
429 format_encode,
430 source_watermarks,
431 append_only,
432 on_conflict,
433 with_version_columns
434 .iter()
435 .map(|col| col.real_value())
436 .collect(),
437 cdc_table_info,
438 include_column_options,
439 webhook_info,
440 engine,
441 )
442 .await
443 }
444 Statement::CreateDatabase {
445 db_name,
446 if_not_exists,
447 owner,
448 resource_group,
449 barrier_interval_ms,
450 checkpoint_frequency,
451 } => {
452 create_database::handle_create_database(
453 handler_args,
454 db_name,
455 if_not_exists,
456 owner,
457 resource_group,
458 barrier_interval_ms,
459 checkpoint_frequency,
460 )
461 .await
462 }
463 Statement::CreateSchema {
464 schema_name,
465 if_not_exists,
466 owner,
467 } => {
468 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
469 .await
470 }
471 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
472 Statement::DeclareCursor { stmt } => {
473 declare_cursor::handle_declare_cursor(handler_args, stmt).await
474 }
475 Statement::FetchCursor { stmt } => {
476 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
477 }
478 Statement::CloseCursor { stmt } => {
479 close_cursor::handle_close_cursor(handler_args, stmt).await
480 }
481 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
482 Statement::Grant { .. } => {
483 handle_privilege::handle_grant_privilege(handler_args, stmt).await
484 }
485 Statement::Revoke { .. } => {
486 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
487 }
488 Statement::Describe { name, kind } => match kind {
489 DescribeKind::Fragments => {
490 describe::handle_describe_fragments(handler_args, name).await
491 }
492 DescribeKind::Plain => describe::handle_describe(handler_args, name),
493 },
494 Statement::DescribeFragment { fragment_id } => {
495 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
496 }
497 Statement::Discard(..) => discard::handle_discard(handler_args),
498 Statement::ShowObjects {
499 object: show_object,
500 filter,
501 } => show::handle_show_object(handler_args, show_object, filter).await,
502 Statement::ShowCreateObject { create_type, name } => {
503 show::handle_show_create_object(handler_args, create_type, name)
504 }
505 Statement::ShowTransactionIsolationLevel => {
506 transaction::handle_show_isolation_level(handler_args)
507 }
508 Statement::Drop(DropStatement {
509 object_type,
510 object_name,
511 if_exists,
512 drop_mode,
513 }) => {
514 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
515 match object_type {
516 ObjectType::MaterializedView
517 | ObjectType::View
518 | ObjectType::Sink
519 | ObjectType::Source
520 | ObjectType::Subscription
521 | ObjectType::Index
522 | ObjectType::Table
523 | ObjectType::Schema
524 | ObjectType::Connection => true,
525 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
526 bail_not_implemented!("DROP CASCADE");
527 }
528 }
529 } else {
530 false
531 };
532 match object_type {
533 ObjectType::Table => {
534 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
535 .await
536 }
537 ObjectType::MaterializedView => {
538 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
539 }
540 ObjectType::Index => {
541 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
542 .await
543 }
544 ObjectType::Source => {
545 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
546 .await
547 }
548 ObjectType::Sink => {
549 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
550 }
551 ObjectType::Subscription => {
552 drop_subscription::handle_drop_subscription(
553 handler_args,
554 object_name,
555 if_exists,
556 cascade,
557 )
558 .await
559 }
560 ObjectType::Database => {
561 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
562 }
563 ObjectType::Schema => {
564 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
565 .await
566 }
567 ObjectType::User => {
568 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
569 }
570 ObjectType::View => {
571 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
572 }
573 ObjectType::Connection => {
574 drop_connection::handle_drop_connection(
575 handler_args,
576 object_name,
577 if_exists,
578 cascade,
579 )
580 .await
581 }
582 ObjectType::Secret => {
583 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
584 }
585 }
586 }
587 Statement::DropFunction {
589 if_exists,
590 func_desc,
591 option,
592 } => {
593 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
594 .await
595 }
596 Statement::DropAggregate {
597 if_exists,
598 func_desc,
599 option,
600 } => {
601 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
602 .await
603 }
604 Statement::Query(_)
605 | Statement::Insert { .. }
606 | Statement::Delete { .. }
607 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
608 Statement::Copy {
609 entity: CopyEntity::Query(query),
610 target: CopyTarget::Stdout,
611 } => {
612 let response =
613 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
614 .await?;
615 Ok(response.into_copy_query_to_stdout())
616 }
617 Statement::CreateView {
618 materialized,
619 if_not_exists,
620 name,
621 columns,
622 query,
623 with_options: _, or_replace, emit_mode,
626 } => {
627 if or_replace {
628 bail_not_implemented!("CREATE OR REPLACE VIEW");
629 }
630 if materialized {
631 create_mv::handle_create_mv(
632 handler_args,
633 if_not_exists,
634 name,
635 *query,
636 columns,
637 emit_mode,
638 )
639 .await
640 } else {
641 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
642 .await
643 }
644 }
645 Statement::Flush => flush::handle_flush(handler_args).await,
646 Statement::Wait => wait::handle_wait(handler_args).await,
647 Statement::Recover => recover::handle_recover(handler_args).await,
648 Statement::SetVariable {
649 local: _,
650 variable,
651 value,
652 } => {
653 if variable.real_value().eq_ignore_ascii_case("database") {
655 let x = variable::set_var_to_param_str(&value);
656 let res = use_db::handle_use_db(
657 handler_args,
658 ObjectName::from(vec![Ident::from_real_value(
659 x.as_deref().unwrap_or("default"),
660 )]),
661 )?;
662 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
663 for notice in res.notices() {
664 builder = builder.notice(notice);
665 }
666 return Ok(builder.into());
667 }
668 variable::handle_set(handler_args, variable, value)
669 }
670 Statement::SetTimeZone { local: _, value } => {
671 variable::handle_set_time_zone(handler_args, value)
672 }
673 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
674 Statement::CreateIndex {
675 name,
676 table_name,
677 method,
678 columns,
679 include,
680 distributed_by,
681 unique,
682 if_not_exists,
683 with_properties: _,
684 } => {
685 if unique {
686 bail_not_implemented!("create unique index");
687 }
688
689 create_index::handle_create_index(
690 handler_args,
691 if_not_exists,
692 name,
693 table_name,
694 method,
695 columns.clone(),
696 include,
697 distributed_by,
698 )
699 .await
700 }
701 Statement::AlterDatabase { name, operation } => match operation {
702 AlterDatabaseOperation::RenameDatabase { database_name } => {
703 alter_rename::handle_rename_database(handler_args, name, database_name).await
704 }
705 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
706 alter_owner::handle_alter_owner(
707 handler_args,
708 name,
709 new_owner_name,
710 StatementType::ALTER_DATABASE,
711 )
712 .await
713 }
714 AlterDatabaseOperation::SetParam(config_param) => {
715 let ConfigParam { param, value } = config_param;
716
717 let database_param = match param.real_value().to_uppercase().as_str() {
718 "BARRIER_INTERVAL_MS" => {
719 let barrier_interval_ms = match value {
720 SetVariableValue::Default => None,
721 SetVariableValue::Single(SetVariableValueSingle::Literal(
722 Value::Number(num),
723 )) => {
724 let num = num.parse::<u32>().map_err(|e| {
725 ErrorCode::InvalidInputSyntax(format!(
726 "barrier_interval_ms must be a u32 integer: {}",
727 e.as_report()
728 ))
729 })?;
730 Some(num)
731 }
732 _ => {
733 return Err(ErrorCode::InvalidInputSyntax(
734 "barrier_interval_ms must be a u32 integer or DEFAULT"
735 .to_owned(),
736 )
737 .into());
738 }
739 };
740 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
741 }
742 "CHECKPOINT_FREQUENCY" => {
743 let checkpoint_frequency = match value {
744 SetVariableValue::Default => None,
745 SetVariableValue::Single(SetVariableValueSingle::Literal(
746 Value::Number(num),
747 )) => {
748 let num = num.parse::<u64>().map_err(|e| {
749 ErrorCode::InvalidInputSyntax(format!(
750 "checkpoint_frequency must be a u64 integer: {}",
751 e.as_report()
752 ))
753 })?;
754 Some(num)
755 }
756 _ => {
757 return Err(ErrorCode::InvalidInputSyntax(
758 "checkpoint_frequency must be a u64 integer or DEFAULT"
759 .to_owned(),
760 )
761 .into());
762 }
763 };
764 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
765 }
766 _ => {
767 return Err(ErrorCode::InvalidInputSyntax(format!(
768 "Unsupported database config parameter: {}",
769 param.real_value()
770 ))
771 .into());
772 }
773 };
774
775 alter_database_param::handle_alter_database_param(
776 handler_args,
777 name,
778 database_param,
779 )
780 .await
781 }
782 },
783 Statement::AlterSchema { name, operation } => match operation {
784 AlterSchemaOperation::RenameSchema { schema_name } => {
785 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
786 }
787 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
788 alter_owner::handle_alter_owner(
789 handler_args,
790 name,
791 new_owner_name,
792 StatementType::ALTER_SCHEMA,
793 )
794 .await
795 }
796 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
797 alter_swap_rename::handle_swap_rename(
798 handler_args,
799 name,
800 target_schema,
801 StatementType::ALTER_SCHEMA,
802 )
803 .await
804 }
805 },
806 Statement::AlterTable { name, operation } => match operation {
807 AlterTableOperation::AddColumn { .. }
808 | AlterTableOperation::DropColumn { .. }
809 | AlterTableOperation::AlterColumn { .. } => {
810 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
811 }
812 AlterTableOperation::RenameTable { table_name } => {
813 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
814 .await
815 }
816 AlterTableOperation::ChangeOwner { new_owner_name } => {
817 alter_owner::handle_alter_owner(
818 handler_args,
819 name,
820 new_owner_name,
821 StatementType::ALTER_TABLE,
822 )
823 .await
824 }
825 AlterTableOperation::SetParallelism {
826 parallelism,
827 deferred,
828 } => {
829 alter_parallelism::handle_alter_parallelism(
830 handler_args,
831 name,
832 parallelism,
833 StatementType::ALTER_TABLE,
834 deferred,
835 )
836 .await
837 }
838 AlterTableOperation::SetSchema { new_schema_name } => {
839 alter_set_schema::handle_alter_set_schema(
840 handler_args,
841 name,
842 new_schema_name,
843 StatementType::ALTER_TABLE,
844 None,
845 )
846 .await
847 }
848 AlterTableOperation::RefreshSchema => {
849 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
850 }
851 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
852 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
853 handler_args,
854 PbThrottleTarget::TableWithSource,
855 name,
856 rate_limit,
857 )
858 .await
859 }
860 AlterTableOperation::DropConnector => {
861 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
862 .await
863 }
864 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
865 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
866 handler_args,
867 PbThrottleTarget::TableDml,
868 name,
869 rate_limit,
870 )
871 .await
872 }
873 AlterTableOperation::SetConfig { entries } => {
874 alter_streaming_config::handle_alter_streaming_set_config(
875 handler_args,
876 name,
877 entries,
878 StatementType::ALTER_TABLE,
879 )
880 .await
881 }
882 AlterTableOperation::ResetConfig { keys } => {
883 alter_streaming_config::handle_alter_streaming_reset_config(
884 handler_args,
885 name,
886 keys,
887 StatementType::ALTER_TABLE,
888 )
889 .await
890 }
891 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
892 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
893 handler_args,
894 PbThrottleTarget::CdcTable,
895 name,
896 rate_limit,
897 )
898 .await
899 }
900 AlterTableOperation::SwapRenameTable { target_table } => {
901 alter_swap_rename::handle_swap_rename(
902 handler_args,
903 name,
904 target_table,
905 StatementType::ALTER_TABLE,
906 )
907 .await
908 }
909 AlterTableOperation::AlterConnectorProps { alter_props } => {
910 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
911 }
912 AlterTableOperation::AddConstraint { .. }
913 | AlterTableOperation::DropConstraint { .. }
914 | AlterTableOperation::RenameColumn { .. }
915 | AlterTableOperation::ChangeColumn { .. }
916 | AlterTableOperation::RenameConstraint { .. } => {
917 bail_not_implemented!(
918 "Unhandled statement: {}",
919 Statement::AlterTable { name, operation }
920 )
921 }
922 },
923 Statement::AlterIndex { name, operation } => match operation {
924 AlterIndexOperation::RenameIndex { index_name } => {
925 alter_rename::handle_rename_index(handler_args, name, index_name).await
926 }
927 AlterIndexOperation::SetParallelism {
928 parallelism,
929 deferred,
930 } => {
931 alter_parallelism::handle_alter_parallelism(
932 handler_args,
933 name,
934 parallelism,
935 StatementType::ALTER_INDEX,
936 deferred,
937 )
938 .await
939 }
940 AlterIndexOperation::SetConfig { entries } => {
941 alter_streaming_config::handle_alter_streaming_set_config(
942 handler_args,
943 name,
944 entries,
945 StatementType::ALTER_INDEX,
946 )
947 .await
948 }
949 AlterIndexOperation::ResetConfig { keys } => {
950 alter_streaming_config::handle_alter_streaming_reset_config(
951 handler_args,
952 name,
953 keys,
954 StatementType::ALTER_INDEX,
955 )
956 .await
957 }
958 },
959 Statement::AlterView {
960 materialized,
961 name,
962 operation,
963 } => {
964 let statement_type = if materialized {
965 StatementType::ALTER_MATERIALIZED_VIEW
966 } else {
967 StatementType::ALTER_VIEW
968 };
969 match operation {
970 AlterViewOperation::RenameView { view_name } => {
971 if materialized {
972 alter_rename::handle_rename_table(
973 handler_args,
974 TableType::MaterializedView,
975 name,
976 view_name,
977 )
978 .await
979 } else {
980 alter_rename::handle_rename_view(handler_args, name, view_name).await
981 }
982 }
983 AlterViewOperation::SetParallelism {
984 parallelism,
985 deferred,
986 } => {
987 if !materialized {
988 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
989 }
990 alter_parallelism::handle_alter_parallelism(
991 handler_args,
992 name,
993 parallelism,
994 statement_type,
995 deferred,
996 )
997 .await
998 }
999 AlterViewOperation::SetResourceGroup {
1000 resource_group,
1001 deferred,
1002 } => {
1003 if !materialized {
1004 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1005 }
1006 alter_resource_group::handle_alter_resource_group(
1007 handler_args,
1008 name,
1009 resource_group,
1010 statement_type,
1011 deferred,
1012 )
1013 .await
1014 }
1015 AlterViewOperation::ChangeOwner { new_owner_name } => {
1016 alter_owner::handle_alter_owner(
1017 handler_args,
1018 name,
1019 new_owner_name,
1020 statement_type,
1021 )
1022 .await
1023 }
1024 AlterViewOperation::SetSchema { new_schema_name } => {
1025 alter_set_schema::handle_alter_set_schema(
1026 handler_args,
1027 name,
1028 new_schema_name,
1029 statement_type,
1030 None,
1031 )
1032 .await
1033 }
1034 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1035 if !materialized {
1036 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1037 }
1038 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1039 handler_args,
1040 PbThrottleTarget::Mv,
1041 name,
1042 rate_limit,
1043 )
1044 .await
1045 }
1046 AlterViewOperation::SwapRenameView { target_view } => {
1047 alter_swap_rename::handle_swap_rename(
1048 handler_args,
1049 name,
1050 target_view,
1051 statement_type,
1052 )
1053 .await
1054 }
1055 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1056 if !materialized {
1057 bail!(
1058 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1059 );
1060 }
1061 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1062 }
1063 AlterViewOperation::AsQuery { query } => {
1064 if !materialized {
1065 bail_not_implemented!("ALTER VIEW AS QUERY");
1066 }
1067 if !cfg!(debug_assertions) {
1069 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1070 }
1071 alter_mv::handle_alter_mv(handler_args, name, query).await
1072 }
1073 AlterViewOperation::SetConfig { entries } => {
1074 if !materialized {
1075 bail!("SET CONFIG is only supported for materialized views");
1076 }
1077 alter_streaming_config::handle_alter_streaming_set_config(
1078 handler_args,
1079 name,
1080 entries,
1081 statement_type,
1082 )
1083 .await
1084 }
1085 AlterViewOperation::ResetConfig { keys } => {
1086 if !materialized {
1087 bail!("RESET CONFIG is only supported for materialized views");
1088 }
1089 alter_streaming_config::handle_alter_streaming_reset_config(
1090 handler_args,
1091 name,
1092 keys,
1093 statement_type,
1094 )
1095 .await
1096 }
1097 }
1098 }
1099
1100 Statement::AlterSink { name, operation } => match operation {
1101 AlterSinkOperation::AlterConnectorProps {
1102 alter_props: changed_props,
1103 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1104 AlterSinkOperation::RenameSink { sink_name } => {
1105 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1106 }
1107 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1108 alter_owner::handle_alter_owner(
1109 handler_args,
1110 name,
1111 new_owner_name,
1112 StatementType::ALTER_SINK,
1113 )
1114 .await
1115 }
1116 AlterSinkOperation::SetSchema { new_schema_name } => {
1117 alter_set_schema::handle_alter_set_schema(
1118 handler_args,
1119 name,
1120 new_schema_name,
1121 StatementType::ALTER_SINK,
1122 None,
1123 )
1124 .await
1125 }
1126 AlterSinkOperation::SetParallelism {
1127 parallelism,
1128 deferred,
1129 } => {
1130 alter_parallelism::handle_alter_parallelism(
1131 handler_args,
1132 name,
1133 parallelism,
1134 StatementType::ALTER_SINK,
1135 deferred,
1136 )
1137 .await
1138 }
1139 AlterSinkOperation::SetConfig { entries } => {
1140 alter_streaming_config::handle_alter_streaming_set_config(
1141 handler_args,
1142 name,
1143 entries,
1144 StatementType::ALTER_SINK,
1145 )
1146 .await
1147 }
1148 AlterSinkOperation::ResetConfig { keys } => {
1149 alter_streaming_config::handle_alter_streaming_reset_config(
1150 handler_args,
1151 name,
1152 keys,
1153 StatementType::ALTER_SINK,
1154 )
1155 .await
1156 }
1157 AlterSinkOperation::SwapRenameSink { target_sink } => {
1158 alter_swap_rename::handle_swap_rename(
1159 handler_args,
1160 name,
1161 target_sink,
1162 StatementType::ALTER_SINK,
1163 )
1164 .await
1165 }
1166 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1167 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1168 handler_args,
1169 PbThrottleTarget::Sink,
1170 name,
1171 rate_limit,
1172 )
1173 .await
1174 }
1175 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1176 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1177 handler_args,
1178 name,
1179 enable,
1180 )
1181 .await
1182 }
1183 },
1184 Statement::AlterSubscription { name, operation } => match operation {
1185 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1186 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1187 .await
1188 }
1189 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1190 alter_owner::handle_alter_owner(
1191 handler_args,
1192 name,
1193 new_owner_name,
1194 StatementType::ALTER_SUBSCRIPTION,
1195 )
1196 .await
1197 }
1198 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1199 alter_set_schema::handle_alter_set_schema(
1200 handler_args,
1201 name,
1202 new_schema_name,
1203 StatementType::ALTER_SUBSCRIPTION,
1204 None,
1205 )
1206 .await
1207 }
1208 AlterSubscriptionOperation::SwapRenameSubscription {
1209 target_subscription,
1210 } => {
1211 alter_swap_rename::handle_swap_rename(
1212 handler_args,
1213 name,
1214 target_subscription,
1215 StatementType::ALTER_SUBSCRIPTION,
1216 )
1217 .await
1218 }
1219 },
1220 Statement::AlterSource { name, operation } => match operation {
1221 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1222 alter_source_props::handle_alter_source_connector_props(
1223 handler_args,
1224 name,
1225 alter_props,
1226 )
1227 .await
1228 }
1229 AlterSourceOperation::RenameSource { source_name } => {
1230 alter_rename::handle_rename_source(handler_args, name, source_name).await
1231 }
1232 AlterSourceOperation::AddColumn { .. } => {
1233 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1234 }
1235 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1236 alter_owner::handle_alter_owner(
1237 handler_args,
1238 name,
1239 new_owner_name,
1240 StatementType::ALTER_SOURCE,
1241 )
1242 .await
1243 }
1244 AlterSourceOperation::SetSchema { new_schema_name } => {
1245 alter_set_schema::handle_alter_set_schema(
1246 handler_args,
1247 name,
1248 new_schema_name,
1249 StatementType::ALTER_SOURCE,
1250 None,
1251 )
1252 .await
1253 }
1254 AlterSourceOperation::FormatEncode { format_encode } => {
1255 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1256 .await
1257 }
1258 AlterSourceOperation::RefreshSchema => {
1259 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1260 }
1261 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1262 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1263 handler_args,
1264 PbThrottleTarget::Source,
1265 name,
1266 rate_limit,
1267 )
1268 .await
1269 }
1270 AlterSourceOperation::SwapRenameSource { target_source } => {
1271 alter_swap_rename::handle_swap_rename(
1272 handler_args,
1273 name,
1274 target_source,
1275 StatementType::ALTER_SOURCE,
1276 )
1277 .await
1278 }
1279 AlterSourceOperation::SetParallelism {
1280 parallelism,
1281 deferred,
1282 } => {
1283 alter_parallelism::handle_alter_parallelism(
1284 handler_args,
1285 name,
1286 parallelism,
1287 StatementType::ALTER_SOURCE,
1288 deferred,
1289 )
1290 .await
1291 }
1292 AlterSourceOperation::SetConfig { entries } => {
1293 alter_streaming_config::handle_alter_streaming_set_config(
1294 handler_args,
1295 name,
1296 entries,
1297 StatementType::ALTER_SOURCE,
1298 )
1299 .await
1300 }
1301 AlterSourceOperation::ResetConfig { keys } => {
1302 alter_streaming_config::handle_alter_streaming_reset_config(
1303 handler_args,
1304 name,
1305 keys,
1306 StatementType::ALTER_SOURCE,
1307 )
1308 .await
1309 }
1310 AlterSourceOperation::ResetSource => {
1311 reset_source::handle_reset_source(handler_args, name).await
1312 }
1313 },
1314 Statement::AlterFunction {
1315 name,
1316 args,
1317 operation,
1318 } => match operation {
1319 AlterFunctionOperation::SetSchema { new_schema_name } => {
1320 alter_set_schema::handle_alter_set_schema(
1321 handler_args,
1322 name,
1323 new_schema_name,
1324 StatementType::ALTER_FUNCTION,
1325 args,
1326 )
1327 .await
1328 }
1329 },
1330 Statement::AlterConnection { name, operation } => match operation {
1331 AlterConnectionOperation::SetSchema { new_schema_name } => {
1332 alter_set_schema::handle_alter_set_schema(
1333 handler_args,
1334 name,
1335 new_schema_name,
1336 StatementType::ALTER_CONNECTION,
1337 None,
1338 )
1339 .await
1340 }
1341 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1342 alter_owner::handle_alter_owner(
1343 handler_args,
1344 name,
1345 new_owner_name,
1346 StatementType::ALTER_CONNECTION,
1347 )
1348 .await
1349 }
1350 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1351 alter_connection_props::handle_alter_connection_connector_props(
1352 handler_args,
1353 name,
1354 alter_props,
1355 )
1356 .await
1357 }
1358 },
1359 Statement::AlterSystem { param, value } => {
1360 alter_system::handle_alter_system(handler_args, param, value).await
1361 }
1362 Statement::AlterSecret {
1363 name,
1364 with_options,
1365 operation,
1366 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1367 Statement::AlterFragment {
1368 fragment_ids,
1369 operation,
1370 } => match operation {
1371 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1372 let [fragment_id] = fragment_ids.as_slice() else {
1373 return Err(ErrorCode::InvalidInputSyntax(
1374 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1375 .to_owned(),
1376 )
1377 .into());
1378 };
1379 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1380 &handler_args.session,
1381 PbThrottleTarget::Fragment,
1382 *fragment_id,
1383 rate_limit,
1384 StatementType::SET_VARIABLE,
1385 )
1386 .await
1387 }
1388 AlterFragmentOperation::SetParallelism { parallelism } => {
1389 alter_parallelism::handle_alter_fragment_parallelism(
1390 handler_args,
1391 fragment_ids.into_iter().map_into().collect(),
1392 parallelism,
1393 )
1394 .await
1395 }
1396 },
1397 Statement::AlterDefaultPrivileges { .. } => {
1398 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1399 }
1400 Statement::StartTransaction { modes } => {
1401 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1402 }
1403 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1404 Statement::Commit { chain } => {
1405 transaction::handle_commit(handler_args, COMMIT, chain).await
1406 }
1407 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1408 Statement::Rollback { chain } => {
1409 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1410 }
1411 Statement::SetTransaction {
1412 modes,
1413 snapshot,
1414 session,
1415 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1416 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1417 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1418 Statement::Comment {
1419 object_type,
1420 object_name,
1421 comment,
1422 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1423 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1424 Statement::Prepare {
1425 name,
1426 data_types,
1427 statement,
1428 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1429 Statement::Deallocate { name, prepare } => {
1430 prepared_statement::handle_deallocate(name, prepare).await
1431 }
1432 Statement::Vacuum { object_name, full } => {
1433 vacuum::handle_vacuum(handler_args, object_name, full).await
1434 }
1435 Statement::Refresh { table_name } => {
1436 refresh::handle_refresh(handler_args, table_name).await
1437 }
1438 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1439 }
1440}
1441
1442fn check_ban_ddl_for_iceberg_engine_table(
1443 session: Arc<SessionImpl>,
1444 stmt: &Statement,
1445) -> Result<()> {
1446 match stmt {
1447 Statement::AlterTable {
1448 name,
1449 operation:
1450 operation @ (AlterTableOperation::AddColumn { .. }
1451 | AlterTableOperation::DropColumn { .. }),
1452 } => {
1453 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1454 if table.is_iceberg_engine_table() {
1455 bail!(
1456 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1457 operation,
1458 schema_name,
1459 name
1460 );
1461 }
1462 }
1463
1464 Statement::AlterTable {
1465 name,
1466 operation: AlterTableOperation::RenameTable { .. },
1467 } => {
1468 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1469 if table.is_iceberg_engine_table() {
1470 bail!(
1471 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1472 schema_name,
1473 name
1474 );
1475 }
1476 }
1477
1478 Statement::AlterTable {
1479 name,
1480 operation: AlterTableOperation::SetParallelism { .. },
1481 } => {
1482 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1483 if table.is_iceberg_engine_table() {
1484 bail!(
1485 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1486 schema_name,
1487 name
1488 );
1489 }
1490 }
1491
1492 Statement::AlterTable {
1493 name,
1494 operation: AlterTableOperation::SetSchema { .. },
1495 } => {
1496 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1497 if table.is_iceberg_engine_table() {
1498 bail!(
1499 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1500 schema_name,
1501 name
1502 );
1503 }
1504 }
1505
1506 Statement::AlterTable {
1507 name,
1508 operation: AlterTableOperation::RefreshSchema,
1509 } => {
1510 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1511 if table.is_iceberg_engine_table() {
1512 bail!(
1513 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1514 schema_name,
1515 name
1516 );
1517 }
1518 }
1519
1520 Statement::AlterTable {
1521 name,
1522 operation: AlterTableOperation::SetSourceRateLimit { .. },
1523 } => {
1524 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1525 if table.is_iceberg_engine_table() {
1526 bail!(
1527 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1528 schema_name,
1529 name
1530 );
1531 }
1532 }
1533
1534 _ => {}
1535 }
1536
1537 Ok(())
1538}