1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116mod reset_source;
117pub mod show;
118mod transaction;
119mod use_db;
120pub mod util;
121pub mod vacuum;
122pub mod variable;
123mod wait;
124
125pub use alter_table_column::{
126 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
127};
128
129pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
131
132pub type RwPgResponse = PgResponse<PgResponseStream>;
134
135#[easy_ext::ext(RwPgResponseBuilderExt)]
136impl RwPgResponseBuilder {
137 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
139 let fields = T::fields();
140 self.values(
141 rows.into_iter()
142 .map(|row| {
143 Row::new(
144 row.into_owned_row()
145 .into_iter()
146 .zip_eq_fast(&fields)
147 .map(|(datum, (_, ty))| {
148 datum.map(|scalar| {
149 scalar.as_scalar_ref_impl().text_format(ty).into()
150 })
151 })
152 .collect(),
153 )
154 })
155 .collect_vec()
156 .into(),
157 fields_to_descriptors(fields),
158 )
159 }
160}
161
162pub fn fields_to_descriptors(
163 fields: Vec<(&str, risingwave_common::types::DataType)>,
164) -> Vec<PgFieldDescriptor> {
165 fields
166 .iter()
167 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
168 .collect()
169}
170
171pub enum PgResponseStream {
172 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
173 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
174 Rows(BoxStream<'static, RowSetResult>),
175}
176
177impl Stream for PgResponseStream {
178 type Item = std::result::Result<Vec<Row>, BoxedError>;
179
180 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 match &mut *self {
182 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
183 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
184 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
185 }
186 }
187}
188
189impl From<Vec<Row>> for PgResponseStream {
190 fn from(rows: Vec<Row>) -> Self {
191 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
192 }
193}
194
195#[derive(Clone)]
196pub struct HandlerArgs {
197 pub session: Arc<SessionImpl>,
198 pub sql: Arc<str>,
199 pub normalized_sql: String,
200 pub with_options: WithOptions,
201}
202
203impl HandlerArgs {
204 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
205 Ok(Self {
206 session,
207 sql,
208 with_options: WithOptions::try_from(stmt)?,
209 normalized_sql: Self::normalize_sql(stmt),
210 })
211 }
212
213 fn normalize_sql(stmt: &Statement) -> String {
219 let mut stmt = stmt.clone();
220 match &mut stmt {
221 Statement::CreateView {
222 or_replace,
223 if_not_exists,
224 ..
225 } => {
226 *or_replace = false;
227 *if_not_exists = false;
228 }
229 Statement::CreateTable {
230 or_replace,
231 if_not_exists,
232 ..
233 } => {
234 *or_replace = false;
235 *if_not_exists = false;
236 }
237 Statement::CreateIndex { if_not_exists, .. } => {
238 *if_not_exists = false;
239 }
240 Statement::CreateSource {
241 stmt: CreateSourceStatement { if_not_exists, .. },
242 ..
243 } => {
244 *if_not_exists = false;
245 }
246 Statement::CreateSink {
247 stmt: CreateSinkStatement { if_not_exists, .. },
248 } => {
249 *if_not_exists = false;
250 }
251 Statement::CreateSubscription {
252 stmt: CreateSubscriptionStatement { if_not_exists, .. },
253 } => {
254 *if_not_exists = false;
255 }
256 Statement::CreateConnection {
257 stmt: CreateConnectionStatement { if_not_exists, .. },
258 } => {
259 *if_not_exists = false;
260 }
261 _ => {}
262 }
263 stmt.to_string()
264 }
265}
266
267pub async fn handle(
268 session: Arc<SessionImpl>,
269 stmt: Statement,
270 sql: Arc<str>,
271 formats: Vec<Format>,
272) -> Result<RwPgResponse> {
273 session.clear_cancel_query_flag();
274 let _guard = session.txn_begin_implicit();
275 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
276
277 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
278
279 match stmt {
280 Statement::Explain {
281 statement,
282 analyze,
283 options,
284 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
285 Statement::ExplainAnalyzeStreamJob {
286 target,
287 duration_secs,
288 } => {
289 explain_analyze_stream_job::handle_explain_analyze_stream_job(
290 handler_args,
291 target,
292 duration_secs,
293 )
294 .await
295 }
296 Statement::CreateSource { stmt } => {
297 create_source::handle_create_source(handler_args, stmt).await
298 }
299 Statement::CreateSink { stmt } => {
300 create_sink::handle_create_sink(handler_args, stmt, false).await
301 }
302 Statement::CreateSubscription { stmt } => {
303 create_subscription::handle_create_subscription(handler_args, stmt).await
304 }
305 Statement::CreateConnection { stmt } => {
306 create_connection::handle_create_connection(handler_args, stmt).await
307 }
308 Statement::CreateSecret { stmt } => {
309 create_secret::handle_create_secret(handler_args, stmt).await
310 }
311 Statement::CreateFunction {
312 or_replace,
313 temporary,
314 if_not_exists,
315 name,
316 args,
317 returns,
318 params,
319 with_options,
320 } => {
321 if params.language.is_none()
324 || !params
325 .language
326 .as_ref()
327 .unwrap()
328 .real_value()
329 .eq_ignore_ascii_case("sql")
330 {
331 create_function::handle_create_function(
332 handler_args,
333 or_replace,
334 temporary,
335 if_not_exists,
336 name,
337 args,
338 returns,
339 params,
340 with_options,
341 )
342 .await
343 } else {
344 create_sql_function::handle_create_sql_function(
345 handler_args,
346 or_replace,
347 temporary,
348 if_not_exists,
349 name,
350 args,
351 returns,
352 params,
353 )
354 .await
355 }
356 }
357 Statement::CreateAggregate {
358 or_replace,
359 if_not_exists,
360 name,
361 args,
362 returns,
363 params,
364 ..
365 } => {
366 create_aggregate::handle_create_aggregate(
367 handler_args,
368 or_replace,
369 if_not_exists,
370 name,
371 args,
372 returns,
373 params,
374 )
375 .await
376 }
377 Statement::CreateTable {
378 name,
379 columns,
380 wildcard_idx,
381 constraints,
382 query,
383 with_options: _, or_replace,
386 temporary,
387 if_not_exists,
388 format_encode,
389 source_watermarks,
390 append_only,
391 on_conflict,
392 with_version_columns,
393 cdc_table_info,
394 include_column_options,
395 webhook_info,
396 engine,
397 } => {
398 if or_replace {
399 bail_not_implemented!("CREATE OR REPLACE TABLE");
400 }
401 if temporary {
402 bail_not_implemented!("CREATE TEMPORARY TABLE");
403 }
404 if let Some(query) = query {
405 return create_table_as::handle_create_as(
406 handler_args,
407 name,
408 if_not_exists,
409 query,
410 columns,
411 append_only,
412 on_conflict,
413 with_version_columns
414 .iter()
415 .map(|col| col.real_value())
416 .collect(),
417 engine,
418 )
419 .await;
420 }
421 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
422 create_table::handle_create_table(
423 handler_args,
424 name,
425 columns,
426 wildcard_idx,
427 constraints,
428 if_not_exists,
429 format_encode,
430 source_watermarks,
431 append_only,
432 on_conflict,
433 with_version_columns
434 .iter()
435 .map(|col| col.real_value())
436 .collect(),
437 cdc_table_info,
438 include_column_options,
439 webhook_info,
440 engine,
441 )
442 .await
443 }
444 Statement::CreateDatabase {
445 db_name,
446 if_not_exists,
447 owner,
448 resource_group,
449 barrier_interval_ms,
450 checkpoint_frequency,
451 } => {
452 create_database::handle_create_database(
453 handler_args,
454 db_name,
455 if_not_exists,
456 owner,
457 resource_group,
458 barrier_interval_ms,
459 checkpoint_frequency,
460 )
461 .await
462 }
463 Statement::CreateSchema {
464 schema_name,
465 if_not_exists,
466 owner,
467 } => {
468 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
469 .await
470 }
471 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
472 Statement::DeclareCursor { stmt } => {
473 declare_cursor::handle_declare_cursor(handler_args, stmt).await
474 }
475 Statement::FetchCursor { stmt } => {
476 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
477 }
478 Statement::CloseCursor { stmt } => {
479 close_cursor::handle_close_cursor(handler_args, stmt).await
480 }
481 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
482 Statement::Grant { .. } => {
483 handle_privilege::handle_grant_privilege(handler_args, stmt).await
484 }
485 Statement::Revoke { .. } => {
486 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
487 }
488 Statement::Describe { name, kind } => match kind {
489 DescribeKind::Fragments => {
490 describe::handle_describe_fragments(handler_args, name).await
491 }
492 DescribeKind::Plain => describe::handle_describe(handler_args, name),
493 },
494 Statement::DescribeFragment { fragment_id } => {
495 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
496 }
497 Statement::Discard(..) => discard::handle_discard(handler_args),
498 Statement::ShowObjects {
499 object: show_object,
500 filter,
501 } => show::handle_show_object(handler_args, show_object, filter).await,
502 Statement::ShowCreateObject { create_type, name } => {
503 show::handle_show_create_object(handler_args, create_type, name)
504 }
505 Statement::ShowTransactionIsolationLevel => {
506 transaction::handle_show_isolation_level(handler_args)
507 }
508 Statement::Drop(DropStatement {
509 object_type,
510 object_name,
511 if_exists,
512 drop_mode,
513 }) => {
514 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
515 match object_type {
516 ObjectType::MaterializedView
517 | ObjectType::View
518 | ObjectType::Sink
519 | ObjectType::Source
520 | ObjectType::Subscription
521 | ObjectType::Index
522 | ObjectType::Table
523 | ObjectType::Schema
524 | ObjectType::Connection => true,
525 ObjectType::Database | ObjectType::User | ObjectType::Secret => {
526 bail_not_implemented!("DROP CASCADE");
527 }
528 }
529 } else {
530 false
531 };
532 match object_type {
533 ObjectType::Table => {
534 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
535 .await
536 }
537 ObjectType::MaterializedView => {
538 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
539 }
540 ObjectType::Index => {
541 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
542 .await
543 }
544 ObjectType::Source => {
545 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
546 .await
547 }
548 ObjectType::Sink => {
549 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
550 }
551 ObjectType::Subscription => {
552 drop_subscription::handle_drop_subscription(
553 handler_args,
554 object_name,
555 if_exists,
556 cascade,
557 )
558 .await
559 }
560 ObjectType::Database => {
561 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
562 }
563 ObjectType::Schema => {
564 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
565 .await
566 }
567 ObjectType::User => {
568 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
569 }
570 ObjectType::View => {
571 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
572 }
573 ObjectType::Connection => {
574 drop_connection::handle_drop_connection(
575 handler_args,
576 object_name,
577 if_exists,
578 cascade,
579 )
580 .await
581 }
582 ObjectType::Secret => {
583 drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await
584 }
585 }
586 }
587 Statement::DropFunction {
589 if_exists,
590 func_desc,
591 option,
592 } => {
593 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
594 .await
595 }
596 Statement::DropAggregate {
597 if_exists,
598 func_desc,
599 option,
600 } => {
601 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
602 .await
603 }
604 Statement::Query(_)
605 | Statement::Insert { .. }
606 | Statement::Delete { .. }
607 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
608 Statement::Copy {
609 entity: CopyEntity::Query(query),
610 target: CopyTarget::Stdout,
611 } => {
612 let response =
613 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
614 .await?;
615 Ok(response.into_copy_query_to_stdout())
616 }
617 Statement::CreateView {
618 materialized,
619 if_not_exists,
620 name,
621 columns,
622 query,
623 with_options: _, or_replace, emit_mode,
626 } => {
627 if or_replace {
628 bail_not_implemented!("CREATE OR REPLACE VIEW");
629 }
630 if materialized {
631 create_mv::handle_create_mv(
632 handler_args,
633 if_not_exists,
634 name,
635 *query,
636 columns,
637 emit_mode,
638 )
639 .await
640 } else {
641 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
642 .await
643 }
644 }
645 Statement::Flush => flush::handle_flush(handler_args).await,
646 Statement::Wait => wait::handle_wait(handler_args).await,
647 Statement::Recover => recover::handle_recover(handler_args).await,
648 Statement::SetVariable {
649 local: _,
650 variable,
651 value,
652 } => {
653 if variable.real_value().eq_ignore_ascii_case("database") {
655 let x = variable::set_var_to_param_str(&value);
656 let res = use_db::handle_use_db(
657 handler_args,
658 ObjectName::from(vec![Ident::from_real_value(
659 x.as_deref().unwrap_or("default"),
660 )]),
661 )?;
662 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
663 for notice in res.notices() {
664 builder = builder.notice(notice);
665 }
666 return Ok(builder.into());
667 }
668 variable::handle_set(handler_args, variable, value)
669 }
670 Statement::SetTimeZone { local: _, value } => {
671 variable::handle_set_time_zone(handler_args, value)
672 }
673 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
674 Statement::CreateIndex {
675 name,
676 table_name,
677 method,
678 columns,
679 include,
680 distributed_by,
681 unique,
682 if_not_exists,
683 with_properties: _,
684 } => {
685 if unique {
686 bail_not_implemented!("create unique index");
687 }
688
689 create_index::handle_create_index(
690 handler_args,
691 if_not_exists,
692 name,
693 table_name,
694 method,
695 columns.clone(),
696 include,
697 distributed_by,
698 )
699 .await
700 }
701 Statement::AlterDatabase { name, operation } => match operation {
702 AlterDatabaseOperation::RenameDatabase { database_name } => {
703 alter_rename::handle_rename_database(handler_args, name, database_name).await
704 }
705 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
706 alter_owner::handle_alter_owner(
707 handler_args,
708 name,
709 new_owner_name,
710 StatementType::ALTER_DATABASE,
711 None,
712 )
713 .await
714 }
715 AlterDatabaseOperation::SetParam(config_param) => {
716 let ConfigParam { param, value } = config_param;
717
718 let database_param = match param.real_value().to_uppercase().as_str() {
719 "BARRIER_INTERVAL_MS" => {
720 let barrier_interval_ms = match value {
721 SetVariableValue::Default => None,
722 SetVariableValue::Single(SetVariableValueSingle::Literal(
723 Value::Number(num),
724 )) => {
725 let num = num.parse::<u32>().map_err(|e| {
726 ErrorCode::InvalidInputSyntax(format!(
727 "barrier_interval_ms must be a u32 integer: {}",
728 e.as_report()
729 ))
730 })?;
731 Some(num)
732 }
733 _ => {
734 return Err(ErrorCode::InvalidInputSyntax(
735 "barrier_interval_ms must be a u32 integer or DEFAULT"
736 .to_owned(),
737 )
738 .into());
739 }
740 };
741 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
742 }
743 "CHECKPOINT_FREQUENCY" => {
744 let checkpoint_frequency = match value {
745 SetVariableValue::Default => None,
746 SetVariableValue::Single(SetVariableValueSingle::Literal(
747 Value::Number(num),
748 )) => {
749 let num = num.parse::<u64>().map_err(|e| {
750 ErrorCode::InvalidInputSyntax(format!(
751 "checkpoint_frequency must be a u64 integer: {}",
752 e.as_report()
753 ))
754 })?;
755 Some(num)
756 }
757 _ => {
758 return Err(ErrorCode::InvalidInputSyntax(
759 "checkpoint_frequency must be a u64 integer or DEFAULT"
760 .to_owned(),
761 )
762 .into());
763 }
764 };
765 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
766 }
767 _ => {
768 return Err(ErrorCode::InvalidInputSyntax(format!(
769 "Unsupported database config parameter: {}",
770 param.real_value()
771 ))
772 .into());
773 }
774 };
775
776 alter_database_param::handle_alter_database_param(
777 handler_args,
778 name,
779 database_param,
780 )
781 .await
782 }
783 },
784 Statement::AlterSchema { name, operation } => match operation {
785 AlterSchemaOperation::RenameSchema { schema_name } => {
786 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
787 }
788 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
789 alter_owner::handle_alter_owner(
790 handler_args,
791 name,
792 new_owner_name,
793 StatementType::ALTER_SCHEMA,
794 None,
795 )
796 .await
797 }
798 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
799 alter_swap_rename::handle_swap_rename(
800 handler_args,
801 name,
802 target_schema,
803 StatementType::ALTER_SCHEMA,
804 )
805 .await
806 }
807 },
808 Statement::AlterTable { name, operation } => match operation {
809 AlterTableOperation::AddColumn { .. }
810 | AlterTableOperation::DropColumn { .. }
811 | AlterTableOperation::AlterColumn { .. } => {
812 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
813 }
814 AlterTableOperation::RenameTable { table_name } => {
815 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
816 .await
817 }
818 AlterTableOperation::ChangeOwner { new_owner_name } => {
819 alter_owner::handle_alter_owner(
820 handler_args,
821 name,
822 new_owner_name,
823 StatementType::ALTER_TABLE,
824 None,
825 )
826 .await
827 }
828 AlterTableOperation::SetParallelism {
829 parallelism,
830 deferred,
831 } => {
832 alter_parallelism::handle_alter_parallelism(
833 handler_args,
834 name,
835 parallelism,
836 StatementType::ALTER_TABLE,
837 deferred,
838 )
839 .await
840 }
841 AlterTableOperation::SetSchema { new_schema_name } => {
842 alter_set_schema::handle_alter_set_schema(
843 handler_args,
844 name,
845 new_schema_name,
846 StatementType::ALTER_TABLE,
847 None,
848 )
849 .await
850 }
851 AlterTableOperation::RefreshSchema => {
852 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
853 }
854 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
855 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
856 handler_args,
857 PbThrottleTarget::Table,
858 risingwave_pb::common::PbThrottleType::Source,
859 name,
860 rate_limit,
861 )
862 .await
863 }
864 AlterTableOperation::DropConnector => {
865 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
866 .await
867 }
868 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
869 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
870 handler_args,
871 PbThrottleTarget::Table,
872 risingwave_pb::common::PbThrottleType::Dml,
873 name,
874 rate_limit,
875 )
876 .await
877 }
878 AlterTableOperation::SetConfig { entries } => {
879 alter_streaming_config::handle_alter_streaming_set_config(
880 handler_args,
881 name,
882 entries,
883 StatementType::ALTER_TABLE,
884 )
885 .await
886 }
887 AlterTableOperation::ResetConfig { keys } => {
888 alter_streaming_config::handle_alter_streaming_reset_config(
889 handler_args,
890 name,
891 keys,
892 StatementType::ALTER_TABLE,
893 )
894 .await
895 }
896 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
897 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
898 handler_args,
899 PbThrottleTarget::Table,
900 risingwave_pb::common::PbThrottleType::Backfill,
901 name,
902 rate_limit,
903 )
904 .await
905 }
906 AlterTableOperation::SwapRenameTable { target_table } => {
907 alter_swap_rename::handle_swap_rename(
908 handler_args,
909 name,
910 target_table,
911 StatementType::ALTER_TABLE,
912 )
913 .await
914 }
915 AlterTableOperation::AlterConnectorProps { alter_props } => {
916 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
917 }
918 AlterTableOperation::AddConstraint { .. }
919 | AlterTableOperation::DropConstraint { .. }
920 | AlterTableOperation::RenameColumn { .. }
921 | AlterTableOperation::ChangeColumn { .. }
922 | AlterTableOperation::RenameConstraint { .. } => {
923 bail_not_implemented!(
924 "Unhandled statement: {}",
925 Statement::AlterTable { name, operation }
926 )
927 }
928 },
929 Statement::AlterIndex { name, operation } => match operation {
930 AlterIndexOperation::RenameIndex { index_name } => {
931 alter_rename::handle_rename_index(handler_args, name, index_name).await
932 }
933 AlterIndexOperation::SetParallelism {
934 parallelism,
935 deferred,
936 } => {
937 alter_parallelism::handle_alter_parallelism(
938 handler_args,
939 name,
940 parallelism,
941 StatementType::ALTER_INDEX,
942 deferred,
943 )
944 .await
945 }
946 AlterIndexOperation::SetConfig { entries } => {
947 alter_streaming_config::handle_alter_streaming_set_config(
948 handler_args,
949 name,
950 entries,
951 StatementType::ALTER_INDEX,
952 )
953 .await
954 }
955 AlterIndexOperation::ResetConfig { keys } => {
956 alter_streaming_config::handle_alter_streaming_reset_config(
957 handler_args,
958 name,
959 keys,
960 StatementType::ALTER_INDEX,
961 )
962 .await
963 }
964 },
965 Statement::AlterView {
966 materialized,
967 name,
968 operation,
969 } => {
970 let statement_type = if materialized {
971 StatementType::ALTER_MATERIALIZED_VIEW
972 } else {
973 StatementType::ALTER_VIEW
974 };
975 match operation {
976 AlterViewOperation::RenameView { view_name } => {
977 if materialized {
978 alter_rename::handle_rename_table(
979 handler_args,
980 TableType::MaterializedView,
981 name,
982 view_name,
983 )
984 .await
985 } else {
986 alter_rename::handle_rename_view(handler_args, name, view_name).await
987 }
988 }
989 AlterViewOperation::SetParallelism {
990 parallelism,
991 deferred,
992 } => {
993 if !materialized {
994 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
995 }
996 alter_parallelism::handle_alter_parallelism(
997 handler_args,
998 name,
999 parallelism,
1000 statement_type,
1001 deferred,
1002 )
1003 .await
1004 }
1005 AlterViewOperation::SetResourceGroup {
1006 resource_group,
1007 deferred,
1008 } => {
1009 if !materialized {
1010 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1011 }
1012 alter_resource_group::handle_alter_resource_group(
1013 handler_args,
1014 name,
1015 resource_group,
1016 statement_type,
1017 deferred,
1018 )
1019 .await
1020 }
1021 AlterViewOperation::ChangeOwner { new_owner_name } => {
1022 alter_owner::handle_alter_owner(
1023 handler_args,
1024 name,
1025 new_owner_name,
1026 statement_type,
1027 None,
1028 )
1029 .await
1030 }
1031 AlterViewOperation::SetSchema { new_schema_name } => {
1032 alter_set_schema::handle_alter_set_schema(
1033 handler_args,
1034 name,
1035 new_schema_name,
1036 statement_type,
1037 None,
1038 )
1039 .await
1040 }
1041 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1042 if !materialized {
1043 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1044 }
1045 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1046 handler_args,
1047 PbThrottleTarget::Mv,
1048 risingwave_pb::common::PbThrottleType::Backfill,
1049 name,
1050 rate_limit,
1051 )
1052 .await
1053 }
1054 AlterViewOperation::SwapRenameView { target_view } => {
1055 alter_swap_rename::handle_swap_rename(
1056 handler_args,
1057 name,
1058 target_view,
1059 statement_type,
1060 )
1061 .await
1062 }
1063 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1064 if !materialized {
1065 bail!(
1066 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1067 );
1068 }
1069 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1070 }
1071 AlterViewOperation::AsQuery { query } => {
1072 if !materialized {
1073 bail_not_implemented!("ALTER VIEW AS QUERY");
1074 }
1075 if !cfg!(debug_assertions) {
1077 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1078 }
1079 alter_mv::handle_alter_mv(handler_args, name, query).await
1080 }
1081 AlterViewOperation::SetConfig { entries } => {
1082 if !materialized {
1083 bail!("SET CONFIG is only supported for materialized views");
1084 }
1085 alter_streaming_config::handle_alter_streaming_set_config(
1086 handler_args,
1087 name,
1088 entries,
1089 statement_type,
1090 )
1091 .await
1092 }
1093 AlterViewOperation::ResetConfig { keys } => {
1094 if !materialized {
1095 bail!("RESET CONFIG is only supported for materialized views");
1096 }
1097 alter_streaming_config::handle_alter_streaming_reset_config(
1098 handler_args,
1099 name,
1100 keys,
1101 statement_type,
1102 )
1103 .await
1104 }
1105 }
1106 }
1107
1108 Statement::AlterSink { name, operation } => match operation {
1109 AlterSinkOperation::AlterConnectorProps {
1110 alter_props: changed_props,
1111 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1112 AlterSinkOperation::RenameSink { sink_name } => {
1113 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1114 }
1115 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1116 alter_owner::handle_alter_owner(
1117 handler_args,
1118 name,
1119 new_owner_name,
1120 StatementType::ALTER_SINK,
1121 None,
1122 )
1123 .await
1124 }
1125 AlterSinkOperation::SetSchema { new_schema_name } => {
1126 alter_set_schema::handle_alter_set_schema(
1127 handler_args,
1128 name,
1129 new_schema_name,
1130 StatementType::ALTER_SINK,
1131 None,
1132 )
1133 .await
1134 }
1135 AlterSinkOperation::SetParallelism {
1136 parallelism,
1137 deferred,
1138 } => {
1139 alter_parallelism::handle_alter_parallelism(
1140 handler_args,
1141 name,
1142 parallelism,
1143 StatementType::ALTER_SINK,
1144 deferred,
1145 )
1146 .await
1147 }
1148 AlterSinkOperation::SetConfig { entries } => {
1149 alter_streaming_config::handle_alter_streaming_set_config(
1150 handler_args,
1151 name,
1152 entries,
1153 StatementType::ALTER_SINK,
1154 )
1155 .await
1156 }
1157 AlterSinkOperation::ResetConfig { keys } => {
1158 alter_streaming_config::handle_alter_streaming_reset_config(
1159 handler_args,
1160 name,
1161 keys,
1162 StatementType::ALTER_SINK,
1163 )
1164 .await
1165 }
1166 AlterSinkOperation::SwapRenameSink { target_sink } => {
1167 alter_swap_rename::handle_swap_rename(
1168 handler_args,
1169 name,
1170 target_sink,
1171 StatementType::ALTER_SINK,
1172 )
1173 .await
1174 }
1175 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1176 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1177 handler_args,
1178 PbThrottleTarget::Sink,
1179 risingwave_pb::common::PbThrottleType::Sink,
1180 name,
1181 rate_limit,
1182 )
1183 .await
1184 }
1185 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1186 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1187 handler_args,
1188 PbThrottleTarget::Sink,
1189 risingwave_pb::common::PbThrottleType::Backfill,
1190 name,
1191 rate_limit,
1192 )
1193 .await
1194 }
1195 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1196 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1197 handler_args,
1198 name,
1199 enable,
1200 )
1201 .await
1202 }
1203 },
1204 Statement::AlterSubscription { name, operation } => match operation {
1205 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1206 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1207 .await
1208 }
1209 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1210 alter_owner::handle_alter_owner(
1211 handler_args,
1212 name,
1213 new_owner_name,
1214 StatementType::ALTER_SUBSCRIPTION,
1215 None,
1216 )
1217 .await
1218 }
1219 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1220 alter_set_schema::handle_alter_set_schema(
1221 handler_args,
1222 name,
1223 new_schema_name,
1224 StatementType::ALTER_SUBSCRIPTION,
1225 None,
1226 )
1227 .await
1228 }
1229 AlterSubscriptionOperation::SwapRenameSubscription {
1230 target_subscription,
1231 } => {
1232 alter_swap_rename::handle_swap_rename(
1233 handler_args,
1234 name,
1235 target_subscription,
1236 StatementType::ALTER_SUBSCRIPTION,
1237 )
1238 .await
1239 }
1240 },
1241 Statement::AlterSource { name, operation } => match operation {
1242 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1243 alter_source_props::handle_alter_source_connector_props(
1244 handler_args,
1245 name,
1246 alter_props,
1247 )
1248 .await
1249 }
1250 AlterSourceOperation::RenameSource { source_name } => {
1251 alter_rename::handle_rename_source(handler_args, name, source_name).await
1252 }
1253 AlterSourceOperation::AddColumn { .. } => {
1254 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1255 }
1256 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1257 alter_owner::handle_alter_owner(
1258 handler_args,
1259 name,
1260 new_owner_name,
1261 StatementType::ALTER_SOURCE,
1262 None,
1263 )
1264 .await
1265 }
1266 AlterSourceOperation::SetSchema { new_schema_name } => {
1267 alter_set_schema::handle_alter_set_schema(
1268 handler_args,
1269 name,
1270 new_schema_name,
1271 StatementType::ALTER_SOURCE,
1272 None,
1273 )
1274 .await
1275 }
1276 AlterSourceOperation::FormatEncode { format_encode } => {
1277 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1278 .await
1279 }
1280 AlterSourceOperation::RefreshSchema => {
1281 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1282 }
1283 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1284 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1285 handler_args,
1286 PbThrottleTarget::Source,
1287 risingwave_pb::common::PbThrottleType::Source,
1288 name,
1289 rate_limit,
1290 )
1291 .await
1292 }
1293 AlterSourceOperation::SwapRenameSource { target_source } => {
1294 alter_swap_rename::handle_swap_rename(
1295 handler_args,
1296 name,
1297 target_source,
1298 StatementType::ALTER_SOURCE,
1299 )
1300 .await
1301 }
1302 AlterSourceOperation::SetParallelism {
1303 parallelism,
1304 deferred,
1305 } => {
1306 alter_parallelism::handle_alter_parallelism(
1307 handler_args,
1308 name,
1309 parallelism,
1310 StatementType::ALTER_SOURCE,
1311 deferred,
1312 )
1313 .await
1314 }
1315 AlterSourceOperation::SetConfig { entries } => {
1316 alter_streaming_config::handle_alter_streaming_set_config(
1317 handler_args,
1318 name,
1319 entries,
1320 StatementType::ALTER_SOURCE,
1321 )
1322 .await
1323 }
1324 AlterSourceOperation::ResetConfig { keys } => {
1325 alter_streaming_config::handle_alter_streaming_reset_config(
1326 handler_args,
1327 name,
1328 keys,
1329 StatementType::ALTER_SOURCE,
1330 )
1331 .await
1332 }
1333 AlterSourceOperation::ResetSource => {
1334 reset_source::handle_reset_source(handler_args, name).await
1335 }
1336 },
1337 Statement::AlterFunction {
1338 name,
1339 args,
1340 operation,
1341 } => match operation {
1342 AlterFunctionOperation::SetSchema { new_schema_name } => {
1343 alter_set_schema::handle_alter_set_schema(
1344 handler_args,
1345 name,
1346 new_schema_name,
1347 StatementType::ALTER_FUNCTION,
1348 args,
1349 )
1350 .await
1351 }
1352 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1353 alter_owner::handle_alter_owner(
1354 handler_args,
1355 name,
1356 new_owner_name,
1357 StatementType::ALTER_FUNCTION,
1358 args,
1359 )
1360 .await
1361 }
1362 },
1363 Statement::AlterConnection { name, operation } => match operation {
1364 AlterConnectionOperation::SetSchema { new_schema_name } => {
1365 alter_set_schema::handle_alter_set_schema(
1366 handler_args,
1367 name,
1368 new_schema_name,
1369 StatementType::ALTER_CONNECTION,
1370 None,
1371 )
1372 .await
1373 }
1374 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1375 alter_owner::handle_alter_owner(
1376 handler_args,
1377 name,
1378 new_owner_name,
1379 StatementType::ALTER_CONNECTION,
1380 None,
1381 )
1382 .await
1383 }
1384 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1385 alter_connection_props::handle_alter_connection_connector_props(
1386 handler_args,
1387 name,
1388 alter_props,
1389 )
1390 .await
1391 }
1392 },
1393 Statement::AlterSystem { param, value } => {
1394 alter_system::handle_alter_system(handler_args, param, value).await
1395 }
1396 Statement::AlterSecret { name, operation } => match operation {
1397 AlterSecretOperation::ChangeCredential {
1398 with_options,
1399 new_credential,
1400 } => {
1401 alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1402 .await
1403 }
1404 AlterSecretOperation::ChangeOwner { new_owner_name } => {
1405 alter_owner::handle_alter_owner(
1406 handler_args,
1407 name,
1408 new_owner_name,
1409 StatementType::ALTER_SECRET,
1410 None,
1411 )
1412 .await
1413 }
1414 },
1415 Statement::AlterFragment {
1416 fragment_ids,
1417 operation,
1418 } => match operation {
1419 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1420 let [fragment_id] = fragment_ids.as_slice() else {
1421 return Err(ErrorCode::InvalidInputSyntax(
1422 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1423 .to_owned(),
1424 )
1425 .into());
1426 };
1427 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1428 &handler_args.session,
1429 PbThrottleTarget::Fragment,
1430 risingwave_pb::common::PbThrottleType::Backfill,
1431 *fragment_id,
1432 rate_limit,
1433 StatementType::SET_VARIABLE,
1434 )
1435 .await
1436 }
1437 AlterFragmentOperation::SetParallelism { parallelism } => {
1438 alter_parallelism::handle_alter_fragment_parallelism(
1439 handler_args,
1440 fragment_ids.into_iter().map_into().collect(),
1441 parallelism,
1442 )
1443 .await
1444 }
1445 },
1446 Statement::AlterDefaultPrivileges { .. } => {
1447 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1448 }
1449 Statement::StartTransaction { modes } => {
1450 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1451 }
1452 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1453 Statement::Commit { chain } => {
1454 transaction::handle_commit(handler_args, COMMIT, chain).await
1455 }
1456 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1457 Statement::Rollback { chain } => {
1458 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1459 }
1460 Statement::SetTransaction {
1461 modes,
1462 snapshot,
1463 session,
1464 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1465 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1466 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1467 Statement::Comment {
1468 object_type,
1469 object_name,
1470 comment,
1471 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1472 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1473 Statement::Prepare {
1474 name,
1475 data_types,
1476 statement,
1477 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1478 Statement::Deallocate { name, prepare } => {
1479 prepared_statement::handle_deallocate(name, prepare).await
1480 }
1481 Statement::Vacuum { object_name, full } => {
1482 vacuum::handle_vacuum(handler_args, object_name, full).await
1483 }
1484 Statement::Refresh { table_name } => {
1485 refresh::handle_refresh(handler_args, table_name).await
1486 }
1487 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1488 }
1489}
1490
1491fn check_ban_ddl_for_iceberg_engine_table(
1492 session: Arc<SessionImpl>,
1493 stmt: &Statement,
1494) -> Result<()> {
1495 match stmt {
1496 Statement::AlterTable {
1497 name,
1498 operation:
1499 operation @ (AlterTableOperation::AddColumn { .. }
1500 | AlterTableOperation::DropColumn { .. }),
1501 } => {
1502 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1503 if table.is_iceberg_engine_table() {
1504 bail!(
1505 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1506 operation,
1507 schema_name,
1508 name
1509 );
1510 }
1511 }
1512
1513 Statement::AlterTable {
1514 name,
1515 operation: AlterTableOperation::RenameTable { .. },
1516 } => {
1517 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1518 if table.is_iceberg_engine_table() {
1519 bail!(
1520 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1521 schema_name,
1522 name
1523 );
1524 }
1525 }
1526
1527 Statement::AlterTable {
1528 name,
1529 operation: AlterTableOperation::SetParallelism { .. },
1530 } => {
1531 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1532 if table.is_iceberg_engine_table() {
1533 bail!(
1534 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1535 schema_name,
1536 name
1537 );
1538 }
1539 }
1540
1541 Statement::AlterTable {
1542 name,
1543 operation: AlterTableOperation::SetSchema { .. },
1544 } => {
1545 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1546 if table.is_iceberg_engine_table() {
1547 bail!(
1548 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1549 schema_name,
1550 name
1551 );
1552 }
1553 }
1554
1555 Statement::AlterTable {
1556 name,
1557 operation: AlterTableOperation::RefreshSchema,
1558 } => {
1559 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1560 if table.is_iceberg_engine_table() {
1561 bail!(
1562 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1563 schema_name,
1564 name
1565 );
1566 }
1567 }
1568
1569 Statement::AlterTable {
1570 name,
1571 operation: AlterTableOperation::SetSourceRateLimit { .. },
1572 } => {
1573 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1574 if table.is_iceberg_engine_table() {
1575 bail!(
1576 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1577 schema_name,
1578 name
1579 );
1580 }
1581 }
1582
1583 _ => {}
1584 }
1585
1586 Ok(())
1587}