1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use futures::stream::{self, BoxStream};
20use futures::{Stream, StreamExt};
21use itertools::Itertools;
22use pgwire::pg_field_descriptor::PgFieldDescriptor;
23use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION};
24use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
25use pgwire::pg_server::BoxedError;
26use pgwire::types::{Format, Row};
27use risingwave_common::catalog::AlterDatabaseParam;
28use risingwave_common::types::Fields;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::{bail, bail_not_implemented};
31use risingwave_pb::meta::PbThrottleTarget;
32use risingwave_sqlparser::ast::*;
33use thiserror_ext::AsReport;
34use util::get_table_catalog_by_table_name;
35
36use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result};
39use crate::handler::cancel_job::handle_cancel;
40use crate::handler::kill_process::handle_kill;
41use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
42use crate::session::SessionImpl;
43use crate::utils::WithOptions;
44
45mod alter_connection_props;
46mod alter_database_param;
47mod alter_mv;
48mod alter_owner;
49mod alter_parallelism;
50mod alter_rename;
51mod alter_resource_group;
52mod alter_secret;
53mod alter_set_schema;
54mod alter_sink_props;
55mod alter_source_column;
56mod alter_source_props;
57mod alter_source_with_sr;
58mod alter_streaming_config;
59mod alter_streaming_enable_unaligned_join;
60mod alter_streaming_rate_limit;
61mod alter_swap_rename;
62mod alter_system;
63mod alter_table_column;
64pub mod alter_table_drop_connector;
65pub mod alter_table_props;
66mod alter_table_with_sr;
67pub mod alter_user;
68mod alter_utils;
69pub mod cancel_job;
70pub mod close_cursor;
71mod comment;
72pub mod create_aggregate;
73pub mod create_connection;
74mod create_database;
75pub mod create_function;
76pub mod create_index;
77pub mod create_mv;
78pub mod create_schema;
79pub mod create_secret;
80pub mod create_sink;
81pub mod create_source;
82pub mod create_sql_function;
83pub mod create_subscription;
84pub mod create_table;
85pub mod create_table_as;
86pub mod create_user;
87pub mod create_view;
88pub mod declare_cursor;
89pub mod describe;
90pub mod discard;
91mod drop_connection;
92mod drop_database;
93pub mod drop_function;
94mod drop_index;
95pub mod drop_mv;
96mod drop_schema;
97pub mod drop_secret;
98pub mod drop_sink;
99pub mod drop_source;
100pub mod drop_subscription;
101pub mod drop_table;
102pub mod drop_user;
103mod drop_view;
104pub mod explain;
105pub mod explain_analyze_stream_job;
106pub mod extended_handle;
107pub mod fetch_cursor;
108mod flush;
109pub mod handle_privilege;
110pub mod kill_process;
111mod prepared_statement;
112pub mod privilege;
113pub mod query;
114mod recover;
115mod refresh;
116mod reset_source;
117pub mod show;
118mod transaction;
119mod use_db;
120pub mod util;
121pub mod vacuum;
122pub mod variable;
123mod wait;
124
125pub use alter_table_column::{
126 fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
127};
128
129pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
131
132pub type RwPgResponse = PgResponse<PgResponseStream>;
134
135#[easy_ext::ext(RwPgResponseBuilderExt)]
136impl RwPgResponseBuilder {
137 pub fn rows<T: Fields>(self, rows: impl IntoIterator<Item = T>) -> Self {
139 let fields = T::fields();
140 self.values(
141 rows.into_iter()
142 .map(|row| {
143 Row::new(
144 row.into_owned_row()
145 .into_iter()
146 .zip_eq_fast(&fields)
147 .map(|(datum, (_, ty))| {
148 datum.map(|scalar| {
149 scalar.as_scalar_ref_impl().text_format(ty).into()
150 })
151 })
152 .collect(),
153 )
154 })
155 .collect_vec()
156 .into(),
157 fields_to_descriptors(fields),
158 )
159 }
160}
161
162pub fn fields_to_descriptors(
163 fields: Vec<(&str, risingwave_common::types::DataType)>,
164) -> Vec<PgFieldDescriptor> {
165 fields
166 .iter()
167 .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len()))
168 .collect()
169}
170
171pub enum PgResponseStream {
172 LocalQuery(DataChunkToRowSetAdapter<LocalQueryStream>),
173 DistributedQuery(DataChunkToRowSetAdapter<DistributedQueryStream>),
174 Rows(BoxStream<'static, RowSetResult>),
175}
176
177impl Stream for PgResponseStream {
178 type Item = std::result::Result<Vec<Row>, BoxedError>;
179
180 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 match &mut *self {
182 PgResponseStream::LocalQuery(inner) => inner.poll_next_unpin(cx),
183 PgResponseStream::DistributedQuery(inner) => inner.poll_next_unpin(cx),
184 PgResponseStream::Rows(inner) => inner.poll_next_unpin(cx),
185 }
186 }
187}
188
189impl From<Vec<Row>> for PgResponseStream {
190 fn from(rows: Vec<Row>) -> Self {
191 Self::Rows(stream::iter(vec![Ok(rows)]).boxed())
192 }
193}
194
195#[derive(Clone)]
196pub struct HandlerArgs {
197 pub session: Arc<SessionImpl>,
198 pub sql: Arc<str>,
199 pub normalized_sql: String,
200 pub with_options: WithOptions,
201}
202
203impl HandlerArgs {
204 pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
205 Ok(Self {
206 session,
207 sql,
208 with_options: WithOptions::try_from(stmt)?,
209 normalized_sql: Self::normalize_sql(stmt),
210 })
211 }
212
213 fn normalize_sql(stmt: &Statement) -> String {
219 let mut stmt = stmt.clone();
220 match &mut stmt {
221 Statement::CreateView {
222 or_replace,
223 if_not_exists,
224 ..
225 } => {
226 *or_replace = false;
227 *if_not_exists = false;
228 }
229 Statement::CreateTable {
230 or_replace,
231 if_not_exists,
232 ..
233 } => {
234 *or_replace = false;
235 *if_not_exists = false;
236 }
237 Statement::CreateIndex { if_not_exists, .. } => {
238 *if_not_exists = false;
239 }
240 Statement::CreateSource {
241 stmt: CreateSourceStatement { if_not_exists, .. },
242 ..
243 } => {
244 *if_not_exists = false;
245 }
246 Statement::CreateSink {
247 stmt: CreateSinkStatement { if_not_exists, .. },
248 } => {
249 *if_not_exists = false;
250 }
251 Statement::CreateSubscription {
252 stmt: CreateSubscriptionStatement { if_not_exists, .. },
253 } => {
254 *if_not_exists = false;
255 }
256 Statement::CreateConnection {
257 stmt: CreateConnectionStatement { if_not_exists, .. },
258 } => {
259 *if_not_exists = false;
260 }
261 _ => {}
262 }
263 stmt.to_string()
264 }
265}
266
267pub async fn handle(
268 session: Arc<SessionImpl>,
269 stmt: Statement,
270 sql: Arc<str>,
271 formats: Vec<Format>,
272) -> Result<RwPgResponse> {
273 session.clear_cancel_query_flag();
274 let _guard = session.txn_begin_implicit();
275 let handler_args = HandlerArgs::new(session, &stmt, sql)?;
276
277 check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;
278
279 match stmt {
280 Statement::Explain {
281 statement,
282 analyze,
283 options,
284 } => explain::handle_explain(handler_args, *statement, options, analyze).await,
285 Statement::ExplainAnalyzeStreamJob {
286 target,
287 duration_secs,
288 } => {
289 explain_analyze_stream_job::handle_explain_analyze_stream_job(
290 handler_args,
291 target,
292 duration_secs,
293 )
294 .await
295 }
296 Statement::CreateSource { stmt } => {
297 create_source::handle_create_source(handler_args, stmt).await
298 }
299 Statement::CreateSink { stmt } => {
300 create_sink::handle_create_sink(handler_args, stmt, false).await
301 }
302 Statement::CreateSubscription { stmt } => {
303 create_subscription::handle_create_subscription(handler_args, stmt).await
304 }
305 Statement::CreateConnection { stmt } => {
306 create_connection::handle_create_connection(handler_args, stmt).await
307 }
308 Statement::CreateSecret { stmt } => {
309 create_secret::handle_create_secret(handler_args, stmt).await
310 }
311 Statement::CreateFunction {
312 or_replace,
313 temporary,
314 if_not_exists,
315 name,
316 args,
317 returns,
318 params,
319 with_options,
320 } => {
321 if params.language.is_none()
324 || !params
325 .language
326 .as_ref()
327 .unwrap()
328 .real_value()
329 .eq_ignore_ascii_case("sql")
330 {
331 create_function::handle_create_function(
332 handler_args,
333 or_replace,
334 temporary,
335 if_not_exists,
336 name,
337 args,
338 returns,
339 params,
340 with_options,
341 )
342 .await
343 } else {
344 create_sql_function::handle_create_sql_function(
345 handler_args,
346 or_replace,
347 temporary,
348 if_not_exists,
349 name,
350 args,
351 returns,
352 params,
353 )
354 .await
355 }
356 }
357 Statement::CreateAggregate {
358 or_replace,
359 if_not_exists,
360 name,
361 args,
362 returns,
363 params,
364 ..
365 } => {
366 create_aggregate::handle_create_aggregate(
367 handler_args,
368 or_replace,
369 if_not_exists,
370 name,
371 args,
372 returns,
373 params,
374 )
375 .await
376 }
377 Statement::CreateTable {
378 name,
379 columns,
380 wildcard_idx,
381 constraints,
382 query,
383 with_options: _, or_replace,
386 temporary,
387 if_not_exists,
388 format_encode,
389 source_watermarks,
390 append_only,
391 on_conflict,
392 with_version_columns,
393 cdc_table_info,
394 include_column_options,
395 webhook_info,
396 engine,
397 } => {
398 if or_replace {
399 bail_not_implemented!("CREATE OR REPLACE TABLE");
400 }
401 if temporary {
402 bail_not_implemented!("CREATE TEMPORARY TABLE");
403 }
404 if let Some(query) = query {
405 return create_table_as::handle_create_as(
406 handler_args,
407 name,
408 if_not_exists,
409 query,
410 columns,
411 append_only,
412 on_conflict,
413 with_version_columns
414 .iter()
415 .map(|col| col.real_value())
416 .collect(),
417 engine,
418 )
419 .await;
420 }
421 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
422 create_table::handle_create_table(
423 handler_args,
424 name,
425 columns,
426 wildcard_idx,
427 constraints,
428 if_not_exists,
429 format_encode,
430 source_watermarks,
431 append_only,
432 on_conflict,
433 with_version_columns
434 .iter()
435 .map(|col| col.real_value())
436 .collect(),
437 cdc_table_info,
438 include_column_options,
439 webhook_info,
440 engine,
441 )
442 .await
443 }
444 Statement::CreateDatabase {
445 db_name,
446 if_not_exists,
447 owner,
448 resource_group,
449 barrier_interval_ms,
450 checkpoint_frequency,
451 } => {
452 create_database::handle_create_database(
453 handler_args,
454 db_name,
455 if_not_exists,
456 owner,
457 resource_group,
458 barrier_interval_ms,
459 checkpoint_frequency,
460 )
461 .await
462 }
463 Statement::CreateSchema {
464 schema_name,
465 if_not_exists,
466 owner,
467 } => {
468 create_schema::handle_create_schema(handler_args, schema_name, if_not_exists, owner)
469 .await
470 }
471 Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await,
472 Statement::DeclareCursor { stmt } => {
473 declare_cursor::handle_declare_cursor(handler_args, stmt).await
474 }
475 Statement::FetchCursor { stmt } => {
476 fetch_cursor::handle_fetch_cursor(handler_args, stmt, &formats).await
477 }
478 Statement::CloseCursor { stmt } => {
479 close_cursor::handle_close_cursor(handler_args, stmt).await
480 }
481 Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await,
482 Statement::Grant { .. } => {
483 handle_privilege::handle_grant_privilege(handler_args, stmt).await
484 }
485 Statement::Revoke { .. } => {
486 handle_privilege::handle_revoke_privilege(handler_args, stmt).await
487 }
488 Statement::Describe { name, kind } => match kind {
489 DescribeKind::Fragments => {
490 describe::handle_describe_fragments(handler_args, name).await
491 }
492 DescribeKind::Plain => describe::handle_describe(handler_args, name),
493 },
494 Statement::DescribeFragment { fragment_id } => {
495 describe::handle_describe_fragment(handler_args, fragment_id.into()).await
496 }
497 Statement::Discard(..) => discard::handle_discard(handler_args),
498 Statement::ShowObjects {
499 object: show_object,
500 filter,
501 } => show::handle_show_object(handler_args, show_object, filter).await,
502 Statement::ShowCreateObject { create_type, name } => {
503 show::handle_show_create_object(handler_args, create_type, name)
504 }
505 Statement::ShowTransactionIsolationLevel => {
506 transaction::handle_show_isolation_level(handler_args)
507 }
508 Statement::Drop(DropStatement {
509 object_type,
510 object_name,
511 if_exists,
512 drop_mode,
513 }) => {
514 let cascade = if let AstOption::Some(DropMode::Cascade) = drop_mode {
515 match object_type {
516 ObjectType::MaterializedView
517 | ObjectType::View
518 | ObjectType::Sink
519 | ObjectType::Source
520 | ObjectType::Subscription
521 | ObjectType::Index
522 | ObjectType::Table
523 | ObjectType::Schema
524 | ObjectType::Connection
525 | ObjectType::Secret => true,
526 ObjectType::Database | ObjectType::User => {
527 bail_not_implemented!("DROP CASCADE");
528 }
529 }
530 } else {
531 false
532 };
533 match object_type {
534 ObjectType::Table => {
535 drop_table::handle_drop_table(handler_args, object_name, if_exists, cascade)
536 .await
537 }
538 ObjectType::MaterializedView => {
539 drop_mv::handle_drop_mv(handler_args, object_name, if_exists, cascade).await
540 }
541 ObjectType::Index => {
542 drop_index::handle_drop_index(handler_args, object_name, if_exists, cascade)
543 .await
544 }
545 ObjectType::Source => {
546 drop_source::handle_drop_source(handler_args, object_name, if_exists, cascade)
547 .await
548 }
549 ObjectType::Sink => {
550 drop_sink::handle_drop_sink(handler_args, object_name, if_exists, cascade).await
551 }
552 ObjectType::Subscription => {
553 drop_subscription::handle_drop_subscription(
554 handler_args,
555 object_name,
556 if_exists,
557 cascade,
558 )
559 .await
560 }
561 ObjectType::Database => {
562 drop_database::handle_drop_database(handler_args, object_name, if_exists).await
563 }
564 ObjectType::Schema => {
565 drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade)
566 .await
567 }
568 ObjectType::User => {
569 drop_user::handle_drop_user(handler_args, object_name, if_exists).await
570 }
571 ObjectType::View => {
572 drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await
573 }
574 ObjectType::Connection => {
575 drop_connection::handle_drop_connection(
576 handler_args,
577 object_name,
578 if_exists,
579 cascade,
580 )
581 .await
582 }
583 ObjectType::Secret => {
584 drop_secret::handle_drop_secret(handler_args, object_name, if_exists, cascade)
585 .await
586 }
587 }
588 }
589 Statement::DropFunction {
591 if_exists,
592 func_desc,
593 option,
594 } => {
595 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, false)
596 .await
597 }
598 Statement::DropAggregate {
599 if_exists,
600 func_desc,
601 option,
602 } => {
603 drop_function::handle_drop_function(handler_args, if_exists, func_desc, option, true)
604 .await
605 }
606 Statement::Query(_)
607 | Statement::Insert { .. }
608 | Statement::Delete { .. }
609 | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await,
610 Statement::Copy {
611 entity: CopyEntity::Query(query),
612 target: CopyTarget::Stdout,
613 } => {
614 let response =
615 query::handle_query(handler_args, Statement::Query(query), vec![Format::Text])
616 .await?;
617 Ok(response.into_copy_query_to_stdout())
618 }
619 Statement::CreateView {
620 materialized,
621 if_not_exists,
622 name,
623 columns,
624 query,
625 with_options: _, or_replace, emit_mode,
628 } => {
629 if or_replace {
630 bail_not_implemented!("CREATE OR REPLACE VIEW");
631 }
632 if materialized {
633 create_mv::handle_create_mv(
634 handler_args,
635 if_not_exists,
636 name,
637 *query,
638 columns,
639 emit_mode,
640 )
641 .await
642 } else {
643 create_view::handle_create_view(handler_args, if_not_exists, name, columns, *query)
644 .await
645 }
646 }
647 Statement::Flush => flush::handle_flush(handler_args).await,
648 Statement::Wait => wait::handle_wait(handler_args).await,
649 Statement::Recover => recover::handle_recover(handler_args).await,
650 Statement::SetVariable {
651 local: _,
652 variable,
653 value,
654 } => {
655 if variable.real_value().eq_ignore_ascii_case("database") {
657 let x = variable::set_var_to_param_str(&value);
658 let res = use_db::handle_use_db(
659 handler_args,
660 ObjectName::from(vec![Ident::from_real_value(
661 x.as_deref().unwrap_or("default"),
662 )]),
663 )?;
664 let mut builder = RwPgResponse::builder(StatementType::SET_VARIABLE);
665 for notice in res.notices() {
666 builder = builder.notice(notice);
667 }
668 return Ok(builder.into());
669 }
670 variable::handle_set(handler_args, variable, value)
671 }
672 Statement::SetTimeZone { local: _, value } => {
673 variable::handle_set_time_zone(handler_args, value)
674 }
675 Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
676 Statement::CreateIndex {
677 name,
678 table_name,
679 method,
680 columns,
681 include,
682 distributed_by,
683 unique,
684 if_not_exists,
685 with_properties: _,
686 } => {
687 if unique {
688 bail_not_implemented!("create unique index");
689 }
690
691 create_index::handle_create_index(
692 handler_args,
693 if_not_exists,
694 name,
695 table_name,
696 method,
697 columns.clone(),
698 include,
699 distributed_by,
700 )
701 .await
702 }
703 Statement::AlterDatabase { name, operation } => match operation {
704 AlterDatabaseOperation::RenameDatabase { database_name } => {
705 alter_rename::handle_rename_database(handler_args, name, database_name).await
706 }
707 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
708 alter_owner::handle_alter_owner(
709 handler_args,
710 name,
711 new_owner_name,
712 StatementType::ALTER_DATABASE,
713 None,
714 )
715 .await
716 }
717 AlterDatabaseOperation::SetParam(config_param) => {
718 let ConfigParam { param, value } = config_param;
719
720 let database_param = match param.real_value().to_uppercase().as_str() {
721 "BARRIER_INTERVAL_MS" => {
722 let barrier_interval_ms = match value {
723 SetVariableValue::Default => None,
724 SetVariableValue::Single(SetVariableValueSingle::Literal(
725 Value::Number(num),
726 )) => {
727 let num = num.parse::<u32>().map_err(|e| {
728 ErrorCode::InvalidInputSyntax(format!(
729 "barrier_interval_ms must be a u32 integer: {}",
730 e.as_report()
731 ))
732 })?;
733 Some(num)
734 }
735 _ => {
736 return Err(ErrorCode::InvalidInputSyntax(
737 "barrier_interval_ms must be a u32 integer or DEFAULT"
738 .to_owned(),
739 )
740 .into());
741 }
742 };
743 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms)
744 }
745 "CHECKPOINT_FREQUENCY" => {
746 let checkpoint_frequency = match value {
747 SetVariableValue::Default => None,
748 SetVariableValue::Single(SetVariableValueSingle::Literal(
749 Value::Number(num),
750 )) => {
751 let num = num.parse::<u64>().map_err(|e| {
752 ErrorCode::InvalidInputSyntax(format!(
753 "checkpoint_frequency must be a u64 integer: {}",
754 e.as_report()
755 ))
756 })?;
757 Some(num)
758 }
759 _ => {
760 return Err(ErrorCode::InvalidInputSyntax(
761 "checkpoint_frequency must be a u64 integer or DEFAULT"
762 .to_owned(),
763 )
764 .into());
765 }
766 };
767 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency)
768 }
769 _ => {
770 return Err(ErrorCode::InvalidInputSyntax(format!(
771 "Unsupported database config parameter: {}",
772 param.real_value()
773 ))
774 .into());
775 }
776 };
777
778 alter_database_param::handle_alter_database_param(
779 handler_args,
780 name,
781 database_param,
782 )
783 .await
784 }
785 },
786 Statement::AlterSchema { name, operation } => match operation {
787 AlterSchemaOperation::RenameSchema { schema_name } => {
788 alter_rename::handle_rename_schema(handler_args, name, schema_name).await
789 }
790 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
791 alter_owner::handle_alter_owner(
792 handler_args,
793 name,
794 new_owner_name,
795 StatementType::ALTER_SCHEMA,
796 None,
797 )
798 .await
799 }
800 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
801 alter_swap_rename::handle_swap_rename(
802 handler_args,
803 name,
804 target_schema,
805 StatementType::ALTER_SCHEMA,
806 )
807 .await
808 }
809 },
810 Statement::AlterTable { name, operation } => match operation {
811 AlterTableOperation::AddColumn { .. }
812 | AlterTableOperation::DropColumn { .. }
813 | AlterTableOperation::AlterColumn { .. } => {
814 alter_table_column::handle_alter_table_column(handler_args, name, operation).await
815 }
816 AlterTableOperation::RenameTable { table_name } => {
817 alter_rename::handle_rename_table(handler_args, TableType::Table, name, table_name)
818 .await
819 }
820 AlterTableOperation::ChangeOwner { new_owner_name } => {
821 alter_owner::handle_alter_owner(
822 handler_args,
823 name,
824 new_owner_name,
825 StatementType::ALTER_TABLE,
826 None,
827 )
828 .await
829 }
830 AlterTableOperation::SetParallelism {
831 parallelism,
832 deferred,
833 } => {
834 alter_parallelism::handle_alter_parallelism(
835 handler_args,
836 name,
837 parallelism,
838 StatementType::ALTER_TABLE,
839 deferred,
840 )
841 .await
842 }
843 AlterTableOperation::SetSchema { new_schema_name } => {
844 alter_set_schema::handle_alter_set_schema(
845 handler_args,
846 name,
847 new_schema_name,
848 StatementType::ALTER_TABLE,
849 None,
850 )
851 .await
852 }
853 AlterTableOperation::RefreshSchema => {
854 alter_table_with_sr::handle_refresh_schema(handler_args, name).await
855 }
856 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
857 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
858 handler_args,
859 PbThrottleTarget::Table,
860 risingwave_pb::common::PbThrottleType::Source,
861 name,
862 rate_limit,
863 )
864 .await
865 }
866 AlterTableOperation::DropConnector => {
867 alter_table_drop_connector::handle_alter_table_drop_connector(handler_args, name)
868 .await
869 }
870 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
871 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
872 handler_args,
873 PbThrottleTarget::Table,
874 risingwave_pb::common::PbThrottleType::Dml,
875 name,
876 rate_limit,
877 )
878 .await
879 }
880 AlterTableOperation::SetConfig { entries } => {
881 alter_streaming_config::handle_alter_streaming_set_config(
882 handler_args,
883 name,
884 entries,
885 StatementType::ALTER_TABLE,
886 )
887 .await
888 }
889 AlterTableOperation::ResetConfig { keys } => {
890 alter_streaming_config::handle_alter_streaming_reset_config(
891 handler_args,
892 name,
893 keys,
894 StatementType::ALTER_TABLE,
895 )
896 .await
897 }
898 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
899 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
900 handler_args,
901 PbThrottleTarget::Table,
902 risingwave_pb::common::PbThrottleType::Backfill,
903 name,
904 rate_limit,
905 )
906 .await
907 }
908 AlterTableOperation::SwapRenameTable { target_table } => {
909 alter_swap_rename::handle_swap_rename(
910 handler_args,
911 name,
912 target_table,
913 StatementType::ALTER_TABLE,
914 )
915 .await
916 }
917 AlterTableOperation::AlterConnectorProps { alter_props } => {
918 alter_table_props::handle_alter_table_props(handler_args, name, alter_props).await
919 }
920 AlterTableOperation::AddConstraint { .. }
921 | AlterTableOperation::DropConstraint { .. }
922 | AlterTableOperation::RenameColumn { .. }
923 | AlterTableOperation::ChangeColumn { .. }
924 | AlterTableOperation::RenameConstraint { .. } => {
925 bail_not_implemented!(
926 "Unhandled statement: {}",
927 Statement::AlterTable { name, operation }
928 )
929 }
930 },
931 Statement::AlterIndex { name, operation } => match operation {
932 AlterIndexOperation::RenameIndex { index_name } => {
933 alter_rename::handle_rename_index(handler_args, name, index_name).await
934 }
935 AlterIndexOperation::SetParallelism {
936 parallelism,
937 deferred,
938 } => {
939 alter_parallelism::handle_alter_parallelism(
940 handler_args,
941 name,
942 parallelism,
943 StatementType::ALTER_INDEX,
944 deferred,
945 )
946 .await
947 }
948 AlterIndexOperation::SetConfig { entries } => {
949 alter_streaming_config::handle_alter_streaming_set_config(
950 handler_args,
951 name,
952 entries,
953 StatementType::ALTER_INDEX,
954 )
955 .await
956 }
957 AlterIndexOperation::ResetConfig { keys } => {
958 alter_streaming_config::handle_alter_streaming_reset_config(
959 handler_args,
960 name,
961 keys,
962 StatementType::ALTER_INDEX,
963 )
964 .await
965 }
966 },
967 Statement::AlterView {
968 materialized,
969 name,
970 operation,
971 } => {
972 let statement_type = if materialized {
973 StatementType::ALTER_MATERIALIZED_VIEW
974 } else {
975 StatementType::ALTER_VIEW
976 };
977 match operation {
978 AlterViewOperation::RenameView { view_name } => {
979 if materialized {
980 alter_rename::handle_rename_table(
981 handler_args,
982 TableType::MaterializedView,
983 name,
984 view_name,
985 )
986 .await
987 } else {
988 alter_rename::handle_rename_view(handler_args, name, view_name).await
989 }
990 }
991 AlterViewOperation::SetParallelism {
992 parallelism,
993 deferred,
994 } => {
995 if !materialized {
996 bail_not_implemented!("ALTER VIEW SET PARALLELISM");
997 }
998 alter_parallelism::handle_alter_parallelism(
999 handler_args,
1000 name,
1001 parallelism,
1002 statement_type,
1003 deferred,
1004 )
1005 .await
1006 }
1007 AlterViewOperation::SetResourceGroup {
1008 resource_group,
1009 deferred,
1010 } => {
1011 if !materialized {
1012 bail_not_implemented!("ALTER VIEW SET RESOURCE GROUP");
1013 }
1014 alter_resource_group::handle_alter_resource_group(
1015 handler_args,
1016 name,
1017 resource_group,
1018 statement_type,
1019 deferred,
1020 )
1021 .await
1022 }
1023 AlterViewOperation::ChangeOwner { new_owner_name } => {
1024 alter_owner::handle_alter_owner(
1025 handler_args,
1026 name,
1027 new_owner_name,
1028 statement_type,
1029 None,
1030 )
1031 .await
1032 }
1033 AlterViewOperation::SetSchema { new_schema_name } => {
1034 alter_set_schema::handle_alter_set_schema(
1035 handler_args,
1036 name,
1037 new_schema_name,
1038 statement_type,
1039 None,
1040 )
1041 .await
1042 }
1043 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
1044 if !materialized {
1045 bail_not_implemented!("ALTER VIEW SET BACKFILL RATE LIMIT");
1046 }
1047 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1048 handler_args,
1049 PbThrottleTarget::Mv,
1050 risingwave_pb::common::PbThrottleType::Backfill,
1051 name,
1052 rate_limit,
1053 )
1054 .await
1055 }
1056 AlterViewOperation::SwapRenameView { target_view } => {
1057 alter_swap_rename::handle_swap_rename(
1058 handler_args,
1059 name,
1060 target_view,
1061 statement_type,
1062 )
1063 .await
1064 }
1065 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
1066 if !materialized {
1067 bail!(
1068 "ALTER VIEW SET STREAMING_ENABLE_UNALIGNED_JOIN is not supported. Only supported for materialized views"
1069 );
1070 }
1071 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(handler_args, name, enable).await
1072 }
1073 AlterViewOperation::AsQuery { query } => {
1074 if !materialized {
1075 bail_not_implemented!("ALTER VIEW AS QUERY");
1076 }
1077 if !cfg!(debug_assertions) {
1079 bail_not_implemented!("ALTER MATERIALIZED VIEW AS QUERY");
1080 }
1081 alter_mv::handle_alter_mv(handler_args, name, query).await
1082 }
1083 AlterViewOperation::SetConfig { entries } => {
1084 if !materialized {
1085 bail!("SET CONFIG is only supported for materialized views");
1086 }
1087 alter_streaming_config::handle_alter_streaming_set_config(
1088 handler_args,
1089 name,
1090 entries,
1091 statement_type,
1092 )
1093 .await
1094 }
1095 AlterViewOperation::ResetConfig { keys } => {
1096 if !materialized {
1097 bail!("RESET CONFIG is only supported for materialized views");
1098 }
1099 alter_streaming_config::handle_alter_streaming_reset_config(
1100 handler_args,
1101 name,
1102 keys,
1103 statement_type,
1104 )
1105 .await
1106 }
1107 }
1108 }
1109
1110 Statement::AlterSink { name, operation } => match operation {
1111 AlterSinkOperation::AlterConnectorProps {
1112 alter_props: changed_props,
1113 } => alter_sink_props::handle_alter_sink_props(handler_args, name, changed_props).await,
1114 AlterSinkOperation::RenameSink { sink_name } => {
1115 alter_rename::handle_rename_sink(handler_args, name, sink_name).await
1116 }
1117 AlterSinkOperation::ChangeOwner { new_owner_name } => {
1118 alter_owner::handle_alter_owner(
1119 handler_args,
1120 name,
1121 new_owner_name,
1122 StatementType::ALTER_SINK,
1123 None,
1124 )
1125 .await
1126 }
1127 AlterSinkOperation::SetSchema { new_schema_name } => {
1128 alter_set_schema::handle_alter_set_schema(
1129 handler_args,
1130 name,
1131 new_schema_name,
1132 StatementType::ALTER_SINK,
1133 None,
1134 )
1135 .await
1136 }
1137 AlterSinkOperation::SetParallelism {
1138 parallelism,
1139 deferred,
1140 } => {
1141 alter_parallelism::handle_alter_parallelism(
1142 handler_args,
1143 name,
1144 parallelism,
1145 StatementType::ALTER_SINK,
1146 deferred,
1147 )
1148 .await
1149 }
1150 AlterSinkOperation::SetConfig { entries } => {
1151 alter_streaming_config::handle_alter_streaming_set_config(
1152 handler_args,
1153 name,
1154 entries,
1155 StatementType::ALTER_SINK,
1156 )
1157 .await
1158 }
1159 AlterSinkOperation::ResetConfig { keys } => {
1160 alter_streaming_config::handle_alter_streaming_reset_config(
1161 handler_args,
1162 name,
1163 keys,
1164 StatementType::ALTER_SINK,
1165 )
1166 .await
1167 }
1168 AlterSinkOperation::SwapRenameSink { target_sink } => {
1169 alter_swap_rename::handle_swap_rename(
1170 handler_args,
1171 name,
1172 target_sink,
1173 StatementType::ALTER_SINK,
1174 )
1175 .await
1176 }
1177 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
1178 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1179 handler_args,
1180 PbThrottleTarget::Sink,
1181 risingwave_pb::common::PbThrottleType::Sink,
1182 name,
1183 rate_limit,
1184 )
1185 .await
1186 }
1187 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
1188 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1189 handler_args,
1190 PbThrottleTarget::Sink,
1191 risingwave_pb::common::PbThrottleType::Backfill,
1192 name,
1193 rate_limit,
1194 )
1195 .await
1196 }
1197 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
1198 alter_streaming_enable_unaligned_join::handle_alter_streaming_enable_unaligned_join(
1199 handler_args,
1200 name,
1201 enable,
1202 )
1203 .await
1204 }
1205 },
1206 Statement::AlterSubscription { name, operation } => match operation {
1207 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
1208 alter_rename::handle_rename_subscription(handler_args, name, subscription_name)
1209 .await
1210 }
1211 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
1212 alter_owner::handle_alter_owner(
1213 handler_args,
1214 name,
1215 new_owner_name,
1216 StatementType::ALTER_SUBSCRIPTION,
1217 None,
1218 )
1219 .await
1220 }
1221 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
1222 alter_set_schema::handle_alter_set_schema(
1223 handler_args,
1224 name,
1225 new_schema_name,
1226 StatementType::ALTER_SUBSCRIPTION,
1227 None,
1228 )
1229 .await
1230 }
1231 AlterSubscriptionOperation::SwapRenameSubscription {
1232 target_subscription,
1233 } => {
1234 alter_swap_rename::handle_swap_rename(
1235 handler_args,
1236 name,
1237 target_subscription,
1238 StatementType::ALTER_SUBSCRIPTION,
1239 )
1240 .await
1241 }
1242 },
1243 Statement::AlterSource { name, operation } => match operation {
1244 AlterSourceOperation::AlterConnectorProps { alter_props } => {
1245 alter_source_props::handle_alter_source_connector_props(
1246 handler_args,
1247 name,
1248 alter_props,
1249 )
1250 .await
1251 }
1252 AlterSourceOperation::RenameSource { source_name } => {
1253 alter_rename::handle_rename_source(handler_args, name, source_name).await
1254 }
1255 AlterSourceOperation::AddColumn { .. } => {
1256 alter_source_column::handle_alter_source_column(handler_args, name, operation).await
1257 }
1258 AlterSourceOperation::ChangeOwner { new_owner_name } => {
1259 alter_owner::handle_alter_owner(
1260 handler_args,
1261 name,
1262 new_owner_name,
1263 StatementType::ALTER_SOURCE,
1264 None,
1265 )
1266 .await
1267 }
1268 AlterSourceOperation::SetSchema { new_schema_name } => {
1269 alter_set_schema::handle_alter_set_schema(
1270 handler_args,
1271 name,
1272 new_schema_name,
1273 StatementType::ALTER_SOURCE,
1274 None,
1275 )
1276 .await
1277 }
1278 AlterSourceOperation::FormatEncode { format_encode } => {
1279 alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode)
1280 .await
1281 }
1282 AlterSourceOperation::RefreshSchema => {
1283 alter_source_with_sr::handler_refresh_schema(handler_args, name).await
1284 }
1285 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
1286 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1287 handler_args,
1288 PbThrottleTarget::Source,
1289 risingwave_pb::common::PbThrottleType::Source,
1290 name,
1291 rate_limit,
1292 )
1293 .await
1294 }
1295 AlterSourceOperation::SwapRenameSource { target_source } => {
1296 alter_swap_rename::handle_swap_rename(
1297 handler_args,
1298 name,
1299 target_source,
1300 StatementType::ALTER_SOURCE,
1301 )
1302 .await
1303 }
1304 AlterSourceOperation::SetParallelism {
1305 parallelism,
1306 deferred,
1307 } => {
1308 alter_parallelism::handle_alter_parallelism(
1309 handler_args,
1310 name,
1311 parallelism,
1312 StatementType::ALTER_SOURCE,
1313 deferred,
1314 )
1315 .await
1316 }
1317 AlterSourceOperation::SetConfig { entries } => {
1318 alter_streaming_config::handle_alter_streaming_set_config(
1319 handler_args,
1320 name,
1321 entries,
1322 StatementType::ALTER_SOURCE,
1323 )
1324 .await
1325 }
1326 AlterSourceOperation::ResetConfig { keys } => {
1327 alter_streaming_config::handle_alter_streaming_reset_config(
1328 handler_args,
1329 name,
1330 keys,
1331 StatementType::ALTER_SOURCE,
1332 )
1333 .await
1334 }
1335 AlterSourceOperation::ResetSource => {
1336 reset_source::handle_reset_source(handler_args, name).await
1337 }
1338 },
1339 Statement::AlterFunction {
1340 name,
1341 args,
1342 operation,
1343 } => match operation {
1344 AlterFunctionOperation::SetSchema { new_schema_name } => {
1345 alter_set_schema::handle_alter_set_schema(
1346 handler_args,
1347 name,
1348 new_schema_name,
1349 StatementType::ALTER_FUNCTION,
1350 args,
1351 )
1352 .await
1353 }
1354 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
1355 alter_owner::handle_alter_owner(
1356 handler_args,
1357 name,
1358 new_owner_name,
1359 StatementType::ALTER_FUNCTION,
1360 args,
1361 )
1362 .await
1363 }
1364 },
1365 Statement::AlterConnection { name, operation } => match operation {
1366 AlterConnectionOperation::SetSchema { new_schema_name } => {
1367 alter_set_schema::handle_alter_set_schema(
1368 handler_args,
1369 name,
1370 new_schema_name,
1371 StatementType::ALTER_CONNECTION,
1372 None,
1373 )
1374 .await
1375 }
1376 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
1377 alter_owner::handle_alter_owner(
1378 handler_args,
1379 name,
1380 new_owner_name,
1381 StatementType::ALTER_CONNECTION,
1382 None,
1383 )
1384 .await
1385 }
1386 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
1387 alter_connection_props::handle_alter_connection_connector_props(
1388 handler_args,
1389 name,
1390 alter_props,
1391 )
1392 .await
1393 }
1394 },
1395 Statement::AlterSystem { param, value } => {
1396 alter_system::handle_alter_system(handler_args, param, value).await
1397 }
1398 Statement::AlterSecret { name, operation } => match operation {
1399 AlterSecretOperation::ChangeCredential {
1400 with_options,
1401 new_credential,
1402 } => {
1403 alter_secret::handle_alter_secret(handler_args, name, with_options, new_credential)
1404 .await
1405 }
1406 AlterSecretOperation::ChangeOwner { new_owner_name } => {
1407 alter_owner::handle_alter_owner(
1408 handler_args,
1409 name,
1410 new_owner_name,
1411 StatementType::ALTER_SECRET,
1412 None,
1413 )
1414 .await
1415 }
1416 },
1417 Statement::AlterFragment {
1418 fragment_ids,
1419 operation,
1420 } => match operation {
1421 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1422 let [fragment_id] = fragment_ids.as_slice() else {
1423 return Err(ErrorCode::InvalidInputSyntax(
1424 "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
1425 .to_owned(),
1426 )
1427 .into());
1428 };
1429 alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1430 &handler_args.session,
1431 PbThrottleTarget::Fragment,
1432 risingwave_pb::common::PbThrottleType::Backfill,
1433 *fragment_id,
1434 rate_limit,
1435 StatementType::SET_VARIABLE,
1436 )
1437 .await
1438 }
1439 AlterFragmentOperation::SetParallelism { parallelism } => {
1440 alter_parallelism::handle_alter_fragment_parallelism(
1441 handler_args,
1442 fragment_ids.into_iter().map_into().collect(),
1443 parallelism,
1444 )
1445 .await
1446 }
1447 },
1448 Statement::AlterDefaultPrivileges { .. } => {
1449 handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
1450 }
1451 Statement::StartTransaction { modes } => {
1452 transaction::handle_begin(handler_args, START_TRANSACTION, modes).await
1453 }
1454 Statement::Begin { modes } => transaction::handle_begin(handler_args, BEGIN, modes).await,
1455 Statement::Commit { chain } => {
1456 transaction::handle_commit(handler_args, COMMIT, chain).await
1457 }
1458 Statement::Abort => transaction::handle_rollback(handler_args, ABORT, false).await,
1459 Statement::Rollback { chain } => {
1460 transaction::handle_rollback(handler_args, ROLLBACK, chain).await
1461 }
1462 Statement::SetTransaction {
1463 modes,
1464 snapshot,
1465 session,
1466 } => transaction::handle_set(handler_args, modes, snapshot, session).await,
1467 Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
1468 Statement::Kill(worker_process_id) => handle_kill(handler_args, worker_process_id).await,
1469 Statement::Comment {
1470 object_type,
1471 object_name,
1472 comment,
1473 } => comment::handle_comment(handler_args, object_type, object_name, comment).await,
1474 Statement::Use { db_name } => use_db::handle_use_db(handler_args, db_name),
1475 Statement::Prepare {
1476 name,
1477 data_types,
1478 statement,
1479 } => prepared_statement::handle_prepare(name, data_types, statement).await,
1480 Statement::Deallocate { name, prepare } => {
1481 prepared_statement::handle_deallocate(name, prepare).await
1482 }
1483 Statement::Vacuum { object_name, full } => {
1484 vacuum::handle_vacuum(handler_args, object_name, full).await
1485 }
1486 Statement::Refresh { table_name } => {
1487 refresh::handle_refresh(handler_args, table_name).await
1488 }
1489 _ => bail_not_implemented!("Unhandled statement: {}", stmt),
1490 }
1491}
1492
1493fn check_ban_ddl_for_iceberg_engine_table(
1494 session: Arc<SessionImpl>,
1495 stmt: &Statement,
1496) -> Result<()> {
1497 match stmt {
1498 Statement::AlterTable {
1499 name,
1500 operation:
1501 operation @ (AlterTableOperation::AddColumn { .. }
1502 | AlterTableOperation::DropColumn { .. }),
1503 } => {
1504 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1505 if table.is_iceberg_engine_table() {
1506 bail!(
1507 "ALTER TABLE {} is not supported for iceberg table: {}.{}",
1508 operation,
1509 schema_name,
1510 name
1511 );
1512 }
1513 }
1514
1515 Statement::AlterTable {
1516 name,
1517 operation: AlterTableOperation::RenameTable { .. },
1518 } => {
1519 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1520 if table.is_iceberg_engine_table() {
1521 bail!(
1522 "ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
1523 schema_name,
1524 name
1525 );
1526 }
1527 }
1528
1529 Statement::AlterTable {
1530 name,
1531 operation: AlterTableOperation::SetParallelism { .. },
1532 } => {
1533 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1534 if table.is_iceberg_engine_table() {
1535 bail!(
1536 "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
1537 schema_name,
1538 name
1539 );
1540 }
1541 }
1542
1543 Statement::AlterTable {
1544 name,
1545 operation: AlterTableOperation::SetSchema { .. },
1546 } => {
1547 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1548 if table.is_iceberg_engine_table() {
1549 bail!(
1550 "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
1551 schema_name,
1552 name
1553 );
1554 }
1555 }
1556
1557 Statement::AlterTable {
1558 name,
1559 operation: AlterTableOperation::RefreshSchema,
1560 } => {
1561 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1562 if table.is_iceberg_engine_table() {
1563 bail!(
1564 "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
1565 schema_name,
1566 name
1567 );
1568 }
1569 }
1570
1571 Statement::AlterTable {
1572 name,
1573 operation: AlterTableOperation::SetSourceRateLimit { .. },
1574 } => {
1575 let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
1576 if table.is_iceberg_engine_table() {
1577 bail!(
1578 "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
1579 schema_name,
1580 name
1581 );
1582 }
1583 }
1584
1585 _ => {}
1586 }
1587
1588 Ok(())
1589}