1use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pgwire::pg_response::{PgResponse, StatementType};
25use prost::Message as _;
26use risingwave_common::catalog::{
27 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
28 RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
29};
30use risingwave_common::config::MetaBackend;
31use risingwave_common::license::Feature;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::jvm_runtime::JVM;
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::source::OptionalAssociatedTableId;
47use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc};
48use risingwave_pb::ddl_service::TableJobType;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
66use super::{RwPgResponse, create_sink, create_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77 bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::SourceSchemaCompatExt;
80use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
81use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
82use crate::optimizer::property::{Order, RequiredDist};
83use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
84use crate::session::SessionImpl;
85use crate::session::current::notice_to_user;
86use crate::stream_fragmenter::build_graph;
87use crate::utils::OverwriteOptions;
88use crate::{Binder, TableCatalog, WithOptions};
89
90mod col_id_gen;
91pub use col_id_gen::*;
92
93fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
94 for option_def in &c.options {
95 match option_def.option {
96 ColumnOption::GeneratedColumns(_) => {}
97 ColumnOption::DefaultValue(_) => {}
98 ColumnOption::DefaultValueInternal { .. } => {}
99 ColumnOption::Unique { is_primary: true } => {}
100 ColumnOption::Null => {}
101 ColumnOption::NotNull => {}
102 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
103 }
104 }
105 Ok(())
106}
107
108pub fn bind_sql_columns(
112 column_defs: &[ColumnDef],
113 is_for_drop_table_connector: bool,
114) -> Result<Vec<ColumnCatalog>> {
115 let mut columns = Vec::with_capacity(column_defs.len());
116
117 for column in column_defs {
118 ensure_column_options_supported(column)?;
119 let ColumnDef {
123 name,
124 data_type,
125 collation,
126 options,
127 ..
128 } = column;
129
130 let data_type = data_type
131 .clone()
132 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
133 if let Some(collation) = collation {
134 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
140 bail_not_implemented!(
141 "Collate collation other than `C` or `POSIX` is not implemented"
142 );
143 }
144
145 match data_type {
146 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
147 _ => {
148 return Err(ErrorCode::NotSupported(
149 format!("{} is not a collatable data type", data_type),
150 "The only built-in collatable data types are `varchar`, please check your type".into(),
151 ).into());
152 }
153 }
154 }
155
156 if !is_for_drop_table_connector {
157 check_column_name_not_reserved(&name.real_value())?;
161 }
162
163 let nullable: bool = !options
164 .iter()
165 .any(|def| matches!(def.option, ColumnOption::NotNull));
166
167 columns.push(ColumnCatalog {
168 column_desc: ColumnDesc {
169 data_type: bind_data_type(&data_type)?,
170 column_id: ColumnId::placeholder(),
171 name: name.real_value(),
172 generated_or_default_column: None,
173 description: None,
174 additional_column: AdditionalColumn { column_type: None },
175 version: ColumnDescVersion::LATEST,
176 system_column: None,
177 nullable,
178 },
179 is_hidden: false,
180 });
181 }
182
183 Ok(columns)
184}
185
186fn check_generated_column_constraints(
187 column_name: &String,
188 column_id: ColumnId,
189 expr: &ExprImpl,
190 column_catalogs: &[ColumnCatalog],
191 generated_column_names: &[String],
192 pk_column_ids: &[ColumnId],
193) -> Result<()> {
194 let input_refs = expr.collect_input_refs(column_catalogs.len());
195 for idx in input_refs.ones() {
196 let referred_generated_column = &column_catalogs[idx].column_desc.name;
197 if generated_column_names
198 .iter()
199 .any(|c| c == referred_generated_column)
200 {
201 return Err(ErrorCode::BindError(format!(
202 "Generated can not reference another generated column. \
203 But here generated column \"{}\" referenced another generated column \"{}\"",
204 column_name, referred_generated_column
205 ))
206 .into());
207 }
208 }
209
210 if pk_column_ids.contains(&column_id) && expr.is_impure() {
211 return Err(ErrorCode::BindError(format!(
212 "Generated columns with impure expressions should not be part of the primary key. \
213 Here column \"{}\" is defined as part of the primary key.",
214 column_name
215 ))
216 .into());
217 }
218
219 Ok(())
220}
221
222pub fn bind_sql_column_constraints(
225 session: &SessionImpl,
226 table_name: String,
227 column_catalogs: &mut [ColumnCatalog],
228 columns: Vec<ColumnDef>,
229 pk_column_ids: &[ColumnId],
230) -> Result<()> {
231 let generated_column_names = {
232 let mut names = vec![];
233 for column in &columns {
234 for option_def in &column.options {
235 if let ColumnOption::GeneratedColumns(_) = option_def.option {
236 names.push(column.name.real_value());
237 break;
238 }
239 }
240 }
241 names
242 };
243
244 let mut binder = Binder::new_for_ddl(session);
245 binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
246
247 for column in columns {
248 let Some(idx) = column_catalogs
249 .iter()
250 .position(|c| c.name() == column.name.real_value())
251 else {
252 continue;
255 };
256
257 for option_def in column.options {
258 match option_def.option {
259 ColumnOption::GeneratedColumns(expr) => {
260 binder.set_clause(Some(Clause::GeneratedColumn));
261
262 let expr_impl = binder.bind_expr(expr).with_context(|| {
263 format!(
264 "fail to bind expression in generated column \"{}\"",
265 column.name.real_value()
266 )
267 })?;
268
269 check_generated_column_constraints(
270 &column.name.real_value(),
271 column_catalogs[idx].column_id(),
272 &expr_impl,
273 column_catalogs,
274 &generated_column_names,
275 pk_column_ids,
276 )?;
277
278 column_catalogs[idx].column_desc.generated_or_default_column = Some(
279 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
280 expr: Some(expr_impl.to_expr_proto()),
281 }),
282 );
283 binder.set_clause(None);
284 }
285 ColumnOption::DefaultValue(expr) => {
286 let expr_impl = binder
287 .bind_expr(expr)?
288 .cast_assign(column_catalogs[idx].data_type().clone())?;
289
290 let rewritten_expr_impl = session
298 .pinned_snapshot()
299 .inline_now_proc_time()
300 .rewrite_expr(expr_impl.clone());
301
302 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
303 let snapshot_value = snapshot_value?;
304
305 column_catalogs[idx].column_desc.generated_or_default_column =
306 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
307 snapshot_value: Some(snapshot_value.to_protobuf()),
308 expr: Some(expr_impl.to_expr_proto()),
309 }));
311 } else {
312 return Err(ErrorCode::BindError(format!(
313 "Default expression used in column `{}` cannot be evaluated. \
314 Use generated columns instead if you mean to reference other columns.",
315 column.name
316 ))
317 .into());
318 }
319 }
320 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
321 if persisted.is_empty() {
326 bail_bind_error!(
327 "DEFAULT INTERNAL is only used for internal purposes, \
328 please specify a concrete default value"
329 );
330 }
331
332 let desc = DefaultColumnDesc::decode(&*persisted)
333 .expect("failed to decode persisted `DefaultColumnDesc`");
334
335 column_catalogs[idx].column_desc.generated_or_default_column =
336 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
337 }
338 _ => {}
339 }
340 }
341 }
342 Ok(())
343}
344
345pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
347 let mut pk_column_names = vec![];
348
349 for constraint in table_constraints {
350 match constraint {
351 TableConstraint::Unique {
352 name: _,
353 columns,
354 is_primary: true,
355 } => {
356 if !pk_column_names.is_empty() {
357 return Err(multiple_pk_definition_err());
358 }
359 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
360 }
361 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
362 }
363 }
364 Ok(pk_column_names)
365}
366
367pub fn bind_sql_pk_names(
368 columns_defs: &[ColumnDef],
369 pk_names_from_table_constraints: Vec<String>,
370) -> Result<Vec<String>> {
371 let mut pk_column_names = pk_names_from_table_constraints;
372
373 for column in columns_defs {
374 for option_def in &column.options {
375 if let ColumnOption::Unique { is_primary: true } = option_def.option {
376 if !pk_column_names.is_empty() {
377 return Err(multiple_pk_definition_err());
378 }
379 pk_column_names.push(column.name.real_value());
380 };
381 }
382 }
383
384 Ok(pk_column_names)
385}
386
387fn multiple_pk_definition_err() -> RwError {
388 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
389}
390
391pub fn bind_pk_and_row_id_on_relation(
396 mut columns: Vec<ColumnCatalog>,
397 pk_names: Vec<String>,
398 must_need_pk: bool,
399) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
400 for c in &columns {
401 assert!(c.column_id() != ColumnId::placeholder());
402 }
403
404 let name_to_id = columns
406 .iter()
407 .map(|c| (c.name(), c.column_id()))
408 .collect::<HashMap<_, _>>();
409
410 let mut pk_column_ids: Vec<_> = pk_names
411 .iter()
412 .map(|name| {
413 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
414 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
415 })
416 })
417 .try_collect()?;
418
419 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
421
422 let row_id_index = need_row_id.then(|| {
423 let column = ColumnCatalog::row_id_column();
424 let index = columns.len();
425 pk_column_ids = vec![column.column_id()];
426 columns.push(column);
427 index
428 });
429
430 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
431 Err(ErrorCode::InvalidInputSyntax(format!(
432 "column \"{col}\" specified more than once"
433 )))?;
434 }
435
436 Ok((columns, pk_column_ids, row_id_index))
437}
438
439#[allow(clippy::too_many_arguments)]
442pub(crate) async fn gen_create_table_plan_with_source(
443 mut handler_args: HandlerArgs,
444 explain_options: ExplainOptions,
445 table_name: ObjectName,
446 column_defs: Vec<ColumnDef>,
447 wildcard_idx: Option<usize>,
448 constraints: Vec<TableConstraint>,
449 format_encode: FormatEncodeOptions,
450 source_watermarks: Vec<SourceWatermark>,
451 mut col_id_gen: ColumnIdGenerator,
452 include_column_options: IncludeOption,
453 props: CreateTableProps,
454 sql_column_strategy: SqlColumnStrategy,
455) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
456 if props.append_only
457 && format_encode.format != Format::Plain
458 && format_encode.format != Format::Native
459 {
460 return Err(ErrorCode::BindError(format!(
461 "Append only table does not support format {}.",
462 format_encode.format
463 ))
464 .into());
465 }
466
467 let session = &handler_args.session;
468 let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
469 if with_properties.is_shareable_cdc_connector() {
470 generated_columns_check_for_cdc_table(&column_defs)?;
471 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
472 } else if column_defs.iter().any(|col| {
473 col.options
474 .iter()
475 .any(|def| matches!(def.option, ColumnOption::NotNull))
476 }) {
477 notice_to_user(
479 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
480 );
481 }
482
483 let db_name: &str = &session.database();
484 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
485
486 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
488 session,
489 &format_encode,
490 Either::Left(&with_properties),
491 CreateSourceType::Table,
492 )
493 .await?;
494
495 let overwrite_options = OverwriteOptions::new(&mut handler_args);
496 let rate_limit = overwrite_options.source_rate_limit;
497 let source_catalog = bind_create_source_or_table_with_connector(
498 handler_args.clone(),
499 table_name,
500 format_encode,
501 with_properties,
502 &column_defs,
503 constraints,
504 wildcard_idx,
505 source_watermarks,
506 columns_from_resolve_source,
507 source_info,
508 include_column_options,
509 &mut col_id_gen,
510 CreateSourceType::Table,
511 rate_limit,
512 sql_column_strategy,
513 )
514 .await?;
515
516 let pb_source = source_catalog.to_prost();
517
518 let context = OptimizerContext::new(handler_args, explain_options);
519
520 let (plan, table) = gen_table_plan_with_source(
521 context.into(),
522 schema_name,
523 source_catalog,
524 col_id_gen.into_version(),
525 props,
526 )?;
527
528 Ok((plan, Some(pb_source), table))
529}
530
531#[allow(clippy::too_many_arguments)]
534pub(crate) fn gen_create_table_plan(
535 context: OptimizerContext,
536 table_name: ObjectName,
537 column_defs: Vec<ColumnDef>,
538 constraints: Vec<TableConstraint>,
539 mut col_id_gen: ColumnIdGenerator,
540 source_watermarks: Vec<SourceWatermark>,
541 props: CreateTableProps,
542 is_for_replace_plan: bool,
543) -> Result<(PlanRef, PbTable)> {
544 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
545 for c in &mut columns {
546 col_id_gen.generate(c)?;
547 }
548
549 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
550 if !secret_refs.is_empty() || !connection_refs.is_empty() {
551 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
552 }
553
554 gen_create_table_plan_without_source(
555 context,
556 table_name,
557 columns,
558 column_defs,
559 constraints,
560 source_watermarks,
561 col_id_gen.into_version(),
562 props,
563 )
564}
565
566#[allow(clippy::too_many_arguments)]
567pub(crate) fn gen_create_table_plan_without_source(
568 context: OptimizerContext,
569 table_name: ObjectName,
570 columns: Vec<ColumnCatalog>,
571 column_defs: Vec<ColumnDef>,
572 constraints: Vec<TableConstraint>,
573 source_watermarks: Vec<SourceWatermark>,
574 version: TableVersion,
575 props: CreateTableProps,
576) -> Result<(PlanRef, PbTable)> {
577 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
579 let (mut columns, pk_column_ids, row_id_index) =
580 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
581
582 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
583 context.session_ctx(),
584 table_name.real_value(),
585 source_watermarks,
586 &columns,
587 )?;
588
589 bind_sql_column_constraints(
590 context.session_ctx(),
591 table_name.real_value(),
592 &mut columns,
593 column_defs,
594 &pk_column_ids,
595 )?;
596 let session = context.session_ctx().clone();
597
598 let db_name = &session.database();
599 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
600
601 let info = CreateTableInfo {
602 columns,
603 pk_column_ids,
604 row_id_index,
605 watermark_descs,
606 source_catalog: None,
607 version,
608 };
609
610 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
611}
612
613fn gen_table_plan_with_source(
614 context: OptimizerContextRef,
615 schema_name: Option<String>,
616 source_catalog: SourceCatalog,
617 version: TableVersion,
618 props: CreateTableProps,
619) -> Result<(PlanRef, PbTable)> {
620 let table_name = source_catalog.name.clone();
621
622 let info = CreateTableInfo {
623 columns: source_catalog.columns.clone(),
624 pk_column_ids: source_catalog.pk_col_ids.clone(),
625 row_id_index: source_catalog.row_id_index,
626 watermark_descs: source_catalog.watermark_descs.clone(),
627 source_catalog: Some(source_catalog),
628 version,
629 };
630
631 gen_table_plan_inner(context, schema_name, table_name, info, props)
632}
633
634#[derive(Clone, Copy)]
636pub enum EitherOnConflict {
637 Ast(Option<OnConflict>),
638 Resolved(ConflictBehavior),
639}
640
641impl From<Option<OnConflict>> for EitherOnConflict {
642 fn from(v: Option<OnConflict>) -> Self {
643 Self::Ast(v)
644 }
645}
646
647impl From<ConflictBehavior> for EitherOnConflict {
648 fn from(v: ConflictBehavior) -> Self {
649 Self::Resolved(v)
650 }
651}
652
653impl EitherOnConflict {
654 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
656 let conflict_behavior = match self {
657 EitherOnConflict::Ast(on_conflict) => {
658 if append_only {
659 if row_id_as_pk {
660 ConflictBehavior::NoCheck
662 } else {
663 if let Some(on_conflict) = on_conflict
665 && on_conflict != OnConflict::Nothing
666 {
667 return Err(ErrorCode::InvalidInputSyntax(
668 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
669 the ON CONFLICT behavior must be DO NOTHING."
670 .to_owned(),
671 )
672 .into());
673 }
674 ConflictBehavior::IgnoreConflict
675 }
676 } else {
677 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
679 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
680 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
681 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
682 }
683 }
684 }
685 EitherOnConflict::Resolved(b) => b,
686 };
687
688 Ok(conflict_behavior)
689 }
690}
691
692pub struct CreateTableInfo {
697 pub columns: Vec<ColumnCatalog>,
698 pub pk_column_ids: Vec<ColumnId>,
699 pub row_id_index: Option<usize>,
700 pub watermark_descs: Vec<WatermarkDesc>,
701 pub source_catalog: Option<SourceCatalog>,
702 pub version: TableVersion,
703}
704
705pub struct CreateTableProps {
710 pub definition: String,
711 pub append_only: bool,
712 pub on_conflict: EitherOnConflict,
713 pub with_version_column: Option<String>,
714 pub webhook_info: Option<PbWebhookSourceInfo>,
715 pub engine: Engine,
716}
717
718#[allow(clippy::too_many_arguments)]
719fn gen_table_plan_inner(
720 context: OptimizerContextRef,
721 schema_name: Option<String>,
722 table_name: String,
723 info: CreateTableInfo,
724 props: CreateTableProps,
725) -> Result<(PlanRef, PbTable)> {
726 let CreateTableInfo {
727 ref columns,
728 row_id_index,
729 ref watermark_descs,
730 ref source_catalog,
731 ..
732 } = info;
733 let CreateTableProps { append_only, .. } = props;
734
735 let (database_id, schema_id) = context
736 .session_ctx()
737 .get_database_and_schema_id_for_create(schema_name)?;
738
739 let session = context.session_ctx().clone();
740 let retention_seconds = context.with_options().retention_seconds();
741
742 let source_node: PlanRef = LogicalSource::new(
743 source_catalog.clone().map(Rc::new),
744 columns.clone(),
745 row_id_index,
746 SourceNodeKind::CreateTable,
747 context.clone(),
748 None,
749 )?
750 .into();
751
752 let required_cols = FixedBitSet::with_capacity(columns.len());
753 let plan_root = PlanRoot::new_with_logical_plan(
754 source_node,
755 RequiredDist::Any,
756 Order::any(),
757 required_cols,
758 vec![],
759 );
760
761 if !append_only && !watermark_descs.is_empty() {
762 return Err(ErrorCode::NotSupported(
763 "Defining watermarks on table requires the table to be append only.".to_owned(),
764 "Use the key words `APPEND ONLY`".to_owned(),
765 )
766 .into());
767 }
768
769 if !append_only && retention_seconds.is_some() {
770 return Err(ErrorCode::NotSupported(
771 "Defining retention seconds on table requires the table to be append only.".to_owned(),
772 "Use the key words `APPEND ONLY`".to_owned(),
773 )
774 .into());
775 }
776
777 let materialize =
778 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
779
780 let mut table = materialize.table().to_prost();
781
782 table.owner = session.user_id();
783 Ok((materialize.into(), table))
784}
785
786#[allow(clippy::too_many_arguments)]
790pub(crate) fn gen_create_table_plan_for_cdc_table(
791 context: OptimizerContextRef,
792 source: Arc<SourceCatalog>,
793 external_table_name: String,
794 column_defs: Vec<ColumnDef>,
795 mut columns: Vec<ColumnCatalog>,
796 pk_names: Vec<String>,
797 cdc_with_options: WithOptionsSecResolved,
798 mut col_id_gen: ColumnIdGenerator,
799 on_conflict: Option<OnConflict>,
800 with_version_column: Option<String>,
801 include_column_options: IncludeOption,
802 table_name: ObjectName,
803 resolved_table_name: String, database_id: DatabaseId,
805 schema_id: SchemaId,
806 table_id: TableId,
807 engine: Engine,
808) -> Result<(PlanRef, PbTable)> {
809 let session = context.session_ctx().clone();
810
811 handle_addition_columns(
813 None,
814 &cdc_with_options,
815 include_column_options,
816 &mut columns,
817 true,
818 )?;
819
820 for c in &mut columns {
821 col_id_gen.generate(c)?;
822 }
823
824 let (mut columns, pk_column_ids, _row_id_index) =
825 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
826
827 bind_sql_column_constraints(
829 context.session_ctx(),
830 table_name.real_value(),
831 &mut columns,
832 column_defs,
833 &pk_column_ids,
834 )?;
835
836 let definition = context.normalized_sql().to_owned();
837
838 let pk_column_indices = {
839 let mut id_to_idx = HashMap::new();
840 columns.iter().enumerate().for_each(|(idx, c)| {
841 id_to_idx.insert(c.column_id(), idx);
842 });
843 pk_column_ids
845 .iter()
846 .map(|c| id_to_idx.get(c).copied().unwrap())
847 .collect_vec()
848 };
849 let table_pk = pk_column_indices
850 .iter()
851 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
852 .collect();
853
854 let (options, secret_refs) = cdc_with_options.into_parts();
855
856 let non_generated_column_descs = columns
857 .iter()
858 .filter(|&c| (!c.is_generated()))
859 .map(|c| c.column_desc.clone())
860 .collect_vec();
861 let non_generated_column_num = non_generated_column_descs.len();
862
863 let cdc_table_desc = CdcTableDesc {
864 table_id,
865 source_id: source.id.into(), external_table_name: external_table_name.clone(),
867 pk: table_pk,
868 columns: non_generated_column_descs,
869 stream_key: pk_column_indices,
870 connect_properties: options,
871 secret_refs,
872 };
873
874 tracing::debug!(?cdc_table_desc, "create cdc table");
875
876 let options = CdcScanOptions::from_with_options(context.with_options())?;
877
878 let logical_scan = LogicalCdcScan::create(
879 external_table_name.clone(),
880 Rc::new(cdc_table_desc),
881 context.clone(),
882 options,
883 );
884
885 let scan_node: PlanRef = logical_scan.into();
886 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
887 let plan_root = PlanRoot::new_with_logical_plan(
888 scan_node,
889 RequiredDist::Any,
890 Order::any(),
891 required_cols,
892 vec![],
893 );
894
895 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
896 let materialize = plan_root.gen_table_plan(
897 context,
898 resolved_table_name,
899 database_id,
900 schema_id,
901 CreateTableInfo {
902 columns,
903 pk_column_ids,
904 row_id_index: None,
905 watermark_descs: vec![],
906 source_catalog: Some((*source).clone()),
907 version: col_id_gen.into_version(),
908 },
909 CreateTableProps {
910 definition,
911 append_only: false,
912 on_conflict: on_conflict.into(),
913 with_version_column,
914 webhook_info: None,
915 engine,
916 },
917 )?;
918
919 let mut table = materialize.table().to_prost();
920 table.owner = session.user_id();
921 table.cdc_table_id = Some(cdc_table_id);
922 table.dependent_relations = vec![source.id];
923
924 Ok((materialize.into(), table))
925}
926
927fn derive_with_options_for_cdc_table(
928 source_with_properties: &WithOptionsSecResolved,
929 external_table_name: String,
930) -> Result<WithOptionsSecResolved> {
931 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
932 let mut with_options = source_with_properties.clone();
934 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
935 match connector.as_str() {
936 MYSQL_CDC_CONNECTOR => {
937 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
940 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
941 })?;
942 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
943 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
944 }
945 POSTGRES_CDC_CONNECTOR => {
946 let (schema_name, table_name) = external_table_name
947 .split_once('.')
948 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
949
950 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
952 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
953 }
954 SQL_SERVER_CDC_CONNECTOR => {
955 let schema_table_name = external_table_name
958 .split_once('.')
959 .ok_or_else(|| {
960 anyhow!("The upstream table name must be in 'database.schema.table' format")
961 })?
962 .1;
963
964 let (schema_name, table_name) =
965 schema_table_name.split_once('.').ok_or_else(|| {
966 anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'")
967 })?;
968
969 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
971 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
972 }
973 _ => {
974 return Err(RwError::from(anyhow!(
975 "connector {} is not supported for cdc table",
976 connector
977 )));
978 }
979 };
980 }
981 Ok(with_options)
982}
983
984#[allow(clippy::too_many_arguments)]
985pub(super) async fn handle_create_table_plan(
986 handler_args: HandlerArgs,
987 explain_options: ExplainOptions,
988 format_encode: Option<FormatEncodeOptions>,
989 cdc_table_info: Option<CdcTableInfo>,
990 table_name: ObjectName,
991 column_defs: Vec<ColumnDef>,
992 wildcard_idx: Option<usize>,
993 constraints: Vec<TableConstraint>,
994 source_watermarks: Vec<SourceWatermark>,
995 append_only: bool,
996 on_conflict: Option<OnConflict>,
997 with_version_column: Option<String>,
998 include_column_options: IncludeOption,
999 webhook_info: Option<WebhookSourceInfo>,
1000 engine: Engine,
1001) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
1002 let col_id_gen = ColumnIdGenerator::new_initial();
1003 let format_encode = check_create_table_with_source(
1004 &handler_args.with_options,
1005 format_encode,
1006 &include_column_options,
1007 &cdc_table_info,
1008 )?;
1009 let webhook_info = webhook_info
1010 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1011 .transpose()?;
1012
1013 let props = CreateTableProps {
1014 definition: handler_args.normalized_sql.clone(),
1015 append_only,
1016 on_conflict: on_conflict.into(),
1017 with_version_column: with_version_column.clone(),
1018 webhook_info,
1019 engine,
1020 };
1021
1022 let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1023 (Some(format_encode), None) => (
1024 gen_create_table_plan_with_source(
1025 handler_args,
1026 explain_options,
1027 table_name.clone(),
1028 column_defs,
1029 wildcard_idx,
1030 constraints,
1031 format_encode,
1032 source_watermarks,
1033 col_id_gen,
1034 include_column_options,
1035 props,
1036 SqlColumnStrategy::FollowChecked,
1037 )
1038 .await?,
1039 TableJobType::General,
1040 ),
1041 (None, None) => {
1042 let context = OptimizerContext::new(handler_args, explain_options);
1043 let (plan, table) = gen_create_table_plan(
1044 context,
1045 table_name.clone(),
1046 column_defs,
1047 constraints,
1048 col_id_gen,
1049 source_watermarks,
1050 props,
1051 false,
1052 )?;
1053
1054 ((plan, None, table), TableJobType::General)
1055 }
1056
1057 (None, Some(cdc_table)) => {
1058 sanity_check_for_table_on_cdc_source(
1059 append_only,
1060 &column_defs,
1061 &wildcard_idx,
1062 &constraints,
1063 &source_watermarks,
1064 )?;
1065
1066 generated_columns_check_for_cdc_table(&column_defs)?;
1067 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1068
1069 let session = &handler_args.session;
1070 let db_name = &session.database();
1071 let user_name = &session.user_name();
1072 let search_path = session.config().search_path();
1073 let (schema_name, resolved_table_name) =
1074 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1075 let (database_id, schema_id) =
1076 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1077
1078 let (format_encode, source_name) =
1080 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1081
1082 let source = {
1083 let catalog_reader = session.env().catalog_reader().read_guard();
1084 let schema_path =
1085 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1086
1087 let (source, _) = catalog_reader.get_source_by_name(
1088 db_name,
1089 schema_path,
1090 source_name.as_str(),
1091 )?;
1092 source.clone()
1093 };
1094 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1095 &source.with_properties,
1096 cdc_table.external_table_name.clone(),
1097 )?;
1098
1099 let (columns, pk_names) = match wildcard_idx {
1100 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1101 None => {
1102 for column_def in &column_defs {
1103 for option_def in &column_def.options {
1104 if let ColumnOption::DefaultValue(_)
1105 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1106 {
1107 return Err(ErrorCode::NotSupported(
1108 "Default value for columns defined on the table created from a CDC source".into(),
1109 "Remove the default value expression in the column definitions".into(),
1110 )
1111 .into());
1112 }
1113 }
1114 }
1115
1116 let (columns, pk_names) =
1117 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1118 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1120 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1121 .context("failed to extract external table config")?;
1122
1123 (columns, pk_names)
1125 }
1126 };
1127
1128 let context: OptimizerContextRef =
1129 OptimizerContext::new(handler_args, explain_options).into();
1130 let (plan, table) = gen_create_table_plan_for_cdc_table(
1131 context,
1132 source,
1133 cdc_table.external_table_name.clone(),
1134 column_defs,
1135 columns,
1136 pk_names,
1137 cdc_with_options,
1138 col_id_gen,
1139 on_conflict,
1140 with_version_column,
1141 include_column_options,
1142 table_name,
1143 resolved_table_name,
1144 database_id,
1145 schema_id,
1146 TableId::placeholder(),
1147 engine,
1148 )?;
1149
1150 ((plan, None, table), TableJobType::SharedCdcSource)
1151 }
1152 (Some(_), Some(_)) => {
1153 return Err(ErrorCode::NotSupported(
1154 "Data format and encoding format doesn't apply to table created from a CDC source"
1155 .into(),
1156 "Remove the FORMAT and ENCODE specification".into(),
1157 )
1158 .into());
1159 }
1160 };
1161 Ok((plan, source, table, job_type))
1162}
1163
1164fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1166 let mut found_generated_column = false;
1167 for column in columns {
1168 let mut is_generated = false;
1169
1170 for option_def in &column.options {
1171 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1172 is_generated = true;
1173 break;
1174 }
1175 }
1176
1177 if is_generated {
1178 found_generated_column = true;
1179 } else if found_generated_column {
1180 return Err(ErrorCode::NotSupported(
1181 "Non-generated column found after a generated column.".into(),
1182 "Ensure that all generated columns appear at the end of the cdc table definition."
1183 .into(),
1184 )
1185 .into());
1186 }
1187 }
1188 Ok(())
1189}
1190
1191fn not_null_check_for_cdc_table(
1193 wildcard_idx: &Option<usize>,
1194 column_defs: &Vec<ColumnDef>,
1195) -> Result<()> {
1196 if !wildcard_idx.is_some()
1197 && column_defs.iter().any(|col| {
1198 col.options
1199 .iter()
1200 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1201 })
1202 {
1203 return Err(ErrorCode::NotSupported(
1204 "CDC table with NOT NULL constraint is not supported".to_owned(),
1205 "Please remove the NOT NULL constraint for columns".to_owned(),
1206 )
1207 .into());
1208 }
1209 Ok(())
1210}
1211
1212fn sanity_check_for_table_on_cdc_source(
1214 append_only: bool,
1215 column_defs: &Vec<ColumnDef>,
1216 wildcard_idx: &Option<usize>,
1217 constraints: &Vec<TableConstraint>,
1218 source_watermarks: &Vec<SourceWatermark>,
1219) -> Result<()> {
1220 if wildcard_idx.is_some() && !column_defs.is_empty() {
1222 return Err(ErrorCode::NotSupported(
1223 "wildcard(*) and column definitions cannot be used together".to_owned(),
1224 "Remove the wildcard or column definitions".to_owned(),
1225 )
1226 .into());
1227 }
1228
1229 if !wildcard_idx.is_some()
1231 && !constraints.iter().any(|c| {
1232 matches!(
1233 c,
1234 TableConstraint::Unique {
1235 is_primary: true,
1236 ..
1237 }
1238 )
1239 })
1240 && !column_defs.iter().any(|col| {
1241 col.options
1242 .iter()
1243 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1244 })
1245 {
1246 return Err(ErrorCode::NotSupported(
1247 "CDC table without primary key constraint is not supported".to_owned(),
1248 "Please define a primary key".to_owned(),
1249 )
1250 .into());
1251 }
1252
1253 if append_only {
1254 return Err(ErrorCode::NotSupported(
1255 "append only modifier on the table created from a CDC source".into(),
1256 "Remove the APPEND ONLY clause".into(),
1257 )
1258 .into());
1259 }
1260
1261 if !source_watermarks.is_empty() {
1262 return Err(ErrorCode::NotSupported(
1263 "watermark defined on the table created from a CDC source".into(),
1264 "Remove the Watermark definitions".into(),
1265 )
1266 .into());
1267 }
1268
1269 Ok(())
1270}
1271
1272async fn bind_cdc_table_schema_externally(
1274 cdc_with_options: WithOptionsSecResolved,
1275) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1276 Feature::CdcTableSchemaMap.check_available().map_err(
1278 |err: risingwave_common::license::FeatureNotAvailable| {
1279 ErrorCode::NotSupported(
1280 err.to_report_string(),
1281 "Please define the schema manually".to_owned(),
1282 )
1283 },
1284 )?;
1285 let (options, secret_refs) = cdc_with_options.into_parts();
1286 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1287 .context("failed to extract external table config")?;
1288
1289 let table = ExternalTableImpl::connect(config)
1290 .await
1291 .context("failed to auto derive table schema")?;
1292 Ok((
1293 table
1294 .column_descs()
1295 .iter()
1296 .cloned()
1297 .map(|column_desc| ColumnCatalog {
1298 column_desc,
1299 is_hidden: false,
1300 })
1301 .collect(),
1302 table.pk_names().clone(),
1303 ))
1304}
1305
1306fn bind_cdc_table_schema(
1308 column_defs: &Vec<ColumnDef>,
1309 constraints: &Vec<TableConstraint>,
1310 is_for_replace_plan: bool,
1311) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1312 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1313
1314 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1315 Ok((columns, pk_names))
1316}
1317
1318#[allow(clippy::too_many_arguments)]
1319pub async fn handle_create_table(
1320 handler_args: HandlerArgs,
1321 table_name: ObjectName,
1322 column_defs: Vec<ColumnDef>,
1323 wildcard_idx: Option<usize>,
1324 constraints: Vec<TableConstraint>,
1325 if_not_exists: bool,
1326 format_encode: Option<FormatEncodeOptions>,
1327 source_watermarks: Vec<SourceWatermark>,
1328 append_only: bool,
1329 on_conflict: Option<OnConflict>,
1330 with_version_column: Option<String>,
1331 cdc_table_info: Option<CdcTableInfo>,
1332 include_column_options: IncludeOption,
1333 webhook_info: Option<WebhookSourceInfo>,
1334 ast_engine: risingwave_sqlparser::ast::Engine,
1335) -> Result<RwPgResponse> {
1336 let session = handler_args.session.clone();
1337
1338 if append_only {
1339 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1340 }
1341
1342 session.check_cluster_limits().await?;
1343
1344 let engine = match ast_engine {
1345 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1346 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1347 };
1348
1349 if let Either::Right(resp) = session.check_relation_name_duplicated(
1350 table_name.clone(),
1351 StatementType::CREATE_TABLE,
1352 if_not_exists,
1353 )? {
1354 return Ok(resp);
1355 }
1356
1357 let (graph, source, table, job_type) = {
1358 let (plan, source, table, job_type) = handle_create_table_plan(
1359 handler_args.clone(),
1360 ExplainOptions::default(),
1361 format_encode,
1362 cdc_table_info,
1363 table_name.clone(),
1364 column_defs.clone(),
1365 wildcard_idx,
1366 constraints.clone(),
1367 source_watermarks,
1368 append_only,
1369 on_conflict,
1370 with_version_column,
1371 include_column_options,
1372 webhook_info,
1373 engine,
1374 )
1375 .await?;
1376
1377 let graph = build_graph(plan)?;
1378
1379 (graph, source, table, job_type)
1380 };
1381
1382 tracing::trace!(
1383 "name={}, graph=\n{}",
1384 table_name,
1385 serde_json::to_string_pretty(&graph).unwrap()
1386 );
1387
1388 match engine {
1390 Engine::Hummock => {
1391 let catalog_writer = session.catalog_writer()?;
1392 catalog_writer
1393 .create_table(source, table, graph, job_type)
1394 .await?;
1395 }
1396 Engine::Iceberg => {
1397 create_iceberg_engine_table(
1398 session,
1399 handler_args,
1400 source,
1401 table,
1402 graph,
1403 job_type,
1404 column_defs,
1405 constraints,
1406 table_name,
1407 )
1408 .await?;
1409 }
1410 }
1411
1412 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1413}
1414
1415#[allow(clippy::too_many_arguments)]
1416pub async fn create_iceberg_engine_table(
1417 session: Arc<SessionImpl>,
1418 handler_args: HandlerArgs,
1419 mut source: Option<PbSource>,
1420 table: PbTable,
1421 graph: StreamFragmentGraph,
1422 job_type: TableJobType,
1423 column_defs: Vec<ColumnDef>,
1424 constraints: Vec<TableConstraint>,
1425 table_name: ObjectName,
1426) -> Result<()> {
1427 let meta_client = session.env().meta_client();
1433 let system_params = meta_client.get_system_params().await?;
1434 let state_store_endpoint = system_params.state_store().to_owned();
1435 let data_directory = system_params.data_directory().to_owned();
1436 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1437
1438 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1439 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1440 })?;
1441 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1442 let meta_store_user = meta_store_endpoint.username().to_owned();
1443 let meta_store_password = meta_store_endpoint
1444 .password()
1445 .ok_or_else(|| {
1446 ErrorCode::InternalError("failed to parse password from meta store endpoint".to_owned())
1447 })?
1448 .to_owned();
1449 let meta_store_host = meta_store_endpoint
1450 .host_str()
1451 .ok_or_else(|| {
1452 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1453 })?
1454 .to_owned();
1455 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1456 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1457 })?;
1458 let meta_store_database = meta_store_endpoint
1459 .path()
1460 .trim_start_matches('/')
1461 .to_owned();
1462
1463 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1464 bail!("failed to parse meta backend: {}", meta_store_backend);
1465 };
1466
1467 let catalog_uri = match meta_backend {
1468 MetaBackend::Postgres => {
1469 format!(
1470 "jdbc:postgresql://{}:{}/{}",
1471 meta_store_host.clone(),
1472 meta_store_port.clone(),
1473 meta_store_database.clone()
1474 )
1475 }
1476 MetaBackend::Mysql => {
1477 format!(
1478 "jdbc:mysql://{}:{}/{}",
1479 meta_store_host.clone(),
1480 meta_store_port.clone(),
1481 meta_store_database.clone()
1482 )
1483 }
1484 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1485 bail!(
1486 "Unsupported meta backend for iceberg engine table: {}",
1487 meta_store_backend
1488 );
1489 }
1490 };
1491
1492 let rw_db_name = session
1493 .env()
1494 .catalog_reader()
1495 .read_guard()
1496 .get_database_by_id(&table.database_id)?
1497 .name()
1498 .to_owned();
1499 let rw_schema_name = session
1500 .env()
1501 .catalog_reader()
1502 .read_guard()
1503 .get_schema_by_id(&table.database_id, &table.schema_id)?
1504 .name()
1505 .clone();
1506 let iceberg_catalog_name = rw_db_name.clone();
1507 let iceberg_database_name = rw_schema_name.clone();
1508 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1509
1510 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1511 let sink_decouple = session.config().sink_decouple();
1512 if matches!(sink_decouple, SinkDecouple::Disable) {
1513 bail!(
1514 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = false` to resolve it"
1515 );
1516 }
1517
1518 let mut connection_ref = BTreeMap::new();
1519 let with_common = if iceberg_engine_connection.is_empty() {
1520 #[allow(dead_code)]
1521 {
1522 let (s3_region, s3_endpoint, s3_ak, s3_sk, warehouse_path) = match state_store_endpoint
1523 {
1524 s3 if s3.starts_with("hummock+s3://") => {
1525 let s3_region = if let Ok(s3_region) = std::env::var("AWS_REGION") {
1526 s3_region
1527 } else {
1528 bail!(
1529 "To create an iceberg engine table with s3 backend, AWS_REGION needed to be set"
1530 );
1531 };
1532 let s3_bucket = s3.strip_prefix("hummock+s3://").unwrap().to_owned();
1533 (
1534 Some(s3_region),
1535 None,
1536 None,
1537 None,
1538 Some(format!(
1539 "s3://{}/{}/iceberg/{}",
1540 s3_bucket, data_directory, iceberg_catalog_name
1541 )),
1542 )
1543 }
1544 minio if minio.starts_with("hummock+minio://") => {
1545 let server = minio.strip_prefix("hummock+minio://").unwrap();
1546 let (access_key_id, rest) = server.split_once(':').unwrap();
1547 let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
1548 let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://")
1549 {
1550 rest = rest_stripped;
1551 "https://"
1552 } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
1553 rest = rest_stripped;
1554 "http://"
1555 } else {
1556 "http://"
1557 };
1558 let (address, s3_bucket) = rest.split_once('/').unwrap();
1559 (
1560 Some("us-east-1".to_owned()),
1561 Some(format!("{}{}", endpoint_prefix, address)),
1562 Some(access_key_id.to_owned()),
1563 Some(secret_access_key.to_owned()),
1564 Some(format!(
1565 "s3://{}/{}/iceberg/{}",
1566 s3_bucket, data_directory, iceberg_catalog_name
1567 )),
1568 )
1569 }
1570 _ => {
1571 bail!(
1572 "iceberg engine can't operate with this state store endpoint: {}",
1573 state_store_endpoint
1574 );
1575 }
1576 };
1577
1578 let mut with_common = BTreeMap::new();
1579 with_common.insert("enable_config_load".to_owned(), "true".to_owned());
1580 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1581 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1582 if let Some(warehouse_path) = warehouse_path.clone() {
1583 with_common.insert("warehouse.path".to_owned(), warehouse_path.to_owned());
1584 }
1585 if let Some(s3_endpoint) = s3_endpoint.clone() {
1586 with_common.insert("s3.endpoint".to_owned(), s3_endpoint);
1587 }
1588 if let Some(s3_ak) = s3_ak.clone() {
1589 with_common.insert("s3.access.key".to_owned(), s3_ak.to_owned());
1590 }
1591 if let Some(s3_sk) = s3_sk.clone() {
1592 with_common.insert("s3.secret.key".to_owned(), s3_sk.to_owned());
1593 }
1594 if let Some(s3_region) = s3_region.clone() {
1595 with_common.insert("s3.region".to_owned(), s3_region.to_owned());
1596 }
1597 with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1598 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1599 with_common.insert(
1600 "catalog.jdbc.password".to_owned(),
1601 meta_store_password.clone(),
1602 );
1603 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1604 with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1605 with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1606 }
1607 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1608 } else {
1609 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1610 assert_eq!(parts.len(), 2);
1611 let connection_catalog =
1612 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1613 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1614 if params.connection_type == ConnectionType::Iceberg as i32 {
1615 connection_ref.insert(
1617 "connection".to_owned(),
1618 ConnectionRefValue {
1619 connection_name: ObjectName::from(vec![
1620 Ident::from(parts[0]),
1621 Ident::from(parts[1]),
1622 ]),
1623 },
1624 );
1625
1626 let mut with_common = BTreeMap::new();
1627 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1628 with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1629 with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1630 with_common
1631 } else {
1632 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1633 "Only iceberg connection could be used in iceberg engine".to_owned(),
1634 )));
1635 }
1636 } else {
1637 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1638 "Private Link Service has been deprecated. Please create a new connection instead."
1639 .to_owned(),
1640 )));
1641 }
1642 };
1643
1644 let mut pks = column_defs
1647 .into_iter()
1648 .filter(|c| {
1649 c.options
1650 .iter()
1651 .any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true }))
1652 })
1653 .map(|c| c.name.to_string())
1654 .collect::<Vec<String>>();
1655
1656 if pks.is_empty() {
1658 pks = constraints
1659 .into_iter()
1660 .filter(|c| {
1661 matches!(
1662 c,
1663 TableConstraint::Unique {
1664 is_primary: true,
1665 ..
1666 }
1667 )
1668 })
1669 .flat_map(|c| match c {
1670 TableConstraint::Unique { columns, .. } => columns
1671 .into_iter()
1672 .map(|c| c.to_string())
1673 .collect::<Vec<String>>(),
1674 _ => vec![],
1675 })
1676 .collect::<Vec<String>>();
1677 }
1678
1679 let sink_from = if pks.is_empty() {
1681 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1682 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1683 "select {} as {}, * from {}",
1684 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1685 ))
1686 .context("unable to parse query")?
1687 .try_into()
1688 .unwrap();
1689
1690 let Statement::Query(query) = &stmt else {
1691 panic!("unexpected statement: {:?}", stmt);
1692 };
1693 CreateSink::AsQuery(query.clone())
1694 } else {
1695 CreateSink::From(table_name.clone())
1696 };
1697
1698 let mut sink_name = table_name.clone();
1699 *sink_name.0.last_mut().unwrap() = Ident::from(
1700 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1701 );
1702 let create_sink_stmt = CreateSinkStatement {
1703 if_not_exists: false,
1704 sink_name,
1705 with_properties: WithProperties(vec![]),
1706 sink_from,
1707 columns: vec![],
1708 emit_mode: None,
1709 sink_schema: None,
1710 into_table_name: None,
1711 };
1712
1713 let mut sink_handler_args = handler_args.clone();
1714
1715 let mut sink_with = with_common.clone();
1716
1717 sink_with.insert("primary_key".to_owned(), pks.join(","));
1718 sink_with.insert("type".to_owned(), "upsert".to_owned());
1719
1720 let commit_checkpoint_interval = handler_args
1721 .with_options
1722 .get(COMMIT_CHECKPOINT_INTERVAL)
1723 .map(|v| v.to_owned())
1724 .unwrap_or_else(|| "60".to_owned());
1725 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1726 ErrorCode::InvalidInputSyntax(format!(
1727 "commit_checkpoint_interval must be a positive integer: {}",
1728 commit_checkpoint_interval
1729 ))
1730 })?;
1731
1732 if commit_checkpoint_interval == 0 {
1733 bail!("commit_checkpoint_interval must be a positive integer: 0");
1734 }
1735
1736 source
1738 .as_mut()
1739 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1740
1741 let sink_decouple = session.config().sink_decouple();
1742 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1743 bail!(
1744 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1745 )
1746 }
1747
1748 sink_with.insert(
1749 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1750 commit_checkpoint_interval.to_string(),
1751 );
1752 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1753
1754 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1755
1756 sink_handler_args.with_options =
1757 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1758
1759 let mut source_name = table_name.clone();
1760 *source_name.0.last_mut().unwrap() = Ident::from(
1761 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1762 );
1763 let create_source_stmt = CreateSourceStatement {
1764 temporary: false,
1765 if_not_exists: false,
1766 columns: vec![],
1767 source_name,
1768 wildcard_idx: Some(0),
1769 constraints: vec![],
1770 with_properties: WithProperties(vec![]),
1771 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1772 source_watermarks: vec![],
1773 include_column_options: vec![],
1774 };
1775
1776 let mut source_handler_args = handler_args.clone();
1777 let source_with = with_common;
1778 source_handler_args.with_options =
1779 WithOptions::new(source_with, Default::default(), connection_ref);
1780
1781 let _ = JVM.get_or_init()?;
1784
1785 let catalog_writer = session.catalog_writer()?;
1786 catalog_writer
1788 .create_table(source, table, graph, job_type)
1789 .await?;
1790 create_sink::handle_create_sink(sink_handler_args, create_sink_stmt).await?;
1791 create_source::handle_create_source(source_handler_args, create_source_stmt).await?;
1792
1793 Ok(())
1794}
1795
1796pub fn check_create_table_with_source(
1797 with_options: &WithOptions,
1798 format_encode: Option<FormatEncodeOptions>,
1799 include_column_options: &IncludeOption,
1800 cdc_table_info: &Option<CdcTableInfo>,
1801) -> Result<Option<FormatEncodeOptions>> {
1802 if cdc_table_info.is_some() {
1804 return Ok(format_encode);
1805 }
1806 let defined_source = with_options.is_source_connector();
1807
1808 if !include_column_options.is_empty() && !defined_source {
1809 return Err(ErrorCode::InvalidInputSyntax(
1810 "INCLUDE should be used with a connector".to_owned(),
1811 )
1812 .into());
1813 }
1814 if defined_source {
1815 format_encode.as_ref().ok_or_else(|| {
1816 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1817 })?;
1818 }
1819 Ok(format_encode)
1820}
1821
1822#[allow(clippy::too_many_arguments)]
1823pub async fn generate_stream_graph_for_replace_table(
1824 _session: &Arc<SessionImpl>,
1825 table_name: ObjectName,
1826 original_catalog: &Arc<TableCatalog>,
1827 handler_args: HandlerArgs,
1828 statement: Statement,
1829 col_id_gen: ColumnIdGenerator,
1830 sql_column_strategy: SqlColumnStrategy,
1831) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
1832 use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
1833
1834 let Statement::CreateTable {
1835 columns,
1836 constraints,
1837 source_watermarks,
1838 append_only,
1839 on_conflict,
1840 with_version_column,
1841 wildcard_idx,
1842 cdc_table_info,
1843 format_encode,
1844 include_column_options,
1845 engine,
1846 with_options,
1847 ..
1848 } = statement
1849 else {
1850 panic!("unexpected statement type: {:?}", statement);
1851 };
1852
1853 let format_encode = format_encode
1854 .clone()
1855 .map(|format_encode| format_encode.into_v2_with_warning());
1856
1857 let engine = match engine {
1858 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1859 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1860 };
1861
1862 let is_drop_connector =
1863 original_catalog.associated_source_id().is_some() && format_encode.is_none();
1864 if is_drop_connector {
1865 debug_assert!(
1866 source_watermarks.is_empty()
1867 && include_column_options.is_empty()
1868 && with_options
1869 .iter()
1870 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
1871 );
1872 }
1873
1874 let props = CreateTableProps {
1875 definition: handler_args.normalized_sql.clone(),
1876 append_only,
1877 on_conflict: on_conflict.into(),
1878 with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
1879 webhook_info: original_catalog.webhook_info.clone(),
1880 engine,
1881 };
1882
1883 let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1884 (Some(format_encode), None) => (
1885 gen_create_table_plan_with_source(
1886 handler_args,
1887 ExplainOptions::default(),
1888 table_name,
1889 columns,
1890 wildcard_idx,
1891 constraints,
1892 format_encode,
1893 source_watermarks,
1894 col_id_gen,
1895 include_column_options,
1896 props,
1897 sql_column_strategy,
1898 )
1899 .await?,
1900 TableJobType::General,
1901 ),
1902 (None, None) => {
1903 let context = OptimizerContext::from_handler_args(handler_args);
1904 let (plan, table) = gen_create_table_plan(
1905 context,
1906 table_name,
1907 columns,
1908 constraints,
1909 col_id_gen,
1910 source_watermarks,
1911 props,
1912 true,
1913 )?;
1914 ((plan, None, table), TableJobType::General)
1915 }
1916 (None, Some(cdc_table)) => {
1917 let session = &handler_args.session;
1918 let (source, resolved_table_name, database_id, schema_id) =
1919 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
1920
1921 let cdc_with_options = derive_with_options_for_cdc_table(
1922 &source.with_properties,
1923 cdc_table.external_table_name.clone(),
1924 )?;
1925
1926 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
1927
1928 let context: OptimizerContextRef =
1929 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
1930 let (plan, table) = gen_create_table_plan_for_cdc_table(
1931 context,
1932 source,
1933 cdc_table.external_table_name.clone(),
1934 columns,
1935 column_catalogs,
1936 pk_names,
1937 cdc_with_options,
1938 col_id_gen,
1939 on_conflict,
1940 with_version_column.map(|x| x.real_value()),
1941 IncludeOption::default(),
1942 table_name,
1943 resolved_table_name,
1944 database_id,
1945 schema_id,
1946 original_catalog.id(),
1947 engine,
1948 )?;
1949
1950 ((plan, None, table), TableJobType::SharedCdcSource)
1951 }
1952 (Some(_), Some(_)) => {
1953 return Err(ErrorCode::NotSupported(
1954 "Data format and encoding format doesn't apply to table created from a CDC source"
1955 .into(),
1956 "Remove the FORMAT and ENCODE specification".into(),
1957 )
1958 .into());
1959 }
1960 };
1961
1962 if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
1964 Err(ErrorCode::InvalidInputSyntax(
1965 "alter primary key of table is not supported".to_owned(),
1966 ))?
1967 }
1968
1969 let graph = build_graph(plan)?;
1970
1971 let mut table = Table {
1973 id: original_catalog.id().table_id(),
1974 ..table
1975 };
1976 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
1977 table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
1978 source_id.table_id,
1979 ));
1980 source.as_mut().unwrap().id = source_id.table_id;
1981 source.as_mut().unwrap().optional_associated_table_id =
1982 Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
1983 }
1984
1985 Ok((graph, table, source, job_type))
1986}
1987
1988fn get_source_and_resolved_table_name(
1989 session: &Arc<SessionImpl>,
1990 cdc_table: CdcTableInfo,
1991 table_name: ObjectName,
1992) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
1993 let db_name = &session.database();
1994 let (schema_name, resolved_table_name) =
1995 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1996 let (database_id, schema_id) =
1997 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1998
1999 let (format_encode, source_name) =
2000 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2001
2002 let source = {
2003 let catalog_reader = session.env().catalog_reader().read_guard();
2004 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2005 let (source, _) = catalog_reader.get_source_by_name(
2006 db_name,
2007 SchemaPath::Name(schema_name.as_str()),
2008 source_name.as_str(),
2009 )?;
2010 source.clone()
2011 };
2012
2013 Ok((source, resolved_table_name, database_id, schema_id))
2014}
2015
2016fn bind_webhook_info(
2018 session: &Arc<SessionImpl>,
2019 columns_defs: &[ColumnDef],
2020 webhook_info: WebhookSourceInfo,
2021) -> Result<PbWebhookSourceInfo> {
2022 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2024 {
2025 return Err(ErrorCode::InvalidInputSyntax(
2026 "Table with webhook source should have exactly one JSONB column".to_owned(),
2027 )
2028 .into());
2029 }
2030
2031 let WebhookSourceInfo {
2032 secret_ref,
2033 signature_expr,
2034 wait_for_persistence,
2035 } = webhook_info;
2036
2037 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2039 let db_name = &session.database();
2040 let (schema_name, secret_name) =
2041 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2042 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2043 (
2044 Some(PbSecretRef {
2045 secret_id: secret_catalog.id.secret_id(),
2046 ref_as: match secret_ref.ref_as {
2047 SecretRefAsType::Text => PbRefAsType::Text,
2048 SecretRefAsType::File => PbRefAsType::File,
2049 }
2050 .into(),
2051 }),
2052 Some(secret_name),
2053 )
2054 } else {
2055 (None, None)
2056 };
2057
2058 let secure_compare_context = SecureCompareContext {
2059 column_name: columns_defs[0].name.real_value(),
2060 secret_name,
2061 };
2062 let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2063 let expr = binder.bind_expr(signature_expr.clone())?;
2064
2065 if expr.as_function_call().is_none()
2067 || expr.as_function_call().unwrap().func_type()
2068 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2069 {
2070 return Err(ErrorCode::InvalidInputSyntax(
2071 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2072 )
2073 .into());
2074 }
2075
2076 let pb_webhook_info = PbWebhookSourceInfo {
2077 secret_ref: pb_secret_ref,
2078 signature_expr: Some(expr.to_expr_proto()),
2079 wait_for_persistence,
2080 };
2081
2082 Ok(pb_webhook_info)
2083}
2084
2085#[cfg(test)]
2086mod tests {
2087 use risingwave_common::catalog::{
2088 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2089 };
2090 use risingwave_common::types::{DataType, StructType};
2091
2092 use super::*;
2093 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2094
2095 #[tokio::test]
2096 async fn test_create_table_handler() {
2097 let sql =
2098 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2099 let frontend = LocalFrontend::new(Default::default()).await;
2100 frontend.run_sql(sql).await.unwrap();
2101
2102 let session = frontend.session_ref();
2103 let catalog_reader = session.env().catalog_reader().read_guard();
2104 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2105
2106 let (table, _) = catalog_reader
2108 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2109 .unwrap();
2110 assert_eq!(table.name(), "t");
2111
2112 let columns = table
2113 .columns
2114 .iter()
2115 .map(|col| (col.name(), col.data_type().clone()))
2116 .collect::<HashMap<&str, DataType>>();
2117
2118 let expected_columns = maplit::hashmap! {
2119 ROW_ID_COLUMN_NAME => DataType::Serial,
2120 "v1" => DataType::Int16,
2121 "v2" => StructType::new(
2122 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2123 )
2124 .with_ids([3, 4, 5].map(ColumnId::new))
2125 .into(),
2126 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2127 };
2128
2129 assert_eq!(columns, expected_columns, "{columns:#?}");
2130 }
2131
2132 #[test]
2133 fn test_bind_primary_key() {
2134 for (sql, expected) in [
2137 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2138 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2139 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2140 (
2141 "create table t (v1 int primary key, v2 int primary key)",
2142 Err("multiple primary keys are not allowed"),
2143 ),
2144 (
2145 "create table t (v1 int primary key primary key, v2 int)",
2146 Err("multiple primary keys are not allowed"),
2147 ),
2148 (
2149 "create table t (v1 int, v2 int, primary key (v1))",
2150 Ok(&[1]),
2151 ),
2152 (
2153 "create table t (v1 int, primary key (v2), v2 int)",
2154 Ok(&[2]),
2155 ),
2156 (
2157 "create table t (primary key (v2, v1), v1 int, v2 int)",
2158 Ok(&[2, 1]),
2159 ),
2160 (
2161 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2162 Err("multiple primary keys are not allowed"),
2163 ),
2164 (
2165 "create table t (v1 int primary key, primary key (v1), v2 int)",
2166 Err("multiple primary keys are not allowed"),
2167 ),
2168 (
2169 "create table t (v1 int, primary key (V3), v2 int)",
2170 Err("column \"v3\" named in key does not exist"),
2171 ),
2172 ] {
2173 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2174 let risingwave_sqlparser::ast::Statement::CreateTable {
2175 columns: column_defs,
2176 constraints,
2177 ..
2178 } = ast.remove(0)
2179 else {
2180 panic!("test case should be create table")
2181 };
2182 let actual: Result<_> = (|| {
2183 let mut columns = bind_sql_columns(&column_defs, false)?;
2184 let mut col_id_gen = ColumnIdGenerator::new_initial();
2185 for c in &mut columns {
2186 col_id_gen.generate(c)?;
2187 }
2188
2189 let pk_names =
2190 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2191 let (_, pk_column_ids, _) =
2192 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2193 Ok(pk_column_ids)
2194 })();
2195 match (expected, actual) {
2196 (Ok(expected), Ok(actual)) => assert_eq!(
2197 expected.iter().copied().map(ColumnId::new).collect_vec(),
2198 actual,
2199 "sql: {sql}"
2200 ),
2201 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2202 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2203 (Err(expected), Err(actual)) => assert!(
2204 actual.to_string().contains(expected),
2205 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2206 ),
2207 }
2208 }
2209 }
2210
2211 #[tokio::test]
2212 async fn test_duplicate_props_options() {
2213 let proto_file = create_proto_file(PROTO_FILE_DATA);
2214 let sql = format!(
2215 r#"CREATE TABLE t
2216 WITH (
2217 connector = 'kinesis',
2218 aws.region='user_test_topic',
2219 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2220 aws.credentials.access_key_id = 'your_access_key_1',
2221 aws.credentials.secret_access_key = 'your_secret_key_1'
2222 )
2223 FORMAT PLAIN ENCODE PROTOBUF (
2224 message = '.test.TestRecord',
2225 aws.credentials.access_key_id = 'your_access_key_2',
2226 aws.credentials.secret_access_key = 'your_secret_key_2',
2227 schema.location = 'file://{}',
2228 )"#,
2229 proto_file.path().to_str().unwrap()
2230 );
2231 let frontend = LocalFrontend::new(Default::default()).await;
2232 frontend.run_sql(sql).await.unwrap();
2233
2234 let session = frontend.session_ref();
2235 let catalog_reader = session.env().catalog_reader().read_guard();
2236 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2237
2238 let (source, _) = catalog_reader
2240 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2241 .unwrap();
2242 assert_eq!(source.name, "t");
2243
2244 assert_eq!(
2246 source
2247 .info
2248 .format_encode_options
2249 .get("aws.credentials.access_key_id")
2250 .unwrap(),
2251 "your_access_key_2"
2252 );
2253 assert_eq!(
2254 source
2255 .info
2256 .format_encode_options
2257 .get("aws.credentials.secret_access_key")
2258 .unwrap(),
2259 "your_secret_key_2"
2260 );
2261
2262 assert_eq!(
2264 source
2265 .with_properties
2266 .get("aws.credentials.access_key_id")
2267 .unwrap(),
2268 "your_access_key_1"
2269 );
2270 assert_eq!(
2271 source
2272 .with_properties
2273 .get("aws.credentials.secret_access_key")
2274 .unwrap(),
2275 "your_secret_key_1"
2276 );
2277
2278 assert!(!source.with_properties.contains_key("schema.location"));
2280 }
2281}