1use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
34use risingwave_common::util::value_encoding::DatumToProtoExt;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::jvm_runtime::JVM;
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40 DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
41};
42use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
43use risingwave_pb::catalog::connection::Info as ConnectionInfo;
44use risingwave_pb::catalog::connection_params::ConnectionType;
45use risingwave_pb::catalog::source::OptionalAssociatedTableId;
46use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76 bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
80use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
81use crate::optimizer::property::{Order, RequiredDist};
82use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
83use crate::session::SessionImpl;
84use crate::session::current::notice_to_user;
85use crate::stream_fragmenter::{GraphJobType, build_graph};
86use crate::utils::OverwriteOptions;
87use crate::{Binder, Explain, TableCatalog, WithOptions};
88
89mod col_id_gen;
90pub use col_id_gen::*;
91use risingwave_connector::sink::iceberg::parse_partition_by_exprs;
92
93use crate::handler::drop_table::handle_drop_table;
94
95fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
96 for option_def in &c.options {
97 match option_def.option {
98 ColumnOption::GeneratedColumns(_) => {}
99 ColumnOption::DefaultValue(_) => {}
100 ColumnOption::DefaultValueInternal { .. } => {}
101 ColumnOption::Unique { is_primary: true } => {}
102 ColumnOption::Null => {}
103 ColumnOption::NotNull => {}
104 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
105 }
106 }
107 Ok(())
108}
109
110pub fn bind_sql_columns(
114 column_defs: &[ColumnDef],
115 is_for_drop_table_connector: bool,
116) -> Result<Vec<ColumnCatalog>> {
117 let mut columns = Vec::with_capacity(column_defs.len());
118
119 for column in column_defs {
120 ensure_column_options_supported(column)?;
121 let ColumnDef {
125 name,
126 data_type,
127 collation,
128 options,
129 ..
130 } = column;
131
132 let data_type = data_type
133 .clone()
134 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
135 if let Some(collation) = collation {
136 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
142 bail_not_implemented!(
143 "Collate collation other than `C` or `POSIX` is not implemented"
144 );
145 }
146
147 match data_type {
148 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
149 _ => {
150 return Err(ErrorCode::NotSupported(
151 format!("{} is not a collatable data type", data_type),
152 "The only built-in collatable data types are `varchar`, please check your type".into(),
153 ).into());
154 }
155 }
156 }
157
158 if !is_for_drop_table_connector {
159 check_column_name_not_reserved(&name.real_value())?;
163 }
164
165 let nullable: bool = !options
166 .iter()
167 .any(|def| matches!(def.option, ColumnOption::NotNull));
168
169 columns.push(ColumnCatalog {
170 column_desc: ColumnDesc {
171 data_type: bind_data_type(&data_type)?,
172 column_id: ColumnId::placeholder(),
173 name: name.real_value(),
174 generated_or_default_column: None,
175 description: None,
176 additional_column: AdditionalColumn { column_type: None },
177 version: ColumnDescVersion::LATEST,
178 system_column: None,
179 nullable,
180 },
181 is_hidden: false,
182 });
183 }
184
185 Ok(columns)
186}
187
188fn check_generated_column_constraints(
189 column_name: &String,
190 column_id: ColumnId,
191 expr: &ExprImpl,
192 column_catalogs: &[ColumnCatalog],
193 generated_column_names: &[String],
194 pk_column_ids: &[ColumnId],
195) -> Result<()> {
196 let input_refs = expr.collect_input_refs(column_catalogs.len());
197 for idx in input_refs.ones() {
198 let referred_generated_column = &column_catalogs[idx].column_desc.name;
199 if generated_column_names
200 .iter()
201 .any(|c| c == referred_generated_column)
202 {
203 return Err(ErrorCode::BindError(format!(
204 "Generated can not reference another generated column. \
205 But here generated column \"{}\" referenced another generated column \"{}\"",
206 column_name, referred_generated_column
207 ))
208 .into());
209 }
210 }
211
212 if pk_column_ids.contains(&column_id) && expr.is_impure() {
213 return Err(ErrorCode::BindError(format!(
214 "Generated columns with impure expressions should not be part of the primary key. \
215 Here column \"{}\" is defined as part of the primary key.",
216 column_name
217 ))
218 .into());
219 }
220
221 Ok(())
222}
223
224pub fn bind_sql_column_constraints(
227 session: &SessionImpl,
228 table_name: String,
229 column_catalogs: &mut [ColumnCatalog],
230 columns: Vec<ColumnDef>,
231 pk_column_ids: &[ColumnId],
232) -> Result<()> {
233 let generated_column_names = {
234 let mut names = vec![];
235 for column in &columns {
236 for option_def in &column.options {
237 if let ColumnOption::GeneratedColumns(_) = option_def.option {
238 names.push(column.name.real_value());
239 break;
240 }
241 }
242 }
243 names
244 };
245
246 let mut binder = Binder::new_for_ddl(session);
247 binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
248
249 for column in columns {
250 let Some(idx) = column_catalogs
251 .iter()
252 .position(|c| c.name() == column.name.real_value())
253 else {
254 continue;
257 };
258
259 for option_def in column.options {
260 match option_def.option {
261 ColumnOption::GeneratedColumns(expr) => {
262 binder.set_clause(Some(Clause::GeneratedColumn));
263
264 let expr_impl = binder.bind_expr(expr).with_context(|| {
265 format!(
266 "fail to bind expression in generated column \"{}\"",
267 column.name.real_value()
268 )
269 })?;
270
271 check_generated_column_constraints(
272 &column.name.real_value(),
273 column_catalogs[idx].column_id(),
274 &expr_impl,
275 column_catalogs,
276 &generated_column_names,
277 pk_column_ids,
278 )?;
279
280 column_catalogs[idx].column_desc.generated_or_default_column = Some(
281 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
282 expr: Some(expr_impl.to_expr_proto()),
283 }),
284 );
285 binder.set_clause(None);
286 }
287 ColumnOption::DefaultValue(expr) => {
288 let expr_impl = binder
289 .bind_expr(expr)?
290 .cast_assign(column_catalogs[idx].data_type().clone())?;
291
292 let rewritten_expr_impl = session
300 .pinned_snapshot()
301 .inline_now_proc_time()
302 .rewrite_expr(expr_impl.clone());
303
304 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
305 let snapshot_value = snapshot_value?;
306
307 column_catalogs[idx].column_desc.generated_or_default_column =
308 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
309 snapshot_value: Some(snapshot_value.to_protobuf()),
310 expr: Some(expr_impl.to_expr_proto()),
311 }));
313 } else {
314 return Err(ErrorCode::BindError(format!(
315 "Default expression used in column `{}` cannot be evaluated. \
316 Use generated columns instead if you mean to reference other columns.",
317 column.name
318 ))
319 .into());
320 }
321 }
322 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
323 if persisted.is_empty() {
328 bail_bind_error!(
329 "DEFAULT INTERNAL is only used for internal purposes, \
330 please specify a concrete default value"
331 );
332 }
333
334 let desc = DefaultColumnDesc::decode(&*persisted)
335 .expect("failed to decode persisted `DefaultColumnDesc`");
336
337 column_catalogs[idx].column_desc.generated_or_default_column =
338 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
339 }
340 _ => {}
341 }
342 }
343 }
344 Ok(())
345}
346
347pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
349 let mut pk_column_names = vec![];
350
351 for constraint in table_constraints {
352 match constraint {
353 TableConstraint::Unique {
354 name: _,
355 columns,
356 is_primary: true,
357 } => {
358 if !pk_column_names.is_empty() {
359 return Err(multiple_pk_definition_err());
360 }
361 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
362 }
363 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
364 }
365 }
366 Ok(pk_column_names)
367}
368
369pub fn bind_sql_pk_names(
370 columns_defs: &[ColumnDef],
371 pk_names_from_table_constraints: Vec<String>,
372) -> Result<Vec<String>> {
373 let mut pk_column_names = pk_names_from_table_constraints;
374
375 for column in columns_defs {
376 for option_def in &column.options {
377 if let ColumnOption::Unique { is_primary: true } = option_def.option {
378 if !pk_column_names.is_empty() {
379 return Err(multiple_pk_definition_err());
380 }
381 pk_column_names.push(column.name.real_value());
382 };
383 }
384 }
385
386 Ok(pk_column_names)
387}
388
389fn multiple_pk_definition_err() -> RwError {
390 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
391}
392
393pub fn bind_pk_and_row_id_on_relation(
398 mut columns: Vec<ColumnCatalog>,
399 pk_names: Vec<String>,
400 must_need_pk: bool,
401) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
402 for c in &columns {
403 assert!(c.column_id() != ColumnId::placeholder());
404 }
405
406 let name_to_id = columns
408 .iter()
409 .map(|c| (c.name(), c.column_id()))
410 .collect::<HashMap<_, _>>();
411
412 let mut pk_column_ids: Vec<_> = pk_names
413 .iter()
414 .map(|name| {
415 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
416 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
417 })
418 })
419 .try_collect()?;
420
421 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
423
424 let row_id_index = need_row_id.then(|| {
425 let column = ColumnCatalog::row_id_column();
426 let index = columns.len();
427 pk_column_ids = vec![column.column_id()];
428 columns.push(column);
429 index
430 });
431
432 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
433 Err(ErrorCode::InvalidInputSyntax(format!(
434 "column \"{col}\" specified more than once"
435 )))?;
436 }
437
438 Ok((columns, pk_column_ids, row_id_index))
439}
440
441#[allow(clippy::too_many_arguments)]
444pub(crate) async fn gen_create_table_plan_with_source(
445 mut handler_args: HandlerArgs,
446 explain_options: ExplainOptions,
447 table_name: ObjectName,
448 column_defs: Vec<ColumnDef>,
449 wildcard_idx: Option<usize>,
450 constraints: Vec<TableConstraint>,
451 format_encode: FormatEncodeOptions,
452 source_watermarks: Vec<SourceWatermark>,
453 mut col_id_gen: ColumnIdGenerator,
454 include_column_options: IncludeOption,
455 props: CreateTableProps,
456 sql_column_strategy: SqlColumnStrategy,
457) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
458 if props.append_only
459 && format_encode.format != Format::Plain
460 && format_encode.format != Format::Native
461 {
462 return Err(ErrorCode::BindError(format!(
463 "Append only table does not support format {}.",
464 format_encode.format
465 ))
466 .into());
467 }
468
469 let session = &handler_args.session;
470 let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
471 if with_properties.is_shareable_cdc_connector() {
472 generated_columns_check_for_cdc_table(&column_defs)?;
473 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
474 } else if column_defs.iter().any(|col| {
475 col.options
476 .iter()
477 .any(|def| matches!(def.option, ColumnOption::NotNull))
478 }) {
479 notice_to_user(
481 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
482 );
483 }
484
485 let db_name: &str = &session.database();
486 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
487
488 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
490 session,
491 &format_encode,
492 Either::Left(&with_properties),
493 CreateSourceType::Table,
494 )
495 .await?;
496
497 let overwrite_options = OverwriteOptions::new(&mut handler_args);
498 let rate_limit = overwrite_options.source_rate_limit;
499 let source_catalog = bind_create_source_or_table_with_connector(
500 handler_args.clone(),
501 table_name,
502 format_encode,
503 with_properties,
504 &column_defs,
505 constraints,
506 wildcard_idx,
507 source_watermarks,
508 columns_from_resolve_source,
509 source_info,
510 include_column_options,
511 &mut col_id_gen,
512 CreateSourceType::Table,
513 rate_limit,
514 sql_column_strategy,
515 )
516 .await?;
517
518 let pb_source = source_catalog.to_prost();
519
520 let context = OptimizerContext::new(handler_args, explain_options);
521
522 let (plan, table) = gen_table_plan_with_source(
523 context.into(),
524 schema_name,
525 source_catalog,
526 col_id_gen.into_version(),
527 props,
528 )?;
529
530 Ok((plan, Some(pb_source), table))
531}
532
533#[allow(clippy::too_many_arguments)]
536pub(crate) fn gen_create_table_plan(
537 context: OptimizerContext,
538 table_name: ObjectName,
539 column_defs: Vec<ColumnDef>,
540 constraints: Vec<TableConstraint>,
541 mut col_id_gen: ColumnIdGenerator,
542 source_watermarks: Vec<SourceWatermark>,
543 props: CreateTableProps,
544 is_for_replace_plan: bool,
545) -> Result<(PlanRef, PbTable)> {
546 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
547 for c in &mut columns {
548 col_id_gen.generate(c)?;
549 }
550
551 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
552 if !secret_refs.is_empty() || !connection_refs.is_empty() {
553 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
554 }
555
556 gen_create_table_plan_without_source(
557 context,
558 table_name,
559 columns,
560 column_defs,
561 constraints,
562 source_watermarks,
563 col_id_gen.into_version(),
564 props,
565 )
566}
567
568#[allow(clippy::too_many_arguments)]
569pub(crate) fn gen_create_table_plan_without_source(
570 context: OptimizerContext,
571 table_name: ObjectName,
572 columns: Vec<ColumnCatalog>,
573 column_defs: Vec<ColumnDef>,
574 constraints: Vec<TableConstraint>,
575 source_watermarks: Vec<SourceWatermark>,
576 version: TableVersion,
577 props: CreateTableProps,
578) -> Result<(PlanRef, PbTable)> {
579 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
581 let (mut columns, pk_column_ids, row_id_index) =
582 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
583
584 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
585 context.session_ctx(),
586 table_name.real_value(),
587 source_watermarks,
588 &columns,
589 )?;
590
591 bind_sql_column_constraints(
592 context.session_ctx(),
593 table_name.real_value(),
594 &mut columns,
595 column_defs,
596 &pk_column_ids,
597 )?;
598 let session = context.session_ctx().clone();
599
600 let db_name = &session.database();
601 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
602
603 let info = CreateTableInfo {
604 columns,
605 pk_column_ids,
606 row_id_index,
607 watermark_descs,
608 source_catalog: None,
609 version,
610 };
611
612 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
613}
614
615fn gen_table_plan_with_source(
616 context: OptimizerContextRef,
617 schema_name: Option<String>,
618 source_catalog: SourceCatalog,
619 version: TableVersion,
620 props: CreateTableProps,
621) -> Result<(PlanRef, PbTable)> {
622 let table_name = source_catalog.name.clone();
623
624 let info = CreateTableInfo {
625 columns: source_catalog.columns.clone(),
626 pk_column_ids: source_catalog.pk_col_ids.clone(),
627 row_id_index: source_catalog.row_id_index,
628 watermark_descs: source_catalog.watermark_descs.clone(),
629 source_catalog: Some(source_catalog),
630 version,
631 };
632
633 gen_table_plan_inner(context, schema_name, table_name, info, props)
634}
635
636#[derive(Clone, Copy)]
638pub enum EitherOnConflict {
639 Ast(Option<OnConflict>),
640 Resolved(ConflictBehavior),
641}
642
643impl From<Option<OnConflict>> for EitherOnConflict {
644 fn from(v: Option<OnConflict>) -> Self {
645 Self::Ast(v)
646 }
647}
648
649impl From<ConflictBehavior> for EitherOnConflict {
650 fn from(v: ConflictBehavior) -> Self {
651 Self::Resolved(v)
652 }
653}
654
655impl EitherOnConflict {
656 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
658 let conflict_behavior = match self {
659 EitherOnConflict::Ast(on_conflict) => {
660 if append_only {
661 if row_id_as_pk {
662 ConflictBehavior::NoCheck
664 } else {
665 if let Some(on_conflict) = on_conflict
667 && on_conflict != OnConflict::Nothing
668 {
669 return Err(ErrorCode::InvalidInputSyntax(
670 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
671 the ON CONFLICT behavior must be DO NOTHING."
672 .to_owned(),
673 )
674 .into());
675 }
676 ConflictBehavior::IgnoreConflict
677 }
678 } else {
679 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
681 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
682 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
683 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
684 }
685 }
686 }
687 EitherOnConflict::Resolved(b) => b,
688 };
689
690 Ok(conflict_behavior)
691 }
692}
693
694pub struct CreateTableInfo {
699 pub columns: Vec<ColumnCatalog>,
700 pub pk_column_ids: Vec<ColumnId>,
701 pub row_id_index: Option<usize>,
702 pub watermark_descs: Vec<WatermarkDesc>,
703 pub source_catalog: Option<SourceCatalog>,
704 pub version: TableVersion,
705}
706
707pub struct CreateTableProps {
712 pub definition: String,
713 pub append_only: bool,
714 pub on_conflict: EitherOnConflict,
715 pub with_version_column: Option<String>,
716 pub webhook_info: Option<PbWebhookSourceInfo>,
717 pub engine: Engine,
718}
719
720#[allow(clippy::too_many_arguments)]
721fn gen_table_plan_inner(
722 context: OptimizerContextRef,
723 schema_name: Option<String>,
724 table_name: String,
725 info: CreateTableInfo,
726 props: CreateTableProps,
727) -> Result<(PlanRef, PbTable)> {
728 let CreateTableInfo {
729 ref columns,
730 row_id_index,
731 ref watermark_descs,
732 ref source_catalog,
733 ..
734 } = info;
735 let CreateTableProps { append_only, .. } = props;
736
737 let (database_id, schema_id) = context
738 .session_ctx()
739 .get_database_and_schema_id_for_create(schema_name)?;
740
741 let session = context.session_ctx().clone();
742 let retention_seconds = context.with_options().retention_seconds();
743
744 let source_node: PlanRef = LogicalSource::new(
745 source_catalog.clone().map(Rc::new),
746 columns.clone(),
747 row_id_index,
748 SourceNodeKind::CreateTable,
749 context.clone(),
750 None,
751 )?
752 .into();
753
754 let required_cols = FixedBitSet::with_capacity(columns.len());
755 let plan_root = PlanRoot::new_with_logical_plan(
756 source_node,
757 RequiredDist::Any,
758 Order::any(),
759 required_cols,
760 vec![],
761 );
762
763 if !append_only && !watermark_descs.is_empty() {
764 return Err(ErrorCode::NotSupported(
765 "Defining watermarks on table requires the table to be append only.".to_owned(),
766 "Use the key words `APPEND ONLY`".to_owned(),
767 )
768 .into());
769 }
770
771 if !append_only && retention_seconds.is_some() {
772 return Err(ErrorCode::NotSupported(
773 "Defining retention seconds on table requires the table to be append only.".to_owned(),
774 "Use the key words `APPEND ONLY`".to_owned(),
775 )
776 .into());
777 }
778
779 let materialize =
780 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
781
782 let mut table = materialize.table().to_prost();
783
784 table.owner = session.user_id();
785 Ok((materialize.into(), table))
786}
787
788#[allow(clippy::too_many_arguments)]
792pub(crate) fn gen_create_table_plan_for_cdc_table(
793 context: OptimizerContextRef,
794 source: Arc<SourceCatalog>,
795 external_table_name: String,
796 column_defs: Vec<ColumnDef>,
797 mut columns: Vec<ColumnCatalog>,
798 pk_names: Vec<String>,
799 cdc_with_options: WithOptionsSecResolved,
800 mut col_id_gen: ColumnIdGenerator,
801 on_conflict: Option<OnConflict>,
802 with_version_column: Option<String>,
803 include_column_options: IncludeOption,
804 table_name: ObjectName,
805 resolved_table_name: String, database_id: DatabaseId,
807 schema_id: SchemaId,
808 table_id: TableId,
809 engine: Engine,
810) -> Result<(PlanRef, PbTable)> {
811 let session = context.session_ctx().clone();
812
813 handle_addition_columns(
815 None,
816 &cdc_with_options,
817 include_column_options,
818 &mut columns,
819 true,
820 )?;
821
822 for c in &mut columns {
823 col_id_gen.generate(c)?;
824 }
825
826 let (mut columns, pk_column_ids, _row_id_index) =
827 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
828
829 bind_sql_column_constraints(
831 context.session_ctx(),
832 table_name.real_value(),
833 &mut columns,
834 column_defs,
835 &pk_column_ids,
836 )?;
837
838 let definition = context.normalized_sql().to_owned();
839
840 let pk_column_indices = {
841 let mut id_to_idx = HashMap::new();
842 columns.iter().enumerate().for_each(|(idx, c)| {
843 id_to_idx.insert(c.column_id(), idx);
844 });
845 pk_column_ids
847 .iter()
848 .map(|c| id_to_idx.get(c).copied().unwrap())
849 .collect_vec()
850 };
851 let table_pk = pk_column_indices
852 .iter()
853 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
854 .collect();
855
856 let (options, secret_refs) = cdc_with_options.into_parts();
857
858 let non_generated_column_descs = columns
859 .iter()
860 .filter(|&c| (!c.is_generated()))
861 .map(|c| c.column_desc.clone())
862 .collect_vec();
863 let non_generated_column_num = non_generated_column_descs.len();
864
865 let cdc_table_desc = CdcTableDesc {
866 table_id,
867 source_id: source.id.into(), external_table_name: external_table_name.clone(),
869 pk: table_pk,
870 columns: non_generated_column_descs,
871 stream_key: pk_column_indices,
872 connect_properties: options,
873 secret_refs,
874 };
875
876 tracing::debug!(?cdc_table_desc, "create cdc table");
877
878 let options = CdcScanOptions::from_with_options(context.with_options())?;
879
880 let logical_scan = LogicalCdcScan::create(
881 external_table_name.clone(),
882 Rc::new(cdc_table_desc),
883 context.clone(),
884 options,
885 );
886
887 let scan_node: PlanRef = logical_scan.into();
888 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
889 let plan_root = PlanRoot::new_with_logical_plan(
890 scan_node,
891 RequiredDist::Any,
892 Order::any(),
893 required_cols,
894 vec![],
895 );
896
897 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
898 let materialize = plan_root.gen_table_plan(
899 context,
900 resolved_table_name,
901 database_id,
902 schema_id,
903 CreateTableInfo {
904 columns,
905 pk_column_ids,
906 row_id_index: None,
907 watermark_descs: vec![],
908 source_catalog: Some((*source).clone()),
909 version: col_id_gen.into_version(),
910 },
911 CreateTableProps {
912 definition,
913 append_only: false,
914 on_conflict: on_conflict.into(),
915 with_version_column,
916 webhook_info: None,
917 engine,
918 },
919 )?;
920
921 let mut table = materialize.table().to_prost();
922 table.owner = session.user_id();
923 table.cdc_table_id = Some(cdc_table_id);
924 table.dependent_relations = vec![source.id];
925
926 Ok((materialize.into(), table))
927}
928
929fn derive_with_options_for_cdc_table(
930 source_with_properties: &WithOptionsSecResolved,
931 external_table_name: String,
932) -> Result<WithOptionsSecResolved> {
933 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
934 let source_database_name: &str = source_with_properties
936 .get("database.name")
937 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
938 .as_str();
939 let mut with_options = source_with_properties.clone();
940 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
941 match connector.as_str() {
942 MYSQL_CDC_CONNECTOR => {
943 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
946 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
947 })?;
948 if !source_database_name
950 .split(',')
951 .map(|s| s.trim())
952 .any(|name| name == db_name)
953 {
954 return Err(anyhow!(
955 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
956 db_name,
957 source_database_name
958 ).into());
959 }
960 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
961 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
962 }
963 POSTGRES_CDC_CONNECTOR => {
964 let (schema_name, table_name) = external_table_name
965 .split_once('.')
966 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
967
968 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
970 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
971 }
972 SQL_SERVER_CDC_CONNECTOR => {
973 let (db_name, schema_table_name) =
976 external_table_name.split_once('.').ok_or_else(|| {
977 anyhow!("The upstream table name must be in 'database.schema.table' format")
978 })?;
979
980 if db_name != source_database_name {
982 return Err(anyhow!(
983 "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
984 db_name,
985 source_database_name
986 ).into());
987 }
988
989 let (schema_name, table_name) =
990 schema_table_name.split_once('.').ok_or_else(|| {
991 anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'")
992 })?;
993
994 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
996 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
997 }
998 _ => {
999 return Err(RwError::from(anyhow!(
1000 "connector {} is not supported for cdc table",
1001 connector
1002 )));
1003 }
1004 };
1005 }
1006 Ok(with_options)
1007}
1008
1009#[allow(clippy::too_many_arguments)]
1010pub(super) async fn handle_create_table_plan(
1011 handler_args: HandlerArgs,
1012 explain_options: ExplainOptions,
1013 format_encode: Option<FormatEncodeOptions>,
1014 cdc_table_info: Option<CdcTableInfo>,
1015 table_name: ObjectName,
1016 column_defs: Vec<ColumnDef>,
1017 wildcard_idx: Option<usize>,
1018 constraints: Vec<TableConstraint>,
1019 source_watermarks: Vec<SourceWatermark>,
1020 append_only: bool,
1021 on_conflict: Option<OnConflict>,
1022 with_version_column: Option<String>,
1023 include_column_options: IncludeOption,
1024 webhook_info: Option<WebhookSourceInfo>,
1025 engine: Engine,
1026) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
1027 let col_id_gen = ColumnIdGenerator::new_initial();
1028 let format_encode = check_create_table_with_source(
1029 &handler_args.with_options,
1030 format_encode,
1031 &include_column_options,
1032 &cdc_table_info,
1033 )?;
1034 let webhook_info = webhook_info
1035 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1036 .transpose()?;
1037
1038 let props = CreateTableProps {
1039 definition: handler_args.normalized_sql.clone(),
1040 append_only,
1041 on_conflict: on_conflict.into(),
1042 with_version_column: with_version_column.clone(),
1043 webhook_info,
1044 engine,
1045 };
1046
1047 let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1048 (Some(format_encode), None) => (
1049 gen_create_table_plan_with_source(
1050 handler_args,
1051 explain_options,
1052 table_name.clone(),
1053 column_defs,
1054 wildcard_idx,
1055 constraints,
1056 format_encode,
1057 source_watermarks,
1058 col_id_gen,
1059 include_column_options,
1060 props,
1061 SqlColumnStrategy::FollowChecked,
1062 )
1063 .await?,
1064 TableJobType::General,
1065 ),
1066 (None, None) => {
1067 let context = OptimizerContext::new(handler_args, explain_options);
1068 let (plan, table) = gen_create_table_plan(
1069 context,
1070 table_name.clone(),
1071 column_defs,
1072 constraints,
1073 col_id_gen,
1074 source_watermarks,
1075 props,
1076 false,
1077 )?;
1078
1079 ((plan, None, table), TableJobType::General)
1080 }
1081
1082 (None, Some(cdc_table)) => {
1083 sanity_check_for_table_on_cdc_source(
1084 append_only,
1085 &column_defs,
1086 &wildcard_idx,
1087 &constraints,
1088 &source_watermarks,
1089 )?;
1090
1091 generated_columns_check_for_cdc_table(&column_defs)?;
1092 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1093
1094 let session = &handler_args.session;
1095 let db_name = &session.database();
1096 let user_name = &session.user_name();
1097 let search_path = session.config().search_path();
1098 let (schema_name, resolved_table_name) =
1099 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1100 let (database_id, schema_id) =
1101 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1102
1103 let (format_encode, source_name) =
1105 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1106
1107 let source = {
1108 let catalog_reader = session.env().catalog_reader().read_guard();
1109 let schema_path =
1110 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1111
1112 let (source, _) = catalog_reader.get_source_by_name(
1113 db_name,
1114 schema_path,
1115 source_name.as_str(),
1116 )?;
1117 source.clone()
1118 };
1119 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1120 &source.with_properties,
1121 cdc_table.external_table_name.clone(),
1122 )?;
1123
1124 let (columns, pk_names) = match wildcard_idx {
1125 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1126 None => {
1127 for column_def in &column_defs {
1128 for option_def in &column_def.options {
1129 if let ColumnOption::DefaultValue(_)
1130 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1131 {
1132 return Err(ErrorCode::NotSupported(
1133 "Default value for columns defined on the table created from a CDC source".into(),
1134 "Remove the default value expression in the column definitions".into(),
1135 )
1136 .into());
1137 }
1138 }
1139 }
1140
1141 let (columns, pk_names) =
1142 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1143 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1145 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1146 .context("failed to extract external table config")?;
1147
1148 (columns, pk_names)
1150 }
1151 };
1152
1153 let context: OptimizerContextRef =
1154 OptimizerContext::new(handler_args, explain_options).into();
1155 let (plan, table) = gen_create_table_plan_for_cdc_table(
1156 context,
1157 source,
1158 cdc_table.external_table_name.clone(),
1159 column_defs,
1160 columns,
1161 pk_names,
1162 cdc_with_options,
1163 col_id_gen,
1164 on_conflict,
1165 with_version_column,
1166 include_column_options,
1167 table_name,
1168 resolved_table_name,
1169 database_id,
1170 schema_id,
1171 TableId::placeholder(),
1172 engine,
1173 )?;
1174
1175 ((plan, None, table), TableJobType::SharedCdcSource)
1176 }
1177 (Some(_), Some(_)) => {
1178 return Err(ErrorCode::NotSupported(
1179 "Data format and encoding format doesn't apply to table created from a CDC source"
1180 .into(),
1181 "Remove the FORMAT and ENCODE specification".into(),
1182 )
1183 .into());
1184 }
1185 };
1186 Ok((plan, source, table, job_type))
1187}
1188
1189fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1191 let mut found_generated_column = false;
1192 for column in columns {
1193 let mut is_generated = false;
1194
1195 for option_def in &column.options {
1196 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1197 is_generated = true;
1198 break;
1199 }
1200 }
1201
1202 if is_generated {
1203 found_generated_column = true;
1204 } else if found_generated_column {
1205 return Err(ErrorCode::NotSupported(
1206 "Non-generated column found after a generated column.".into(),
1207 "Ensure that all generated columns appear at the end of the cdc table definition."
1208 .into(),
1209 )
1210 .into());
1211 }
1212 }
1213 Ok(())
1214}
1215
1216fn not_null_check_for_cdc_table(
1218 wildcard_idx: &Option<usize>,
1219 column_defs: &Vec<ColumnDef>,
1220) -> Result<()> {
1221 if !wildcard_idx.is_some()
1222 && column_defs.iter().any(|col| {
1223 col.options
1224 .iter()
1225 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1226 })
1227 {
1228 return Err(ErrorCode::NotSupported(
1229 "CDC table with NOT NULL constraint is not supported".to_owned(),
1230 "Please remove the NOT NULL constraint for columns".to_owned(),
1231 )
1232 .into());
1233 }
1234 Ok(())
1235}
1236
1237fn sanity_check_for_table_on_cdc_source(
1239 append_only: bool,
1240 column_defs: &Vec<ColumnDef>,
1241 wildcard_idx: &Option<usize>,
1242 constraints: &Vec<TableConstraint>,
1243 source_watermarks: &Vec<SourceWatermark>,
1244) -> Result<()> {
1245 if wildcard_idx.is_some() && !column_defs.is_empty() {
1247 return Err(ErrorCode::NotSupported(
1248 "wildcard(*) and column definitions cannot be used together".to_owned(),
1249 "Remove the wildcard or column definitions".to_owned(),
1250 )
1251 .into());
1252 }
1253
1254 if !wildcard_idx.is_some()
1256 && !constraints.iter().any(|c| {
1257 matches!(
1258 c,
1259 TableConstraint::Unique {
1260 is_primary: true,
1261 ..
1262 }
1263 )
1264 })
1265 && !column_defs.iter().any(|col| {
1266 col.options
1267 .iter()
1268 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1269 })
1270 {
1271 return Err(ErrorCode::NotSupported(
1272 "CDC table without primary key constraint is not supported".to_owned(),
1273 "Please define a primary key".to_owned(),
1274 )
1275 .into());
1276 }
1277
1278 if append_only {
1279 return Err(ErrorCode::NotSupported(
1280 "append only modifier on the table created from a CDC source".into(),
1281 "Remove the APPEND ONLY clause".into(),
1282 )
1283 .into());
1284 }
1285
1286 if !source_watermarks.is_empty() {
1287 return Err(ErrorCode::NotSupported(
1288 "watermark defined on the table created from a CDC source".into(),
1289 "Remove the Watermark definitions".into(),
1290 )
1291 .into());
1292 }
1293
1294 Ok(())
1295}
1296
1297async fn bind_cdc_table_schema_externally(
1299 cdc_with_options: WithOptionsSecResolved,
1300) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1301 let (options, secret_refs) = cdc_with_options.into_parts();
1303 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1304 .context("failed to extract external table config")?;
1305
1306 let table = ExternalTableImpl::connect(config)
1307 .await
1308 .context("failed to auto derive table schema")?;
1309 Ok((
1310 table
1311 .column_descs()
1312 .iter()
1313 .cloned()
1314 .map(|column_desc| ColumnCatalog {
1315 column_desc,
1316 is_hidden: false,
1317 })
1318 .collect(),
1319 table.pk_names().clone(),
1320 ))
1321}
1322
1323fn bind_cdc_table_schema(
1325 column_defs: &Vec<ColumnDef>,
1326 constraints: &Vec<TableConstraint>,
1327 is_for_replace_plan: bool,
1328) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1329 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1330
1331 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1332 Ok((columns, pk_names))
1333}
1334
1335#[allow(clippy::too_many_arguments)]
1336pub async fn handle_create_table(
1337 mut handler_args: HandlerArgs,
1338 table_name: ObjectName,
1339 column_defs: Vec<ColumnDef>,
1340 wildcard_idx: Option<usize>,
1341 constraints: Vec<TableConstraint>,
1342 if_not_exists: bool,
1343 format_encode: Option<FormatEncodeOptions>,
1344 source_watermarks: Vec<SourceWatermark>,
1345 append_only: bool,
1346 on_conflict: Option<OnConflict>,
1347 with_version_column: Option<String>,
1348 cdc_table_info: Option<CdcTableInfo>,
1349 include_column_options: IncludeOption,
1350 webhook_info: Option<WebhookSourceInfo>,
1351 ast_engine: risingwave_sqlparser::ast::Engine,
1352) -> Result<RwPgResponse> {
1353 let session = handler_args.session.clone();
1354
1355 if append_only {
1356 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1357 }
1358
1359 session.check_cluster_limits().await?;
1360
1361 let engine = match ast_engine {
1362 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1363 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1364 };
1365 if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1366 if let Some(_rate_limit) = handler_args.with_options.insert(
1369 OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1370 "0".to_owned(),
1371 ) {
1372 return Err(ErrorCode::NotSupported(
1374 "source_rate_limit for iceberg table engine during table creation".to_owned(),
1375 "Please remove source_rate_limit from WITH options.".to_owned(),
1376 )
1377 .into());
1378 }
1379 }
1380
1381 if let Either::Right(resp) = session.check_relation_name_duplicated(
1382 table_name.clone(),
1383 StatementType::CREATE_TABLE,
1384 if_not_exists,
1385 )? {
1386 return Ok(resp);
1387 }
1388
1389 let (graph, source, hummock_table, job_type) = {
1390 let (plan, source, table, job_type) = handle_create_table_plan(
1391 handler_args.clone(),
1392 ExplainOptions::default(),
1393 format_encode,
1394 cdc_table_info,
1395 table_name.clone(),
1396 column_defs.clone(),
1397 wildcard_idx,
1398 constraints.clone(),
1399 source_watermarks,
1400 append_only,
1401 on_conflict,
1402 with_version_column,
1403 include_column_options,
1404 webhook_info,
1405 engine,
1406 )
1407 .await?;
1408 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1409
1410 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1411
1412 (graph, source, table, job_type)
1413 };
1414
1415 tracing::trace!(
1416 "name={}, graph=\n{}",
1417 table_name,
1418 serde_json::to_string_pretty(&graph).unwrap()
1419 );
1420
1421 match engine {
1423 Engine::Hummock => {
1424 let catalog_writer = session.catalog_writer()?;
1425 catalog_writer
1426 .create_table(source, hummock_table, graph, job_type, if_not_exists)
1427 .await?;
1428 }
1429 Engine::Iceberg => {
1430 create_iceberg_engine_table(
1431 session,
1432 handler_args,
1433 source,
1434 hummock_table,
1435 graph,
1436 table_name,
1437 job_type,
1438 if_not_exists,
1439 )
1440 .await?;
1441 }
1442 }
1443
1444 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1445}
1446
1447#[allow(clippy::too_many_arguments)]
1456pub async fn create_iceberg_engine_table(
1457 session: Arc<SessionImpl>,
1458 handler_args: HandlerArgs,
1459 mut source: Option<PbSource>,
1460 table: PbTable,
1461 graph: StreamFragmentGraph,
1462 table_name: ObjectName,
1463 job_type: PbTableJobType,
1464 if_not_exists: bool,
1465) -> Result<()> {
1466 let meta_client = session.env().meta_client();
1467 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1468
1469 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1470 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1471 })?;
1472 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1473 let meta_store_user = meta_store_endpoint.username().to_owned();
1474 let meta_store_password = meta_store_endpoint
1475 .password()
1476 .and_then(|p| percent_decode_str(p).decode_utf8().ok())
1477 .ok_or_else(|| {
1478 ErrorCode::InternalError("failed to parse password from meta store endpoint".to_owned())
1479 })?
1480 .into_owned();
1481 let meta_store_host = meta_store_endpoint
1482 .host_str()
1483 .ok_or_else(|| {
1484 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1485 })?
1486 .to_owned();
1487 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1488 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1489 })?;
1490 let meta_store_database = meta_store_endpoint
1491 .path()
1492 .trim_start_matches('/')
1493 .to_owned();
1494
1495 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1496 bail!("failed to parse meta backend: {}", meta_store_backend);
1497 };
1498
1499 let catalog_uri = match meta_backend {
1500 MetaBackend::Postgres => {
1501 format!(
1502 "jdbc:postgresql://{}:{}/{}",
1503 meta_store_host.clone(),
1504 meta_store_port.clone(),
1505 meta_store_database.clone()
1506 )
1507 }
1508 MetaBackend::Mysql => {
1509 format!(
1510 "jdbc:mysql://{}:{}/{}",
1511 meta_store_host.clone(),
1512 meta_store_port.clone(),
1513 meta_store_database.clone()
1514 )
1515 }
1516 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1517 bail!(
1518 "Unsupported meta backend for iceberg engine table: {}",
1519 meta_store_backend
1520 );
1521 }
1522 };
1523
1524 let rw_db_name = session
1525 .env()
1526 .catalog_reader()
1527 .read_guard()
1528 .get_database_by_id(&table.database_id)?
1529 .name()
1530 .to_owned();
1531 let rw_schema_name = session
1532 .env()
1533 .catalog_reader()
1534 .read_guard()
1535 .get_schema_by_id(&table.database_id, &table.schema_id)?
1536 .name()
1537 .clone();
1538 let iceberg_catalog_name = rw_db_name.clone();
1539 let iceberg_database_name = rw_schema_name.clone();
1540 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1541
1542 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1543 let sink_decouple = session.config().sink_decouple();
1544 if matches!(sink_decouple, SinkDecouple::Disable) {
1545 bail!(
1546 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1547 );
1548 }
1549
1550 let mut connection_ref = BTreeMap::new();
1551 let with_common = if iceberg_engine_connection.is_empty() {
1552 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1553 } else {
1554 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1555 assert_eq!(parts.len(), 2);
1556 let connection_catalog =
1557 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1558 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1559 if params.connection_type == ConnectionType::Iceberg as i32 {
1560 connection_ref.insert(
1562 "connection".to_owned(),
1563 ConnectionRefValue {
1564 connection_name: ObjectName::from(vec![
1565 Ident::from(parts[0]),
1566 Ident::from(parts[1]),
1567 ]),
1568 },
1569 );
1570
1571 let mut with_common = BTreeMap::new();
1572 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1573 with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1574 with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1575
1576 if let Some(s) = params.properties.get("hosted_catalog") {
1577 if s.eq_ignore_ascii_case("true") {
1578 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1579 with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1580 with_common
1581 .insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1582 with_common.insert(
1583 "catalog.jdbc.password".to_owned(),
1584 meta_store_password.clone(),
1585 );
1586 with_common
1587 .insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1588 }
1589 }
1590
1591 with_common
1592 } else {
1593 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1594 "Only iceberg connection could be used in iceberg engine".to_owned(),
1595 )));
1596 }
1597 } else {
1598 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1599 "Private Link Service has been deprecated. Please create a new connection instead."
1600 .to_owned(),
1601 )));
1602 }
1603 };
1604
1605 let table_catalog = TableCatalog::from(table.clone());
1606
1607 let mut pks = table_catalog
1610 .pk_column_names()
1611 .iter()
1612 .map(|c| c.to_string())
1613 .collect::<Vec<String>>();
1614
1615 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1617 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1618 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1619 "select {} as {}, * from {}",
1620 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1621 ))
1622 .context("unable to parse query")?
1623 .try_into()
1624 .unwrap();
1625
1626 let Statement::Query(query) = &stmt else {
1627 panic!("unexpected statement: {:?}", stmt);
1628 };
1629 CreateSink::AsQuery(query.clone())
1630 } else {
1631 CreateSink::From(table_name.clone())
1632 };
1633
1634 let mut sink_name = table_name.clone();
1635 *sink_name.0.last_mut().unwrap() = Ident::from(
1636 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1637 );
1638 let create_sink_stmt = CreateSinkStatement {
1639 if_not_exists: false,
1640 sink_name,
1641 with_properties: WithProperties(vec![]),
1642 sink_from,
1643 columns: vec![],
1644 emit_mode: None,
1645 sink_schema: None,
1646 into_table_name: None,
1647 };
1648
1649 let mut sink_handler_args = handler_args.clone();
1650
1651 let mut sink_with = with_common.clone();
1652
1653 sink_with.insert("primary_key".to_owned(), pks.join(","));
1654 sink_with.insert("type".to_owned(), "upsert".to_owned());
1655 let commit_checkpoint_interval = handler_args
1674 .with_options
1675 .get(COMMIT_CHECKPOINT_INTERVAL)
1676 .map(|v| v.to_owned())
1677 .unwrap_or_else(|| "60".to_owned());
1678 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1679 ErrorCode::InvalidInputSyntax(format!(
1680 "commit_checkpoint_interval must be a positive integer: {}",
1681 commit_checkpoint_interval
1682 ))
1683 })?;
1684
1685 if commit_checkpoint_interval == 0 {
1686 bail!("commit_checkpoint_interval must be a positive integer: 0");
1687 }
1688
1689 source
1691 .as_mut()
1692 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1693
1694 let sink_decouple = session.config().sink_decouple();
1695 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1696 bail!(
1697 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1698 )
1699 }
1700
1701 sink_with.insert(
1702 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1703 commit_checkpoint_interval.to_string(),
1704 );
1705 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1706
1707 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1708
1709 const ENABLE_COMPACTION: &str = "enable_compaction";
1710 const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
1711 const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
1712
1713 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1714 match enable_compaction.to_lowercase().as_str() {
1715 "true" => {
1716 risingwave_common::license::Feature::IcebergCompaction
1717 .check_available()
1718 .map_err(|e| anyhow::anyhow!(e))?;
1719
1720 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1721 }
1722 "false" => {
1723 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1724 }
1725 _ => {
1726 return Err(ErrorCode::InvalidInputSyntax(format!(
1727 "enable_compaction must be true or false: {}",
1728 enable_compaction
1729 ))
1730 .into());
1731 }
1732 }
1733
1734 source
1736 .as_mut()
1737 .map(|x| x.with_properties.remove("enable_compaction"));
1738 } else {
1739 sink_with.insert(
1740 ENABLE_COMPACTION.to_owned(),
1741 risingwave_common::license::Feature::IcebergCompaction
1742 .check_available()
1743 .is_ok()
1744 .to_string(),
1745 );
1746 }
1747
1748 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1749 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1750 ErrorCode::InvalidInputSyntax(format!(
1751 "compaction_interval_sec must be a positive integer: {}",
1752 commit_checkpoint_interval
1753 ))
1754 })?;
1755 if compaction_interval_sec == 0 {
1756 bail!("compaction_interval_sec must be a positive integer: 0");
1757 }
1758 sink_with.insert(
1759 "compaction_interval_sec".to_owned(),
1760 compaction_interval_sec.to_string(),
1761 );
1762 source
1764 .as_mut()
1765 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1766 }
1767
1768 if let Some(enable_snapshot_expiration) =
1769 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1770 {
1771 match enable_snapshot_expiration.to_lowercase().as_str() {
1772 "true" => {
1773 risingwave_common::license::Feature::IcebergCompaction
1774 .check_available()
1775 .map_err(|e| anyhow::anyhow!(e))?;
1776 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1777 }
1778 "false" => {
1779 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1780 }
1781 _ => {
1782 return Err(ErrorCode::InvalidInputSyntax(format!(
1783 "enable_snapshot_expiration must be true or false: {}",
1784 enable_snapshot_expiration
1785 ))
1786 .into());
1787 }
1788 }
1789
1790 source
1792 .as_mut()
1793 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1794 } else {
1795 sink_with.insert(
1796 ENABLE_SNAPSHOT_EXPIRATION.to_owned(),
1797 risingwave_common::license::Feature::IcebergCompaction
1798 .check_available()
1799 .is_ok()
1800 .to_string(),
1801 );
1802 }
1803
1804 let partition_by = handler_args
1805 .with_options
1806 .get("partition_by")
1807 .map(|v| v.to_owned());
1808
1809 if let Some(partition_by) = &partition_by {
1810 let mut partition_columns = vec![];
1811 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1812 table_catalog
1813 .columns()
1814 .iter()
1815 .find(|col| col.name().eq_ignore_ascii_case(&column))
1816 .ok_or_else(|| {
1817 ErrorCode::InvalidInputSyntax(format!(
1818 "Partition source column does not exist in schema: {}",
1819 column
1820 ))
1821 })?;
1822
1823 partition_columns.push(column.to_owned());
1824 }
1825
1826 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1827 |_| {
1828 ErrorCode::InvalidInputSyntax(
1829 "The partition columns should be the prefix of the primary key".to_owned(),
1830 )
1831 },
1832 )?;
1833
1834 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1835
1836 source
1838 .as_mut()
1839 .map(|x| x.with_properties.remove("partition_by"));
1840 }
1841
1842 sink_handler_args.with_options =
1843 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1844
1845 let mut source_name = table_name.clone();
1846 *source_name.0.last_mut().unwrap() = Ident::from(
1847 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1848 );
1849 let create_source_stmt = CreateSourceStatement {
1850 temporary: false,
1851 if_not_exists: false,
1852 columns: vec![],
1853 source_name,
1854 wildcard_idx: Some(0),
1855 constraints: vec![],
1856 with_properties: WithProperties(vec![]),
1857 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1858 source_watermarks: vec![],
1859 include_column_options: vec![],
1860 };
1861
1862 let mut source_handler_args = handler_args.clone();
1863 let source_with = with_common;
1864 source_handler_args.with_options =
1865 WithOptions::new(source_with, Default::default(), connection_ref);
1866
1867 let _ = JVM.get_or_init()?;
1870
1871 let catalog_writer = session.catalog_writer()?;
1872 let has_connector = source.is_some();
1874 catalog_writer
1875 .create_table(source, table, graph, job_type, if_not_exists)
1876 .await?;
1877 let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1878 if res.is_err() {
1879 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1881 res?;
1882 }
1883 let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1884 if res.is_err() {
1885 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1887 res?;
1888 }
1889
1890 if has_connector {
1891 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1892 handler_args,
1893 PbThrottleTarget::TableWithSource,
1894 table_name,
1895 -1,
1896 )
1897 .await?;
1898 }
1899
1900 Ok(())
1901}
1902
1903pub fn check_create_table_with_source(
1904 with_options: &WithOptions,
1905 format_encode: Option<FormatEncodeOptions>,
1906 include_column_options: &IncludeOption,
1907 cdc_table_info: &Option<CdcTableInfo>,
1908) -> Result<Option<FormatEncodeOptions>> {
1909 if cdc_table_info.is_some() {
1911 return Ok(format_encode);
1912 }
1913 let defined_source = with_options.is_source_connector();
1914
1915 if !include_column_options.is_empty() && !defined_source {
1916 return Err(ErrorCode::InvalidInputSyntax(
1917 "INCLUDE should be used with a connector".to_owned(),
1918 )
1919 .into());
1920 }
1921 if defined_source {
1922 format_encode.as_ref().ok_or_else(|| {
1923 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1924 })?;
1925 }
1926 Ok(format_encode)
1927}
1928
1929fn ensure_partition_columns_are_prefix_of_primary_key(
1930 partition_columns: &[String],
1931 primary_key_columns: &[String],
1932) -> std::result::Result<(), String> {
1933 if partition_columns.len() > primary_key_columns.len() {
1934 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
1935 }
1936
1937 for (i, partition_col) in partition_columns.iter().enumerate() {
1938 if primary_key_columns.get(i) != Some(partition_col) {
1939 return Err(format!(
1940 "Partition column '{}' is not a prefix of the primary key.",
1941 partition_col
1942 ));
1943 }
1944 }
1945
1946 Ok(())
1947}
1948
1949#[allow(clippy::too_many_arguments)]
1950pub async fn generate_stream_graph_for_replace_table(
1951 _session: &Arc<SessionImpl>,
1952 table_name: ObjectName,
1953 original_catalog: &Arc<TableCatalog>,
1954 handler_args: HandlerArgs,
1955 statement: Statement,
1956 col_id_gen: ColumnIdGenerator,
1957 sql_column_strategy: SqlColumnStrategy,
1958) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
1959 use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
1960
1961 let Statement::CreateTable {
1962 columns,
1963 constraints,
1964 source_watermarks,
1965 append_only,
1966 on_conflict,
1967 with_version_column,
1968 wildcard_idx,
1969 cdc_table_info,
1970 format_encode,
1971 include_column_options,
1972 engine,
1973 with_options,
1974 ..
1975 } = statement
1976 else {
1977 panic!("unexpected statement type: {:?}", statement);
1978 };
1979
1980 let format_encode = format_encode
1981 .clone()
1982 .map(|format_encode| format_encode.into_v2_with_warning());
1983
1984 let engine = match engine {
1985 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1986 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1987 };
1988
1989 let is_drop_connector =
1990 original_catalog.associated_source_id().is_some() && format_encode.is_none();
1991 if is_drop_connector {
1992 debug_assert!(
1993 source_watermarks.is_empty()
1994 && include_column_options.is_empty()
1995 && with_options
1996 .iter()
1997 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
1998 );
1999 }
2000
2001 let props = CreateTableProps {
2002 definition: handler_args.normalized_sql.clone(),
2003 append_only,
2004 on_conflict: on_conflict.into(),
2005 with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
2006 webhook_info: original_catalog.webhook_info.clone(),
2007 engine,
2008 };
2009
2010 let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2011 (Some(format_encode), None) => (
2012 gen_create_table_plan_with_source(
2013 handler_args,
2014 ExplainOptions::default(),
2015 table_name,
2016 columns,
2017 wildcard_idx,
2018 constraints,
2019 format_encode,
2020 source_watermarks,
2021 col_id_gen,
2022 include_column_options,
2023 props,
2024 sql_column_strategy,
2025 )
2026 .await?,
2027 TableJobType::General,
2028 ),
2029 (None, None) => {
2030 let context = OptimizerContext::from_handler_args(handler_args);
2031 let (plan, table) = gen_create_table_plan(
2032 context,
2033 table_name,
2034 columns,
2035 constraints,
2036 col_id_gen,
2037 source_watermarks,
2038 props,
2039 true,
2040 )?;
2041 ((plan, None, table), TableJobType::General)
2042 }
2043 (None, Some(cdc_table)) => {
2044 let session = &handler_args.session;
2045 let (source, resolved_table_name, database_id, schema_id) =
2046 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2047
2048 let cdc_with_options = derive_with_options_for_cdc_table(
2049 &source.with_properties,
2050 cdc_table.external_table_name.clone(),
2051 )?;
2052
2053 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2054
2055 let context: OptimizerContextRef =
2056 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2057 let (plan, table) = gen_create_table_plan_for_cdc_table(
2058 context,
2059 source,
2060 cdc_table.external_table_name.clone(),
2061 columns,
2062 column_catalogs,
2063 pk_names,
2064 cdc_with_options,
2065 col_id_gen,
2066 on_conflict,
2067 with_version_column.map(|x| x.real_value()),
2068 IncludeOption::default(),
2069 table_name,
2070 resolved_table_name,
2071 database_id,
2072 schema_id,
2073 original_catalog.id(),
2074 engine,
2075 )?;
2076
2077 ((plan, None, table), TableJobType::SharedCdcSource)
2078 }
2079 (Some(_), Some(_)) => {
2080 return Err(ErrorCode::NotSupported(
2081 "Data format and encoding format doesn't apply to table created from a CDC source"
2082 .into(),
2083 "Remove the FORMAT and ENCODE specification".into(),
2084 )
2085 .into());
2086 }
2087 };
2088
2089 if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
2091 Err(ErrorCode::InvalidInputSyntax(
2092 "alter primary key of table is not supported".to_owned(),
2093 ))?
2094 }
2095
2096 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2097
2098 let mut table = Table {
2100 id: original_catalog.id().table_id(),
2101 ..table
2102 };
2103 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2104 table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
2105 source_id.table_id,
2106 ));
2107 source.as_mut().unwrap().id = source_id.table_id;
2108 source.as_mut().unwrap().optional_associated_table_id =
2109 Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
2110 }
2111
2112 Ok((graph, table, source, job_type))
2113}
2114
2115fn get_source_and_resolved_table_name(
2116 session: &Arc<SessionImpl>,
2117 cdc_table: CdcTableInfo,
2118 table_name: ObjectName,
2119) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2120 let db_name = &session.database();
2121 let (schema_name, resolved_table_name) =
2122 Binder::resolve_schema_qualified_name(db_name, table_name)?;
2123 let (database_id, schema_id) =
2124 session.get_database_and_schema_id_for_create(schema_name.clone())?;
2125
2126 let (format_encode, source_name) =
2127 Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2128
2129 let source = {
2130 let catalog_reader = session.env().catalog_reader().read_guard();
2131 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2132 let (source, _) = catalog_reader.get_source_by_name(
2133 db_name,
2134 SchemaPath::Name(schema_name.as_str()),
2135 source_name.as_str(),
2136 )?;
2137 source.clone()
2138 };
2139
2140 Ok((source, resolved_table_name, database_id, schema_id))
2141}
2142
2143fn bind_webhook_info(
2145 session: &Arc<SessionImpl>,
2146 columns_defs: &[ColumnDef],
2147 webhook_info: WebhookSourceInfo,
2148) -> Result<PbWebhookSourceInfo> {
2149 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2151 {
2152 return Err(ErrorCode::InvalidInputSyntax(
2153 "Table with webhook source should have exactly one JSONB column".to_owned(),
2154 )
2155 .into());
2156 }
2157
2158 let WebhookSourceInfo {
2159 secret_ref,
2160 signature_expr,
2161 wait_for_persistence,
2162 is_batched,
2163 } = webhook_info;
2164
2165 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2167 let db_name = &session.database();
2168 let (schema_name, secret_name) =
2169 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2170 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2171 (
2172 Some(PbSecretRef {
2173 secret_id: secret_catalog.id.secret_id(),
2174 ref_as: match secret_ref.ref_as {
2175 SecretRefAsType::Text => PbRefAsType::Text,
2176 SecretRefAsType::File => PbRefAsType::File,
2177 }
2178 .into(),
2179 }),
2180 Some(secret_name),
2181 )
2182 } else {
2183 (None, None)
2184 };
2185
2186 let secure_compare_context = SecureCompareContext {
2187 column_name: columns_defs[0].name.real_value(),
2188 secret_name,
2189 };
2190 let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2191 let expr = binder.bind_expr(signature_expr.clone())?;
2192
2193 if expr.as_function_call().is_none()
2195 || expr.as_function_call().unwrap().func_type()
2196 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2197 {
2198 return Err(ErrorCode::InvalidInputSyntax(
2199 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2200 )
2201 .into());
2202 }
2203
2204 let pb_webhook_info = PbWebhookSourceInfo {
2205 secret_ref: pb_secret_ref,
2206 signature_expr: Some(expr.to_expr_proto()),
2207 wait_for_persistence,
2208 is_batched,
2209 };
2210
2211 Ok(pb_webhook_info)
2212}
2213
2214#[cfg(test)]
2215mod tests {
2216 use risingwave_common::catalog::{
2217 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2218 };
2219 use risingwave_common::types::{DataType, StructType};
2220
2221 use super::*;
2222 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2223
2224 #[tokio::test]
2225 async fn test_create_table_handler() {
2226 let sql =
2227 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2228 let frontend = LocalFrontend::new(Default::default()).await;
2229 frontend.run_sql(sql).await.unwrap();
2230
2231 let session = frontend.session_ref();
2232 let catalog_reader = session.env().catalog_reader().read_guard();
2233 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2234
2235 let (table, _) = catalog_reader
2237 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2238 .unwrap();
2239 assert_eq!(table.name(), "t");
2240
2241 let columns = table
2242 .columns
2243 .iter()
2244 .map(|col| (col.name(), col.data_type().clone()))
2245 .collect::<HashMap<&str, DataType>>();
2246
2247 let expected_columns = maplit::hashmap! {
2248 ROW_ID_COLUMN_NAME => DataType::Serial,
2249 "v1" => DataType::Int16,
2250 "v2" => StructType::new(
2251 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2252 )
2253 .with_ids([3, 4, 5].map(ColumnId::new))
2254 .into(),
2255 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2256 };
2257
2258 assert_eq!(columns, expected_columns, "{columns:#?}");
2259 }
2260
2261 #[test]
2262 fn test_bind_primary_key() {
2263 for (sql, expected) in [
2266 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2267 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2268 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2269 (
2270 "create table t (v1 int primary key, v2 int primary key)",
2271 Err("multiple primary keys are not allowed"),
2272 ),
2273 (
2274 "create table t (v1 int primary key primary key, v2 int)",
2275 Err("multiple primary keys are not allowed"),
2276 ),
2277 (
2278 "create table t (v1 int, v2 int, primary key (v1))",
2279 Ok(&[1]),
2280 ),
2281 (
2282 "create table t (v1 int, primary key (v2), v2 int)",
2283 Ok(&[2]),
2284 ),
2285 (
2286 "create table t (primary key (v2, v1), v1 int, v2 int)",
2287 Ok(&[2, 1]),
2288 ),
2289 (
2290 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2291 Err("multiple primary keys are not allowed"),
2292 ),
2293 (
2294 "create table t (v1 int primary key, primary key (v1), v2 int)",
2295 Err("multiple primary keys are not allowed"),
2296 ),
2297 (
2298 "create table t (v1 int, primary key (V3), v2 int)",
2299 Err("column \"v3\" named in key does not exist"),
2300 ),
2301 ] {
2302 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2303 let risingwave_sqlparser::ast::Statement::CreateTable {
2304 columns: column_defs,
2305 constraints,
2306 ..
2307 } = ast.remove(0)
2308 else {
2309 panic!("test case should be create table")
2310 };
2311 let actual: Result<_> = (|| {
2312 let mut columns = bind_sql_columns(&column_defs, false)?;
2313 let mut col_id_gen = ColumnIdGenerator::new_initial();
2314 for c in &mut columns {
2315 col_id_gen.generate(c)?;
2316 }
2317
2318 let pk_names =
2319 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2320 let (_, pk_column_ids, _) =
2321 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2322 Ok(pk_column_ids)
2323 })();
2324 match (expected, actual) {
2325 (Ok(expected), Ok(actual)) => assert_eq!(
2326 expected.iter().copied().map(ColumnId::new).collect_vec(),
2327 actual,
2328 "sql: {sql}"
2329 ),
2330 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2331 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2332 (Err(expected), Err(actual)) => assert!(
2333 actual.to_string().contains(expected),
2334 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2335 ),
2336 }
2337 }
2338 }
2339
2340 #[tokio::test]
2341 async fn test_duplicate_props_options() {
2342 let proto_file = create_proto_file(PROTO_FILE_DATA);
2343 let sql = format!(
2344 r#"CREATE TABLE t
2345 WITH (
2346 connector = 'kinesis',
2347 aws.region='user_test_topic',
2348 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2349 aws.credentials.access_key_id = 'your_access_key_1',
2350 aws.credentials.secret_access_key = 'your_secret_key_1'
2351 )
2352 FORMAT PLAIN ENCODE PROTOBUF (
2353 message = '.test.TestRecord',
2354 aws.credentials.access_key_id = 'your_access_key_2',
2355 aws.credentials.secret_access_key = 'your_secret_key_2',
2356 schema.location = 'file://{}',
2357 )"#,
2358 proto_file.path().to_str().unwrap()
2359 );
2360 let frontend = LocalFrontend::new(Default::default()).await;
2361 frontend.run_sql(sql).await.unwrap();
2362
2363 let session = frontend.session_ref();
2364 let catalog_reader = session.env().catalog_reader().read_guard();
2365 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2366
2367 let (source, _) = catalog_reader
2369 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2370 .unwrap();
2371 assert_eq!(source.name, "t");
2372
2373 assert_eq!(
2375 source
2376 .info
2377 .format_encode_options
2378 .get("aws.credentials.access_key_id")
2379 .unwrap(),
2380 "your_access_key_2"
2381 );
2382 assert_eq!(
2383 source
2384 .info
2385 .format_encode_options
2386 .get("aws.credentials.secret_access_key")
2387 .unwrap(),
2388 "your_secret_key_2"
2389 );
2390
2391 assert_eq!(
2393 source
2394 .with_properties
2395 .get("aws.credentials.access_key_id")
2396 .unwrap(),
2397 "your_access_key_1"
2398 );
2399 assert_eq!(
2400 source
2401 .with_properties
2402 .get("aws.credentials.secret_access_key")
2403 .unwrap(),
2404 "your_secret_key_1"
2405 );
2406
2407 assert!(!source.with_properties.contains_key("schema.location"));
2409 }
2410}