1use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pgwire::pg_response::{PgResponse, StatementType};
25use prost::Message as _;
26use risingwave_common::catalog::{
27 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
28 RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
29};
30use risingwave_common::config::MetaBackend;
31use risingwave_common::license::Feature;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
34use risingwave_common::util::value_encoding::DatumToProtoExt;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::jvm_runtime::JVM;
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40 DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
41};
42use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
43use risingwave_pb::catalog::connection::Info as ConnectionInfo;
44use risingwave_pb::catalog::connection_params::ConnectionType;
45use risingwave_pb::catalog::source::OptionalAssociatedTableId;
46use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
66use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77 bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::SourceSchemaCompatExt;
80use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
81use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
82use crate::optimizer::property::{Order, RequiredDist};
83use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
84use crate::session::SessionImpl;
85use crate::session::current::notice_to_user;
86use crate::stream_fragmenter::{GraphJobType, build_graph};
87use crate::utils::OverwriteOptions;
88use crate::{Binder, Explain, TableCatalog, WithOptions};
89
90mod col_id_gen;
91pub use col_id_gen::*;
92use risingwave_connector::sink::iceberg::parse_partition_by_exprs;
93
94use crate::handler::drop_table::handle_drop_table;
95
96fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
97 for option_def in &c.options {
98 match option_def.option {
99 ColumnOption::GeneratedColumns(_) => {}
100 ColumnOption::DefaultValue(_) => {}
101 ColumnOption::DefaultValueInternal { .. } => {}
102 ColumnOption::Unique { is_primary: true } => {}
103 ColumnOption::Null => {}
104 ColumnOption::NotNull => {}
105 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
106 }
107 }
108 Ok(())
109}
110
111pub fn bind_sql_columns(
115 column_defs: &[ColumnDef],
116 is_for_drop_table_connector: bool,
117) -> Result<Vec<ColumnCatalog>> {
118 let mut columns = Vec::with_capacity(column_defs.len());
119
120 for column in column_defs {
121 ensure_column_options_supported(column)?;
122 let ColumnDef {
126 name,
127 data_type,
128 collation,
129 options,
130 ..
131 } = column;
132
133 let data_type = data_type
134 .clone()
135 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
136 if let Some(collation) = collation {
137 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
143 bail_not_implemented!(
144 "Collate collation other than `C` or `POSIX` is not implemented"
145 );
146 }
147
148 match data_type {
149 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
150 _ => {
151 return Err(ErrorCode::NotSupported(
152 format!("{} is not a collatable data type", data_type),
153 "The only built-in collatable data types are `varchar`, please check your type".into(),
154 ).into());
155 }
156 }
157 }
158
159 if !is_for_drop_table_connector {
160 check_column_name_not_reserved(&name.real_value())?;
164 }
165
166 let nullable: bool = !options
167 .iter()
168 .any(|def| matches!(def.option, ColumnOption::NotNull));
169
170 columns.push(ColumnCatalog {
171 column_desc: ColumnDesc {
172 data_type: bind_data_type(&data_type)?,
173 column_id: ColumnId::placeholder(),
174 name: name.real_value(),
175 generated_or_default_column: None,
176 description: None,
177 additional_column: AdditionalColumn { column_type: None },
178 version: ColumnDescVersion::LATEST,
179 system_column: None,
180 nullable,
181 },
182 is_hidden: false,
183 });
184 }
185
186 Ok(columns)
187}
188
189fn check_generated_column_constraints(
190 column_name: &String,
191 column_id: ColumnId,
192 expr: &ExprImpl,
193 column_catalogs: &[ColumnCatalog],
194 generated_column_names: &[String],
195 pk_column_ids: &[ColumnId],
196) -> Result<()> {
197 let input_refs = expr.collect_input_refs(column_catalogs.len());
198 for idx in input_refs.ones() {
199 let referred_generated_column = &column_catalogs[idx].column_desc.name;
200 if generated_column_names
201 .iter()
202 .any(|c| c == referred_generated_column)
203 {
204 return Err(ErrorCode::BindError(format!(
205 "Generated can not reference another generated column. \
206 But here generated column \"{}\" referenced another generated column \"{}\"",
207 column_name, referred_generated_column
208 ))
209 .into());
210 }
211 }
212
213 if pk_column_ids.contains(&column_id) && expr.is_impure() {
214 return Err(ErrorCode::BindError(format!(
215 "Generated columns with impure expressions should not be part of the primary key. \
216 Here column \"{}\" is defined as part of the primary key.",
217 column_name
218 ))
219 .into());
220 }
221
222 Ok(())
223}
224
225pub fn bind_sql_column_constraints(
228 session: &SessionImpl,
229 table_name: String,
230 column_catalogs: &mut [ColumnCatalog],
231 columns: Vec<ColumnDef>,
232 pk_column_ids: &[ColumnId],
233) -> Result<()> {
234 let generated_column_names = {
235 let mut names = vec![];
236 for column in &columns {
237 for option_def in &column.options {
238 if let ColumnOption::GeneratedColumns(_) = option_def.option {
239 names.push(column.name.real_value());
240 break;
241 }
242 }
243 }
244 names
245 };
246
247 let mut binder = Binder::new_for_ddl(session);
248 binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
249
250 for column in columns {
251 let Some(idx) = column_catalogs
252 .iter()
253 .position(|c| c.name() == column.name.real_value())
254 else {
255 continue;
258 };
259
260 for option_def in column.options {
261 match option_def.option {
262 ColumnOption::GeneratedColumns(expr) => {
263 binder.set_clause(Some(Clause::GeneratedColumn));
264
265 let expr_impl = binder.bind_expr(expr).with_context(|| {
266 format!(
267 "fail to bind expression in generated column \"{}\"",
268 column.name.real_value()
269 )
270 })?;
271
272 check_generated_column_constraints(
273 &column.name.real_value(),
274 column_catalogs[idx].column_id(),
275 &expr_impl,
276 column_catalogs,
277 &generated_column_names,
278 pk_column_ids,
279 )?;
280
281 column_catalogs[idx].column_desc.generated_or_default_column = Some(
282 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
283 expr: Some(expr_impl.to_expr_proto()),
284 }),
285 );
286 binder.set_clause(None);
287 }
288 ColumnOption::DefaultValue(expr) => {
289 let expr_impl = binder
290 .bind_expr(expr)?
291 .cast_assign(column_catalogs[idx].data_type().clone())?;
292
293 let rewritten_expr_impl = session
301 .pinned_snapshot()
302 .inline_now_proc_time()
303 .rewrite_expr(expr_impl.clone());
304
305 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
306 let snapshot_value = snapshot_value?;
307
308 column_catalogs[idx].column_desc.generated_or_default_column =
309 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
310 snapshot_value: Some(snapshot_value.to_protobuf()),
311 expr: Some(expr_impl.to_expr_proto()),
312 }));
314 } else {
315 return Err(ErrorCode::BindError(format!(
316 "Default expression used in column `{}` cannot be evaluated. \
317 Use generated columns instead if you mean to reference other columns.",
318 column.name
319 ))
320 .into());
321 }
322 }
323 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
324 if persisted.is_empty() {
329 bail_bind_error!(
330 "DEFAULT INTERNAL is only used for internal purposes, \
331 please specify a concrete default value"
332 );
333 }
334
335 let desc = DefaultColumnDesc::decode(&*persisted)
336 .expect("failed to decode persisted `DefaultColumnDesc`");
337
338 column_catalogs[idx].column_desc.generated_or_default_column =
339 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
340 }
341 _ => {}
342 }
343 }
344 }
345 Ok(())
346}
347
348pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
350 let mut pk_column_names = vec![];
351
352 for constraint in table_constraints {
353 match constraint {
354 TableConstraint::Unique {
355 name: _,
356 columns,
357 is_primary: true,
358 } => {
359 if !pk_column_names.is_empty() {
360 return Err(multiple_pk_definition_err());
361 }
362 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
363 }
364 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
365 }
366 }
367 Ok(pk_column_names)
368}
369
370pub fn bind_sql_pk_names(
371 columns_defs: &[ColumnDef],
372 pk_names_from_table_constraints: Vec<String>,
373) -> Result<Vec<String>> {
374 let mut pk_column_names = pk_names_from_table_constraints;
375
376 for column in columns_defs {
377 for option_def in &column.options {
378 if let ColumnOption::Unique { is_primary: true } = option_def.option {
379 if !pk_column_names.is_empty() {
380 return Err(multiple_pk_definition_err());
381 }
382 pk_column_names.push(column.name.real_value());
383 };
384 }
385 }
386
387 Ok(pk_column_names)
388}
389
390fn multiple_pk_definition_err() -> RwError {
391 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
392}
393
394pub fn bind_pk_and_row_id_on_relation(
399 mut columns: Vec<ColumnCatalog>,
400 pk_names: Vec<String>,
401 must_need_pk: bool,
402) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
403 for c in &columns {
404 assert!(c.column_id() != ColumnId::placeholder());
405 }
406
407 let name_to_id = columns
409 .iter()
410 .map(|c| (c.name(), c.column_id()))
411 .collect::<HashMap<_, _>>();
412
413 let mut pk_column_ids: Vec<_> = pk_names
414 .iter()
415 .map(|name| {
416 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
417 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
418 })
419 })
420 .try_collect()?;
421
422 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
424
425 let row_id_index = need_row_id.then(|| {
426 let column = ColumnCatalog::row_id_column();
427 let index = columns.len();
428 pk_column_ids = vec![column.column_id()];
429 columns.push(column);
430 index
431 });
432
433 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
434 Err(ErrorCode::InvalidInputSyntax(format!(
435 "column \"{col}\" specified more than once"
436 )))?;
437 }
438
439 Ok((columns, pk_column_ids, row_id_index))
440}
441
442#[allow(clippy::too_many_arguments)]
445pub(crate) async fn gen_create_table_plan_with_source(
446 mut handler_args: HandlerArgs,
447 explain_options: ExplainOptions,
448 table_name: ObjectName,
449 column_defs: Vec<ColumnDef>,
450 wildcard_idx: Option<usize>,
451 constraints: Vec<TableConstraint>,
452 format_encode: FormatEncodeOptions,
453 source_watermarks: Vec<SourceWatermark>,
454 mut col_id_gen: ColumnIdGenerator,
455 include_column_options: IncludeOption,
456 props: CreateTableProps,
457 sql_column_strategy: SqlColumnStrategy,
458) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
459 if props.append_only
460 && format_encode.format != Format::Plain
461 && format_encode.format != Format::Native
462 {
463 return Err(ErrorCode::BindError(format!(
464 "Append only table does not support format {}.",
465 format_encode.format
466 ))
467 .into());
468 }
469
470 let session = &handler_args.session;
471 let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
472 if with_properties.is_shareable_cdc_connector() {
473 generated_columns_check_for_cdc_table(&column_defs)?;
474 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
475 } else if column_defs.iter().any(|col| {
476 col.options
477 .iter()
478 .any(|def| matches!(def.option, ColumnOption::NotNull))
479 }) {
480 notice_to_user(
482 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
483 );
484 }
485
486 let db_name: &str = &session.database();
487 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
488
489 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
491 session,
492 &format_encode,
493 Either::Left(&with_properties),
494 CreateSourceType::Table,
495 )
496 .await?;
497
498 let overwrite_options = OverwriteOptions::new(&mut handler_args);
499 let rate_limit = overwrite_options.source_rate_limit;
500 let source_catalog = bind_create_source_or_table_with_connector(
501 handler_args.clone(),
502 table_name,
503 format_encode,
504 with_properties,
505 &column_defs,
506 constraints,
507 wildcard_idx,
508 source_watermarks,
509 columns_from_resolve_source,
510 source_info,
511 include_column_options,
512 &mut col_id_gen,
513 CreateSourceType::Table,
514 rate_limit,
515 sql_column_strategy,
516 )
517 .await?;
518
519 let pb_source = source_catalog.to_prost();
520
521 let context = OptimizerContext::new(handler_args, explain_options);
522
523 let (plan, table) = gen_table_plan_with_source(
524 context.into(),
525 schema_name,
526 source_catalog,
527 col_id_gen.into_version(),
528 props,
529 )?;
530
531 Ok((plan, Some(pb_source), table))
532}
533
534#[allow(clippy::too_many_arguments)]
537pub(crate) fn gen_create_table_plan(
538 context: OptimizerContext,
539 table_name: ObjectName,
540 column_defs: Vec<ColumnDef>,
541 constraints: Vec<TableConstraint>,
542 mut col_id_gen: ColumnIdGenerator,
543 source_watermarks: Vec<SourceWatermark>,
544 props: CreateTableProps,
545 is_for_replace_plan: bool,
546) -> Result<(PlanRef, PbTable)> {
547 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
548 for c in &mut columns {
549 col_id_gen.generate(c)?;
550 }
551
552 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
553 if !secret_refs.is_empty() || !connection_refs.is_empty() {
554 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
555 }
556
557 gen_create_table_plan_without_source(
558 context,
559 table_name,
560 columns,
561 column_defs,
562 constraints,
563 source_watermarks,
564 col_id_gen.into_version(),
565 props,
566 )
567}
568
569#[allow(clippy::too_many_arguments)]
570pub(crate) fn gen_create_table_plan_without_source(
571 context: OptimizerContext,
572 table_name: ObjectName,
573 columns: Vec<ColumnCatalog>,
574 column_defs: Vec<ColumnDef>,
575 constraints: Vec<TableConstraint>,
576 source_watermarks: Vec<SourceWatermark>,
577 version: TableVersion,
578 props: CreateTableProps,
579) -> Result<(PlanRef, PbTable)> {
580 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
582 let (mut columns, pk_column_ids, row_id_index) =
583 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
584
585 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
586 context.session_ctx(),
587 table_name.real_value(),
588 source_watermarks,
589 &columns,
590 )?;
591
592 bind_sql_column_constraints(
593 context.session_ctx(),
594 table_name.real_value(),
595 &mut columns,
596 column_defs,
597 &pk_column_ids,
598 )?;
599 let session = context.session_ctx().clone();
600
601 let db_name = &session.database();
602 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
603
604 let info = CreateTableInfo {
605 columns,
606 pk_column_ids,
607 row_id_index,
608 watermark_descs,
609 source_catalog: None,
610 version,
611 };
612
613 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
614}
615
616fn gen_table_plan_with_source(
617 context: OptimizerContextRef,
618 schema_name: Option<String>,
619 source_catalog: SourceCatalog,
620 version: TableVersion,
621 props: CreateTableProps,
622) -> Result<(PlanRef, PbTable)> {
623 let table_name = source_catalog.name.clone();
624
625 let info = CreateTableInfo {
626 columns: source_catalog.columns.clone(),
627 pk_column_ids: source_catalog.pk_col_ids.clone(),
628 row_id_index: source_catalog.row_id_index,
629 watermark_descs: source_catalog.watermark_descs.clone(),
630 source_catalog: Some(source_catalog),
631 version,
632 };
633
634 gen_table_plan_inner(context, schema_name, table_name, info, props)
635}
636
637#[derive(Clone, Copy)]
639pub enum EitherOnConflict {
640 Ast(Option<OnConflict>),
641 Resolved(ConflictBehavior),
642}
643
644impl From<Option<OnConflict>> for EitherOnConflict {
645 fn from(v: Option<OnConflict>) -> Self {
646 Self::Ast(v)
647 }
648}
649
650impl From<ConflictBehavior> for EitherOnConflict {
651 fn from(v: ConflictBehavior) -> Self {
652 Self::Resolved(v)
653 }
654}
655
656impl EitherOnConflict {
657 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
659 let conflict_behavior = match self {
660 EitherOnConflict::Ast(on_conflict) => {
661 if append_only {
662 if row_id_as_pk {
663 ConflictBehavior::NoCheck
665 } else {
666 if let Some(on_conflict) = on_conflict
668 && on_conflict != OnConflict::Nothing
669 {
670 return Err(ErrorCode::InvalidInputSyntax(
671 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
672 the ON CONFLICT behavior must be DO NOTHING."
673 .to_owned(),
674 )
675 .into());
676 }
677 ConflictBehavior::IgnoreConflict
678 }
679 } else {
680 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
682 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
683 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
684 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
685 }
686 }
687 }
688 EitherOnConflict::Resolved(b) => b,
689 };
690
691 Ok(conflict_behavior)
692 }
693}
694
695pub struct CreateTableInfo {
700 pub columns: Vec<ColumnCatalog>,
701 pub pk_column_ids: Vec<ColumnId>,
702 pub row_id_index: Option<usize>,
703 pub watermark_descs: Vec<WatermarkDesc>,
704 pub source_catalog: Option<SourceCatalog>,
705 pub version: TableVersion,
706}
707
708pub struct CreateTableProps {
713 pub definition: String,
714 pub append_only: bool,
715 pub on_conflict: EitherOnConflict,
716 pub with_version_column: Option<String>,
717 pub webhook_info: Option<PbWebhookSourceInfo>,
718 pub engine: Engine,
719}
720
721#[allow(clippy::too_many_arguments)]
722fn gen_table_plan_inner(
723 context: OptimizerContextRef,
724 schema_name: Option<String>,
725 table_name: String,
726 info: CreateTableInfo,
727 props: CreateTableProps,
728) -> Result<(PlanRef, PbTable)> {
729 let CreateTableInfo {
730 ref columns,
731 row_id_index,
732 ref watermark_descs,
733 ref source_catalog,
734 ..
735 } = info;
736 let CreateTableProps { append_only, .. } = props;
737
738 let (database_id, schema_id) = context
739 .session_ctx()
740 .get_database_and_schema_id_for_create(schema_name)?;
741
742 let session = context.session_ctx().clone();
743 let retention_seconds = context.with_options().retention_seconds();
744
745 let source_node: PlanRef = LogicalSource::new(
746 source_catalog.clone().map(Rc::new),
747 columns.clone(),
748 row_id_index,
749 SourceNodeKind::CreateTable,
750 context.clone(),
751 None,
752 )?
753 .into();
754
755 let required_cols = FixedBitSet::with_capacity(columns.len());
756 let plan_root = PlanRoot::new_with_logical_plan(
757 source_node,
758 RequiredDist::Any,
759 Order::any(),
760 required_cols,
761 vec![],
762 );
763
764 if !append_only && !watermark_descs.is_empty() {
765 return Err(ErrorCode::NotSupported(
766 "Defining watermarks on table requires the table to be append only.".to_owned(),
767 "Use the key words `APPEND ONLY`".to_owned(),
768 )
769 .into());
770 }
771
772 if !append_only && retention_seconds.is_some() {
773 return Err(ErrorCode::NotSupported(
774 "Defining retention seconds on table requires the table to be append only.".to_owned(),
775 "Use the key words `APPEND ONLY`".to_owned(),
776 )
777 .into());
778 }
779
780 let materialize =
781 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
782
783 let mut table = materialize.table().to_prost();
784
785 table.owner = session.user_id();
786 Ok((materialize.into(), table))
787}
788
789#[allow(clippy::too_many_arguments)]
793pub(crate) fn gen_create_table_plan_for_cdc_table(
794 context: OptimizerContextRef,
795 source: Arc<SourceCatalog>,
796 external_table_name: String,
797 column_defs: Vec<ColumnDef>,
798 mut columns: Vec<ColumnCatalog>,
799 pk_names: Vec<String>,
800 cdc_with_options: WithOptionsSecResolved,
801 mut col_id_gen: ColumnIdGenerator,
802 on_conflict: Option<OnConflict>,
803 with_version_column: Option<String>,
804 include_column_options: IncludeOption,
805 table_name: ObjectName,
806 resolved_table_name: String, database_id: DatabaseId,
808 schema_id: SchemaId,
809 table_id: TableId,
810 engine: Engine,
811) -> Result<(PlanRef, PbTable)> {
812 let session = context.session_ctx().clone();
813
814 handle_addition_columns(
816 None,
817 &cdc_with_options,
818 include_column_options,
819 &mut columns,
820 true,
821 )?;
822
823 for c in &mut columns {
824 col_id_gen.generate(c)?;
825 }
826
827 let (mut columns, pk_column_ids, _row_id_index) =
828 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
829
830 bind_sql_column_constraints(
832 context.session_ctx(),
833 table_name.real_value(),
834 &mut columns,
835 column_defs,
836 &pk_column_ids,
837 )?;
838
839 let definition = context.normalized_sql().to_owned();
840
841 let pk_column_indices = {
842 let mut id_to_idx = HashMap::new();
843 columns.iter().enumerate().for_each(|(idx, c)| {
844 id_to_idx.insert(c.column_id(), idx);
845 });
846 pk_column_ids
848 .iter()
849 .map(|c| id_to_idx.get(c).copied().unwrap())
850 .collect_vec()
851 };
852 let table_pk = pk_column_indices
853 .iter()
854 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
855 .collect();
856
857 let (options, secret_refs) = cdc_with_options.into_parts();
858
859 let non_generated_column_descs = columns
860 .iter()
861 .filter(|&c| (!c.is_generated()))
862 .map(|c| c.column_desc.clone())
863 .collect_vec();
864 let non_generated_column_num = non_generated_column_descs.len();
865
866 let cdc_table_desc = CdcTableDesc {
867 table_id,
868 source_id: source.id.into(), external_table_name: external_table_name.clone(),
870 pk: table_pk,
871 columns: non_generated_column_descs,
872 stream_key: pk_column_indices,
873 connect_properties: options,
874 secret_refs,
875 };
876
877 tracing::debug!(?cdc_table_desc, "create cdc table");
878
879 let options = CdcScanOptions::from_with_options(context.with_options())?;
880
881 let logical_scan = LogicalCdcScan::create(
882 external_table_name.clone(),
883 Rc::new(cdc_table_desc),
884 context.clone(),
885 options,
886 );
887
888 let scan_node: PlanRef = logical_scan.into();
889 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
890 let plan_root = PlanRoot::new_with_logical_plan(
891 scan_node,
892 RequiredDist::Any,
893 Order::any(),
894 required_cols,
895 vec![],
896 );
897
898 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
899 let materialize = plan_root.gen_table_plan(
900 context,
901 resolved_table_name,
902 database_id,
903 schema_id,
904 CreateTableInfo {
905 columns,
906 pk_column_ids,
907 row_id_index: None,
908 watermark_descs: vec![],
909 source_catalog: Some((*source).clone()),
910 version: col_id_gen.into_version(),
911 },
912 CreateTableProps {
913 definition,
914 append_only: false,
915 on_conflict: on_conflict.into(),
916 with_version_column,
917 webhook_info: None,
918 engine,
919 },
920 )?;
921
922 let mut table = materialize.table().to_prost();
923 table.owner = session.user_id();
924 table.cdc_table_id = Some(cdc_table_id);
925 table.dependent_relations = vec![source.id];
926
927 Ok((materialize.into(), table))
928}
929
930fn derive_with_options_for_cdc_table(
931 source_with_properties: &WithOptionsSecResolved,
932 external_table_name: String,
933) -> Result<WithOptionsSecResolved> {
934 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
935 let source_database_name: &str = source_with_properties
937 .get("database.name")
938 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
939 .as_str();
940 let mut with_options = source_with_properties.clone();
941 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
942 match connector.as_str() {
943 MYSQL_CDC_CONNECTOR => {
944 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
947 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
948 })?;
949 if !source_database_name
951 .split(',')
952 .map(|s| s.trim())
953 .any(|name| name == db_name)
954 {
955 return Err(anyhow!(
956 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
957 db_name,
958 source_database_name
959 ).into());
960 }
961 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
962 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
963 }
964 POSTGRES_CDC_CONNECTOR => {
965 let (schema_name, table_name) = external_table_name
966 .split_once('.')
967 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
968
969 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
971 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
972 }
973 SQL_SERVER_CDC_CONNECTOR => {
974 let (db_name, schema_table_name) =
977 external_table_name.split_once('.').ok_or_else(|| {
978 anyhow!("The upstream table name must be in 'database.schema.table' format")
979 })?;
980
981 if db_name != source_database_name {
983 return Err(anyhow!(
984 "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
985 db_name,
986 source_database_name
987 ).into());
988 }
989
990 let (schema_name, table_name) =
991 schema_table_name.split_once('.').ok_or_else(|| {
992 anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'")
993 })?;
994
995 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
997 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
998 }
999 _ => {
1000 return Err(RwError::from(anyhow!(
1001 "connector {} is not supported for cdc table",
1002 connector
1003 )));
1004 }
1005 };
1006 }
1007 Ok(with_options)
1008}
1009
1010#[allow(clippy::too_many_arguments)]
1011pub(super) async fn handle_create_table_plan(
1012 handler_args: HandlerArgs,
1013 explain_options: ExplainOptions,
1014 format_encode: Option<FormatEncodeOptions>,
1015 cdc_table_info: Option<CdcTableInfo>,
1016 table_name: ObjectName,
1017 column_defs: Vec<ColumnDef>,
1018 wildcard_idx: Option<usize>,
1019 constraints: Vec<TableConstraint>,
1020 source_watermarks: Vec<SourceWatermark>,
1021 append_only: bool,
1022 on_conflict: Option<OnConflict>,
1023 with_version_column: Option<String>,
1024 include_column_options: IncludeOption,
1025 webhook_info: Option<WebhookSourceInfo>,
1026 engine: Engine,
1027) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
1028 let col_id_gen = ColumnIdGenerator::new_initial();
1029 let format_encode = check_create_table_with_source(
1030 &handler_args.with_options,
1031 format_encode,
1032 &include_column_options,
1033 &cdc_table_info,
1034 )?;
1035 let webhook_info = webhook_info
1036 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1037 .transpose()?;
1038
1039 let props = CreateTableProps {
1040 definition: handler_args.normalized_sql.clone(),
1041 append_only,
1042 on_conflict: on_conflict.into(),
1043 with_version_column: with_version_column.clone(),
1044 webhook_info,
1045 engine,
1046 };
1047
1048 let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1049 (Some(format_encode), None) => (
1050 gen_create_table_plan_with_source(
1051 handler_args,
1052 explain_options,
1053 table_name.clone(),
1054 column_defs,
1055 wildcard_idx,
1056 constraints,
1057 format_encode,
1058 source_watermarks,
1059 col_id_gen,
1060 include_column_options,
1061 props,
1062 SqlColumnStrategy::FollowChecked,
1063 )
1064 .await?,
1065 TableJobType::General,
1066 ),
1067 (None, None) => {
1068 let context = OptimizerContext::new(handler_args, explain_options);
1069 let (plan, table) = gen_create_table_plan(
1070 context,
1071 table_name.clone(),
1072 column_defs,
1073 constraints,
1074 col_id_gen,
1075 source_watermarks,
1076 props,
1077 false,
1078 )?;
1079
1080 ((plan, None, table), TableJobType::General)
1081 }
1082
1083 (None, Some(cdc_table)) => {
1084 sanity_check_for_table_on_cdc_source(
1085 append_only,
1086 &column_defs,
1087 &wildcard_idx,
1088 &constraints,
1089 &source_watermarks,
1090 )?;
1091
1092 generated_columns_check_for_cdc_table(&column_defs)?;
1093 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1094
1095 let session = &handler_args.session;
1096 let db_name = &session.database();
1097 let user_name = &session.user_name();
1098 let search_path = session.config().search_path();
1099 let (schema_name, resolved_table_name) =
1100 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1101 let (database_id, schema_id) =
1102 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1103
1104 let (format_encode, source_name) =
1106 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1107
1108 let source = {
1109 let catalog_reader = session.env().catalog_reader().read_guard();
1110 let schema_path =
1111 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1112
1113 let (source, _) = catalog_reader.get_source_by_name(
1114 db_name,
1115 schema_path,
1116 source_name.as_str(),
1117 )?;
1118 source.clone()
1119 };
1120 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1121 &source.with_properties,
1122 cdc_table.external_table_name.clone(),
1123 )?;
1124
1125 let (columns, pk_names) = match wildcard_idx {
1126 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1127 None => {
1128 for column_def in &column_defs {
1129 for option_def in &column_def.options {
1130 if let ColumnOption::DefaultValue(_)
1131 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1132 {
1133 return Err(ErrorCode::NotSupported(
1134 "Default value for columns defined on the table created from a CDC source".into(),
1135 "Remove the default value expression in the column definitions".into(),
1136 )
1137 .into());
1138 }
1139 }
1140 }
1141
1142 let (columns, pk_names) =
1143 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1144 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1146 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1147 .context("failed to extract external table config")?;
1148
1149 (columns, pk_names)
1151 }
1152 };
1153
1154 let context: OptimizerContextRef =
1155 OptimizerContext::new(handler_args, explain_options).into();
1156 let (plan, table) = gen_create_table_plan_for_cdc_table(
1157 context,
1158 source,
1159 cdc_table.external_table_name.clone(),
1160 column_defs,
1161 columns,
1162 pk_names,
1163 cdc_with_options,
1164 col_id_gen,
1165 on_conflict,
1166 with_version_column,
1167 include_column_options,
1168 table_name,
1169 resolved_table_name,
1170 database_id,
1171 schema_id,
1172 TableId::placeholder(),
1173 engine,
1174 )?;
1175
1176 ((plan, None, table), TableJobType::SharedCdcSource)
1177 }
1178 (Some(_), Some(_)) => {
1179 return Err(ErrorCode::NotSupported(
1180 "Data format and encoding format doesn't apply to table created from a CDC source"
1181 .into(),
1182 "Remove the FORMAT and ENCODE specification".into(),
1183 )
1184 .into());
1185 }
1186 };
1187 Ok((plan, source, table, job_type))
1188}
1189
1190fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1192 let mut found_generated_column = false;
1193 for column in columns {
1194 let mut is_generated = false;
1195
1196 for option_def in &column.options {
1197 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1198 is_generated = true;
1199 break;
1200 }
1201 }
1202
1203 if is_generated {
1204 found_generated_column = true;
1205 } else if found_generated_column {
1206 return Err(ErrorCode::NotSupported(
1207 "Non-generated column found after a generated column.".into(),
1208 "Ensure that all generated columns appear at the end of the cdc table definition."
1209 .into(),
1210 )
1211 .into());
1212 }
1213 }
1214 Ok(())
1215}
1216
1217fn not_null_check_for_cdc_table(
1219 wildcard_idx: &Option<usize>,
1220 column_defs: &Vec<ColumnDef>,
1221) -> Result<()> {
1222 if !wildcard_idx.is_some()
1223 && column_defs.iter().any(|col| {
1224 col.options
1225 .iter()
1226 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1227 })
1228 {
1229 return Err(ErrorCode::NotSupported(
1230 "CDC table with NOT NULL constraint is not supported".to_owned(),
1231 "Please remove the NOT NULL constraint for columns".to_owned(),
1232 )
1233 .into());
1234 }
1235 Ok(())
1236}
1237
1238fn sanity_check_for_table_on_cdc_source(
1240 append_only: bool,
1241 column_defs: &Vec<ColumnDef>,
1242 wildcard_idx: &Option<usize>,
1243 constraints: &Vec<TableConstraint>,
1244 source_watermarks: &Vec<SourceWatermark>,
1245) -> Result<()> {
1246 if wildcard_idx.is_some() && !column_defs.is_empty() {
1248 return Err(ErrorCode::NotSupported(
1249 "wildcard(*) and column definitions cannot be used together".to_owned(),
1250 "Remove the wildcard or column definitions".to_owned(),
1251 )
1252 .into());
1253 }
1254
1255 if !wildcard_idx.is_some()
1257 && !constraints.iter().any(|c| {
1258 matches!(
1259 c,
1260 TableConstraint::Unique {
1261 is_primary: true,
1262 ..
1263 }
1264 )
1265 })
1266 && !column_defs.iter().any(|col| {
1267 col.options
1268 .iter()
1269 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1270 })
1271 {
1272 return Err(ErrorCode::NotSupported(
1273 "CDC table without primary key constraint is not supported".to_owned(),
1274 "Please define a primary key".to_owned(),
1275 )
1276 .into());
1277 }
1278
1279 if append_only {
1280 return Err(ErrorCode::NotSupported(
1281 "append only modifier on the table created from a CDC source".into(),
1282 "Remove the APPEND ONLY clause".into(),
1283 )
1284 .into());
1285 }
1286
1287 if !source_watermarks.is_empty() {
1288 return Err(ErrorCode::NotSupported(
1289 "watermark defined on the table created from a CDC source".into(),
1290 "Remove the Watermark definitions".into(),
1291 )
1292 .into());
1293 }
1294
1295 Ok(())
1296}
1297
1298async fn bind_cdc_table_schema_externally(
1300 cdc_with_options: WithOptionsSecResolved,
1301) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1302 Feature::CdcTableSchemaMap.check_available().map_err(
1304 |err: risingwave_common::license::FeatureNotAvailable| {
1305 ErrorCode::NotSupported(
1306 err.to_report_string(),
1307 "Please define the schema manually".to_owned(),
1308 )
1309 },
1310 )?;
1311 let (options, secret_refs) = cdc_with_options.into_parts();
1312 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1313 .context("failed to extract external table config")?;
1314
1315 let table = ExternalTableImpl::connect(config)
1316 .await
1317 .context("failed to auto derive table schema")?;
1318 Ok((
1319 table
1320 .column_descs()
1321 .iter()
1322 .cloned()
1323 .map(|column_desc| ColumnCatalog {
1324 column_desc,
1325 is_hidden: false,
1326 })
1327 .collect(),
1328 table.pk_names().clone(),
1329 ))
1330}
1331
1332fn bind_cdc_table_schema(
1334 column_defs: &Vec<ColumnDef>,
1335 constraints: &Vec<TableConstraint>,
1336 is_for_replace_plan: bool,
1337) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1338 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1339
1340 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1341 Ok((columns, pk_names))
1342}
1343
1344#[allow(clippy::too_many_arguments)]
1345pub async fn handle_create_table(
1346 mut handler_args: HandlerArgs,
1347 table_name: ObjectName,
1348 column_defs: Vec<ColumnDef>,
1349 wildcard_idx: Option<usize>,
1350 constraints: Vec<TableConstraint>,
1351 if_not_exists: bool,
1352 format_encode: Option<FormatEncodeOptions>,
1353 source_watermarks: Vec<SourceWatermark>,
1354 append_only: bool,
1355 on_conflict: Option<OnConflict>,
1356 with_version_column: Option<String>,
1357 cdc_table_info: Option<CdcTableInfo>,
1358 include_column_options: IncludeOption,
1359 webhook_info: Option<WebhookSourceInfo>,
1360 ast_engine: risingwave_sqlparser::ast::Engine,
1361) -> Result<RwPgResponse> {
1362 let session = handler_args.session.clone();
1363
1364 if append_only {
1365 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1366 }
1367
1368 session.check_cluster_limits().await?;
1369
1370 let engine = match ast_engine {
1371 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1372 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1373 };
1374 if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1375 if let Some(_rate_limit) = handler_args.with_options.insert(
1378 OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1379 "0".to_owned(),
1380 ) {
1381 return Err(ErrorCode::NotSupported(
1383 "source_rate_limit for iceberg table engine during table creation".to_owned(),
1384 "Please remove source_rate_limit from WITH options.".to_owned(),
1385 )
1386 .into());
1387 }
1388 }
1389
1390 if let Either::Right(resp) = session.check_relation_name_duplicated(
1391 table_name.clone(),
1392 StatementType::CREATE_TABLE,
1393 if_not_exists,
1394 )? {
1395 return Ok(resp);
1396 }
1397
1398 let (graph, source, hummock_table, job_type) = {
1399 let (plan, source, table, job_type) = handle_create_table_plan(
1400 handler_args.clone(),
1401 ExplainOptions::default(),
1402 format_encode,
1403 cdc_table_info,
1404 table_name.clone(),
1405 column_defs.clone(),
1406 wildcard_idx,
1407 constraints.clone(),
1408 source_watermarks,
1409 append_only,
1410 on_conflict,
1411 with_version_column,
1412 include_column_options,
1413 webhook_info,
1414 engine,
1415 )
1416 .await?;
1417 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1418
1419 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1420
1421 (graph, source, table, job_type)
1422 };
1423
1424 tracing::trace!(
1425 "name={}, graph=\n{}",
1426 table_name,
1427 serde_json::to_string_pretty(&graph).unwrap()
1428 );
1429
1430 match engine {
1432 Engine::Hummock => {
1433 let catalog_writer = session.catalog_writer()?;
1434 catalog_writer
1435 .create_table(source, hummock_table, graph, job_type)
1436 .await?;
1437 }
1438 Engine::Iceberg => {
1439 create_iceberg_engine_table(
1440 session,
1441 handler_args,
1442 source,
1443 hummock_table,
1444 graph,
1445 table_name,
1446 job_type,
1447 )
1448 .await?;
1449 }
1450 }
1451
1452 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1453}
1454
1455#[allow(clippy::too_many_arguments)]
1464pub async fn create_iceberg_engine_table(
1465 session: Arc<SessionImpl>,
1466 handler_args: HandlerArgs,
1467 mut source: Option<PbSource>,
1468 table: PbTable,
1469 graph: StreamFragmentGraph,
1470 table_name: ObjectName,
1471 job_type: PbTableJobType,
1472) -> Result<()> {
1473 let meta_client = session.env().meta_client();
1474 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1475
1476 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1477 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1478 })?;
1479 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1480 let meta_store_user = meta_store_endpoint.username().to_owned();
1481 let meta_store_password = meta_store_endpoint
1482 .password()
1483 .ok_or_else(|| {
1484 ErrorCode::InternalError("failed to parse password from meta store endpoint".to_owned())
1485 })?
1486 .to_owned();
1487 let meta_store_host = meta_store_endpoint
1488 .host_str()
1489 .ok_or_else(|| {
1490 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1491 })?
1492 .to_owned();
1493 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1494 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1495 })?;
1496 let meta_store_database = meta_store_endpoint
1497 .path()
1498 .trim_start_matches('/')
1499 .to_owned();
1500
1501 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1502 bail!("failed to parse meta backend: {}", meta_store_backend);
1503 };
1504
1505 let catalog_uri = match meta_backend {
1506 MetaBackend::Postgres => {
1507 format!(
1508 "jdbc:postgresql://{}:{}/{}",
1509 meta_store_host.clone(),
1510 meta_store_port.clone(),
1511 meta_store_database.clone()
1512 )
1513 }
1514 MetaBackend::Mysql => {
1515 format!(
1516 "jdbc:mysql://{}:{}/{}",
1517 meta_store_host.clone(),
1518 meta_store_port.clone(),
1519 meta_store_database.clone()
1520 )
1521 }
1522 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1523 bail!(
1524 "Unsupported meta backend for iceberg engine table: {}",
1525 meta_store_backend
1526 );
1527 }
1528 };
1529
1530 let rw_db_name = session
1531 .env()
1532 .catalog_reader()
1533 .read_guard()
1534 .get_database_by_id(&table.database_id)?
1535 .name()
1536 .to_owned();
1537 let rw_schema_name = session
1538 .env()
1539 .catalog_reader()
1540 .read_guard()
1541 .get_schema_by_id(&table.database_id, &table.schema_id)?
1542 .name()
1543 .clone();
1544 let iceberg_catalog_name = rw_db_name.clone();
1545 let iceberg_database_name = rw_schema_name.clone();
1546 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1547
1548 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1549 let sink_decouple = session.config().sink_decouple();
1550 if matches!(sink_decouple, SinkDecouple::Disable) {
1551 bail!(
1552 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1553 );
1554 }
1555
1556 let mut connection_ref = BTreeMap::new();
1557 let with_common = if iceberg_engine_connection.is_empty() {
1558 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1559 } else {
1560 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1561 assert_eq!(parts.len(), 2);
1562 let connection_catalog =
1563 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1564 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1565 if params.connection_type == ConnectionType::Iceberg as i32 {
1566 connection_ref.insert(
1568 "connection".to_owned(),
1569 ConnectionRefValue {
1570 connection_name: ObjectName::from(vec![
1571 Ident::from(parts[0]),
1572 Ident::from(parts[1]),
1573 ]),
1574 },
1575 );
1576
1577 let mut with_common = BTreeMap::new();
1578 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1579 with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1580 with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1581
1582 if let Some(s) = params.properties.get("hosted_catalog") {
1583 if s.eq_ignore_ascii_case("true") {
1584 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1585 with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1586 with_common
1587 .insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1588 with_common.insert(
1589 "catalog.jdbc.password".to_owned(),
1590 meta_store_password.clone(),
1591 );
1592 with_common
1593 .insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1594 }
1595 }
1596
1597 with_common
1598 } else {
1599 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1600 "Only iceberg connection could be used in iceberg engine".to_owned(),
1601 )));
1602 }
1603 } else {
1604 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1605 "Private Link Service has been deprecated. Please create a new connection instead."
1606 .to_owned(),
1607 )));
1608 }
1609 };
1610
1611 let table_catalog = TableCatalog::from(table.clone());
1612
1613 let mut pks = table_catalog
1616 .pk_column_names()
1617 .iter()
1618 .map(|c| c.to_string())
1619 .collect::<Vec<String>>();
1620
1621 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1623 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1624 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1625 "select {} as {}, * from {}",
1626 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1627 ))
1628 .context("unable to parse query")?
1629 .try_into()
1630 .unwrap();
1631
1632 let Statement::Query(query) = &stmt else {
1633 panic!("unexpected statement: {:?}", stmt);
1634 };
1635 CreateSink::AsQuery(query.clone())
1636 } else {
1637 CreateSink::From(table_name.clone())
1638 };
1639
1640 let mut sink_name = table_name.clone();
1641 *sink_name.0.last_mut().unwrap() = Ident::from(
1642 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1643 );
1644 let create_sink_stmt = CreateSinkStatement {
1645 if_not_exists: false,
1646 sink_name,
1647 with_properties: WithProperties(vec![]),
1648 sink_from,
1649 columns: vec![],
1650 emit_mode: None,
1651 sink_schema: None,
1652 into_table_name: None,
1653 };
1654
1655 let mut sink_handler_args = handler_args.clone();
1656
1657 let mut sink_with = with_common.clone();
1658
1659 sink_with.insert("primary_key".to_owned(), pks.join(","));
1660 sink_with.insert("type".to_owned(), "upsert".to_owned());
1661 let commit_checkpoint_interval = handler_args
1680 .with_options
1681 .get(COMMIT_CHECKPOINT_INTERVAL)
1682 .map(|v| v.to_owned())
1683 .unwrap_or_else(|| "60".to_owned());
1684 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1685 ErrorCode::InvalidInputSyntax(format!(
1686 "commit_checkpoint_interval must be a positive integer: {}",
1687 commit_checkpoint_interval
1688 ))
1689 })?;
1690
1691 if commit_checkpoint_interval == 0 {
1692 bail!("commit_checkpoint_interval must be a positive integer: 0");
1693 }
1694
1695 source
1697 .as_mut()
1698 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1699
1700 let sink_decouple = session.config().sink_decouple();
1701 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1702 bail!(
1703 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1704 )
1705 }
1706
1707 sink_with.insert(
1708 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1709 commit_checkpoint_interval.to_string(),
1710 );
1711 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1712
1713 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1714
1715 let partition_by = handler_args
1716 .with_options
1717 .get("partition_by")
1718 .map(|v| v.to_owned());
1719
1720 if let Some(partition_by) = &partition_by {
1721 let mut partition_columns = vec![];
1722 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1723 table_catalog
1724 .columns()
1725 .iter()
1726 .find(|col| col.name().eq_ignore_ascii_case(&column))
1727 .ok_or_else(|| {
1728 ErrorCode::InvalidInputSyntax(format!(
1729 "Partition source column does not exist in schema: {}",
1730 column
1731 ))
1732 })?;
1733
1734 partition_columns.push(column.to_owned());
1735 }
1736
1737 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1738 |_| {
1739 ErrorCode::InvalidInputSyntax(
1740 "The partition columns should be the prefix of the primary key".to_owned(),
1741 )
1742 },
1743 )?;
1744
1745 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1746
1747 source
1749 .as_mut()
1750 .map(|x| x.with_properties.remove("partition_by"));
1751 }
1752
1753 sink_handler_args.with_options =
1754 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1755
1756 let mut source_name = table_name.clone();
1757 *source_name.0.last_mut().unwrap() = Ident::from(
1758 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1759 );
1760 let create_source_stmt = CreateSourceStatement {
1761 temporary: false,
1762 if_not_exists: false,
1763 columns: vec![],
1764 source_name,
1765 wildcard_idx: Some(0),
1766 constraints: vec![],
1767 with_properties: WithProperties(vec![]),
1768 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1769 source_watermarks: vec![],
1770 include_column_options: vec![],
1771 };
1772
1773 let mut source_handler_args = handler_args.clone();
1774 let source_with = with_common;
1775 source_handler_args.with_options =
1776 WithOptions::new(source_with, Default::default(), connection_ref);
1777
1778 let _ = JVM.get_or_init()?;
1781
1782 let catalog_writer = session.catalog_writer()?;
1783 let has_connector = source.is_some();
1785 catalog_writer
1786 .create_table(source, table, graph, job_type)
1787 .await?;
1788 let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1789 if res.is_err() {
1790 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1792 res?;
1793 }
1794 let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1795 if res.is_err() {
1796 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1798 res?;
1799 }
1800
1801 if has_connector {
1802 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1803 handler_args,
1804 PbThrottleTarget::TableWithSource,
1805 table_name,
1806 -1,
1807 )
1808 .await?;
1809 }
1810
1811 Ok(())
1812}
1813
1814pub fn check_create_table_with_source(
1815 with_options: &WithOptions,
1816 format_encode: Option<FormatEncodeOptions>,
1817 include_column_options: &IncludeOption,
1818 cdc_table_info: &Option<CdcTableInfo>,
1819) -> Result<Option<FormatEncodeOptions>> {
1820 if cdc_table_info.is_some() {
1822 return Ok(format_encode);
1823 }
1824 let defined_source = with_options.is_source_connector();
1825
1826 if !include_column_options.is_empty() && !defined_source {
1827 return Err(ErrorCode::InvalidInputSyntax(
1828 "INCLUDE should be used with a connector".to_owned(),
1829 )
1830 .into());
1831 }
1832 if defined_source {
1833 format_encode.as_ref().ok_or_else(|| {
1834 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1835 })?;
1836 }
1837 Ok(format_encode)
1838}
1839
1840fn ensure_partition_columns_are_prefix_of_primary_key(
1841 partition_columns: &[String],
1842 primary_key_columns: &[String],
1843) -> std::result::Result<(), String> {
1844 if partition_columns.len() > primary_key_columns.len() {
1845 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
1846 }
1847
1848 for (i, partition_col) in partition_columns.iter().enumerate() {
1849 if primary_key_columns.get(i) != Some(partition_col) {
1850 return Err(format!(
1851 "Partition column '{}' is not a prefix of the primary key.",
1852 partition_col
1853 ));
1854 }
1855 }
1856
1857 Ok(())
1858}
1859
1860#[allow(clippy::too_many_arguments)]
1861pub async fn generate_stream_graph_for_replace_table(
1862 _session: &Arc<SessionImpl>,
1863 table_name: ObjectName,
1864 original_catalog: &Arc<TableCatalog>,
1865 handler_args: HandlerArgs,
1866 statement: Statement,
1867 col_id_gen: ColumnIdGenerator,
1868 sql_column_strategy: SqlColumnStrategy,
1869) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
1870 use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
1871
1872 let Statement::CreateTable {
1873 columns,
1874 constraints,
1875 source_watermarks,
1876 append_only,
1877 on_conflict,
1878 with_version_column,
1879 wildcard_idx,
1880 cdc_table_info,
1881 format_encode,
1882 include_column_options,
1883 engine,
1884 with_options,
1885 ..
1886 } = statement
1887 else {
1888 panic!("unexpected statement type: {:?}", statement);
1889 };
1890
1891 let format_encode = format_encode
1892 .clone()
1893 .map(|format_encode| format_encode.into_v2_with_warning());
1894
1895 let engine = match engine {
1896 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1897 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1898 };
1899
1900 let is_drop_connector =
1901 original_catalog.associated_source_id().is_some() && format_encode.is_none();
1902 if is_drop_connector {
1903 debug_assert!(
1904 source_watermarks.is_empty()
1905 && include_column_options.is_empty()
1906 && with_options
1907 .iter()
1908 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
1909 );
1910 }
1911
1912 let props = CreateTableProps {
1913 definition: handler_args.normalized_sql.clone(),
1914 append_only,
1915 on_conflict: on_conflict.into(),
1916 with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
1917 webhook_info: original_catalog.webhook_info.clone(),
1918 engine,
1919 };
1920
1921 let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1922 (Some(format_encode), None) => (
1923 gen_create_table_plan_with_source(
1924 handler_args,
1925 ExplainOptions::default(),
1926 table_name,
1927 columns,
1928 wildcard_idx,
1929 constraints,
1930 format_encode,
1931 source_watermarks,
1932 col_id_gen,
1933 include_column_options,
1934 props,
1935 sql_column_strategy,
1936 )
1937 .await?,
1938 TableJobType::General,
1939 ),
1940 (None, None) => {
1941 let context = OptimizerContext::from_handler_args(handler_args);
1942 let (plan, table) = gen_create_table_plan(
1943 context,
1944 table_name,
1945 columns,
1946 constraints,
1947 col_id_gen,
1948 source_watermarks,
1949 props,
1950 true,
1951 )?;
1952 ((plan, None, table), TableJobType::General)
1953 }
1954 (None, Some(cdc_table)) => {
1955 let session = &handler_args.session;
1956 let (source, resolved_table_name, database_id, schema_id) =
1957 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
1958
1959 let cdc_with_options = derive_with_options_for_cdc_table(
1960 &source.with_properties,
1961 cdc_table.external_table_name.clone(),
1962 )?;
1963
1964 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
1965
1966 let context: OptimizerContextRef =
1967 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
1968 let (plan, table) = gen_create_table_plan_for_cdc_table(
1969 context,
1970 source,
1971 cdc_table.external_table_name.clone(),
1972 columns,
1973 column_catalogs,
1974 pk_names,
1975 cdc_with_options,
1976 col_id_gen,
1977 on_conflict,
1978 with_version_column.map(|x| x.real_value()),
1979 IncludeOption::default(),
1980 table_name,
1981 resolved_table_name,
1982 database_id,
1983 schema_id,
1984 original_catalog.id(),
1985 engine,
1986 )?;
1987
1988 ((plan, None, table), TableJobType::SharedCdcSource)
1989 }
1990 (Some(_), Some(_)) => {
1991 return Err(ErrorCode::NotSupported(
1992 "Data format and encoding format doesn't apply to table created from a CDC source"
1993 .into(),
1994 "Remove the FORMAT and ENCODE specification".into(),
1995 )
1996 .into());
1997 }
1998 };
1999
2000 if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
2002 Err(ErrorCode::InvalidInputSyntax(
2003 "alter primary key of table is not supported".to_owned(),
2004 ))?
2005 }
2006
2007 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2008
2009 let mut table = Table {
2011 id: original_catalog.id().table_id(),
2012 ..table
2013 };
2014 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2015 table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
2016 source_id.table_id,
2017 ));
2018 source.as_mut().unwrap().id = source_id.table_id;
2019 source.as_mut().unwrap().optional_associated_table_id =
2020 Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
2021 }
2022
2023 Ok((graph, table, source, job_type))
2024}
2025
2026fn get_source_and_resolved_table_name(
2027 session: &Arc<SessionImpl>,
2028 cdc_table: CdcTableInfo,
2029 table_name: ObjectName,
2030) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2031 let db_name = &session.database();
2032 let (schema_name, resolved_table_name) =
2033 Binder::resolve_schema_qualified_name(db_name, table_name)?;
2034 let (database_id, schema_id) =
2035 session.get_database_and_schema_id_for_create(schema_name.clone())?;
2036
2037 let (format_encode, source_name) =
2038 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2039
2040 let source = {
2041 let catalog_reader = session.env().catalog_reader().read_guard();
2042 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2043 let (source, _) = catalog_reader.get_source_by_name(
2044 db_name,
2045 SchemaPath::Name(schema_name.as_str()),
2046 source_name.as_str(),
2047 )?;
2048 source.clone()
2049 };
2050
2051 Ok((source, resolved_table_name, database_id, schema_id))
2052}
2053
2054fn bind_webhook_info(
2056 session: &Arc<SessionImpl>,
2057 columns_defs: &[ColumnDef],
2058 webhook_info: WebhookSourceInfo,
2059) -> Result<PbWebhookSourceInfo> {
2060 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2062 {
2063 return Err(ErrorCode::InvalidInputSyntax(
2064 "Table with webhook source should have exactly one JSONB column".to_owned(),
2065 )
2066 .into());
2067 }
2068
2069 let WebhookSourceInfo {
2070 secret_ref,
2071 signature_expr,
2072 wait_for_persistence,
2073 } = webhook_info;
2074
2075 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2077 let db_name = &session.database();
2078 let (schema_name, secret_name) =
2079 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2080 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2081 (
2082 Some(PbSecretRef {
2083 secret_id: secret_catalog.id.secret_id(),
2084 ref_as: match secret_ref.ref_as {
2085 SecretRefAsType::Text => PbRefAsType::Text,
2086 SecretRefAsType::File => PbRefAsType::File,
2087 }
2088 .into(),
2089 }),
2090 Some(secret_name),
2091 )
2092 } else {
2093 (None, None)
2094 };
2095
2096 let secure_compare_context = SecureCompareContext {
2097 column_name: columns_defs[0].name.real_value(),
2098 secret_name,
2099 };
2100 let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2101 let expr = binder.bind_expr(signature_expr.clone())?;
2102
2103 if expr.as_function_call().is_none()
2105 || expr.as_function_call().unwrap().func_type()
2106 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2107 {
2108 return Err(ErrorCode::InvalidInputSyntax(
2109 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2110 )
2111 .into());
2112 }
2113
2114 let pb_webhook_info = PbWebhookSourceInfo {
2115 secret_ref: pb_secret_ref,
2116 signature_expr: Some(expr.to_expr_proto()),
2117 wait_for_persistence,
2118 };
2119
2120 Ok(pb_webhook_info)
2121}
2122
2123#[cfg(test)]
2124mod tests {
2125 use risingwave_common::catalog::{
2126 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2127 };
2128 use risingwave_common::types::{DataType, StructType};
2129
2130 use super::*;
2131 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2132
2133 #[tokio::test]
2134 async fn test_create_table_handler() {
2135 let sql =
2136 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2137 let frontend = LocalFrontend::new(Default::default()).await;
2138 frontend.run_sql(sql).await.unwrap();
2139
2140 let session = frontend.session_ref();
2141 let catalog_reader = session.env().catalog_reader().read_guard();
2142 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2143
2144 let (table, _) = catalog_reader
2146 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2147 .unwrap();
2148 assert_eq!(table.name(), "t");
2149
2150 let columns = table
2151 .columns
2152 .iter()
2153 .map(|col| (col.name(), col.data_type().clone()))
2154 .collect::<HashMap<&str, DataType>>();
2155
2156 let expected_columns = maplit::hashmap! {
2157 ROW_ID_COLUMN_NAME => DataType::Serial,
2158 "v1" => DataType::Int16,
2159 "v2" => StructType::new(
2160 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2161 )
2162 .with_ids([3, 4, 5].map(ColumnId::new))
2163 .into(),
2164 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2165 };
2166
2167 assert_eq!(columns, expected_columns, "{columns:#?}");
2168 }
2169
2170 #[test]
2171 fn test_bind_primary_key() {
2172 for (sql, expected) in [
2175 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2176 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2177 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2178 (
2179 "create table t (v1 int primary key, v2 int primary key)",
2180 Err("multiple primary keys are not allowed"),
2181 ),
2182 (
2183 "create table t (v1 int primary key primary key, v2 int)",
2184 Err("multiple primary keys are not allowed"),
2185 ),
2186 (
2187 "create table t (v1 int, v2 int, primary key (v1))",
2188 Ok(&[1]),
2189 ),
2190 (
2191 "create table t (v1 int, primary key (v2), v2 int)",
2192 Ok(&[2]),
2193 ),
2194 (
2195 "create table t (primary key (v2, v1), v1 int, v2 int)",
2196 Ok(&[2, 1]),
2197 ),
2198 (
2199 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2200 Err("multiple primary keys are not allowed"),
2201 ),
2202 (
2203 "create table t (v1 int primary key, primary key (v1), v2 int)",
2204 Err("multiple primary keys are not allowed"),
2205 ),
2206 (
2207 "create table t (v1 int, primary key (V3), v2 int)",
2208 Err("column \"v3\" named in key does not exist"),
2209 ),
2210 ] {
2211 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2212 let risingwave_sqlparser::ast::Statement::CreateTable {
2213 columns: column_defs,
2214 constraints,
2215 ..
2216 } = ast.remove(0)
2217 else {
2218 panic!("test case should be create table")
2219 };
2220 let actual: Result<_> = (|| {
2221 let mut columns = bind_sql_columns(&column_defs, false)?;
2222 let mut col_id_gen = ColumnIdGenerator::new_initial();
2223 for c in &mut columns {
2224 col_id_gen.generate(c)?;
2225 }
2226
2227 let pk_names =
2228 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2229 let (_, pk_column_ids, _) =
2230 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2231 Ok(pk_column_ids)
2232 })();
2233 match (expected, actual) {
2234 (Ok(expected), Ok(actual)) => assert_eq!(
2235 expected.iter().copied().map(ColumnId::new).collect_vec(),
2236 actual,
2237 "sql: {sql}"
2238 ),
2239 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2240 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2241 (Err(expected), Err(actual)) => assert!(
2242 actual.to_string().contains(expected),
2243 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2244 ),
2245 }
2246 }
2247 }
2248
2249 #[tokio::test]
2250 async fn test_duplicate_props_options() {
2251 let proto_file = create_proto_file(PROTO_FILE_DATA);
2252 let sql = format!(
2253 r#"CREATE TABLE t
2254 WITH (
2255 connector = 'kinesis',
2256 aws.region='user_test_topic',
2257 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2258 aws.credentials.access_key_id = 'your_access_key_1',
2259 aws.credentials.secret_access_key = 'your_secret_key_1'
2260 )
2261 FORMAT PLAIN ENCODE PROTOBUF (
2262 message = '.test.TestRecord',
2263 aws.credentials.access_key_id = 'your_access_key_2',
2264 aws.credentials.secret_access_key = 'your_secret_key_2',
2265 schema.location = 'file://{}',
2266 )"#,
2267 proto_file.path().to_str().unwrap()
2268 );
2269 let frontend = LocalFrontend::new(Default::default()).await;
2270 frontend.run_sql(sql).await.unwrap();
2271
2272 let session = frontend.session_ref();
2273 let catalog_reader = session.env().catalog_reader().read_guard();
2274 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2275
2276 let (source, _) = catalog_reader
2278 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2279 .unwrap();
2280 assert_eq!(source.name, "t");
2281
2282 assert_eq!(
2284 source
2285 .info
2286 .format_encode_options
2287 .get("aws.credentials.access_key_id")
2288 .unwrap(),
2289 "your_access_key_2"
2290 );
2291 assert_eq!(
2292 source
2293 .info
2294 .format_encode_options
2295 .get("aws.credentials.secret_access_key")
2296 .unwrap(),
2297 "your_secret_key_2"
2298 );
2299
2300 assert_eq!(
2302 source
2303 .with_properties
2304 .get("aws.credentials.access_key_id")
2305 .unwrap(),
2306 "your_access_key_1"
2307 );
2308 assert_eq!(
2309 source
2310 .with_properties
2311 .get("aws.credentials.secret_access_key")
2312 .unwrap(),
2313 "your_secret_key_1"
2314 );
2315
2316 assert!(!source.with_properties.contains_key("schema.location"));
2318 }
2319}