1use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
34use risingwave_common::util::value_encoding::DatumToProtoExt;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::jvm_runtime::JVM;
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40 DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
41};
42use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
43use risingwave_pb::catalog::connection::Info as ConnectionInfo;
44use risingwave_pb::catalog::connection_params::ConnectionType;
45use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
46use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
47use risingwave_pb::meta::PbThrottleTarget;
48use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
49use risingwave_pb::plan_common::{
50 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
51};
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::secret::secret_ref::PbRefAsType;
54use risingwave_pb::stream_plan::StreamFragmentGraph;
55use risingwave_sqlparser::ast::{
56 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
57 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
58 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
59 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
60};
61use risingwave_sqlparser::parser::{IncludeOption, Parser};
62
63use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
64use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
65use crate::binder::{Clause, SecureCompareContext, bind_data_type};
66use crate::catalog::root_catalog::SchemaPath;
67use crate::catalog::source_catalog::SourceCatalog;
68use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
69use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
70use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
71use crate::expr::{Expr, ExprImpl, ExprRewriter};
72use crate::handler::HandlerArgs;
73use crate::handler::create_source::{
74 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
75 bind_source_watermark, handle_addition_columns,
76};
77use crate::handler::util::SourceSchemaCompatExt;
78use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
79use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
80use crate::optimizer::property::{Order, RequiredDist};
81use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
82use crate::session::SessionImpl;
83use crate::session::current::notice_to_user;
84use crate::stream_fragmenter::{GraphJobType, build_graph};
85use crate::utils::OverwriteOptions;
86use crate::{Binder, Explain, TableCatalog, WithOptions};
87
88mod col_id_gen;
89pub use col_id_gen::*;
90use risingwave_connector::sink::iceberg::parse_partition_by_exprs;
91
92use crate::handler::drop_table::handle_drop_table;
93
94fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
95 for option_def in &c.options {
96 match option_def.option {
97 ColumnOption::GeneratedColumns(_) => {}
98 ColumnOption::DefaultValue(_) => {}
99 ColumnOption::DefaultValueInternal { .. } => {}
100 ColumnOption::Unique { is_primary: true } => {}
101 ColumnOption::Null => {}
102 ColumnOption::NotNull => {}
103 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
104 }
105 }
106 Ok(())
107}
108
109pub fn bind_sql_columns(
113 column_defs: &[ColumnDef],
114 is_for_drop_table_connector: bool,
115) -> Result<Vec<ColumnCatalog>> {
116 let mut columns = Vec::with_capacity(column_defs.len());
117
118 for column in column_defs {
119 ensure_column_options_supported(column)?;
120 let ColumnDef {
124 name,
125 data_type,
126 collation,
127 options,
128 ..
129 } = column;
130
131 let data_type = data_type
132 .clone()
133 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
134 if let Some(collation) = collation {
135 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
141 bail_not_implemented!(
142 "Collate collation other than `C` or `POSIX` is not implemented"
143 );
144 }
145
146 match data_type {
147 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
148 _ => {
149 return Err(ErrorCode::NotSupported(
150 format!("{} is not a collatable data type", data_type),
151 "The only built-in collatable data types are `varchar`, please check your type".into(),
152 ).into());
153 }
154 }
155 }
156
157 if !is_for_drop_table_connector {
158 check_column_name_not_reserved(&name.real_value())?;
162 }
163
164 let nullable: bool = !options
165 .iter()
166 .any(|def| matches!(def.option, ColumnOption::NotNull));
167
168 columns.push(ColumnCatalog {
169 column_desc: ColumnDesc {
170 data_type: bind_data_type(&data_type)?,
171 column_id: ColumnId::placeholder(),
172 name: name.real_value(),
173 generated_or_default_column: None,
174 description: None,
175 additional_column: AdditionalColumn { column_type: None },
176 version: ColumnDescVersion::LATEST,
177 system_column: None,
178 nullable,
179 },
180 is_hidden: false,
181 });
182 }
183
184 Ok(columns)
185}
186
187fn check_generated_column_constraints(
188 column_name: &String,
189 column_id: ColumnId,
190 expr: &ExprImpl,
191 column_catalogs: &[ColumnCatalog],
192 generated_column_names: &[String],
193 pk_column_ids: &[ColumnId],
194) -> Result<()> {
195 let input_refs = expr.collect_input_refs(column_catalogs.len());
196 for idx in input_refs.ones() {
197 let referred_generated_column = &column_catalogs[idx].column_desc.name;
198 if generated_column_names
199 .iter()
200 .any(|c| c == referred_generated_column)
201 {
202 return Err(ErrorCode::BindError(format!(
203 "Generated can not reference another generated column. \
204 But here generated column \"{}\" referenced another generated column \"{}\"",
205 column_name, referred_generated_column
206 ))
207 .into());
208 }
209 }
210
211 if pk_column_ids.contains(&column_id) && expr.is_impure() {
212 return Err(ErrorCode::BindError(format!(
213 "Generated columns with impure expressions should not be part of the primary key. \
214 Here column \"{}\" is defined as part of the primary key.",
215 column_name
216 ))
217 .into());
218 }
219
220 Ok(())
221}
222
223pub fn bind_sql_column_constraints(
226 session: &SessionImpl,
227 table_name: String,
228 column_catalogs: &mut [ColumnCatalog],
229 columns: Vec<ColumnDef>,
230 pk_column_ids: &[ColumnId],
231) -> Result<()> {
232 let generated_column_names = {
233 let mut names = vec![];
234 for column in &columns {
235 for option_def in &column.options {
236 if let ColumnOption::GeneratedColumns(_) = option_def.option {
237 names.push(column.name.real_value());
238 break;
239 }
240 }
241 }
242 names
243 };
244
245 let mut binder = Binder::new_for_ddl(session);
246 binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
247
248 for column in columns {
249 let Some(idx) = column_catalogs
250 .iter()
251 .position(|c| c.name() == column.name.real_value())
252 else {
253 continue;
256 };
257
258 for option_def in column.options {
259 match option_def.option {
260 ColumnOption::GeneratedColumns(expr) => {
261 binder.set_clause(Some(Clause::GeneratedColumn));
262
263 let expr_impl = binder.bind_expr(expr).with_context(|| {
264 format!(
265 "fail to bind expression in generated column \"{}\"",
266 column.name.real_value()
267 )
268 })?;
269
270 check_generated_column_constraints(
271 &column.name.real_value(),
272 column_catalogs[idx].column_id(),
273 &expr_impl,
274 column_catalogs,
275 &generated_column_names,
276 pk_column_ids,
277 )?;
278
279 column_catalogs[idx].column_desc.generated_or_default_column = Some(
280 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
281 expr: Some(expr_impl.to_expr_proto()),
282 }),
283 );
284 binder.set_clause(None);
285 }
286 ColumnOption::DefaultValue(expr) => {
287 let expr_impl = binder
288 .bind_expr(expr)?
289 .cast_assign(column_catalogs[idx].data_type().clone())?;
290
291 let rewritten_expr_impl = session
299 .pinned_snapshot()
300 .inline_now_proc_time()
301 .rewrite_expr(expr_impl.clone());
302
303 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
304 let snapshot_value = snapshot_value?;
305
306 column_catalogs[idx].column_desc.generated_or_default_column =
307 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
308 snapshot_value: Some(snapshot_value.to_protobuf()),
309 expr: Some(expr_impl.to_expr_proto()),
310 }));
312 } else {
313 return Err(ErrorCode::BindError(format!(
314 "Default expression used in column `{}` cannot be evaluated. \
315 Use generated columns instead if you mean to reference other columns.",
316 column.name
317 ))
318 .into());
319 }
320 }
321 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
322 if persisted.is_empty() {
327 bail_bind_error!(
328 "DEFAULT INTERNAL is only used for internal purposes, \
329 please specify a concrete default value"
330 );
331 }
332
333 let desc = DefaultColumnDesc::decode(&*persisted)
334 .expect("failed to decode persisted `DefaultColumnDesc`");
335
336 column_catalogs[idx].column_desc.generated_or_default_column =
337 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
338 }
339 _ => {}
340 }
341 }
342 }
343 Ok(())
344}
345
346pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
348 let mut pk_column_names = vec![];
349
350 for constraint in table_constraints {
351 match constraint {
352 TableConstraint::Unique {
353 name: _,
354 columns,
355 is_primary: true,
356 } => {
357 if !pk_column_names.is_empty() {
358 return Err(multiple_pk_definition_err());
359 }
360 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
361 }
362 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
363 }
364 }
365 Ok(pk_column_names)
366}
367
368pub fn bind_sql_pk_names(
369 columns_defs: &[ColumnDef],
370 pk_names_from_table_constraints: Vec<String>,
371) -> Result<Vec<String>> {
372 let mut pk_column_names = pk_names_from_table_constraints;
373
374 for column in columns_defs {
375 for option_def in &column.options {
376 if let ColumnOption::Unique { is_primary: true } = option_def.option {
377 if !pk_column_names.is_empty() {
378 return Err(multiple_pk_definition_err());
379 }
380 pk_column_names.push(column.name.real_value());
381 };
382 }
383 }
384
385 Ok(pk_column_names)
386}
387
388fn multiple_pk_definition_err() -> RwError {
389 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
390}
391
392pub fn bind_pk_and_row_id_on_relation(
397 mut columns: Vec<ColumnCatalog>,
398 pk_names: Vec<String>,
399 must_need_pk: bool,
400) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
401 for c in &columns {
402 assert!(c.column_id() != ColumnId::placeholder());
403 }
404
405 let name_to_id = columns
407 .iter()
408 .map(|c| (c.name(), c.column_id()))
409 .collect::<HashMap<_, _>>();
410
411 let mut pk_column_ids: Vec<_> = pk_names
412 .iter()
413 .map(|name| {
414 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
415 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
416 })
417 })
418 .try_collect()?;
419
420 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
422
423 let row_id_index = need_row_id.then(|| {
424 let column = ColumnCatalog::row_id_column();
425 let index = columns.len();
426 pk_column_ids = vec![column.column_id()];
427 columns.push(column);
428 index
429 });
430
431 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
432 Err(ErrorCode::InvalidInputSyntax(format!(
433 "column \"{col}\" specified more than once"
434 )))?;
435 }
436
437 Ok((columns, pk_column_ids, row_id_index))
438}
439
440#[allow(clippy::too_many_arguments)]
443pub(crate) async fn gen_create_table_plan_with_source(
444 mut handler_args: HandlerArgs,
445 explain_options: ExplainOptions,
446 table_name: ObjectName,
447 column_defs: Vec<ColumnDef>,
448 wildcard_idx: Option<usize>,
449 constraints: Vec<TableConstraint>,
450 format_encode: FormatEncodeOptions,
451 source_watermarks: Vec<SourceWatermark>,
452 mut col_id_gen: ColumnIdGenerator,
453 include_column_options: IncludeOption,
454 props: CreateTableProps,
455 sql_column_strategy: SqlColumnStrategy,
456) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
457 if props.append_only
458 && format_encode.format != Format::Plain
459 && format_encode.format != Format::Native
460 {
461 return Err(ErrorCode::BindError(format!(
462 "Append only table does not support format {}.",
463 format_encode.format
464 ))
465 .into());
466 }
467
468 let session = &handler_args.session;
469 let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
470 if with_properties.is_shareable_cdc_connector() {
471 generated_columns_check_for_cdc_table(&column_defs)?;
472 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
473 } else if column_defs.iter().any(|col| {
474 col.options
475 .iter()
476 .any(|def| matches!(def.option, ColumnOption::NotNull))
477 }) {
478 notice_to_user(
480 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
481 );
482 }
483
484 let db_name: &str = &session.database();
485 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
486
487 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
489 session,
490 &format_encode,
491 Either::Left(&with_properties),
492 CreateSourceType::Table,
493 )
494 .await?;
495
496 let overwrite_options = OverwriteOptions::new(&mut handler_args);
497 let rate_limit = overwrite_options.source_rate_limit;
498 let source = bind_create_source_or_table_with_connector(
499 handler_args.clone(),
500 table_name,
501 format_encode,
502 with_properties,
503 &column_defs,
504 constraints,
505 wildcard_idx,
506 source_watermarks,
507 columns_from_resolve_source,
508 source_info,
509 include_column_options,
510 &mut col_id_gen,
511 CreateSourceType::Table,
512 rate_limit,
513 sql_column_strategy,
514 )
515 .await?;
516
517 let context = OptimizerContext::new(handler_args, explain_options);
518
519 let (plan, table) = gen_table_plan_with_source(
520 context.into(),
521 schema_name,
522 source.clone(),
523 col_id_gen.into_version(),
524 props,
525 )?;
526
527 Ok((plan, Some(source), table))
528}
529
530#[allow(clippy::too_many_arguments)]
533pub(crate) fn gen_create_table_plan(
534 context: OptimizerContext,
535 table_name: ObjectName,
536 column_defs: Vec<ColumnDef>,
537 constraints: Vec<TableConstraint>,
538 mut col_id_gen: ColumnIdGenerator,
539 source_watermarks: Vec<SourceWatermark>,
540 props: CreateTableProps,
541 is_for_replace_plan: bool,
542) -> Result<(PlanRef, TableCatalog)> {
543 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
544 for c in &mut columns {
545 col_id_gen.generate(c)?;
546 }
547
548 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
549 if !secret_refs.is_empty() || !connection_refs.is_empty() {
550 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
551 }
552
553 gen_create_table_plan_without_source(
554 context,
555 table_name,
556 columns,
557 column_defs,
558 constraints,
559 source_watermarks,
560 col_id_gen.into_version(),
561 props,
562 )
563}
564
565#[allow(clippy::too_many_arguments)]
566pub(crate) fn gen_create_table_plan_without_source(
567 context: OptimizerContext,
568 table_name: ObjectName,
569 columns: Vec<ColumnCatalog>,
570 column_defs: Vec<ColumnDef>,
571 constraints: Vec<TableConstraint>,
572 source_watermarks: Vec<SourceWatermark>,
573 version: TableVersion,
574 props: CreateTableProps,
575) -> Result<(PlanRef, TableCatalog)> {
576 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
578 let (mut columns, pk_column_ids, row_id_index) =
579 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
580
581 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
582 context.session_ctx(),
583 table_name.real_value(),
584 source_watermarks,
585 &columns,
586 )?;
587
588 bind_sql_column_constraints(
589 context.session_ctx(),
590 table_name.real_value(),
591 &mut columns,
592 column_defs,
593 &pk_column_ids,
594 )?;
595 let session = context.session_ctx().clone();
596
597 let db_name = &session.database();
598 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
599
600 let info = CreateTableInfo {
601 columns,
602 pk_column_ids,
603 row_id_index,
604 watermark_descs,
605 source_catalog: None,
606 version,
607 };
608
609 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
610}
611
612fn gen_table_plan_with_source(
613 context: OptimizerContextRef,
614 schema_name: Option<String>,
615 source_catalog: SourceCatalog,
616 version: TableVersion,
617 props: CreateTableProps,
618) -> Result<(PlanRef, TableCatalog)> {
619 let table_name = source_catalog.name.clone();
620
621 let info = CreateTableInfo {
622 columns: source_catalog.columns.clone(),
623 pk_column_ids: source_catalog.pk_col_ids.clone(),
624 row_id_index: source_catalog.row_id_index,
625 watermark_descs: source_catalog.watermark_descs.clone(),
626 source_catalog: Some(source_catalog),
627 version,
628 };
629
630 gen_table_plan_inner(context, schema_name, table_name, info, props)
631}
632
633#[derive(Clone, Copy)]
635pub enum EitherOnConflict {
636 Ast(Option<OnConflict>),
637 Resolved(ConflictBehavior),
638}
639
640impl From<Option<OnConflict>> for EitherOnConflict {
641 fn from(v: Option<OnConflict>) -> Self {
642 Self::Ast(v)
643 }
644}
645
646impl From<ConflictBehavior> for EitherOnConflict {
647 fn from(v: ConflictBehavior) -> Self {
648 Self::Resolved(v)
649 }
650}
651
652impl EitherOnConflict {
653 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
655 let conflict_behavior = match self {
656 EitherOnConflict::Ast(on_conflict) => {
657 if append_only {
658 if row_id_as_pk {
659 ConflictBehavior::NoCheck
661 } else {
662 if let Some(on_conflict) = on_conflict
664 && on_conflict != OnConflict::Nothing
665 {
666 return Err(ErrorCode::InvalidInputSyntax(
667 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
668 the ON CONFLICT behavior must be DO NOTHING."
669 .to_owned(),
670 )
671 .into());
672 }
673 ConflictBehavior::IgnoreConflict
674 }
675 } else {
676 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
678 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
679 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
680 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
681 }
682 }
683 }
684 EitherOnConflict::Resolved(b) => b,
685 };
686
687 Ok(conflict_behavior)
688 }
689}
690
691pub struct CreateTableInfo {
696 pub columns: Vec<ColumnCatalog>,
697 pub pk_column_ids: Vec<ColumnId>,
698 pub row_id_index: Option<usize>,
699 pub watermark_descs: Vec<WatermarkDesc>,
700 pub source_catalog: Option<SourceCatalog>,
701 pub version: TableVersion,
702}
703
704pub struct CreateTableProps {
709 pub definition: String,
710 pub append_only: bool,
711 pub on_conflict: EitherOnConflict,
712 pub with_version_column: Option<String>,
713 pub webhook_info: Option<PbWebhookSourceInfo>,
714 pub engine: Engine,
715}
716
717#[allow(clippy::too_many_arguments)]
718fn gen_table_plan_inner(
719 context: OptimizerContextRef,
720 schema_name: Option<String>,
721 table_name: String,
722 info: CreateTableInfo,
723 props: CreateTableProps,
724) -> Result<(PlanRef, TableCatalog)> {
725 let CreateTableInfo {
726 ref columns,
727 row_id_index,
728 ref watermark_descs,
729 ref source_catalog,
730 ..
731 } = info;
732 let CreateTableProps { append_only, .. } = props;
733
734 let (database_id, schema_id) = context
735 .session_ctx()
736 .get_database_and_schema_id_for_create(schema_name)?;
737
738 let session = context.session_ctx().clone();
739 let retention_seconds = context.with_options().retention_seconds();
740
741 let source_node: PlanRef = LogicalSource::new(
742 source_catalog.clone().map(Rc::new),
743 columns.clone(),
744 row_id_index,
745 SourceNodeKind::CreateTable,
746 context.clone(),
747 None,
748 )?
749 .into();
750
751 let required_cols = FixedBitSet::with_capacity(columns.len());
752 let plan_root = PlanRoot::new_with_logical_plan(
753 source_node,
754 RequiredDist::Any,
755 Order::any(),
756 required_cols,
757 vec![],
758 );
759
760 if !append_only && !watermark_descs.is_empty() {
761 return Err(ErrorCode::NotSupported(
762 "Defining watermarks on table requires the table to be append only.".to_owned(),
763 "Use the key words `APPEND ONLY`".to_owned(),
764 )
765 .into());
766 }
767
768 if !append_only && retention_seconds.is_some() {
769 return Err(ErrorCode::NotSupported(
770 "Defining retention seconds on table requires the table to be append only.".to_owned(),
771 "Use the key words `APPEND ONLY`".to_owned(),
772 )
773 .into());
774 }
775
776 let materialize =
777 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
778
779 let mut table = materialize.table().clone();
780 table.owner = session.user_id();
781
782 Ok((materialize.into(), table))
783}
784
785#[allow(clippy::too_many_arguments)]
789pub(crate) fn gen_create_table_plan_for_cdc_table(
790 context: OptimizerContextRef,
791 source: Arc<SourceCatalog>,
792 external_table_name: String,
793 column_defs: Vec<ColumnDef>,
794 mut columns: Vec<ColumnCatalog>,
795 pk_names: Vec<String>,
796 cdc_with_options: WithOptionsSecResolved,
797 mut col_id_gen: ColumnIdGenerator,
798 on_conflict: Option<OnConflict>,
799 with_version_column: Option<String>,
800 include_column_options: IncludeOption,
801 table_name: ObjectName,
802 resolved_table_name: String, database_id: DatabaseId,
804 schema_id: SchemaId,
805 table_id: TableId,
806 engine: Engine,
807) -> Result<(PlanRef, TableCatalog)> {
808 let session = context.session_ctx().clone();
809
810 handle_addition_columns(
812 None,
813 &cdc_with_options,
814 include_column_options,
815 &mut columns,
816 true,
817 )?;
818
819 for c in &mut columns {
820 col_id_gen.generate(c)?;
821 }
822
823 let (mut columns, pk_column_ids, _row_id_index) =
824 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
825
826 bind_sql_column_constraints(
828 context.session_ctx(),
829 table_name.real_value(),
830 &mut columns,
831 column_defs,
832 &pk_column_ids,
833 )?;
834
835 let definition = context.normalized_sql().to_owned();
836
837 let pk_column_indices = {
838 let mut id_to_idx = HashMap::new();
839 columns.iter().enumerate().for_each(|(idx, c)| {
840 id_to_idx.insert(c.column_id(), idx);
841 });
842 pk_column_ids
844 .iter()
845 .map(|c| id_to_idx.get(c).copied().unwrap())
846 .collect_vec()
847 };
848 let table_pk = pk_column_indices
849 .iter()
850 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
851 .collect();
852
853 let (options, secret_refs) = cdc_with_options.into_parts();
854
855 let non_generated_column_descs = columns
856 .iter()
857 .filter(|&c| (!c.is_generated()))
858 .map(|c| c.column_desc.clone())
859 .collect_vec();
860 let non_generated_column_num = non_generated_column_descs.len();
861
862 let cdc_table_desc = CdcTableDesc {
863 table_id,
864 source_id: source.id.into(), external_table_name: external_table_name.clone(),
866 pk: table_pk,
867 columns: non_generated_column_descs,
868 stream_key: pk_column_indices,
869 connect_properties: options,
870 secret_refs,
871 };
872
873 tracing::debug!(?cdc_table_desc, "create cdc table");
874
875 let options = CdcScanOptions::from_with_options(context.with_options())?;
876
877 let logical_scan = LogicalCdcScan::create(
878 external_table_name.clone(),
879 Rc::new(cdc_table_desc),
880 context.clone(),
881 options,
882 );
883
884 let scan_node: PlanRef = logical_scan.into();
885 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
886 let plan_root = PlanRoot::new_with_logical_plan(
887 scan_node,
888 RequiredDist::Any,
889 Order::any(),
890 required_cols,
891 vec![],
892 );
893
894 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
895 let materialize = plan_root.gen_table_plan(
896 context,
897 resolved_table_name,
898 database_id,
899 schema_id,
900 CreateTableInfo {
901 columns,
902 pk_column_ids,
903 row_id_index: None,
904 watermark_descs: vec![],
905 source_catalog: Some((*source).clone()),
906 version: col_id_gen.into_version(),
907 },
908 CreateTableProps {
909 definition,
910 append_only: false,
911 on_conflict: on_conflict.into(),
912 with_version_column,
913 webhook_info: None,
914 engine,
915 },
916 )?;
917
918 let mut table = materialize.table().clone();
919 table.owner = session.user_id();
920 table.cdc_table_id = Some(cdc_table_id);
921 table.dependent_relations = vec![source.id.into()];
922
923 Ok((materialize.into(), table))
924}
925
926fn derive_with_options_for_cdc_table(
927 source_with_properties: &WithOptionsSecResolved,
928 external_table_name: String,
929) -> Result<WithOptionsSecResolved> {
930 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
931 let source_database_name: &str = source_with_properties
933 .get("database.name")
934 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
935 .as_str();
936 let mut with_options = source_with_properties.clone();
937 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
938 match connector.as_str() {
939 MYSQL_CDC_CONNECTOR => {
940 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
943 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
944 })?;
945 if !source_database_name
947 .split(',')
948 .map(|s| s.trim())
949 .any(|name| name == db_name)
950 {
951 return Err(anyhow!(
952 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
953 db_name,
954 source_database_name
955 ).into());
956 }
957 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
958 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
959 }
960 POSTGRES_CDC_CONNECTOR => {
961 let (schema_name, table_name) = external_table_name
962 .split_once('.')
963 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
964
965 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
967 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
968 }
969 SQL_SERVER_CDC_CONNECTOR => {
970 let parts: Vec<&str> = external_table_name.split('.').collect();
976 let (_, schema_name, table_name) = match parts.len() {
977 3 => {
978 let db_name = parts[0];
980 let schema_name = parts[1];
981 let table_name = parts[2];
982
983 if db_name != source_database_name {
985 return Err(anyhow!(
986 "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
987 db_name,
988 source_database_name
989 ).into());
990 }
991 (db_name, schema_name, table_name)
992 }
993 2 => {
994 let schema_name = parts[0];
996 let table_name = parts[1];
997 (source_database_name, schema_name, table_name)
998 }
999 1 => {
1000 let table_name = parts[0];
1002 (source_database_name, "dbo", table_name)
1003 }
1004 _ => {
1005 return Err(anyhow!(
1006 "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1007 ).into());
1008 }
1009 };
1010
1011 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1013 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1014 }
1015 _ => {
1016 return Err(RwError::from(anyhow!(
1017 "connector {} is not supported for cdc table",
1018 connector
1019 )));
1020 }
1021 };
1022 }
1023 Ok(with_options)
1024}
1025
1026#[allow(clippy::too_many_arguments)]
1027pub(super) async fn handle_create_table_plan(
1028 handler_args: HandlerArgs,
1029 explain_options: ExplainOptions,
1030 format_encode: Option<FormatEncodeOptions>,
1031 cdc_table_info: Option<CdcTableInfo>,
1032 table_name: ObjectName,
1033 column_defs: Vec<ColumnDef>,
1034 wildcard_idx: Option<usize>,
1035 constraints: Vec<TableConstraint>,
1036 source_watermarks: Vec<SourceWatermark>,
1037 append_only: bool,
1038 on_conflict: Option<OnConflict>,
1039 with_version_column: Option<String>,
1040 include_column_options: IncludeOption,
1041 webhook_info: Option<WebhookSourceInfo>,
1042 engine: Engine,
1043) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog, TableJobType)> {
1044 let col_id_gen = ColumnIdGenerator::new_initial();
1045 let format_encode = check_create_table_with_source(
1046 &handler_args.with_options,
1047 format_encode,
1048 &include_column_options,
1049 &cdc_table_info,
1050 )?;
1051 let webhook_info = webhook_info
1052 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1053 .transpose()?;
1054
1055 let props = CreateTableProps {
1056 definition: handler_args.normalized_sql.clone(),
1057 append_only,
1058 on_conflict: on_conflict.into(),
1059 with_version_column: with_version_column.clone(),
1060 webhook_info,
1061 engine,
1062 };
1063
1064 let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1065 (Some(format_encode), None) => (
1066 gen_create_table_plan_with_source(
1067 handler_args,
1068 explain_options,
1069 table_name.clone(),
1070 column_defs,
1071 wildcard_idx,
1072 constraints,
1073 format_encode,
1074 source_watermarks,
1075 col_id_gen,
1076 include_column_options,
1077 props,
1078 SqlColumnStrategy::FollowChecked,
1079 )
1080 .await?,
1081 TableJobType::General,
1082 ),
1083 (None, None) => {
1084 let context = OptimizerContext::new(handler_args, explain_options);
1085 let (plan, table) = gen_create_table_plan(
1086 context,
1087 table_name.clone(),
1088 column_defs,
1089 constraints,
1090 col_id_gen,
1091 source_watermarks,
1092 props,
1093 false,
1094 )?;
1095
1096 ((plan, None, table), TableJobType::General)
1097 }
1098
1099 (None, Some(cdc_table)) => {
1100 sanity_check_for_table_on_cdc_source(
1101 append_only,
1102 &column_defs,
1103 &wildcard_idx,
1104 &constraints,
1105 &source_watermarks,
1106 )?;
1107
1108 generated_columns_check_for_cdc_table(&column_defs)?;
1109 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1110
1111 let session = &handler_args.session;
1112 let db_name = &session.database();
1113 let user_name = &session.user_name();
1114 let search_path = session.config().search_path();
1115 let (schema_name, resolved_table_name) =
1116 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1117 let (database_id, schema_id) =
1118 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1119
1120 let (format_encode, source_name) =
1122 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1123
1124 let source = {
1125 let catalog_reader = session.env().catalog_reader().read_guard();
1126 let schema_path =
1127 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1128
1129 let (source, _) = catalog_reader.get_source_by_name(
1130 db_name,
1131 schema_path,
1132 source_name.as_str(),
1133 )?;
1134 source.clone()
1135 };
1136 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1137 &source.with_properties,
1138 cdc_table.external_table_name.clone(),
1139 )?;
1140
1141 let (columns, pk_names) = match wildcard_idx {
1142 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1143 None => {
1144 for column_def in &column_defs {
1145 for option_def in &column_def.options {
1146 if let ColumnOption::DefaultValue(_)
1147 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1148 {
1149 return Err(ErrorCode::NotSupported(
1150 "Default value for columns defined on the table created from a CDC source".into(),
1151 "Remove the default value expression in the column definitions".into(),
1152 )
1153 .into());
1154 }
1155 }
1156 }
1157
1158 let (columns, pk_names) =
1159 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1160 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1162 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1163 .context("failed to extract external table config")?;
1164
1165 (columns, pk_names)
1167 }
1168 };
1169
1170 let context: OptimizerContextRef =
1171 OptimizerContext::new(handler_args, explain_options).into();
1172 let (plan, table) = gen_create_table_plan_for_cdc_table(
1173 context,
1174 source,
1175 cdc_table.external_table_name.clone(),
1176 column_defs,
1177 columns,
1178 pk_names,
1179 cdc_with_options,
1180 col_id_gen,
1181 on_conflict,
1182 with_version_column,
1183 include_column_options,
1184 table_name,
1185 resolved_table_name,
1186 database_id,
1187 schema_id,
1188 TableId::placeholder(),
1189 engine,
1190 )?;
1191
1192 ((plan, None, table), TableJobType::SharedCdcSource)
1193 }
1194 (Some(_), Some(_)) => {
1195 return Err(ErrorCode::NotSupported(
1196 "Data format and encoding format doesn't apply to table created from a CDC source"
1197 .into(),
1198 "Remove the FORMAT and ENCODE specification".into(),
1199 )
1200 .into());
1201 }
1202 };
1203 Ok((plan, source, table, job_type))
1204}
1205
1206fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1208 let mut found_generated_column = false;
1209 for column in columns {
1210 let mut is_generated = false;
1211
1212 for option_def in &column.options {
1213 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1214 is_generated = true;
1215 break;
1216 }
1217 }
1218
1219 if is_generated {
1220 found_generated_column = true;
1221 } else if found_generated_column {
1222 return Err(ErrorCode::NotSupported(
1223 "Non-generated column found after a generated column.".into(),
1224 "Ensure that all generated columns appear at the end of the cdc table definition."
1225 .into(),
1226 )
1227 .into());
1228 }
1229 }
1230 Ok(())
1231}
1232
1233fn not_null_check_for_cdc_table(
1235 wildcard_idx: &Option<usize>,
1236 column_defs: &Vec<ColumnDef>,
1237) -> Result<()> {
1238 if !wildcard_idx.is_some()
1239 && column_defs.iter().any(|col| {
1240 col.options
1241 .iter()
1242 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1243 })
1244 {
1245 return Err(ErrorCode::NotSupported(
1246 "CDC table with NOT NULL constraint is not supported".to_owned(),
1247 "Please remove the NOT NULL constraint for columns".to_owned(),
1248 )
1249 .into());
1250 }
1251 Ok(())
1252}
1253
1254fn sanity_check_for_table_on_cdc_source(
1256 append_only: bool,
1257 column_defs: &Vec<ColumnDef>,
1258 wildcard_idx: &Option<usize>,
1259 constraints: &Vec<TableConstraint>,
1260 source_watermarks: &Vec<SourceWatermark>,
1261) -> Result<()> {
1262 if wildcard_idx.is_some() && !column_defs.is_empty() {
1264 return Err(ErrorCode::NotSupported(
1265 "wildcard(*) and column definitions cannot be used together".to_owned(),
1266 "Remove the wildcard or column definitions".to_owned(),
1267 )
1268 .into());
1269 }
1270
1271 if !wildcard_idx.is_some()
1273 && !constraints.iter().any(|c| {
1274 matches!(
1275 c,
1276 TableConstraint::Unique {
1277 is_primary: true,
1278 ..
1279 }
1280 )
1281 })
1282 && !column_defs.iter().any(|col| {
1283 col.options
1284 .iter()
1285 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1286 })
1287 {
1288 return Err(ErrorCode::NotSupported(
1289 "CDC table without primary key constraint is not supported".to_owned(),
1290 "Please define a primary key".to_owned(),
1291 )
1292 .into());
1293 }
1294
1295 if append_only {
1296 return Err(ErrorCode::NotSupported(
1297 "append only modifier on the table created from a CDC source".into(),
1298 "Remove the APPEND ONLY clause".into(),
1299 )
1300 .into());
1301 }
1302
1303 if !source_watermarks.is_empty() {
1304 return Err(ErrorCode::NotSupported(
1305 "watermark defined on the table created from a CDC source".into(),
1306 "Remove the Watermark definitions".into(),
1307 )
1308 .into());
1309 }
1310
1311 Ok(())
1312}
1313
1314async fn bind_cdc_table_schema_externally(
1316 cdc_with_options: WithOptionsSecResolved,
1317) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1318 let (options, secret_refs) = cdc_with_options.into_parts();
1320 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1321 .context("failed to extract external table config")?;
1322
1323 let table = ExternalTableImpl::connect(config)
1324 .await
1325 .context("failed to auto derive table schema")?;
1326 Ok((
1327 table
1328 .column_descs()
1329 .iter()
1330 .cloned()
1331 .map(|column_desc| ColumnCatalog {
1332 column_desc,
1333 is_hidden: false,
1334 })
1335 .collect(),
1336 table.pk_names().clone(),
1337 ))
1338}
1339
1340fn bind_cdc_table_schema(
1342 column_defs: &Vec<ColumnDef>,
1343 constraints: &Vec<TableConstraint>,
1344 is_for_replace_plan: bool,
1345) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1346 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1347
1348 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1349 Ok((columns, pk_names))
1350}
1351
1352#[allow(clippy::too_many_arguments)]
1353pub async fn handle_create_table(
1354 mut handler_args: HandlerArgs,
1355 table_name: ObjectName,
1356 column_defs: Vec<ColumnDef>,
1357 wildcard_idx: Option<usize>,
1358 constraints: Vec<TableConstraint>,
1359 if_not_exists: bool,
1360 format_encode: Option<FormatEncodeOptions>,
1361 source_watermarks: Vec<SourceWatermark>,
1362 append_only: bool,
1363 on_conflict: Option<OnConflict>,
1364 with_version_column: Option<String>,
1365 cdc_table_info: Option<CdcTableInfo>,
1366 include_column_options: IncludeOption,
1367 webhook_info: Option<WebhookSourceInfo>,
1368 ast_engine: risingwave_sqlparser::ast::Engine,
1369) -> Result<RwPgResponse> {
1370 let session = handler_args.session.clone();
1371
1372 if append_only {
1373 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1374 }
1375
1376 session.check_cluster_limits().await?;
1377
1378 let engine = match ast_engine {
1379 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1380 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1381 };
1382 if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1383 if let Some(_rate_limit) = handler_args.with_options.insert(
1386 OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1387 "0".to_owned(),
1388 ) {
1389 return Err(ErrorCode::NotSupported(
1391 "source_rate_limit for iceberg table engine during table creation".to_owned(),
1392 "Please remove source_rate_limit from WITH options.".to_owned(),
1393 )
1394 .into());
1395 }
1396 }
1397
1398 if let Either::Right(resp) = session.check_relation_name_duplicated(
1399 table_name.clone(),
1400 StatementType::CREATE_TABLE,
1401 if_not_exists,
1402 )? {
1403 return Ok(resp);
1404 }
1405
1406 let (graph, source, hummock_table, job_type) = {
1407 let (plan, source, table, job_type) = handle_create_table_plan(
1408 handler_args.clone(),
1409 ExplainOptions::default(),
1410 format_encode,
1411 cdc_table_info,
1412 table_name.clone(),
1413 column_defs.clone(),
1414 wildcard_idx,
1415 constraints.clone(),
1416 source_watermarks,
1417 append_only,
1418 on_conflict,
1419 with_version_column,
1420 include_column_options,
1421 webhook_info,
1422 engine,
1423 )
1424 .await?;
1425 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1426
1427 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1428
1429 (graph, source, table, job_type)
1430 };
1431
1432 tracing::trace!(
1433 "name={}, graph=\n{}",
1434 table_name,
1435 serde_json::to_string_pretty(&graph).unwrap()
1436 );
1437
1438 match engine {
1440 Engine::Hummock => {
1441 let catalog_writer = session.catalog_writer()?;
1442 catalog_writer
1443 .create_table(
1444 source.map(|s| s.to_prost()),
1445 hummock_table.to_prost(),
1446 graph,
1447 job_type,
1448 if_not_exists,
1449 )
1450 .await?;
1451 }
1452 Engine::Iceberg => {
1453 create_iceberg_engine_table(
1454 session,
1455 handler_args,
1456 source.map(|s| s.to_prost()),
1457 hummock_table,
1458 graph,
1459 table_name,
1460 job_type,
1461 if_not_exists,
1462 )
1463 .await?;
1464 }
1465 }
1466
1467 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1468}
1469
1470#[allow(clippy::too_many_arguments)]
1479pub async fn create_iceberg_engine_table(
1480 session: Arc<SessionImpl>,
1481 handler_args: HandlerArgs,
1482 mut source: Option<PbSource>,
1483 table: TableCatalog,
1484 graph: StreamFragmentGraph,
1485 table_name: ObjectName,
1486 job_type: PbTableJobType,
1487 if_not_exists: bool,
1488) -> Result<()> {
1489 let meta_client = session.env().meta_client();
1490 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1491
1492 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1493 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1494 })?;
1495 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1496 let meta_store_user = meta_store_endpoint.username().to_owned();
1497 let meta_store_password = match meta_store_endpoint.password() {
1498 Some(password) => percent_decode_str(password)
1499 .decode_utf8()
1500 .map_err(|_| {
1501 ErrorCode::InternalError(
1502 "failed to parse password from meta store endpoint".to_owned(),
1503 )
1504 })?
1505 .into_owned(),
1506 None => "".to_owned(),
1507 };
1508 let meta_store_host = meta_store_endpoint
1509 .host_str()
1510 .ok_or_else(|| {
1511 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1512 })?
1513 .to_owned();
1514 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1515 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1516 })?;
1517 let meta_store_database = meta_store_endpoint
1518 .path()
1519 .trim_start_matches('/')
1520 .to_owned();
1521
1522 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1523 bail!("failed to parse meta backend: {}", meta_store_backend);
1524 };
1525
1526 let catalog_uri = match meta_backend {
1527 MetaBackend::Postgres => {
1528 format!(
1529 "jdbc:postgresql://{}:{}/{}",
1530 meta_store_host.clone(),
1531 meta_store_port.clone(),
1532 meta_store_database.clone()
1533 )
1534 }
1535 MetaBackend::Mysql => {
1536 format!(
1537 "jdbc:mysql://{}:{}/{}",
1538 meta_store_host.clone(),
1539 meta_store_port.clone(),
1540 meta_store_database.clone()
1541 )
1542 }
1543 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1544 bail!(
1545 "Unsupported meta backend for iceberg engine table: {}",
1546 meta_store_backend
1547 );
1548 }
1549 };
1550
1551 let rw_db_name = session
1552 .env()
1553 .catalog_reader()
1554 .read_guard()
1555 .get_database_by_id(&table.database_id)?
1556 .name()
1557 .to_owned();
1558 let rw_schema_name = session
1559 .env()
1560 .catalog_reader()
1561 .read_guard()
1562 .get_schema_by_id(&table.database_id, &table.schema_id)?
1563 .name()
1564 .clone();
1565 let iceberg_catalog_name = rw_db_name.clone();
1566 let iceberg_database_name = rw_schema_name.clone();
1567 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1568
1569 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1570 let sink_decouple = session.config().sink_decouple();
1571 if matches!(sink_decouple, SinkDecouple::Disable) {
1572 bail!(
1573 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1574 );
1575 }
1576
1577 let mut connection_ref = BTreeMap::new();
1578 let with_common = if iceberg_engine_connection.is_empty() {
1579 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1580 } else {
1581 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1582 assert_eq!(parts.len(), 2);
1583 let connection_catalog =
1584 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1585 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1586 if params.connection_type == ConnectionType::Iceberg as i32 {
1587 connection_ref.insert(
1589 "connection".to_owned(),
1590 ConnectionRefValue {
1591 connection_name: ObjectName::from(vec![
1592 Ident::from(parts[0]),
1593 Ident::from(parts[1]),
1594 ]),
1595 },
1596 );
1597
1598 let mut with_common = BTreeMap::new();
1599 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1600 with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1601 with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1602
1603 if let Some(s) = params.properties.get("hosted_catalog") {
1604 if s.eq_ignore_ascii_case("true") {
1605 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1606 with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1607 with_common
1608 .insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1609 with_common.insert(
1610 "catalog.jdbc.password".to_owned(),
1611 meta_store_password.clone(),
1612 );
1613 with_common
1614 .insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1615 }
1616 }
1617
1618 with_common
1619 } else {
1620 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1621 "Only iceberg connection could be used in iceberg engine".to_owned(),
1622 )));
1623 }
1624 } else {
1625 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1626 "Private Link Service has been deprecated. Please create a new connection instead."
1627 .to_owned(),
1628 )));
1629 }
1630 };
1631
1632 let mut pks = table
1635 .pk_column_names()
1636 .iter()
1637 .map(|c| c.to_string())
1638 .collect::<Vec<String>>();
1639
1640 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1642 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1643 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1644 "select {} as {}, * from {}",
1645 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1646 ))
1647 .context("unable to parse query")?
1648 .try_into()
1649 .unwrap();
1650
1651 let Statement::Query(query) = &stmt else {
1652 panic!("unexpected statement: {:?}", stmt);
1653 };
1654 CreateSink::AsQuery(query.clone())
1655 } else {
1656 CreateSink::From(table_name.clone())
1657 };
1658
1659 let mut sink_name = table_name.clone();
1660 *sink_name.0.last_mut().unwrap() = Ident::from(
1661 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1662 );
1663 let create_sink_stmt = CreateSinkStatement {
1664 if_not_exists: false,
1665 sink_name,
1666 with_properties: WithProperties(vec![]),
1667 sink_from,
1668 columns: vec![],
1669 emit_mode: None,
1670 sink_schema: None,
1671 into_table_name: None,
1672 };
1673
1674 let mut sink_handler_args = handler_args.clone();
1675
1676 let mut sink_with = with_common.clone();
1677
1678 sink_with.insert("primary_key".to_owned(), pks.join(","));
1679 sink_with.insert("type".to_owned(), "upsert".to_owned());
1680 let commit_checkpoint_interval = handler_args
1699 .with_options
1700 .get(COMMIT_CHECKPOINT_INTERVAL)
1701 .map(|v| v.to_owned())
1702 .unwrap_or_else(|| "60".to_owned());
1703 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1704 ErrorCode::InvalidInputSyntax(format!(
1705 "commit_checkpoint_interval must be a positive integer: {}",
1706 commit_checkpoint_interval
1707 ))
1708 })?;
1709
1710 if commit_checkpoint_interval == 0 {
1711 bail!("commit_checkpoint_interval must be a positive integer: 0");
1712 }
1713
1714 source
1716 .as_mut()
1717 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1718
1719 let sink_decouple = session.config().sink_decouple();
1720 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1721 bail!(
1722 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1723 )
1724 }
1725
1726 sink_with.insert(
1727 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1728 commit_checkpoint_interval.to_string(),
1729 );
1730 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1731
1732 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1733
1734 const ENABLE_COMPACTION: &str = "enable_compaction";
1735 const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
1736 const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
1737
1738 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1739 match enable_compaction.to_lowercase().as_str() {
1740 "true" => {
1741 risingwave_common::license::Feature::IcebergCompaction
1742 .check_available()
1743 .map_err(|e| anyhow::anyhow!(e))?;
1744
1745 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1746 }
1747 "false" => {
1748 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1749 }
1750 _ => {
1751 return Err(ErrorCode::InvalidInputSyntax(format!(
1752 "enable_compaction must be true or false: {}",
1753 enable_compaction
1754 ))
1755 .into());
1756 }
1757 }
1758
1759 source
1761 .as_mut()
1762 .map(|x| x.with_properties.remove("enable_compaction"));
1763 } else {
1764 sink_with.insert(
1765 ENABLE_COMPACTION.to_owned(),
1766 risingwave_common::license::Feature::IcebergCompaction
1767 .check_available()
1768 .is_ok()
1769 .to_string(),
1770 );
1771 }
1772
1773 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1774 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1775 ErrorCode::InvalidInputSyntax(format!(
1776 "compaction_interval_sec must be a positive integer: {}",
1777 commit_checkpoint_interval
1778 ))
1779 })?;
1780 if compaction_interval_sec == 0 {
1781 bail!("compaction_interval_sec must be a positive integer: 0");
1782 }
1783 sink_with.insert(
1784 "compaction_interval_sec".to_owned(),
1785 compaction_interval_sec.to_string(),
1786 );
1787 source
1789 .as_mut()
1790 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1791 }
1792
1793 if let Some(enable_snapshot_expiration) =
1794 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1795 {
1796 match enable_snapshot_expiration.to_lowercase().as_str() {
1797 "true" => {
1798 risingwave_common::license::Feature::IcebergCompaction
1799 .check_available()
1800 .map_err(|e| anyhow::anyhow!(e))?;
1801 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1802 }
1803 "false" => {
1804 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1805 }
1806 _ => {
1807 return Err(ErrorCode::InvalidInputSyntax(format!(
1808 "enable_snapshot_expiration must be true or false: {}",
1809 enable_snapshot_expiration
1810 ))
1811 .into());
1812 }
1813 }
1814
1815 source
1817 .as_mut()
1818 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1819 } else {
1820 sink_with.insert(
1821 ENABLE_SNAPSHOT_EXPIRATION.to_owned(),
1822 risingwave_common::license::Feature::IcebergCompaction
1823 .check_available()
1824 .is_ok()
1825 .to_string(),
1826 );
1827 }
1828
1829 let partition_by = handler_args
1830 .with_options
1831 .get("partition_by")
1832 .map(|v| v.to_owned());
1833
1834 if let Some(partition_by) = &partition_by {
1835 let mut partition_columns = vec![];
1836 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1837 table
1838 .columns()
1839 .iter()
1840 .find(|col| col.name().eq_ignore_ascii_case(&column))
1841 .ok_or_else(|| {
1842 ErrorCode::InvalidInputSyntax(format!(
1843 "Partition source column does not exist in schema: {}",
1844 column
1845 ))
1846 })?;
1847
1848 partition_columns.push(column.to_owned());
1849 }
1850
1851 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1852 |_| {
1853 ErrorCode::InvalidInputSyntax(
1854 "The partition columns should be the prefix of the primary key".to_owned(),
1855 )
1856 },
1857 )?;
1858
1859 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1860
1861 source
1863 .as_mut()
1864 .map(|x| x.with_properties.remove("partition_by"));
1865 }
1866
1867 sink_handler_args.with_options =
1868 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1869
1870 let mut source_name = table_name.clone();
1871 *source_name.0.last_mut().unwrap() = Ident::from(
1872 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1873 );
1874 let create_source_stmt = CreateSourceStatement {
1875 temporary: false,
1876 if_not_exists: false,
1877 columns: vec![],
1878 source_name,
1879 wildcard_idx: Some(0),
1880 constraints: vec![],
1881 with_properties: WithProperties(vec![]),
1882 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1883 source_watermarks: vec![],
1884 include_column_options: vec![],
1885 };
1886
1887 let mut source_handler_args = handler_args.clone();
1888 let source_with = with_common;
1889 source_handler_args.with_options =
1890 WithOptions::new(source_with, Default::default(), connection_ref);
1891
1892 let _ = JVM.get_or_init()?;
1895
1896 let catalog_writer = session.catalog_writer()?;
1897 let has_connector = source.is_some();
1899 catalog_writer
1900 .create_table(source, table.to_prost(), graph, job_type, if_not_exists)
1901 .await?;
1902 let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1903 if res.is_err() {
1904 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1906 res?;
1907 }
1908 let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1909 if res.is_err() {
1910 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1912 res?;
1913 }
1914
1915 if has_connector {
1916 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1917 handler_args,
1918 PbThrottleTarget::TableWithSource,
1919 table_name,
1920 -1,
1921 )
1922 .await?;
1923 }
1924
1925 Ok(())
1926}
1927
1928pub fn check_create_table_with_source(
1929 with_options: &WithOptions,
1930 format_encode: Option<FormatEncodeOptions>,
1931 include_column_options: &IncludeOption,
1932 cdc_table_info: &Option<CdcTableInfo>,
1933) -> Result<Option<FormatEncodeOptions>> {
1934 if cdc_table_info.is_some() {
1936 return Ok(format_encode);
1937 }
1938 let defined_source = with_options.is_source_connector();
1939
1940 if !include_column_options.is_empty() && !defined_source {
1941 return Err(ErrorCode::InvalidInputSyntax(
1942 "INCLUDE should be used with a connector".to_owned(),
1943 )
1944 .into());
1945 }
1946 if defined_source {
1947 format_encode.as_ref().ok_or_else(|| {
1948 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1949 })?;
1950 }
1951 Ok(format_encode)
1952}
1953
1954fn ensure_partition_columns_are_prefix_of_primary_key(
1955 partition_columns: &[String],
1956 primary_key_columns: &[String],
1957) -> std::result::Result<(), String> {
1958 if partition_columns.len() > primary_key_columns.len() {
1959 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
1960 }
1961
1962 for (i, partition_col) in partition_columns.iter().enumerate() {
1963 if primary_key_columns.get(i) != Some(partition_col) {
1964 return Err(format!(
1965 "Partition column '{}' is not a prefix of the primary key.",
1966 partition_col
1967 ));
1968 }
1969 }
1970
1971 Ok(())
1972}
1973
1974#[allow(clippy::too_many_arguments)]
1975pub async fn generate_stream_graph_for_replace_table(
1976 _session: &Arc<SessionImpl>,
1977 table_name: ObjectName,
1978 original_catalog: &Arc<TableCatalog>,
1979 handler_args: HandlerArgs,
1980 statement: Statement,
1981 col_id_gen: ColumnIdGenerator,
1982 sql_column_strategy: SqlColumnStrategy,
1983) -> Result<(
1984 StreamFragmentGraph,
1985 TableCatalog,
1986 Option<SourceCatalog>,
1987 TableJobType,
1988)> {
1989 let Statement::CreateTable {
1990 columns,
1991 constraints,
1992 source_watermarks,
1993 append_only,
1994 on_conflict,
1995 with_version_column,
1996 wildcard_idx,
1997 cdc_table_info,
1998 format_encode,
1999 include_column_options,
2000 engine,
2001 with_options,
2002 ..
2003 } = statement
2004 else {
2005 panic!("unexpected statement type: {:?}", statement);
2006 };
2007
2008 let format_encode = format_encode
2009 .clone()
2010 .map(|format_encode| format_encode.into_v2_with_warning());
2011
2012 let engine = match engine {
2013 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2014 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2015 };
2016
2017 let is_drop_connector =
2018 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2019 if is_drop_connector {
2020 debug_assert!(
2021 source_watermarks.is_empty()
2022 && include_column_options.is_empty()
2023 && with_options
2024 .iter()
2025 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2026 );
2027 }
2028
2029 let props = CreateTableProps {
2030 definition: handler_args.normalized_sql.clone(),
2031 append_only,
2032 on_conflict: on_conflict.into(),
2033 with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
2034 webhook_info: original_catalog.webhook_info.clone(),
2035 engine,
2036 };
2037
2038 let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2039 (Some(format_encode), None) => (
2040 gen_create_table_plan_with_source(
2041 handler_args,
2042 ExplainOptions::default(),
2043 table_name,
2044 columns,
2045 wildcard_idx,
2046 constraints,
2047 format_encode,
2048 source_watermarks,
2049 col_id_gen,
2050 include_column_options,
2051 props,
2052 sql_column_strategy,
2053 )
2054 .await?,
2055 TableJobType::General,
2056 ),
2057 (None, None) => {
2058 let context = OptimizerContext::from_handler_args(handler_args);
2059 let (plan, table) = gen_create_table_plan(
2060 context,
2061 table_name,
2062 columns,
2063 constraints,
2064 col_id_gen,
2065 source_watermarks,
2066 props,
2067 true,
2068 )?;
2069 ((plan, None, table), TableJobType::General)
2070 }
2071 (None, Some(cdc_table)) => {
2072 let session = &handler_args.session;
2073 let (source, resolved_table_name, database_id, schema_id) =
2074 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2075
2076 let cdc_with_options = derive_with_options_for_cdc_table(
2077 &source.with_properties,
2078 cdc_table.external_table_name.clone(),
2079 )?;
2080
2081 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2082
2083 let context: OptimizerContextRef =
2084 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2085 let (plan, table) = gen_create_table_plan_for_cdc_table(
2086 context,
2087 source,
2088 cdc_table.external_table_name.clone(),
2089 columns,
2090 column_catalogs,
2091 pk_names,
2092 cdc_with_options,
2093 col_id_gen,
2094 on_conflict,
2095 with_version_column.map(|x| x.real_value()),
2096 IncludeOption::default(),
2097 table_name,
2098 resolved_table_name,
2099 database_id,
2100 schema_id,
2101 original_catalog.id(),
2102 engine,
2103 )?;
2104
2105 ((plan, None, table), TableJobType::SharedCdcSource)
2106 }
2107 (Some(_), Some(_)) => {
2108 return Err(ErrorCode::NotSupported(
2109 "Data format and encoding format doesn't apply to table created from a CDC source"
2110 .into(),
2111 "Remove the FORMAT and ENCODE specification".into(),
2112 )
2113 .into());
2114 }
2115 };
2116
2117 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2118 Err(ErrorCode::InvalidInputSyntax(
2119 "alter primary key of table is not supported".to_owned(),
2120 ))?
2121 }
2122
2123 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2124
2125 let mut table = TableCatalog {
2127 id: original_catalog.id(),
2128 ..table
2129 };
2130 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2131 table.associated_source_id = Some(source_id);
2132
2133 let source = source.as_mut().unwrap();
2134 source.id = source_id.table_id;
2135 source.associated_table_id = Some(table.id());
2136 }
2137
2138 Ok((graph, table, source, job_type))
2139}
2140
2141fn get_source_and_resolved_table_name(
2142 session: &Arc<SessionImpl>,
2143 cdc_table: CdcTableInfo,
2144 table_name: ObjectName,
2145) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2146 let db_name = &session.database();
2147 let (schema_name, resolved_table_name) =
2148 Binder::resolve_schema_qualified_name(db_name, table_name)?;
2149 let (database_id, schema_id) =
2150 session.get_database_and_schema_id_for_create(schema_name.clone())?;
2151
2152 let (format_encode, source_name) =
2153 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2154
2155 let source = {
2156 let catalog_reader = session.env().catalog_reader().read_guard();
2157 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2158 let (source, _) = catalog_reader.get_source_by_name(
2159 db_name,
2160 SchemaPath::Name(schema_name.as_str()),
2161 source_name.as_str(),
2162 )?;
2163 source.clone()
2164 };
2165
2166 Ok((source, resolved_table_name, database_id, schema_id))
2167}
2168
2169fn bind_webhook_info(
2171 session: &Arc<SessionImpl>,
2172 columns_defs: &[ColumnDef],
2173 webhook_info: WebhookSourceInfo,
2174) -> Result<PbWebhookSourceInfo> {
2175 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2177 {
2178 return Err(ErrorCode::InvalidInputSyntax(
2179 "Table with webhook source should have exactly one JSONB column".to_owned(),
2180 )
2181 .into());
2182 }
2183
2184 let WebhookSourceInfo {
2185 secret_ref,
2186 signature_expr,
2187 wait_for_persistence,
2188 is_batched,
2189 } = webhook_info;
2190
2191 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2193 let db_name = &session.database();
2194 let (schema_name, secret_name) =
2195 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2196 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2197 (
2198 Some(PbSecretRef {
2199 secret_id: secret_catalog.id.secret_id(),
2200 ref_as: match secret_ref.ref_as {
2201 SecretRefAsType::Text => PbRefAsType::Text,
2202 SecretRefAsType::File => PbRefAsType::File,
2203 }
2204 .into(),
2205 }),
2206 Some(secret_name),
2207 )
2208 } else {
2209 (None, None)
2210 };
2211
2212 let secure_compare_context = SecureCompareContext {
2213 column_name: columns_defs[0].name.real_value(),
2214 secret_name,
2215 };
2216 let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2217 let expr = binder.bind_expr(signature_expr.clone())?;
2218
2219 if expr.as_function_call().is_none()
2221 || expr.as_function_call().unwrap().func_type()
2222 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2223 {
2224 return Err(ErrorCode::InvalidInputSyntax(
2225 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2226 )
2227 .into());
2228 }
2229
2230 let pb_webhook_info = PbWebhookSourceInfo {
2231 secret_ref: pb_secret_ref,
2232 signature_expr: Some(expr.to_expr_proto()),
2233 wait_for_persistence,
2234 is_batched,
2235 };
2236
2237 Ok(pb_webhook_info)
2238}
2239
2240#[cfg(test)]
2241mod tests {
2242 use risingwave_common::catalog::{
2243 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2244 };
2245 use risingwave_common::types::{DataType, StructType};
2246
2247 use super::*;
2248 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2249
2250 #[tokio::test]
2251 async fn test_create_table_handler() {
2252 let sql =
2253 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2254 let frontend = LocalFrontend::new(Default::default()).await;
2255 frontend.run_sql(sql).await.unwrap();
2256
2257 let session = frontend.session_ref();
2258 let catalog_reader = session.env().catalog_reader().read_guard();
2259 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2260
2261 let (table, _) = catalog_reader
2263 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2264 .unwrap();
2265 assert_eq!(table.name(), "t");
2266
2267 let columns = table
2268 .columns
2269 .iter()
2270 .map(|col| (col.name(), col.data_type().clone()))
2271 .collect::<HashMap<&str, DataType>>();
2272
2273 let expected_columns = maplit::hashmap! {
2274 ROW_ID_COLUMN_NAME => DataType::Serial,
2275 "v1" => DataType::Int16,
2276 "v2" => StructType::new(
2277 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2278 )
2279 .with_ids([3, 4, 5].map(ColumnId::new))
2280 .into(),
2281 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2282 };
2283
2284 assert_eq!(columns, expected_columns, "{columns:#?}");
2285 }
2286
2287 #[test]
2288 fn test_bind_primary_key() {
2289 for (sql, expected) in [
2292 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2293 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2294 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2295 (
2296 "create table t (v1 int primary key, v2 int primary key)",
2297 Err("multiple primary keys are not allowed"),
2298 ),
2299 (
2300 "create table t (v1 int primary key primary key, v2 int)",
2301 Err("multiple primary keys are not allowed"),
2302 ),
2303 (
2304 "create table t (v1 int, v2 int, primary key (v1))",
2305 Ok(&[1]),
2306 ),
2307 (
2308 "create table t (v1 int, primary key (v2), v2 int)",
2309 Ok(&[2]),
2310 ),
2311 (
2312 "create table t (primary key (v2, v1), v1 int, v2 int)",
2313 Ok(&[2, 1]),
2314 ),
2315 (
2316 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2317 Err("multiple primary keys are not allowed"),
2318 ),
2319 (
2320 "create table t (v1 int primary key, primary key (v1), v2 int)",
2321 Err("multiple primary keys are not allowed"),
2322 ),
2323 (
2324 "create table t (v1 int, primary key (V3), v2 int)",
2325 Err("column \"v3\" named in key does not exist"),
2326 ),
2327 ] {
2328 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2329 let risingwave_sqlparser::ast::Statement::CreateTable {
2330 columns: column_defs,
2331 constraints,
2332 ..
2333 } = ast.remove(0)
2334 else {
2335 panic!("test case should be create table")
2336 };
2337 let actual: Result<_> = (|| {
2338 let mut columns = bind_sql_columns(&column_defs, false)?;
2339 let mut col_id_gen = ColumnIdGenerator::new_initial();
2340 for c in &mut columns {
2341 col_id_gen.generate(c)?;
2342 }
2343
2344 let pk_names =
2345 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2346 let (_, pk_column_ids, _) =
2347 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2348 Ok(pk_column_ids)
2349 })();
2350 match (expected, actual) {
2351 (Ok(expected), Ok(actual)) => assert_eq!(
2352 expected.iter().copied().map(ColumnId::new).collect_vec(),
2353 actual,
2354 "sql: {sql}"
2355 ),
2356 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2357 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2358 (Err(expected), Err(actual)) => assert!(
2359 actual.to_string().contains(expected),
2360 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2361 ),
2362 }
2363 }
2364 }
2365
2366 #[tokio::test]
2367 async fn test_duplicate_props_options() {
2368 let proto_file = create_proto_file(PROTO_FILE_DATA);
2369 let sql = format!(
2370 r#"CREATE TABLE t
2371 WITH (
2372 connector = 'kinesis',
2373 aws.region='user_test_topic',
2374 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2375 aws.credentials.access_key_id = 'your_access_key_1',
2376 aws.credentials.secret_access_key = 'your_secret_key_1'
2377 )
2378 FORMAT PLAIN ENCODE PROTOBUF (
2379 message = '.test.TestRecord',
2380 aws.credentials.access_key_id = 'your_access_key_2',
2381 aws.credentials.secret_access_key = 'your_secret_key_2',
2382 schema.location = 'file://{}',
2383 )"#,
2384 proto_file.path().to_str().unwrap()
2385 );
2386 let frontend = LocalFrontend::new(Default::default()).await;
2387 frontend.run_sql(sql).await.unwrap();
2388
2389 let session = frontend.session_ref();
2390 let catalog_reader = session.env().catalog_reader().read_guard();
2391 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2392
2393 let (source, _) = catalog_reader
2395 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2396 .unwrap();
2397 assert_eq!(source.name, "t");
2398
2399 assert_eq!(
2401 source
2402 .info
2403 .format_encode_options
2404 .get("aws.credentials.access_key_id")
2405 .unwrap(),
2406 "your_access_key_2"
2407 );
2408 assert_eq!(
2409 source
2410 .info
2411 .format_encode_options
2412 .get("aws.credentials.secret_access_key")
2413 .unwrap(),
2414 "your_secret_key_2"
2415 );
2416
2417 assert_eq!(
2419 source
2420 .with_properties
2421 .get("aws.credentials.access_key_id")
2422 .unwrap(),
2423 "your_access_key_1"
2424 );
2425 assert_eq!(
2426 source
2427 .with_properties
2428 .get("aws.credentials.secret_access_key")
2429 .unwrap(),
2430 "your_secret_key_1"
2431 );
2432
2433 assert!(!source.with_properties.contains_key("schema.location"));
2435 }
2436}