1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116pub mod show;
117mod transaction;
118mod use_db;
119pub mod util;
120pub mod vacuum;
121pub mod variable;
122mod wait;
123
124pub use alter_table_column::{
125 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
126};
127
128pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
130
131pub type RwPgResponse = PgResponse<PgResponseStream>;
133
134#[easy_ext::ext(RwPgResponseBuilderExt)]
135impl RwPgResponseBuilder {
136 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
138 let fields = T::fields();
139 self.values(
140 rows.into_iter()
141 .map(|row| {
142 Row::new(
143 row.into_owned_row()
144 .into_iter()
145 .zip_eq_fast(&fields)
146 .map(|(datum, (_, ty))| {
147 datum.map(|scalar| {
148 scalar.as_scalar_ref_impl().text_format(ty).into()
149 })
150 })
151 .collect(),
152 )
153 })
154 .collect_vec()
155 .into(),
156 fields_to_descriptors(fields),
157 )
158 }
159}
160
161pub fn fields_to_descriptors(
162 fields: Vec<(&str, risingwave_common::types::DataType)>,
163) -> Vec<PgFieldDescriptor> {
164 fields
165 .iter()
166 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
167 .collect()
168}
169
170pub enum PgResponseStream {
171 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
172 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
173 Rows(BoxStream<'static, RowSetResult>),
174}
175
176impl Stream for PgResponseStream {
177 type Item = std::result::Result<Vec<Row>, BoxedError>;
178
179 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180 match &mut *self {
181 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
182 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
183 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
184 }
185 }
186}
187
188impl From<Vec<Row>> for PgResponseStream {
189 fn from(rows: Vec<Row>) -> Self {
190 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
191 }
192}
193
194#[derive(Clone)]
195pub struct HandlerArgs {
196 pub session: Arc<SessionImpl>,
197 pub sql: Arc<str>,
198 pub normalized_sql: String,
199 pub with_options: WithOptions,
200}
201
202impl HandlerArgs {
203 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
204 Ok(Self {
205 session,
206 sql,
207 with_options: WithOptions::try_from(stmt)?,
208 normalized_sql: Self::normalize_sql(stmt),
209 })
210 }
211
212 fn normalize_sql(stmt: &Statement) -> String {
218 let mut stmt = stmt.clone();
219 match &mut stmt {
220 Statement::CreateView {
221 or_replace,
222 if_not_exists,
223 ..
224 } => {
225 *or_replace = false;
226 *if_not_exists = false;
227 }
228 Statement::CreateTable {
229 or_replace,
230 if_not_exists,
231 ..
232 } => {
233 *or_replace = false;
234 *if_not_exists = false;
235 }
236 Statement::CreateIndex { if_not_exists, .. } => {
237 *if_not_exists = false;
238 }
239 Statement::CreateSource {
240 stmt: CreateSourceStatement { if_not_exists, .. },
241 ..
242 } => {
243 *if_not_exists = false;
244 }
245 Statement::CreateSink {
246 stmt: CreateSinkStatement { if_not_exists, .. },
247 } => {
248 *if_not_exists = false;
249 }
250 Statement::CreateSubscription {
251 stmt: CreateSubscriptionStatement { if_not_exists, .. },
252 } => {
253 *if_not_exists = false;
254 }
255 Statement::CreateConnection {
256 stmt: CreateConnectionStatement { if_not_exists, .. },
257 } => {
258 *if_not_exists = false;
259 }
260 _ => {}
261 }
262 stmt.to_string()
263 }
264}
265
266pub async fn handle(
267 session: Arc<SessionImpl>,
268 stmt: Statement,
269 sql: Arc<str>,
270 formats: Vec<Format>,
271) -> Result<RwPgResponse> {
272 session.clear_cancel_query_flag();
273 let _guard = session.txn_begin_implicit();
274 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
275
276 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
277
278 match stmt {
279 Statement::Explain {
280 statement,
281 analyze,
282 options,
283 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
284 Statement::ExplainAnalyzeStreamJob {
285 target,
286 duration_secs,
287 } => {
288 explain_analyze_stream_job::handle_explain_analyze_stream_job(
289 handler_args,
290 target,
291 duration_secs,
292 )
293 .await
294 }
295 Statement::CreateSource { stmt } => {
296 create_source::handle_create_source(handler_args, stmt).await
297 }
298 Statement::CreateSink { stmt } => {
299 create_sink::handle_create_sink(handler_args, stmt, false).await
300 }
301 Statement::CreateSubscription { stmt } => {
302 create_subscription::handle_create_subscription(handler_args, stmt).await
303 }
304 Statement::CreateConnection { stmt } => {
305 create_connection::handle_create_connection(handler_args, stmt).await
306 }
307 Statement::CreateSecret { stmt } => {
308 create_secret::handle_create_secret(handler_args, stmt).await
309 }
310 Statement::CreateFunction {
311 or_replace,
312 temporary,
313 if_not_exists,
314 name,
315 args,
316 returns,
317 params,
318 with_options,
319 } => {
320 if params.language.is_none()
323 || !params
324 .language
325 .as_ref()
326 .unwrap()
327 .real_value()
328 .eq_ignore_ascii_case("sql")
329 {
330 create_function::handle_create_function(
331 handler_args,
332 or_replace,
333 temporary,
334 if_not_exists,
335 name,
336 args,
337 returns,
338 params,
339 with_options,
340 )
341 .await
342 } else {
343 create_sql_function::handle_create_sql_function(
344 handler_args,
345 or_replace,
346 temporary,
347 if_not_exists,
348 name,
349 args,
350 returns,
351 params,
352 )
353 .await
354 }
355 }
356 Statement::CreateAggregate {
357 or_replace,
358 if_not_exists,
359 name,
360 args,
361 returns,
362 params,
363 ..
364 } => {
365 create_aggregate::handle_create_aggregate(
366 handler_args,
367 or_replace,
368 if_not_exists,
369 name,
370 args,
371 returns,
372 params,
373 )
374 .await
375 }
376 Statement::CreateTable {
377 name,
378 columns,
379 wildcard_idx,
380 constraints,
381 query,
382 with_options: _, or_replace,
385 temporary,
386 if_not_exists,
387 format_encode,
388 source_watermarks,
389 append_only,
390 on_conflict,
391 with_version_columns,
392 cdc_table_info,
393 include_column_options,
394 webhook_info,
395 engine,
396 } => {
397 if or_replace {
398 bail_not_implemented!("CREATE OR REPLACE TABLE");
399 }
400 if temporary {
401 bail_not_implemented!("CREATE TEMPORARY TABLE");
402 }
403 if let Some(query) = query {
404 return create_table_as::handle_create_as(
405 handler_args,
406 name,
407 if_not_exists,
408 query,
409 columns,
410 append_only,
411 on_conflict,
412 with_version_columns
413 .iter()
414 .map(|col| col.real_value())
415 .collect(),
416 engine,
417 )
418 .await;
419 }
420 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
421 create_table::handle_create_table(
422 handler_args,
423 name,
424 columns,
425 wildcard_idx,
426 constraints,
427 if_not_exists,
428 format_encode,
429 source_watermarks,
430 append_only,
431 on_conflict,
432 with_version_columns
433 .iter()
434 .map(|col| col.real_value())
435 .collect(),
436 cdc_table_info,
437 include_column_options,
438 webhook_info,
439 engine,
440 )
441 .await
442 }
443 Statement::CreateDatabase {
444 db_name,
445 if_not_exists,
446 owner,
447 resource_group,
448 barrier_interval_ms,
449 checkpoint_frequency,
450 } => {
451 create_database::handle_create_database(
452 handler_args,
453 db_name,
454 if_not_exists,
455 owner,
456 resource_group,
457 barrier_interval_ms,
458 checkpoint_frequency,
459 )
460 .await
461 }
462 Statement::CreateSchema {
463 schema_name,
464 if_not_exists,
465 owner,
466 } => {
467 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
468 .await
469 }
470 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
471 Statement::DeclareCursor { stmt } => {
472 declare_cursor::handle_declare_cursor(handler_args, stmt).await
473 }
474 Statement::FetchCursor { stmt } => {
475 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
476 }
477 Statement::CloseCursor { stmt } => {
478 close_cursor::handle_close_cursor(handler_args, stmt).await
479 }
480 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
481 Statement::Grant { .. } => {
482 handle_privilege::handle_grant_privilege(handler_args, stmt).await
483 }
484 Statement::Revoke { .. } => {
485 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
486 }
487 Statement::Describe { name, kind } => match kind {
488 DescribeKind::Fragments => {
489 describe::handle_describe_fragments(handler_args, name).await
490 }
491 DescribeKind::Plain => describe::handle_describe(handler_args, name),
492 },
493 Statement::DescribeFragment { fragment_id } => {
494 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
495 }
496 Statement::Discard(..) => discard::handle_discard(handler_args),
497 Statement::ShowObjects {
498 object: show_object,
499 filter,
500 } => show::handle_show_object(handler_args, show_object, filter).await,
501 Statement::ShowCreateObject { create_type, name } => {
502 show::handle_show_create_object(handler_args, create_type, name)
503 }
504 Statement::ShowTransactionIsolationLevel => {
505 transaction::handle_show_isolation_level(handler_args)
506 }
507 Statement::Drop(DropStatement {
508 object_type,
509 object_name,
510 if_exists,
511 drop_mode,
512 }) => {
513 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
514 match object_type {
515 ObjectType::MaterializedView
516 | ObjectType::View
517 | ObjectType::Sink
518 | ObjectType::Source
519 | ObjectType::Subscription
520 | ObjectType::Index
521 | ObjectType::Table
522 | ObjectType::Schema
523 | ObjectType::Connection => true,
524 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
525 bail_not_implemented!("DROP CASCADE");
526 }
527 }
528 } else {
529 false
530 };
531 match object_type {
532 ObjectType::Table => {
533 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
534 .await
535 }
536 ObjectType::MaterializedView => {
537 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
538 }
539 ObjectType::Index => {
540 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
541 .await
542 }
543 ObjectType::Source => {
544 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
545 .await
546 }
547 ObjectType::Sink => {
548 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
549 }
550 ObjectType::Subscription => {
551 drop_subscription::handle_drop_subscription(
552 handler_args,
553 object_name,
554 if_exists,
555 cascade,
556 )
557 .await
558 }
559 ObjectType::Database => {
560 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
561 }
562 ObjectType::Schema => {
563 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
564 .await
565 }
566 ObjectType::User => {
567 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
568 }
569 ObjectType::View => {
570 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
571 }
572 ObjectType::Connection => {
573 drop_connection::handle_drop_connection(
574 handler_args,
575 object_name,
576 if_exists,
577 cascade,
578 )
579 .await
580 }
581 ObjectType::Secret => {
582 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
583 }
584 }
585 }
586 Statement::DropFunction {
588 if_exists,
589 func_desc,
590 option,
591 } => {
592 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
593 .await
594 }
595 Statement::DropAggregate {
596 if_exists,
597 func_desc,
598 option,
599 } => {
600 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
601 .await
602 }
603 Statement::Query(_)
604 | Statement::Insert { .. }
605 | Statement::Delete { .. }
606 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
607 Statement::Copy {
608 entity: CopyEntity::Query(query),
609 target: CopyTarget::Stdout,
610 } => {
611 let response =
612 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
613 .await?;
614 Ok(response.into_copy_query_to_stdout())
615 }
616 Statement::CreateView {
617 materialized,
618 if_not_exists,
619 name,
620 columns,
621 query,
622 with_options: _, or_replace, emit_mode,
625 } => {
626 if or_replace {
627 bail_not_implemented!("CREATE OR REPLACE VIEW");
628 }
629 if materialized {
630 create_mv::handle_create_mv(
631 handler_args,
632 if_not_exists,
633 name,
634 *query,
635 columns,
636 emit_mode,
637 )
638 .await
639 } else {
640 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
641 .await
642 }
643 }
644 Statement::Flush => flush::handle_flush(handler_args).await,
645 Statement::Wait => wait::handle_wait(handler_args).await,
646 Statement::Recover => recover::handle_recover(handler_args).await,
647 Statement::SetVariable {
648 local: _,
649 variable,
650 value,
651 } => {
652 if variable.real_value().eq_ignore_ascii_case("database") {
654 let x = variable::set_var_to_param_str(&value);
655 let res = use_db::handle_use_db(
656 handler_args,
657 ObjectName::from(vec![Ident::from_real_value(
658 x.as_deref().unwrap_or("default"),
659 )]),
660 )?;
661 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
662 for notice in res.notices() {
663 builder = builder.notice(notice);
664 }
665 return Ok(builder.into());
666 }
667 variable::handle_set(handler_args, variable, value)
668 }
669 Statement::SetTimeZone { local: _, value } => {
670 variable::handle_set_time_zone(handler_args, value)
671 }
672 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
673 Statement::CreateIndex {
674 name,
675 table_name,
676 method,
677 columns,
678 include,
679 distributed_by,
680 unique,
681 if_not_exists,
682 with_properties: _,
683 } => {
684 if unique {
685 bail_not_implemented!("create unique index");
686 }
687
688 create_index::handle_create_index(
689 handler_args,
690 if_not_exists,
691 name,
692 table_name,
693 method,
694 columns.clone(),
695 include,
696 distributed_by,
697 )
698 .await
699 }
700 Statement::AlterDatabase { name, operation } => match operation {
701 AlterDatabaseOperation::RenameDatabase { database_name } => {
702 alter_rename::handle_rename_database(handler_args, name, database_name).await
703 }
704 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
705 alter_owner::handle_alter_owner(
706 handler_args,
707 name,
708 new_owner_name,
709 StatementType::ALTER_DATABASE,
710 )
711 .await
712 }
713 AlterDatabaseOperation::SetParam(config_param) => {
714 let ConfigParam { param, value } = config_param;
715
716 let database_param = match param.real_value().to_uppercase().as_str() {
717 "BARRIER_INTERVAL_MS" => {
718 let barrier_interval_ms = match value {
719 SetVariableValue::Default => None,
720 SetVariableValue::Single(SetVariableValueSingle::Literal(
721 Value::Number(num),
722 )) => {
723 let num = num.parse::<u32>().map_err(|e| {
724 ErrorCode::InvalidInputSyntax(format!(
725 "barrier_interval_ms must be a u32 integer: {}",
726 e.as_report()
727 ))
728 })?;
729 Some(num)
730 }
731 _ => {
732 return Err(ErrorCode::InvalidInputSyntax(
733 "barrier_interval_ms must be a u32 integer or DEFAULT"
734 .to_owned(),
735 )
736 .into());
737 }
738 };
739 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
740 }
741 "CHECKPOINT_FREQUENCY" => {
742 let checkpoint_frequency = match value {
743 SetVariableValue::Default => None,
744 SetVariableValue::Single(SetVariableValueSingle::Literal(
745 Value::Number(num),
746 )) => {
747 let num = num.parse::<u64>().map_err(|e| {
748 ErrorCode::InvalidInputSyntax(format!(
749 "checkpoint_frequency must be a u64 integer: {}",
750 e.as_report()
751 ))
752 })?;
753 Some(num)
754 }
755 _ => {
756 return Err(ErrorCode::InvalidInputSyntax(
757 "checkpoint_frequency must be a u64 integer or DEFAULT"
758 .to_owned(),
759 )
760 .into());
761 }
762 };
763 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
764 }
765 _ => {
766 return Err(ErrorCode::InvalidInputSyntax(format!(
767 "Unsupported database config parameter: {}",
768 param.real_value()
769 ))
770 .into());
771 }
772 };
773
774 alter_database_param::handle_alter_database_param(
775 handler_args,
776 name,
777 database_param,
778 )
779 .await
780 }
781 },
782 Statement::AlterSchema { name, operation } => match operation {
783 AlterSchemaOperation::RenameSchema { schema_name } => {
784 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
785 }
786 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
787 alter_owner::handle_alter_owner(
788 handler_args,
789 name,
790 new_owner_name,
791 StatementType::ALTER_SCHEMA,
792 )
793 .await
794 }
795 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
796 alter_swap_rename::handle_swap_rename(
797 handler_args,
798 name,
799 target_schema,
800 StatementType::ALTER_SCHEMA,
801 )
802 .await
803 }
804 },
805 Statement::AlterTable { name, operation } => match operation {
806 AlterTableOperation::AddColumn { .. }
807 | AlterTableOperation::DropColumn { .. }
808 | AlterTableOperation::AlterColumn { .. } => {
809 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
810 }
811 AlterTableOperation::RenameTable { table_name } => {
812 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
813 .await
814 }
815 AlterTableOperation::ChangeOwner { new_owner_name } => {
816 alter_owner::handle_alter_owner(
817 handler_args,
818 name,
819 new_owner_name,
820 StatementType::ALTER_TABLE,
821 )
822 .await
823 }
824 AlterTableOperation::SetParallelism {
825 parallelism,
826 deferred,
827 } => {
828 alter_parallelism::handle_alter_parallelism(
829 handler_args,
830 name,
831 parallelism,
832 StatementType::ALTER_TABLE,
833 deferred,
834 )
835 .await
836 }
837 AlterTableOperation::SetSchema { new_schema_name } => {
838 alter_set_schema::handle_alter_set_schema(
839 handler_args,
840 name,
841 new_schema_name,
842 StatementType::ALTER_TABLE,
843 None,
844 )
845 .await
846 }
847 AlterTableOperation::RefreshSchema => {
848 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
849 }
850 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
851 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
852 handler_args,
853 PbThrottleTarget::TableWithSource,
854 name,
855 rate_limit,
856 )
857 .await
858 }
859 AlterTableOperation::DropConnector => {
860 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
861 .await
862 }
863 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
864 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
865 handler_args,
866 PbThrottleTarget::TableDml,
867 name,
868 rate_limit,
869 )
870 .await
871 }
872 AlterTableOperation::SetConfig { entries } => {
873 alter_streaming_config::handle_alter_streaming_set_config(
874 handler_args,
875 name,
876 entries,
877 StatementType::ALTER_TABLE,
878 )
879 .await
880 }
881 AlterTableOperation::ResetConfig { keys } => {
882 alter_streaming_config::handle_alter_streaming_reset_config(
883 handler_args,
884 name,
885 keys,
886 StatementType::ALTER_TABLE,
887 )
888 .await
889 }
890 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
891 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
892 handler_args,
893 PbThrottleTarget::CdcTable,
894 name,
895 rate_limit,
896 )
897 .await
898 }
899 AlterTableOperation::SwapRenameTable { target_table } => {
900 alter_swap_rename::handle_swap_rename(
901 handler_args,
902 name,
903 target_table,
904 StatementType::ALTER_TABLE,
905 )
906 .await
907 }
908 AlterTableOperation::AlterConnectorProps { alter_props } => {
909 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
910 }
911 AlterTableOperation::AddConstraint { .. }
912 | AlterTableOperation::DropConstraint { .. }
913 | AlterTableOperation::RenameColumn { .. }
914 | AlterTableOperation::ChangeColumn { .. }
915 | AlterTableOperation::RenameConstraint { .. } => {
916 bail_not_implemented!(
917 "Unhandled statement: {}",
918 Statement::AlterTable { name, operation }
919 )
920 }
921 },
922 Statement::AlterIndex { name, operation } => match operation {
923 AlterIndexOperation::RenameIndex { index_name } => {
924 alter_rename::handle_rename_index(handler_args, name, index_name).await
925 }
926 AlterIndexOperation::SetParallelism {
927 parallelism,
928 deferred,
929 } => {
930 alter_parallelism::handle_alter_parallelism(
931 handler_args,
932 name,
933 parallelism,
934 StatementType::ALTER_INDEX,
935 deferred,
936 )
937 .await
938 }
939 AlterIndexOperation::SetConfig { entries } => {
940 alter_streaming_config::handle_alter_streaming_set_config(
941 handler_args,
942 name,
943 entries,
944 StatementType::ALTER_INDEX,
945 )
946 .await
947 }
948 AlterIndexOperation::ResetConfig { keys } => {
949 alter_streaming_config::handle_alter_streaming_reset_config(
950 handler_args,
951 name,
952 keys,
953 StatementType::ALTER_INDEX,
954 )
955 .await
956 }
957 },
958 Statement::AlterView {
959 materialized,
960 name,
961 operation,
962 } => {
963 let statement_type = if materialized {
964 StatementType::ALTER_MATERIALIZED_VIEW
965 } else {
966 StatementType::ALTER_VIEW
967 };
968 match operation {
969 AlterViewOperation::RenameView { view_name } => {
970 if materialized {
971 alter_rename::handle_rename_table(
972 handler_args,
973 TableType::MaterializedView,
974 name,
975 view_name,
976 )
977 .await
978 } else {
979 alter_rename::handle_rename_view(handler_args, name, view_name).await
980 }
981 }
982 AlterViewOperation::SetParallelism {
983 parallelism,
984 deferred,
985 } => {
986 if !materialized {
987 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
988 }
989 alter_parallelism::handle_alter_parallelism(
990 handler_args,
991 name,
992 parallelism,
993 statement_type,
994 deferred,
995 )
996 .await
997 }
998 AlterViewOperation::SetResourceGroup {
999 resource_group,
1000 deferred,
1001 } => {
1002 if !materialized {
1003 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1004 }
1005 alter_resource_group::handle_alter_resource_group(
1006 handler_args,
1007 name,
1008 resource_group,
1009 statement_type,
1010 deferred,
1011 )
1012 .await
1013 }
1014 AlterViewOperation::ChangeOwner { new_owner_name } => {
1015 alter_owner::handle_alter_owner(
1016 handler_args,
1017 name,
1018 new_owner_name,
1019 statement_type,
1020 )
1021 .await
1022 }
1023 AlterViewOperation::SetSchema { new_schema_name } => {
1024 alter_set_schema::handle_alter_set_schema(
1025 handler_args,
1026 name,
1027 new_schema_name,
1028 statement_type,
1029 None,
1030 )
1031 .await
1032 }
1033 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1034 if !materialized {
1035 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1036 }
1037 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1038 handler_args,
1039 PbThrottleTarget::Mv,
1040 name,
1041 rate_limit,
1042 )
1043 .await
1044 }
1045 AlterViewOperation::SwapRenameView { target_view } => {
1046 alter_swap_rename::handle_swap_rename(
1047 handler_args,
1048 name,
1049 target_view,
1050 statement_type,
1051 )
1052 .await
1053 }
1054 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1055 if !materialized {
1056 bail!(
1057 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1058 );
1059 }
1060 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1061 }
1062 AlterViewOperation::AsQuery { query } => {
1063 if !materialized {
1064 bail_not_implemented!("ALTER VIEW AS QUERY");
1065 }
1066 if !cfg!(debug_assertions) {
1068 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1069 }
1070 alter_mv::handle_alter_mv(handler_args, name, query).await
1071 }
1072 AlterViewOperation::SetConfig { entries } => {
1073 if !materialized {
1074 bail!("SET CONFIG is only supported for materialized views");
1075 }
1076 alter_streaming_config::handle_alter_streaming_set_config(
1077 handler_args,
1078 name,
1079 entries,
1080 statement_type,
1081 )
1082 .await
1083 }
1084 AlterViewOperation::ResetConfig { keys } => {
1085 if !materialized {
1086 bail!("RESET CONFIG is only supported for materialized views");
1087 }
1088 alter_streaming_config::handle_alter_streaming_reset_config(
1089 handler_args,
1090 name,
1091 keys,
1092 statement_type,
1093 )
1094 .await
1095 }
1096 }
1097 }
1098
1099 Statement::AlterSink { name, operation } => match operation {
1100 AlterSinkOperation::AlterConnectorProps {
1101 alter_props: changed_props,
1102 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1103 AlterSinkOperation::RenameSink { sink_name } => {
1104 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1105 }
1106 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1107 alter_owner::handle_alter_owner(
1108 handler_args,
1109 name,
1110 new_owner_name,
1111 StatementType::ALTER_SINK,
1112 )
1113 .await
1114 }
1115 AlterSinkOperation::SetSchema { new_schema_name } => {
1116 alter_set_schema::handle_alter_set_schema(
1117 handler_args,
1118 name,
1119 new_schema_name,
1120 StatementType::ALTER_SINK,
1121 None,
1122 )
1123 .await
1124 }
1125 AlterSinkOperation::SetParallelism {
1126 parallelism,
1127 deferred,
1128 } => {
1129 alter_parallelism::handle_alter_parallelism(
1130 handler_args,
1131 name,
1132 parallelism,
1133 StatementType::ALTER_SINK,
1134 deferred,
1135 )
1136 .await
1137 }
1138 AlterSinkOperation::SetConfig { entries } => {
1139 alter_streaming_config::handle_alter_streaming_set_config(
1140 handler_args,
1141 name,
1142 entries,
1143 StatementType::ALTER_SINK,
1144 )
1145 .await
1146 }
1147 AlterSinkOperation::ResetConfig { keys } => {
1148 alter_streaming_config::handle_alter_streaming_reset_config(
1149 handler_args,
1150 name,
1151 keys,
1152 StatementType::ALTER_SINK,
1153 )
1154 .await
1155 }
1156 AlterSinkOperation::SwapRenameSink { target_sink } => {
1157 alter_swap_rename::handle_swap_rename(
1158 handler_args,
1159 name,
1160 target_sink,
1161 StatementType::ALTER_SINK,
1162 )
1163 .await
1164 }
1165 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1166 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1167 handler_args,
1168 PbThrottleTarget::Sink,
1169 name,
1170 rate_limit,
1171 )
1172 .await
1173 }
1174 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1175 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1176 handler_args,
1177 name,
1178 enable,
1179 )
1180 .await
1181 }
1182 },
1183 Statement::AlterSubscription { name, operation } => match operation {
1184 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1185 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1186 .await
1187 }
1188 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1189 alter_owner::handle_alter_owner(
1190 handler_args,
1191 name,
1192 new_owner_name,
1193 StatementType::ALTER_SUBSCRIPTION,
1194 )
1195 .await
1196 }
1197 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1198 alter_set_schema::handle_alter_set_schema(
1199 handler_args,
1200 name,
1201 new_schema_name,
1202 StatementType::ALTER_SUBSCRIPTION,
1203 None,
1204 )
1205 .await
1206 }
1207 AlterSubscriptionOperation::SwapRenameSubscription {
1208 target_subscription,
1209 } => {
1210 alter_swap_rename::handle_swap_rename(
1211 handler_args,
1212 name,
1213 target_subscription,
1214 StatementType::ALTER_SUBSCRIPTION,
1215 )
1216 .await
1217 }
1218 },
1219 Statement::AlterSource { name, operation } => match operation {
1220 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1221 alter_source_props::handle_alter_source_connector_props(
1222 handler_args,
1223 name,
1224 alter_props,
1225 )
1226 .await
1227 }
1228 AlterSourceOperation::RenameSource { source_name } => {
1229 alter_rename::handle_rename_source(handler_args, name, source_name).await
1230 }
1231 AlterSourceOperation::AddColumn { .. } => {
1232 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1233 }
1234 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1235 alter_owner::handle_alter_owner(
1236 handler_args,
1237 name,
1238 new_owner_name,
1239 StatementType::ALTER_SOURCE,
1240 )
1241 .await
1242 }
1243 AlterSourceOperation::SetSchema { new_schema_name } => {
1244 alter_set_schema::handle_alter_set_schema(
1245 handler_args,
1246 name,
1247 new_schema_name,
1248 StatementType::ALTER_SOURCE,
1249 None,
1250 )
1251 .await
1252 }
1253 AlterSourceOperation::FormatEncode { format_encode } => {
1254 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1255 .await
1256 }
1257 AlterSourceOperation::RefreshSchema => {
1258 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1259 }
1260 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1261 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1262 handler_args,
1263 PbThrottleTarget::Source,
1264 name,
1265 rate_limit,
1266 )
1267 .await
1268 }
1269 AlterSourceOperation::SwapRenameSource { target_source } => {
1270 alter_swap_rename::handle_swap_rename(
1271 handler_args,
1272 name,
1273 target_source,
1274 StatementType::ALTER_SOURCE,
1275 )
1276 .await
1277 }
1278 AlterSourceOperation::SetParallelism {
1279 parallelism,
1280 deferred,
1281 } => {
1282 alter_parallelism::handle_alter_parallelism(
1283 handler_args,
1284 name,
1285 parallelism,
1286 StatementType::ALTER_SOURCE,
1287 deferred,
1288 )
1289 .await
1290 }
1291 AlterSourceOperation::SetConfig { entries } => {
1292 alter_streaming_config::handle_alter_streaming_set_config(
1293 handler_args,
1294 name,
1295 entries,
1296 StatementType::ALTER_SOURCE,
1297 )
1298 .await
1299 }
1300 AlterSourceOperation::ResetConfig { keys } => {
1301 alter_streaming_config::handle_alter_streaming_reset_config(
1302 handler_args,
1303 name,
1304 keys,
1305 StatementType::ALTER_SOURCE,
1306 )
1307 .await
1308 }
1309 },
1310 Statement::AlterFunction {
1311 name,
1312 args,
1313 operation,
1314 } => match operation {
1315 AlterFunctionOperation::SetSchema { new_schema_name } => {
1316 alter_set_schema::handle_alter_set_schema(
1317 handler_args,
1318 name,
1319 new_schema_name,
1320 StatementType::ALTER_FUNCTION,
1321 args,
1322 )
1323 .await
1324 }
1325 },
1326 Statement::AlterConnection { name, operation } => match operation {
1327 AlterConnectionOperation::SetSchema { new_schema_name } => {
1328 alter_set_schema::handle_alter_set_schema(
1329 handler_args,
1330 name,
1331 new_schema_name,
1332 StatementType::ALTER_CONNECTION,
1333 None,
1334 )
1335 .await
1336 }
1337 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1338 alter_owner::handle_alter_owner(
1339 handler_args,
1340 name,
1341 new_owner_name,
1342 StatementType::ALTER_CONNECTION,
1343 )
1344 .await
1345 }
1346 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1347 alter_connection_props::handle_alter_connection_connector_props(
1348 handler_args,
1349 name,
1350 alter_props,
1351 )
1352 .await
1353 }
1354 },
1355 Statement::AlterSystem { param, value } => {
1356 alter_system::handle_alter_system(handler_args, param, value).await
1357 }
1358 Statement::AlterSecret {
1359 name,
1360 with_options,
1361 operation,
1362 } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
1363 Statement::AlterFragment {
1364 fragment_ids,
1365 operation,
1366 } => match operation {
1367 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1368 let [fragment_id] = fragment_ids.as_slice() else {
1369 return Err(ErrorCode::InvalidInputSyntax(
1370 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1371 .to_owned(),
1372 )
1373 .into());
1374 };
1375 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1376 &handler_args.session,
1377 PbThrottleTarget::Fragment,
1378 *fragment_id,
1379 rate_limit,
1380 StatementType::SET_VARIABLE,
1381 )
1382 .await
1383 }
1384 AlterFragmentOperation::SetParallelism { parallelism } => {
1385 alter_parallelism::handle_alter_fragment_parallelism(
1386 handler_args,
1387 fragment_ids.into_iter().map_into().collect(),
1388 parallelism,
1389 )
1390 .await
1391 }
1392 },
1393 Statement::AlterDefaultPrivileges { .. } => {
1394 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1395 }
1396 Statement::StartTransaction { modes } => {
1397 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1398 }
1399 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1400 Statement::Commit { chain } => {
1401 transaction::handle_commit(handler_args, COMMIT, chain).await
1402 }
1403 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1404 Statement::Rollback { chain } => {
1405 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1406 }
1407 Statement::SetTransaction {
1408 modes,
1409 snapshot,
1410 session,
1411 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1412 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1413 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1414 Statement::Comment {
1415 object_type,
1416 object_name,
1417 comment,
1418 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1419 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1420 Statement::Prepare {
1421 name,
1422 data_types,
1423 statement,
1424 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1425 Statement::Deallocate { name, prepare } => {
1426 prepared_statement::handle_deallocate(name, prepare).await
1427 }
1428 Statement::Vacuum { object_name, full } => {
1429 vacuum::handle_vacuum(handler_args, object_name, full).await
1430 }
1431 Statement::Refresh { table_name } => {
1432 refresh::handle_refresh(handler_args, table_name).await
1433 }
1434 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1435 }
1436}
1437
1438fn check_ban_ddl_for_iceberg_engine_table(
1439 session: Arc<SessionImpl>,
1440 stmt: &Statement,
1441) -> Result<()> {
1442 match stmt {
1443 Statement::AlterTable {
1444 name,
1445 operation:
1446 operation @ (AlterTableOperation::AddColumn { .. }
1447 | AlterTableOperation::DropColumn { .. }),
1448 } => {
1449 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1450 if table.is_iceberg_engine_table() {
1451 bail!(
1452 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1453 operation,
1454 schema_name,
1455 name
1456 );
1457 }
1458 }
1459
1460 Statement::AlterTable {
1461 name,
1462 operation: AlterTableOperation::RenameTable { .. },
1463 } => {
1464 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1465 if table.is_iceberg_engine_table() {
1466 bail!(
1467 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1468 schema_name,
1469 name
1470 );
1471 }
1472 }
1473
1474 Statement::AlterTable {
1475 name,
1476 operation: AlterTableOperation::SetParallelism { .. },
1477 } => {
1478 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1479 if table.is_iceberg_engine_table() {
1480 bail!(
1481 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1482 schema_name,
1483 name
1484 );
1485 }
1486 }
1487
1488 Statement::AlterTable {
1489 name,
1490 operation: AlterTableOperation::SetSchema { .. },
1491 } => {
1492 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1493 if table.is_iceberg_engine_table() {
1494 bail!(
1495 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1496 schema_name,
1497 name
1498 );
1499 }
1500 }
1501
1502 Statement::AlterTable {
1503 name,
1504 operation: AlterTableOperation::RefreshSchema,
1505 } => {
1506 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1507 if table.is_iceberg_engine_table() {
1508 bail!(
1509 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1510 schema_name,
1511 name
1512 );
1513 }
1514 }
1515
1516 Statement::AlterTable {
1517 name,
1518 operation: AlterTableOperation::SetSourceRateLimit { .. },
1519 } => {
1520 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1521 if table.is_iceberg_engine_table() {
1522 bail!(
1523 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1524 schema_name,
1525 name
1526 );
1527 }
1528 }
1529
1530 _ => {}
1531 }
1532
1533 Ok(())
1534}