1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_database_param;
46mod alter_mv;
47mod alter_owner;
48mod alter_parallelism;
49mod alter_rename;
50mod alter_resource_group;
51mod alter_secret;
52mod alter_set_schema;
53mod alter_sink_props;
54mod alter_source_column;
55mod alter_source_props;
56mod alter_source_with_sr;
57mod alter_streaming_config;
58mod alter_streaming_enable_unaligned_join;
59mod alter_streaming_rate_limit;
60mod alter_swap_rename;
61mod alter_system;
62mod alter_table_column;
63pub mod alter_table_drop_connector;
64pub mod alter_table_props;
65mod alter_table_with_sr;
66pub mod alter_user;
67mod alter_utils;
68pub mod cancel_job;
69pub mod close_cursor;
70mod comment;
71pub mod create_aggregate;
72pub mod create_connection;
73mod create_database;
74pub mod create_function;
75pub mod create_index;
76pub mod create_mv;
77pub mod create_schema;
78pub mod create_secret;
79pub mod create_sink;
80pub mod create_source;
81pub mod create_sql_function;
82pub mod create_subscription;
83pub mod create_table;
84pub mod create_table_as;
85pub mod create_user;
86pub mod create_view;
87pub mod declare_cursor;
88pub mod describe;
89pub mod discard;
90mod drop_connection;
91mod drop_database;
92pub mod drop_function;
93mod drop_index;
94pub mod drop_mv;
95mod drop_schema;
96pub mod drop_secret;
97pub mod drop_sink;
98pub mod drop_source;
99pub mod drop_subscription;
100pub mod drop_table;
101pub mod drop_user;
102mod drop_view;
103pub mod explain;
104pub mod explain_analyze_stream_job;
105pub mod extended_handle;
106pub mod fetch_cursor;
107mod flush;
108pub mod handle_privilege;
109pub mod kill_process;
110mod prepared_statement;
111pub mod privilege;
112pub mod query;
113mod recover;
114mod refresh;
115pub mod show;
116mod transaction;
117mod use_db;
118pub mod util;
119pub mod vacuum;
120pub mod variable;
121mod wait;
122
123pub use alter_table_column::{
124 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
125};
126
127pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
129
130pub type RwPgResponse = PgResponse<PgResponseStream>;
132
133#[easy_ext::ext(RwPgResponseBuilderExt)]
134impl RwPgResponseBuilder {
135 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
137 let fields = T::fields();
138 self.values(
139 rows.into_iter()
140 .map(|row| {
141 Row::new(
142 row.into_owned_row()
143 .into_iter()
144 .zip_eq_fast(&fields)
145 .map(|(datum, (_, ty))| {
146 datum.map(|scalar| {
147 scalar.as_scalar_ref_impl().text_format(ty).into()
148 })
149 })
150 .collect(),
151 )
152 })
153 .collect_vec()
154 .into(),
155 fields_to_descriptors(fields),
156 )
157 }
158}
159
160pub fn fields_to_descriptors(
161 fields: Vec<(&str, risingwave_common::types::DataType)>,
162) -> Vec<PgFieldDescriptor> {
163 fields
164 .iter()
165 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
166 .collect()
167}
168
169pub enum PgResponseStream {
170 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
171 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
172 Rows(BoxStream<'static, RowSetResult>),
173}
174
175impl Stream for PgResponseStream {
176 type Item = std::result::Result<Vec<Row>, BoxedError>;
177
178 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
179 match &mut *self {
180 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
181 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
182 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
183 }
184 }
185}
186
187impl From<Vec<Row>> for PgResponseStream {
188 fn from(rows: Vec<Row>) -> Self {
189 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
190 }
191}
192
193#[derive(Clone)]
194pub struct HandlerArgs {
195 pub session: Arc<SessionImpl>,
196 pub sql: Arc<str>,
197 pub normalized_sql: String,
198 pub with_options: WithOptions,
199}
200
201impl HandlerArgs {
202 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
203 Ok(Self {
204 session,
205 sql,
206 with_options: WithOptions::try_from(stmt)?,
207 normalized_sql: Self::normalize_sql(stmt),
208 })
209 }
210
211 fn normalize_sql(stmt: &Statement) -> String {
217 let mut stmt = stmt.clone();
218 match &mut stmt {
219 Statement::CreateView {
220 or_replace,
221 if_not_exists,
222 ..
223 } => {
224 *or_replace = false;
225 *if_not_exists = false;
226 }
227 Statement::CreateTable {
228 or_replace,
229 if_not_exists,
230 ..
231 } => {
232 *or_replace = false;
233 *if_not_exists = false;
234 }
235 Statement::CreateIndex { if_not_exists, .. } => {
236 *if_not_exists = false;
237 }
238 Statement::CreateSource {
239 stmt: CreateSourceStatement { if_not_exists, .. },
240 ..
241 } => {
242 *if_not_exists = false;
243 }
244 Statement::CreateSink {
245 stmt: CreateSinkStatement { if_not_exists, .. },
246 } => {
247 *if_not_exists = false;
248 }
249 Statement::CreateSubscription {
250 stmt: CreateSubscriptionStatement { if_not_exists, .. },
251 } => {
252 *if_not_exists = false;
253 }
254 Statement::CreateConnection {
255 stmt: CreateConnectionStatement { if_not_exists, .. },
256 } => {
257 *if_not_exists = false;
258 }
259 _ => {}
260 }
261 stmt.to_string()
262 }
263}
264
265pub async fn handle(
266 session: Arc<SessionImpl>,
267 stmt: Statement,
268 sql: Arc<str>,
269 formats: Vec<Format>,
270) -> Result<RwPgResponse> {
271 session.clear_cancel_query_flag();
272 let _guard = session.txn_begin_implicit();
273 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
274
275 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
276
277 match stmt {
278 Statement::Explain {
279 statement,
280 analyze,
281 options,
282 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
283 Statement::ExplainAnalyzeStreamJob {
284 target,
285 duration_secs,
286 } => {
287 explain_analyze_stream_job::handle_explain_analyze_stream_job(
288 handler_args,
289 target,
290 duration_secs,
291 )
292 .await
293 }
294 Statement::CreateSource { stmt } => {
295 create_source::handle_create_source(handler_args, stmt).await
296 }
297 Statement::CreateSink { stmt } => {
298 create_sink::handle_create_sink(handler_args, stmt, false).await
299 }
300 Statement::CreateSubscription { stmt } => {
301 create_subscription::handle_create_subscription(handler_args, stmt).await
302 }
303 Statement::CreateConnection { stmt } => {
304 create_connection::handle_create_connection(handler_args, stmt).await
305 }
306 Statement::CreateSecret { stmt } => {
307 create_secret::handle_create_secret(handler_args, stmt).await
308 }
309 Statement::CreateFunction {
310 or_replace,
311 temporary,
312 if_not_exists,
313 name,
314 args,
315 returns,
316 params,
317 with_options,
318 } => {
319 if params.language.is_none()
322 || !params
323 .language
324 .as_ref()
325 .unwrap()
326 .real_value()
327 .eq_ignore_ascii_case("sql")
328 {
329 create_function::handle_create_function(
330 handler_args,
331 or_replace,
332 temporary,
333 if_not_exists,
334 name,
335 args,
336 returns,
337 params,
338 with_options,
339 )
340 .await
341 } else {
342 create_sql_function::handle_create_sql_function(
343 handler_args,
344 or_replace,
345 temporary,
346 if_not_exists,
347 name,
348 args,
349 returns,
350 params,
351 )
352 .await
353 }
354 }
355 Statement::CreateAggregate {
356 or_replace,
357 if_not_exists,
358 name,
359 args,
360 returns,
361 params,
362 ..
363 } => {
364 create_aggregate::handle_create_aggregate(
365 handler_args,
366 or_replace,
367 if_not_exists,
368 name,
369 args,
370 returns,
371 params,
372 )
373 .await
374 }
375 Statement::CreateTable {
376 name,
377 columns,
378 wildcard_idx,
379 constraints,
380 query,
381 with_options: _, or_replace,
384 temporary,
385 if_not_exists,
386 format_encode,
387 source_watermarks,
388 append_only,
389 on_conflict,
390 with_version_columns,
391 cdc_table_info,
392 include_column_options,
393 webhook_info,
394 engine,
395 } => {
396 if or_replace {
397 bail_not_implemented!("CREATE OR REPLACE TABLE");
398 }
399 if temporary {
400 bail_not_implemented!("CREATE TEMPORARY TABLE");
401 }
402 if let Some(query) = query {
403 return create_table_as::handle_create_as(
404 handler_args,
405 name,
406 if_not_exists,
407 query,
408 columns,
409 append_only,
410 on_conflict,
411 with_version_columns
412 .iter()
413 .map(|col| col.real_value())
414 .collect(),
415 engine,
416 )
417 .await;
418 }
419 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
420 create_table::handle_create_table(
421 handler_args,
422 name,
423 columns,
424 wildcard_idx,
425 constraints,
426 if_not_exists,
427 format_encode,
428 source_watermarks,
429 append_only,
430 on_conflict,
431 with_version_columns
432 .iter()
433 .map(|col| col.real_value())
434 .collect(),
435 cdc_table_info,
436 include_column_options,
437 webhook_info,
438 engine,
439 )
440 .await
441 }
442 Statement::CreateDatabase {
443 db_name,
444 if_not_exists,
445 owner,
446 resource_group,
447 barrier_interval_ms,
448 checkpoint_frequency,
449 } => {
450 create_database::handle_create_database(
451 handler_args,
452 db_name,
453 if_not_exists,
454 owner,
455 resource_group,
456 barrier_interval_ms,
457 checkpoint_frequency,
458 )
459 .await
460 }
461 Statement::CreateSchema {
462 schema_name,
463 if_not_exists,
464 owner,
465 } => {
466 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
467 .await
468 }
469 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
470 Statement::DeclareCursor { stmt } => {
471 declare_cursor::handle_declare_cursor(handler_args, stmt).await
472 }
473 Statement::FetchCursor { stmt } => {
474 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
475 }
476 Statement::CloseCursor { stmt } => {
477 close_cursor::handle_close_cursor(handler_args, stmt).await
478 }
479 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
480 Statement::Grant { .. } => {
481 handle_privilege::handle_grant_privilege(handler_args, stmt).await
482 }
483 Statement::Revoke { .. } => {
484 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
485 }
486 Statement::Describe { name, kind } => match kind {
487 DescribeKind::Fragments => {
488 describe::handle_describe_fragments(handler_args, name).await
489 }
490 DescribeKind::Plain => describe::handle_describe(handler_args, name),
491 },
492 Statement::DescribeFragment { fragment_id } => {
493 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
494 }
495 Statement::Discard(..) => discard::handle_discard(handler_args),
496 Statement::ShowObjects {
497 object: show_object,
498 filter,
499 } => show::handle_show_object(handler_args, show_object, filter).await,
500 Statement::ShowCreateObject { create_type, name } => {
501 show::handle_show_create_object(handler_args, create_type, name)
502 }
503 Statement::ShowTransactionIsolationLevel => {
504 transaction::handle_show_isolation_level(handler_args)
505 }
506 Statement::Drop(DropStatement {
507 object_type,
508 object_name,
509 if_exists,
510 drop_mode,
511 }) => {
512 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
513 match object_type {
514 ObjectType::MaterializedView
515 | ObjectType::View
516 | ObjectType::Sink
517 | ObjectType::Source
518 | ObjectType::Subscription
519 | ObjectType::Index
520 | ObjectType::Table
521 | ObjectType::Schema
522 | ObjectType::Connection => true,
523 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
524 bail_not_implemented!("DROP CASCADE");
525 }
526 }
527 } else {
528 false
529 };
530 match object_type {
531 ObjectType::Table => {
532 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
533 .await
534 }
535 ObjectType::MaterializedView => {
536 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
537 }
538 ObjectType::Index => {
539 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
540 .await
541 }
542 ObjectType::Source => {
543 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
544 .await
545 }
546 ObjectType::Sink => {
547 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
548 }
549 ObjectType::Subscription => {
550 drop_subscription::handle_drop_subscription(
551 handler_args,
552 object_name,
553 if_exists,
554 cascade,
555 )
556 .await
557 }
558 ObjectType::Database => {
559 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
560 }
561 ObjectType::Schema => {
562 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
563 .await
564 }
565 ObjectType::User => {
566 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
567 }
568 ObjectType::View => {
569 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
570 }
571 ObjectType::Connection => {
572 drop_connection::handle_drop_connection(
573 handler_args,
574 object_name,
575 if_exists,
576 cascade,
577 )
578 .await
579 }
580 ObjectType::Secret => {
581 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
582 }
583 }
584 }
585 Statement::DropFunction {
587 if_exists,
588 func_desc,
589 option,
590 } => {
591 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
592 .await
593 }
594 Statement::DropAggregate {
595 if_exists,
596 func_desc,
597 option,
598 } => {
599 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
600 .await
601 }
602 Statement::Query(_)
603 | Statement::Insert { .. }
604 | Statement::Delete { .. }
605 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
606 Statement::Copy {
607 entity: CopyEntity::Query(query),
608 target: CopyTarget::Stdout,
609 } => {
610 let response =
611 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
612 .await?;
613 Ok(response.into_copy_query_to_stdout())
614 }
615 Statement::CreateView {
616 materialized,
617 if_not_exists,
618 name,
619 columns,
620 query,
621 with_options: _, or_replace, emit_mode,
624 } => {
625 if or_replace {
626 bail_not_implemented!("CREATE OR REPLACE VIEW");
627 }
628 if materialized {
629 create_mv::handle_create_mv(
630 handler_args,
631 if_not_exists,
632 name,
633 *query,
634 columns,
635 emit_mode,
636 )
637 .await
638 } else {
639 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
640 .await
641 }
642 }
643 Statement::Flush => flush::handle_flush(handler_args).await,
644 Statement::Wait => wait::handle_wait(handler_args).await,
645 Statement::Recover => recover::handle_recover(handler_args).await,
646 Statement::SetVariable {
647 local: _,
648 variable,
649 value,
650 } => {
651 if variable.real_value().eq_ignore_ascii_case("database") {
653 let x = variable::set_var_to_param_str(&value);
654 let res = use_db::handle_use_db(
655 handler_args,
656 ObjectName::from(vec![Ident::from_real_value(
657 x.as_deref().unwrap_or("default"),
658 )]),
659 )?;
660 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
661 for notice in res.notices() {
662 builder = builder.notice(notice);
663 }
664 return Ok(builder.into());
665 }
666 variable::handle_set(handler_args, variable, value)
667 }
668 Statement::SetTimeZone { local: _, value } => {
669 variable::handle_set_time_zone(handler_args, value)
670 }
671 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
672 Statement::CreateIndex {
673 name,
674 table_name,
675 method,
676 columns,
677 include,
678 distributed_by,
679 unique,
680 if_not_exists,
681 with_properties: _,
682 } => {
683 if unique {
684 bail_not_implemented!("create unique index");
685 }
686
687 create_index::handle_create_index(
688 handler_args,
689 if_not_exists,
690 name,
691 table_name,
692 method,
693 columns.clone(),
694 include,
695 distributed_by,
696 )
697 .await
698 }
699 Statement::AlterDatabase { name, operation } => match operation {
700 AlterDatabaseOperation::RenameDatabase { database_name } => {
701 alter_rename::handle_rename_database(handler_args, name, database_name).await
702 }
703 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
704 alter_owner::handle_alter_owner(
705 handler_args,
706 name,
707 new_owner_name,
708 StatementType::ALTER_DATABASE,
709 )
710 .await
711 }
712 AlterDatabaseOperation::SetParam(config_param) => {
713 let ConfigParam { param, value } = config_param;
714
715 let database_param = match param.real_value().to_uppercase().as_str() {
716 "BARRIER_INTERVAL_MS" => {
717 let barrier_interval_ms = match value {
718 SetVariableValue::Default => None,
719 SetVariableValue::Single(SetVariableValueSingle::Literal(
720 Value::Number(num),
721 )) => {
722 let num = num.parse::<u32>().map_err(|e| {
723 ErrorCode::InvalidInputSyntax(format!(
724 "barrier_interval_ms must be a u32 integer: {}",
725 e.as_report()
726 ))
727 })?;
728 Some(num)
729 }
730 _ => {
731 return Err(ErrorCode::InvalidInputSyntax(
732 "barrier_interval_ms must be a u32 integer or DEFAULT"
733 .to_owned(),
734 )
735 .into());
736 }
737 };
738 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
739 }
740 "CHECKPOINT_FREQUENCY" => {
741 let checkpoint_frequency = match value {
742 SetVariableValue::Default => None,
743 SetVariableValue::Single(SetVariableValueSingle::Literal(
744 Value::Number(num),
745 )) => {
746 let num = num.parse::<u64>().map_err(|e| {
747 ErrorCode::InvalidInputSyntax(format!(
748 "checkpoint_frequency must be a u64 integer: {}",
749 e.as_report()
750 ))
751 })?;
752 Some(num)
753 }
754 _ => {
755 return Err(ErrorCode::InvalidInputSyntax(
756 "checkpoint_frequency must be a u64 integer or DEFAULT"
757 .to_owned(),
758 )
759 .into());
760 }
761 };
762 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
763 }
764 _ => {
765 return Err(ErrorCode::InvalidInputSyntax(format!(
766 "Unsupported database config parameter: {}",
767 param.real_value()
768 ))
769 .into());
770 }
771 };
772
773 alter_database_param::handle_alter_database_param(
774 handler_args,
775 name,
776 database_param,
777 )
778 .await
779 }
780 },
781 Statement::AlterSchema { name, operation } => match operation {
782 AlterSchemaOperation::RenameSchema { schema_name } => {
783 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
784 }
785 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
786 alter_owner::handle_alter_owner(
787 handler_args,
788 name,
789 new_owner_name,
790 StatementType::ALTER_SCHEMA,
791 )
792 .await
793 }
794 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
795 alter_swap_rename::handle_swap_rename(
796 handler_args,
797 name,
798 target_schema,
799 StatementType::ALTER_SCHEMA,
800 )
801 .await
802 }
803 },
804 Statement::AlterTable { name, operation } => match operation {
805 AlterTableOperation::AddColumn { .. }
806 | AlterTableOperation::DropColumn { .. }
807 | AlterTableOperation::AlterColumn { .. } => {
808 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
809 }
810 AlterTableOperation::RenameTable { table_name } => {
811 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
812 .await
813 }
814 AlterTableOperation::ChangeOwner { new_owner_name } => {
815 alter_owner::handle_alter_owner(
816 handler_args,
817 name,
818 new_owner_name,
819 StatementType::ALTER_TABLE,
820 )
821 .await
822 }
823 AlterTableOperation::SetParallelism {
824 parallelism,
825 deferred,
826 } => {
827 alter_parallelism::handle_alter_parallelism(
828 handler_args,
829 name,
830 parallelism,
831 StatementType::ALTER_TABLE,
832 deferred,
833 )
834 .await
835 }
836 AlterTableOperation::SetSchema { new_schema_name } => {
837 alter_set_schema::handle_alter_set_schema(
838 handler_args,
839 name,
840 new_schema_name,
841 StatementType::ALTER_TABLE,
842 None,
843 )
844 .await
845 }
846 AlterTableOperation::RefreshSchema => {
847 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
848 }
849 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
850 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
851 handler_args,
852 PbThrottleTarget::TableWithSource,
853 name,
854 rate_limit,
855 )
856 .await
857 }
858 AlterTableOperation::DropConnector => {
859 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
860 .await
861 }
862 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
863 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
864 handler_args,
865 PbThrottleTarget::TableDml,
866 name,
867 rate_limit,
868 )
869 .await
870 }
871 AlterTableOperation::SetConfig { entries } => {
872 alter_streaming_config::handle_alter_streaming_set_config(
873 handler_args,
874 name,
875 entries,
876 StatementType::ALTER_TABLE,
877 )
878 .await
879 }
880 AlterTableOperation::ResetConfig { keys } => {
881 alter_streaming_config::handle_alter_streaming_reset_config(
882 handler_args,
883 name,
884 keys,
885 StatementType::ALTER_TABLE,
886 )
887 .await
888 }
889 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
890 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
891 handler_args,
892 PbThrottleTarget::CdcTable,
893 name,
894 rate_limit,
895 )
896 .await
897 }
898 AlterTableOperation::SwapRenameTable { target_table } => {
899 alter_swap_rename::handle_swap_rename(
900 handler_args,
901 name,
902 target_table,
903 StatementType::ALTER_TABLE,
904 )
905 .await
906 }
907 AlterTableOperation::AlterConnectorProps { alter_props } => {
908 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
909 }
910 AlterTableOperation::AddConstraint { .. }
911 | AlterTableOperation::DropConstraint { .. }
912 | AlterTableOperation::RenameColumn { .. }
913 | AlterTableOperation::ChangeColumn { .. }
914 | AlterTableOperation::RenameConstraint { .. } => {
915 bail_not_implemented!(
916 "Unhandled statement: {}",
917 Statement::AlterTable { name, operation }
918 )
919 }
920 },
921 Statement::AlterIndex { name, operation } => match operation {
922 AlterIndexOperation::RenameIndex { index_name } => {
923 alter_rename::handle_rename_index(handler_args, name, index_name).await
924 }
925 AlterIndexOperation::SetParallelism {
926 parallelism,
927 deferred,
928 } => {
929 alter_parallelism::handle_alter_parallelism(
930 handler_args,
931 name,
932 parallelism,
933 StatementType::ALTER_INDEX,
934 deferred,
935 )
936 .await
937 }
938 AlterIndexOperation::SetConfig { entries } => {
939 alter_streaming_config::handle_alter_streaming_set_config(
940 handler_args,
941 name,
942 entries,
943 StatementType::ALTER_INDEX,
944 )
945 .await
946 }
947 AlterIndexOperation::ResetConfig { keys } => {
948 alter_streaming_config::handle_alter_streaming_reset_config(
949 handler_args,
950 name,
951 keys,
952 StatementType::ALTER_INDEX,
953 )
954 .await
955 }
956 },
957 Statement::AlterView {
958 materialized,
959 name,
960 operation,
961 } => {
962 let statement_type = if materialized {
963 StatementType::ALTER_MATERIALIZED_VIEW
964 } else {
965 StatementType::ALTER_VIEW
966 };
967 match operation {
968 AlterViewOperation::RenameView { view_name } => {
969 if materialized {
970 alter_rename::handle_rename_table(
971 handler_args,
972 TableType::MaterializedView,
973 name,
974 view_name,
975 )
976 .await
977 } else {
978 alter_rename::handle_rename_view(handler_args, name, view_name).await
979 }
980 }
981 AlterViewOperation::SetParallelism {
982 parallelism,
983 deferred,
984 } => {
985 if !materialized {
986 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
987 }
988 alter_parallelism::handle_alter_parallelism(
989 handler_args,
990 name,
991 parallelism,
992 statement_type,
993 deferred,
994 )
995 .await
996 }
997 AlterViewOperation::SetResourceGroup {
998 resource_group,
999 deferred,
1000 } => {
1001 if !materialized {
1002 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1003 }
1004 alter_resource_group::handle_alter_resource_group(
1005 handler_args,
1006 name,
1007 resource_group,
1008 statement_type,
1009 deferred,
1010 )
1011 .await
1012 }
1013 AlterViewOperation::ChangeOwner { new_owner_name } => {
1014 alter_owner::handle_alter_owner(
1015 handler_args,
1016 name,
1017 new_owner_name,
1018 statement_type,
1019 )
1020 .await
1021 }
1022 AlterViewOperation::SetSchema { new_schema_name } => {
1023 alter_set_schema::handle_alter_set_schema(
1024 handler_args,
1025 name,
1026 new_schema_name,
1027 statement_type,
1028 None,
1029 )
1030 .await
1031 }
1032 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1033 if !materialized {
1034 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1035 }
1036 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1037 handler_args,
1038 PbThrottleTarget::Mv,
1039 name,
1040 rate_limit,
1041 )
1042 .await
1043 }
1044 AlterViewOperation::SwapRenameView { target_view } => {
1045 alter_swap_rename::handle_swap_rename(
1046 handler_args,
1047 name,
1048 target_view,
1049 statement_type,
1050 )
1051 .await
1052 }
1053 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1054 if !materialized {
1055 bail!(
1056 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1057 );
1058 }
1059 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1060 }
1061 AlterViewOperation::AsQuery { query } => {
1062 if !materialized {
1063 bail_not_implemented!("ALTER VIEW AS QUERY");
1064 }
1065 if !cfg!(debug_assertions) {
1067 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1068 }
1069 alter_mv::handle_alter_mv(handler_args, name, query).await
1070 }
1071 AlterViewOperation::SetConfig { entries } => {
1072 if !materialized {
1073 bail!("SET CONFIG is only supported for materialized views");
1074 }
1075 alter_streaming_config::handle_alter_streaming_set_config(
1076 handler_args,
1077 name,
1078 entries,
1079 statement_type,
1080 )
1081 .await
1082 }
1083 AlterViewOperation::ResetConfig { keys } => {
1084 if !materialized {
1085 bail!("RESET CONFIG is only supported for materialized views");
1086 }
1087 alter_streaming_config::handle_alter_streaming_reset_config(
1088 handler_args,
1089 name,
1090 keys,
1091 statement_type,
1092 )
1093 .await
1094 }
1095 }
1096 }
1097
1098 Statement::AlterSink { name, operation } => match operation {
1099 AlterSinkOperation::AlterConnectorProps {
1100 alter_props: changed_props,
1101 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1102 AlterSinkOperation::RenameSink { sink_name } => {
1103 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1104 }
1105 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1106 alter_owner::handle_alter_owner(
1107 handler_args,
1108 name,
1109 new_owner_name,
1110 StatementType::ALTER_SINK,
1111 )
1112 .await
1113 }
1114 AlterSinkOperation::SetSchema { new_schema_name } => {
1115 alter_set_schema::handle_alter_set_schema(
1116 handler_args,
1117 name,
1118 new_schema_name,
1119 StatementType::ALTER_SINK,
1120 None,
1121 )
1122 .await
1123 }
1124 AlterSinkOperation::SetParallelism {
1125 parallelism,
1126 deferred,
1127 } => {
1128 alter_parallelism::handle_alter_parallelism(
1129 handler_args,
1130 name,
1131 parallelism,
1132 StatementType::ALTER_SINK,
1133 deferred,
1134 )
1135 .await
1136 }
1137 AlterSinkOperation::SetConfig { entries } => {
1138 alter_streaming_config::handle_alter_streaming_set_config(
1139 handler_args,
1140 name,
1141 entries,
1142 StatementType::ALTER_SINK,
1143 )
1144 .await
1145 }
1146 AlterSinkOperation::ResetConfig { keys } => {
1147 alter_streaming_config::handle_alter_streaming_reset_config(
1148 handler_args,
1149 name,
1150 keys,
1151 StatementType::ALTER_SINK,
1152 )
1153 .await
1154 }
1155 AlterSinkOperation::SwapRenameSink { target_sink } => {
1156 alter_swap_rename::handle_swap_rename(
1157 handler_args,
1158 name,
1159 target_sink,
1160 StatementType::ALTER_SINK,
1161 )
1162 .await
1163 }
1164 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1165 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1166 handler_args,
1167 PbThrottleTarget::Sink,
1168 name,
1169 rate_limit,
1170 )
1171 .await
1172 }
1173 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1174 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1175 handler_args,
1176 name,
1177 enable,
1178 )
1179 .await
1180 }
1181 },
1182 Statement::AlterSubscription { name, operation } => match operation {
1183 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1184 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1185 .await
1186 }
1187 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1188 alter_owner::handle_alter_owner(
1189 handler_args,
1190 name,
1191 new_owner_name,
1192 StatementType::ALTER_SUBSCRIPTION,
1193 )
1194 .await
1195 }
1196 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1197 alter_set_schema::handle_alter_set_schema(
1198 handler_args,
1199 name,
1200 new_schema_name,
1201 StatementType::ALTER_SUBSCRIPTION,
1202 None,
1203 )
1204 .await
1205 }
1206 AlterSubscriptionOperation::SwapRenameSubscription {
1207 target_subscription,
1208 } => {
1209 alter_swap_rename::handle_swap_rename(
1210 handler_args,
1211 name,
1212 target_subscription,
1213 StatementType::ALTER_SUBSCRIPTION,
1214 )
1215 .await
1216 }
1217 },
1218 Statement::AlterSource { name, operation } => match operation {
1219 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1220 alter_source_props::handle_alter_source_connector_props(
1221 handler_args,
1222 name,
1223 alter_props,
1224 )
1225 .await
1226 }
1227 AlterSourceOperation::RenameSource { source_name } => {
1228 alter_rename::handle_rename_source(handler_args, name, source_name).await
1229 }
1230 AlterSourceOperation::AddColumn { .. } => {
1231 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1232 }
1233 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1234 alter_owner::handle_alter_owner(
1235 handler_args,
1236 name,
1237 new_owner_name,
1238 StatementType::ALTER_SOURCE,
1239 )
1240 .await
1241 }
1242 AlterSourceOperation::SetSchema { new_schema_name } => {
1243 alter_set_schema::handle_alter_set_schema(
1244 handler_args,
1245 name,
1246 new_schema_name,
1247 StatementType::ALTER_SOURCE,
1248 None,
1249 )
1250 .await
1251 }
1252 AlterSourceOperation::FormatEncode { format_encode } => {
1253 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1254 .await
1255 }
1256 AlterSourceOperation::RefreshSchema => {
1257 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1258 }
1259 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1260 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1261 handler_args,
1262 PbThrottleTarget::Source,
1263 name,
1264 rate_limit,
1265 )
1266 .await
1267 }
1268 AlterSourceOperation::SwapRenameSource { target_source } => {
1269 alter_swap_rename::handle_swap_rename(
1270 handler_args,
1271 name,
1272 target_source,
1273 StatementType::ALTER_SOURCE,
1274 )
1275 .await
1276 }
1277 AlterSourceOperation::SetParallelism {
1278 parallelism,
1279 deferred,
1280 } => {
1281 alter_parallelism::handle_alter_parallelism(
1282 handler_args,
1283 name,
1284 parallelism,
1285 StatementType::ALTER_SOURCE,
1286 deferred,
1287 )
1288 .await
1289 }
1290 AlterSourceOperation::SetConfig { entries } => {
1291 alter_streaming_config::handle_alter_streaming_set_config(
1292 handler_args,
1293 name,
1294 entries,
1295 StatementType::ALTER_SOURCE,
1296 )
1297 .await
1298 }
1299 AlterSourceOperation::ResetConfig { keys } => {
1300 alter_streaming_config::handle_alter_streaming_reset_config(
1301 handler_args,
1302 name,
1303 keys,
1304 StatementType::ALTER_SOURCE,
1305 )
1306 .await
1307 }
1308 },
1309 Statement::AlterFunction {
1310 name,
1311 args,
1312 operation,
1313 } => match operation {
1314 AlterFunctionOperation::SetSchema { new_schema_name } => {
1315 alter_set_schema::handle_alter_set_schema(
1316 handler_args,
1317 name,
1318 new_schema_name,
1319 StatementType::ALTER_FUNCTION,
1320 args,
1321 )
1322 .await
1323 }
1324 },
1325 Statement::AlterConnection { name, operation } => match operation {
1326 AlterConnectionOperation::SetSchema { new_schema_name } => {
1327 alter_set_schema::handle_alter_set_schema(
1328 handler_args,
1329 name,
1330 new_schema_name,
1331 StatementType::ALTER_CONNECTION,
1332 None,
1333 )
1334 .await
1335 }
1336 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1337 alter_owner::handle_alter_owner(
1338 handler_args,
1339 name,
1340 new_owner_name,
1341 StatementType::ALTER_CONNECTION,
1342 )
1343 .await
1344 }
1345 },
1346 Statement::AlterSystem { param, value } => {
1347 alter_system::handle_alter_system(handler_args, param, value).await
1348 }
1349 Statement::AlterSecret {
1350 name,
1351 with_options,
1352 operation,
1353 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1354 Statement::AlterFragment {
1355 fragment_ids,
1356 operation,
1357 } => match operation {
1358 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1359 let [fragment_id] = fragment_ids.as_slice() else {
1360 return Err(ErrorCode::InvalidInputSyntax(
1361 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1362 .to_owned(),
1363 )
1364 .into());
1365 };
1366 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1367 &handler_args.session,
1368 PbThrottleTarget::Fragment,
1369 *fragment_id,
1370 rate_limit,
1371 StatementType::SET_VARIABLE,
1372 )
1373 .await
1374 }
1375 AlterFragmentOperation::SetParallelism { parallelism } => {
1376 alter_parallelism::handle_alter_fragment_parallelism(
1377 handler_args,
1378 fragment_ids.into_iter().map_into().collect(),
1379 parallelism,
1380 )
1381 .await
1382 }
1383 },
1384 Statement::AlterDefaultPrivileges { .. } => {
1385 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1386 }
1387 Statement::StartTransaction { modes } => {
1388 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1389 }
1390 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1391 Statement::Commit { chain } => {
1392 transaction::handle_commit(handler_args, COMMIT, chain).await
1393 }
1394 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1395 Statement::Rollback { chain } => {
1396 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1397 }
1398 Statement::SetTransaction {
1399 modes,
1400 snapshot,
1401 session,
1402 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1403 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1404 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1405 Statement::Comment {
1406 object_type,
1407 object_name,
1408 comment,
1409 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1410 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1411 Statement::Prepare {
1412 name,
1413 data_types,
1414 statement,
1415 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1416 Statement::Deallocate { name, prepare } => {
1417 prepared_statement::handle_deallocate(name, prepare).await
1418 }
1419 Statement::Vacuum { object_name, full } => {
1420 vacuum::handle_vacuum(handler_args, object_name, full).await
1421 }
1422 Statement::Refresh { table_name } => {
1423 refresh::handle_refresh(handler_args, table_name).await
1424 }
1425 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1426 }
1427}
1428
1429fn check_ban_ddl_for_iceberg_engine_table(
1430 session: Arc<SessionImpl>,
1431 stmt: &Statement,
1432) -> Result<()> {
1433 match stmt {
1434 Statement::AlterTable {
1435 name,
1436 operation:
1437 operation @ (AlterTableOperation::AddColumn { .. }
1438 | AlterTableOperation::DropColumn { .. }),
1439 } => {
1440 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1441 if table.is_iceberg_engine_table() {
1442 bail!(
1443 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1444 operation,
1445 schema_name,
1446 name
1447 );
1448 }
1449 }
1450
1451 Statement::AlterTable {
1452 name,
1453 operation: AlterTableOperation::RenameTable { .. },
1454 } => {
1455 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1456 if table.is_iceberg_engine_table() {
1457 bail!(
1458 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1459 schema_name,
1460 name
1461 );
1462 }
1463 }
1464
1465 Statement::AlterTable {
1466 name,
1467 operation: AlterTableOperation::ChangeOwner { .. },
1468 } => {
1469 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1470 if table.is_iceberg_engine_table() {
1471 bail!(
1472 "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
1473 schema_name,
1474 name
1475 );
1476 }
1477 }
1478
1479 Statement::AlterTable {
1480 name,
1481 operation: AlterTableOperation::SetParallelism { .. },
1482 } => {
1483 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1484 if table.is_iceberg_engine_table() {
1485 bail!(
1486 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1487 schema_name,
1488 name
1489 );
1490 }
1491 }
1492
1493 Statement::AlterTable {
1494 name,
1495 operation: AlterTableOperation::SetSchema { .. },
1496 } => {
1497 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1498 if table.is_iceberg_engine_table() {
1499 bail!(
1500 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1501 schema_name,
1502 name
1503 );
1504 }
1505 }
1506
1507 Statement::AlterTable {
1508 name,
1509 operation: AlterTableOperation::RefreshSchema,
1510 } => {
1511 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1512 if table.is_iceberg_engine_table() {
1513 bail!(
1514 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1515 schema_name,
1516 name
1517 );
1518 }
1519 }
1520
1521 Statement::AlterTable {
1522 name,
1523 operation: AlterTableOperation::SetSourceRateLimit { .. },
1524 } => {
1525 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1526 if table.is_iceberg_engine_table() {
1527 bail!(
1528 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1529 schema_name,
1530 name
1531 );
1532 }
1533 }
1534
1535 _ => {}
1536 }
1537
1538 Ok(())
1539}