1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77 bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::{
80 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
81};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
83use crate::optimizer::plan_node::{
84 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
85};
86use crate::optimizer::property::{Order, RequiredDist};
87use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
88use crate::session::SessionImpl;
89use crate::session::current::notice_to_user;
90use crate::stream_fragmenter::{GraphJobType, build_graph};
91use crate::utils::OverwriteOptions;
92use crate::{Binder, Explain, TableCatalog, WithOptions};
93
94mod col_id_gen;
95pub use col_id_gen::*;
96use risingwave_connector::sink::SinkParam;
97use risingwave_connector::sink::iceberg::{
98 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
99 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
100 COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, CompactionType, ENABLE_COMPACTION,
101 ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION, ICEBERG_WRITE_MODE_COPY_ON_WRITE,
102 ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink, IcebergWriteMode,
103 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
104 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
105 parse_partition_by_exprs,
106};
107use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
108
109use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
110
111fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
112 for option_def in &c.options {
113 match option_def.option {
114 ColumnOption::GeneratedColumns(_) => {}
115 ColumnOption::DefaultValue(_) => {}
116 ColumnOption::DefaultValueInternal { .. } => {}
117 ColumnOption::Unique { is_primary: true } => {}
118 ColumnOption::Null => {}
119 ColumnOption::NotNull => {}
120 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
121 }
122 }
123 Ok(())
124}
125
126pub fn bind_sql_columns(
130 column_defs: &[ColumnDef],
131 is_for_drop_table_connector: bool,
132) -> Result<Vec<ColumnCatalog>> {
133 let mut columns = Vec::with_capacity(column_defs.len());
134
135 for column in column_defs {
136 ensure_column_options_supported(column)?;
137 let ColumnDef {
141 name,
142 data_type,
143 collation,
144 options,
145 ..
146 } = column;
147
148 let data_type = data_type
149 .clone()
150 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
151 if let Some(collation) = collation {
152 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
158 bail_not_implemented!(
159 "Collate collation other than `C` or `POSIX` is not implemented"
160 );
161 }
162
163 match data_type {
164 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
165 _ => {
166 return Err(ErrorCode::NotSupported(
167 format!("{} is not a collatable data type", data_type),
168 "The only built-in collatable data types are `varchar`, please check your type".into(),
169 ).into());
170 }
171 }
172 }
173
174 if !is_for_drop_table_connector {
175 check_column_name_not_reserved(&name.real_value())?;
179 }
180
181 let nullable: bool = !options
182 .iter()
183 .any(|def| matches!(def.option, ColumnOption::NotNull));
184
185 columns.push(ColumnCatalog {
186 column_desc: ColumnDesc {
187 data_type: bind_data_type(&data_type)?,
188 column_id: ColumnId::placeholder(),
189 name: name.real_value(),
190 generated_or_default_column: None,
191 description: None,
192 additional_column: AdditionalColumn { column_type: None },
193 version: ColumnDescVersion::LATEST,
194 system_column: None,
195 nullable,
196 },
197 is_hidden: false,
198 });
199 }
200
201 Ok(columns)
202}
203
204fn check_generated_column_constraints(
205 column_name: &String,
206 column_id: ColumnId,
207 expr: &ExprImpl,
208 column_catalogs: &[ColumnCatalog],
209 generated_column_names: &[String],
210 pk_column_ids: &[ColumnId],
211) -> Result<()> {
212 let input_refs = expr.collect_input_refs(column_catalogs.len());
213 for idx in input_refs.ones() {
214 let referred_generated_column = &column_catalogs[idx].column_desc.name;
215 if generated_column_names
216 .iter()
217 .any(|c| c == referred_generated_column)
218 {
219 return Err(ErrorCode::BindError(format!(
220 "Generated can not reference another generated column. \
221 But here generated column \"{}\" referenced another generated column \"{}\"",
222 column_name, referred_generated_column
223 ))
224 .into());
225 }
226 }
227
228 if pk_column_ids.contains(&column_id) && expr.is_impure() {
229 return Err(ErrorCode::BindError(format!(
230 "Generated columns with impure expressions should not be part of the primary key. \
231 Here column \"{}\" is defined as part of the primary key.",
232 column_name
233 ))
234 .into());
235 }
236
237 Ok(())
238}
239
240pub fn bind_sql_column_constraints(
243 session: &SessionImpl,
244 table_name: String,
245 column_catalogs: &mut [ColumnCatalog],
246 columns: &[ColumnDef],
247 pk_column_ids: &[ColumnId],
248) -> Result<()> {
249 let generated_column_names = {
250 let mut names = vec![];
251 for column in columns {
252 for option_def in &column.options {
253 if let ColumnOption::GeneratedColumns(_) = option_def.option {
254 names.push(column.name.real_value());
255 break;
256 }
257 }
258 }
259 names
260 };
261
262 let mut binder = Binder::new_for_ddl(session);
263 binder.bind_columns_to_context(table_name, column_catalogs)?;
264
265 for column in columns {
266 let Some(idx) = column_catalogs
267 .iter()
268 .position(|c| c.name() == column.name.real_value())
269 else {
270 continue;
273 };
274
275 for option_def in &column.options {
276 match &option_def.option {
277 ColumnOption::GeneratedColumns(expr) => {
278 binder.set_clause(Some(Clause::GeneratedColumn));
279
280 let expr_impl = binder.bind_expr(expr).with_context(|| {
281 format!(
282 "fail to bind expression in generated column \"{}\"",
283 column.name.real_value()
284 )
285 })?;
286
287 check_generated_column_constraints(
288 &column.name.real_value(),
289 column_catalogs[idx].column_id(),
290 &expr_impl,
291 column_catalogs,
292 &generated_column_names,
293 pk_column_ids,
294 )?;
295
296 column_catalogs[idx].column_desc.generated_or_default_column = Some(
297 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
298 expr: Some(expr_impl.to_expr_proto()),
299 }),
300 );
301 binder.set_clause(None);
302 }
303 ColumnOption::DefaultValue(expr) => {
304 let expr_impl = binder
305 .bind_expr(expr)?
306 .cast_assign(column_catalogs[idx].data_type())?;
307
308 let rewritten_expr_impl = session
316 .pinned_snapshot()
317 .inline_now_proc_time()
318 .rewrite_expr(expr_impl.clone());
319
320 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
321 let snapshot_value = snapshot_value?;
322
323 column_catalogs[idx].column_desc.generated_or_default_column =
324 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
325 snapshot_value: Some(snapshot_value.to_protobuf()),
326 expr: Some(expr_impl.to_expr_proto()),
327 }));
329 } else {
330 return Err(ErrorCode::BindError(format!(
331 "Default expression used in column `{}` cannot be evaluated. \
332 Use generated columns instead if you mean to reference other columns.",
333 column.name
334 ))
335 .into());
336 }
337 }
338 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
339 if persisted.is_empty() {
344 bail_bind_error!(
345 "DEFAULT INTERNAL is only used for internal purposes, \
346 please specify a concrete default value"
347 );
348 }
349
350 let desc = DefaultColumnDesc::decode(&**persisted)
351 .expect("failed to decode persisted `DefaultColumnDesc`");
352
353 column_catalogs[idx].column_desc.generated_or_default_column =
354 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
355 }
356 _ => {}
357 }
358 }
359 }
360 Ok(())
361}
362
363pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
365 let mut pk_column_names = vec![];
366
367 for constraint in table_constraints {
368 match constraint {
369 TableConstraint::Unique {
370 name: _,
371 columns,
372 is_primary: true,
373 } => {
374 if !pk_column_names.is_empty() {
375 return Err(multiple_pk_definition_err());
376 }
377 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
378 }
379 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
380 }
381 }
382 Ok(pk_column_names)
383}
384
385pub fn bind_sql_pk_names(
386 columns_defs: &[ColumnDef],
387 pk_names_from_table_constraints: Vec<String>,
388) -> Result<Vec<String>> {
389 let mut pk_column_names = pk_names_from_table_constraints;
390
391 for column in columns_defs {
392 for option_def in &column.options {
393 if let ColumnOption::Unique { is_primary: true } = option_def.option {
394 if !pk_column_names.is_empty() {
395 return Err(multiple_pk_definition_err());
396 }
397 pk_column_names.push(column.name.real_value());
398 };
399 }
400 }
401
402 Ok(pk_column_names)
403}
404
405fn multiple_pk_definition_err() -> RwError {
406 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
407}
408
409pub fn bind_pk_and_row_id_on_relation(
414 mut columns: Vec<ColumnCatalog>,
415 pk_names: Vec<String>,
416 must_need_pk: bool,
417) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
418 for c in &columns {
419 assert!(c.column_id() != ColumnId::placeholder());
420 }
421
422 let name_to_id = columns
424 .iter()
425 .map(|c| (c.name(), c.column_id()))
426 .collect::<HashMap<_, _>>();
427
428 let mut pk_column_ids: Vec<_> = pk_names
429 .iter()
430 .map(|name| {
431 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
432 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
433 })
434 })
435 .try_collect()?;
436
437 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
439
440 let row_id_index = need_row_id.then(|| {
441 let column = ColumnCatalog::row_id_column();
442 let index = columns.len();
443 pk_column_ids = vec![column.column_id()];
444 columns.push(column);
445 index
446 });
447
448 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
449 Err(ErrorCode::InvalidInputSyntax(format!(
450 "column \"{col}\" specified more than once"
451 )))?;
452 }
453
454 Ok((columns, pk_column_ids, row_id_index))
455}
456
457#[allow(clippy::too_many_arguments)]
460pub(crate) async fn gen_create_table_plan_with_source(
461 mut handler_args: HandlerArgs,
462 explain_options: ExplainOptions,
463 table_name: ObjectName,
464 column_defs: Vec<ColumnDef>,
465 wildcard_idx: Option<usize>,
466 constraints: Vec<TableConstraint>,
467 format_encode: FormatEncodeOptions,
468 source_watermarks: Vec<SourceWatermark>,
469 mut col_id_gen: ColumnIdGenerator,
470 include_column_options: IncludeOption,
471 props: CreateTableProps,
472 sql_column_strategy: SqlColumnStrategy,
473) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
474 if props.append_only
475 && format_encode.format != Format::Plain
476 && format_encode.format != Format::Native
477 {
478 return Err(ErrorCode::BindError(format!(
479 "Append only table does not support format {}.",
480 format_encode.format
481 ))
482 .into());
483 }
484
485 let session = &handler_args.session;
486 let (with_properties, refresh_mode) =
487 bind_connector_props(&handler_args, &format_encode, false)?;
488 if with_properties.is_shareable_cdc_connector() {
489 generated_columns_check_for_cdc_table(&column_defs)?;
490 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
491 } else if column_defs.iter().any(|col| {
492 col.options
493 .iter()
494 .any(|def| matches!(def.option, ColumnOption::NotNull))
495 }) {
496 notice_to_user(
498 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
499 );
500 }
501
502 let db_name: &str = &session.database();
503 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
504
505 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
507 session,
508 &format_encode,
509 Either::Left(&with_properties),
510 CreateSourceType::Table,
511 )
512 .await?;
513
514 let overwrite_options = OverwriteOptions::new(&mut handler_args);
515 let rate_limit = overwrite_options.source_rate_limit;
516 let source = bind_create_source_or_table_with_connector(
517 handler_args.clone(),
518 table_name,
519 format_encode,
520 with_properties,
521 &column_defs,
522 constraints,
523 wildcard_idx,
524 source_watermarks,
525 columns_from_resolve_source,
526 source_info,
527 include_column_options,
528 &mut col_id_gen,
529 CreateSourceType::Table,
530 rate_limit,
531 sql_column_strategy,
532 refresh_mode,
533 )
534 .await?;
535
536 let context = OptimizerContext::new(handler_args, explain_options);
537
538 let (plan, table) = gen_table_plan_with_source(
539 context.into(),
540 schema_name,
541 source.clone(),
542 col_id_gen.into_version(),
543 props,
544 )?;
545
546 Ok((plan, Some(source), table))
547}
548
549#[allow(clippy::too_many_arguments)]
552pub(crate) fn gen_create_table_plan(
553 context: OptimizerContext,
554 table_name: ObjectName,
555 column_defs: Vec<ColumnDef>,
556 constraints: Vec<TableConstraint>,
557 mut col_id_gen: ColumnIdGenerator,
558 source_watermarks: Vec<SourceWatermark>,
559 props: CreateTableProps,
560 is_for_replace_plan: bool,
561) -> Result<(PlanRef, TableCatalog)> {
562 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
563 for c in &mut columns {
564 col_id_gen.generate(c)?;
565 }
566
567 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
568 if !secret_refs.is_empty() || !connection_refs.is_empty() {
569 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
570 }
571
572 gen_create_table_plan_without_source(
573 context,
574 table_name,
575 columns,
576 column_defs,
577 constraints,
578 source_watermarks,
579 col_id_gen.into_version(),
580 props,
581 )
582}
583
584#[allow(clippy::too_many_arguments)]
585pub(crate) fn gen_create_table_plan_without_source(
586 context: OptimizerContext,
587 table_name: ObjectName,
588 columns: Vec<ColumnCatalog>,
589 column_defs: Vec<ColumnDef>,
590 constraints: Vec<TableConstraint>,
591 source_watermarks: Vec<SourceWatermark>,
592 version: TableVersion,
593 props: CreateTableProps,
594) -> Result<(PlanRef, TableCatalog)> {
595 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
597 let (mut columns, pk_column_ids, row_id_index) =
598 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
599
600 let watermark_descs = bind_source_watermark(
601 context.session_ctx(),
602 table_name.real_value(),
603 source_watermarks,
604 &columns,
605 )?;
606
607 bind_sql_column_constraints(
608 context.session_ctx(),
609 table_name.real_value(),
610 &mut columns,
611 &column_defs,
612 &pk_column_ids,
613 )?;
614 let session = context.session_ctx().clone();
615
616 let db_name = &session.database();
617 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
618
619 let info = CreateTableInfo {
620 columns,
621 pk_column_ids,
622 row_id_index,
623 watermark_descs,
624 source_catalog: None,
625 version,
626 };
627
628 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
629}
630
631fn gen_table_plan_with_source(
632 context: OptimizerContextRef,
633 schema_name: Option<String>,
634 source_catalog: SourceCatalog,
635 version: TableVersion,
636 props: CreateTableProps,
637) -> Result<(PlanRef, TableCatalog)> {
638 let table_name = source_catalog.name.clone();
639
640 let info = CreateTableInfo {
641 columns: source_catalog.columns.clone(),
642 pk_column_ids: source_catalog.pk_col_ids.clone(),
643 row_id_index: source_catalog.row_id_index,
644 watermark_descs: source_catalog.watermark_descs.clone(),
645 source_catalog: Some(source_catalog),
646 version,
647 };
648
649 gen_table_plan_inner(context, schema_name, table_name, info, props)
650}
651
652#[derive(Clone, Copy)]
654pub enum EitherOnConflict {
655 Ast(Option<OnConflict>),
656 Resolved(ConflictBehavior),
657}
658
659impl From<Option<OnConflict>> for EitherOnConflict {
660 fn from(v: Option<OnConflict>) -> Self {
661 Self::Ast(v)
662 }
663}
664
665impl From<ConflictBehavior> for EitherOnConflict {
666 fn from(v: ConflictBehavior) -> Self {
667 Self::Resolved(v)
668 }
669}
670
671impl EitherOnConflict {
672 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
674 let conflict_behavior = match self {
675 EitherOnConflict::Ast(on_conflict) => {
676 if append_only {
677 if row_id_as_pk {
678 ConflictBehavior::NoCheck
680 } else {
681 if let Some(on_conflict) = on_conflict
683 && on_conflict != OnConflict::Nothing
684 {
685 return Err(ErrorCode::InvalidInputSyntax(
686 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
687 the ON CONFLICT behavior must be DO NOTHING."
688 .to_owned(),
689 )
690 .into());
691 }
692 ConflictBehavior::IgnoreConflict
693 }
694 } else {
695 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
697 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
698 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
699 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
700 }
701 }
702 }
703 EitherOnConflict::Resolved(b) => b,
704 };
705
706 Ok(conflict_behavior)
707 }
708}
709
710pub struct CreateTableInfo {
715 pub columns: Vec<ColumnCatalog>,
716 pub pk_column_ids: Vec<ColumnId>,
717 pub row_id_index: Option<usize>,
718 pub watermark_descs: Vec<WatermarkDesc>,
719 pub source_catalog: Option<SourceCatalog>,
720 pub version: TableVersion,
721}
722
723pub struct CreateTableProps {
728 pub definition: String,
729 pub append_only: bool,
730 pub on_conflict: EitherOnConflict,
731 pub with_version_columns: Vec<String>,
732 pub webhook_info: Option<PbWebhookSourceInfo>,
733 pub engine: Engine,
734}
735
736#[allow(clippy::too_many_arguments)]
737fn gen_table_plan_inner(
738 context: OptimizerContextRef,
739 schema_name: Option<String>,
740 table_name: String,
741 info: CreateTableInfo,
742 props: CreateTableProps,
743) -> Result<(PlanRef, TableCatalog)> {
744 let CreateTableInfo {
745 ref columns,
746 row_id_index,
747 ref watermark_descs,
748 ref source_catalog,
749 ..
750 } = info;
751 let CreateTableProps { append_only, .. } = props;
752
753 let (database_id, schema_id) = context
754 .session_ctx()
755 .get_database_and_schema_id_for_create(schema_name)?;
756
757 let session = context.session_ctx().clone();
758 let retention_seconds = context.with_options().retention_seconds();
759
760 let source_node: LogicalPlanRef = LogicalSource::new(
761 source_catalog.clone().map(Rc::new),
762 columns.clone(),
763 row_id_index,
764 SourceNodeKind::CreateTable,
765 context.clone(),
766 None,
767 )?
768 .into();
769
770 let required_cols = FixedBitSet::with_capacity(columns.len());
771 let plan_root = PlanRoot::new_with_logical_plan(
772 source_node,
773 RequiredDist::Any,
774 Order::any(),
775 required_cols,
776 vec![],
777 );
778
779 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
780
781 if !append_only && has_non_ttl_watermark {
782 return Err(ErrorCode::NotSupported(
783 "Defining watermarks on table requires the table to be append only.".to_owned(),
784 "Use the key words `APPEND ONLY`".to_owned(),
785 )
786 .into());
787 }
788
789 if !append_only && retention_seconds.is_some() {
790 if session
791 .config()
792 .unsafe_enable_storage_retention_for_non_append_only_tables()
793 {
794 tracing::warn!(
795 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
796 table_name
797 );
798 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
799 This may lead to stream inconsistency and unrecoverable \
800 node failure if there is any row INSERT/UPDATE/DELETE operation \
801 corresponding to the TTLed primary keys";
802 session.notice_to_user(NOTICE);
803 } else {
804 return Err(ErrorCode::NotSupported(
805 "Defining retention seconds on table requires the table to be append only."
806 .to_owned(),
807 "Use the key words `APPEND ONLY`".to_owned(),
808 )
809 .into());
810 }
811 }
812
813 let materialize =
814 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
815
816 let mut table = materialize.table().clone();
817 table.owner = session.user_id();
818
819 Ok((materialize.into(), table))
820}
821
822#[allow(clippy::too_many_arguments)]
826pub(crate) fn gen_create_table_plan_for_cdc_table(
827 context: OptimizerContextRef,
828 source: Arc<SourceCatalog>,
829 external_table_name: String,
830 column_defs: Vec<ColumnDef>,
831 mut columns: Vec<ColumnCatalog>,
832 pk_names: Vec<String>,
833 cdc_with_options: WithOptionsSecResolved,
834 mut col_id_gen: ColumnIdGenerator,
835 on_conflict: Option<OnConflict>,
836 with_version_columns: Vec<String>,
837 include_column_options: IncludeOption,
838 table_name: ObjectName,
839 resolved_table_name: String, database_id: DatabaseId,
841 schema_id: SchemaId,
842 table_id: TableId,
843 engine: Engine,
844) -> Result<(PlanRef, TableCatalog)> {
845 let session = context.session_ctx().clone();
846
847 handle_addition_columns(
849 None,
850 &cdc_with_options,
851 include_column_options,
852 &mut columns,
853 true,
854 )?;
855
856 for c in &mut columns {
857 col_id_gen.generate(c)?;
858 }
859
860 let (mut columns, pk_column_ids, _row_id_index) =
861 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
862
863 bind_sql_column_constraints(
865 context.session_ctx(),
866 table_name.real_value(),
867 &mut columns,
868 &column_defs,
869 &pk_column_ids,
870 )?;
871
872 let definition = context.normalized_sql().to_owned();
873
874 let pk_column_indices = {
875 let mut id_to_idx = HashMap::new();
876 columns.iter().enumerate().for_each(|(idx, c)| {
877 id_to_idx.insert(c.column_id(), idx);
878 });
879 pk_column_ids
881 .iter()
882 .map(|c| id_to_idx.get(c).copied().unwrap())
883 .collect_vec()
884 };
885 let table_pk = pk_column_indices
886 .iter()
887 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
888 .collect();
889
890 let (options, secret_refs) = cdc_with_options.into_parts();
891
892 let non_generated_column_descs = columns
893 .iter()
894 .filter(|&c| !c.is_generated())
895 .map(|c| c.column_desc.clone())
896 .collect_vec();
897 let non_generated_column_num = non_generated_column_descs.len();
898 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
899 let cdc_table_desc = CdcTableDesc {
900 table_id,
901 source_id: source.id, external_table_name: external_table_name.clone(),
903 pk: table_pk,
904 columns: non_generated_column_descs,
905 stream_key: pk_column_indices,
906 connect_properties: options,
907 secret_refs,
908 };
909
910 tracing::debug!(?cdc_table_desc, "create cdc table");
911 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
912
913 let logical_scan = LogicalCdcScan::create(
914 external_table_name.clone(),
915 Rc::new(cdc_table_desc),
916 context.clone(),
917 options,
918 );
919
920 let scan_node: LogicalPlanRef = logical_scan.into();
921 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
922 let plan_root = PlanRoot::new_with_logical_plan(
923 scan_node,
924 RequiredDist::Any,
925 Order::any(),
926 required_cols,
927 vec![],
928 );
929
930 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
931 let materialize = plan_root.gen_table_plan(
932 context,
933 resolved_table_name,
934 database_id,
935 schema_id,
936 CreateTableInfo {
937 columns,
938 pk_column_ids,
939 row_id_index: None,
940 watermark_descs: vec![],
941 source_catalog: Some((*source).clone()),
942 version: col_id_gen.into_version(),
943 },
944 CreateTableProps {
945 definition,
946 append_only: false,
947 on_conflict: on_conflict.into(),
948 with_version_columns,
949 webhook_info: None,
950 engine,
951 },
952 )?;
953
954 let mut table = materialize.table().clone();
955 table.owner = session.user_id();
956 table.cdc_table_id = Some(cdc_table_id);
957 table.cdc_table_type = Some(cdc_table_type);
958 Ok((materialize.into(), table))
959}
960
961fn derive_with_options_for_cdc_table(
969 source_with_properties: &WithOptionsSecResolved,
970 external_table_name: String,
971) -> Result<(WithOptionsSecResolved, String)> {
972 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
973 let source_database_name: &str = source_with_properties
975 .get("database.name")
976 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
977 .as_str();
978 let mut with_options = source_with_properties.clone();
979 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
980 match connector.as_str() {
981 MYSQL_CDC_CONNECTOR => {
982 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
985 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
986 })?;
987 if !source_database_name
989 .split(',')
990 .map(|s| s.trim())
991 .any(|name| name == db_name)
992 {
993 return Err(anyhow!(
994 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
995 db_name,
996 source_database_name
997 ).into());
998 }
999 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1000 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1001 return Ok((with_options, external_table_name));
1003 }
1004 POSTGRES_CDC_CONNECTOR => {
1005 let (schema_name, table_name) = external_table_name
1006 .split_once('.')
1007 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1008
1009 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1011 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1012 return Ok((with_options, external_table_name));
1014 }
1015 SQL_SERVER_CDC_CONNECTOR => {
1016 let parts: Vec<&str> = external_table_name.split('.').collect();
1024 let (schema_name, table_name) = match parts.len() {
1025 3 => {
1026 let db_name = parts[0];
1029 let schema_name = parts[1];
1030 let table_name = parts[2];
1031
1032 if db_name != source_database_name {
1033 return Err(anyhow!(
1034 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1035 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1036 db_name,
1037 source_database_name
1038 ).into());
1039 }
1040 (schema_name, table_name)
1041 }
1042 2 => {
1043 let schema_name = parts[0];
1046 let table_name = parts[1];
1047 (schema_name, table_name)
1048 }
1049 1 => {
1050 return Err(anyhow!(
1053 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1054 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1055 external_table_name,
1056 external_table_name,
1057 source_database_name,
1058 external_table_name
1059 ).into());
1060 }
1061 _ => {
1062 return Err(anyhow!(
1064 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1065 external_table_name
1066 ).into());
1067 }
1068 };
1069
1070 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1072 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1073
1074 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1077 return Ok((with_options, normalized_external_table_name));
1078 }
1079 _ => {
1080 return Err(RwError::from(anyhow!(
1081 "connector {} is not supported for cdc table",
1082 connector
1083 )));
1084 }
1085 };
1086 }
1087 unreachable!("All valid CDC connectors should have returned by now")
1088}
1089
1090#[allow(clippy::too_many_arguments)]
1091pub(super) async fn handle_create_table_plan(
1092 handler_args: HandlerArgs,
1093 explain_options: ExplainOptions,
1094 format_encode: Option<FormatEncodeOptions>,
1095 cdc_table_info: Option<CdcTableInfo>,
1096 table_name: &ObjectName,
1097 column_defs: Vec<ColumnDef>,
1098 wildcard_idx: Option<usize>,
1099 constraints: Vec<TableConstraint>,
1100 source_watermarks: Vec<SourceWatermark>,
1101 append_only: bool,
1102 on_conflict: Option<OnConflict>,
1103 with_version_columns: Vec<String>,
1104 include_column_options: IncludeOption,
1105 webhook_info: Option<WebhookSourceInfo>,
1106 engine: Engine,
1107) -> Result<(
1108 PlanRef,
1109 Option<SourceCatalog>,
1110 TableCatalog,
1111 TableJobType,
1112 Option<SourceId>,
1113)> {
1114 let col_id_gen = ColumnIdGenerator::new_initial();
1115 let format_encode = check_create_table_with_source(
1116 &handler_args.with_options,
1117 format_encode,
1118 &include_column_options,
1119 &cdc_table_info,
1120 )?;
1121 let webhook_info = webhook_info
1122 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1123 .transpose()?;
1124
1125 let props = CreateTableProps {
1126 definition: handler_args.normalized_sql.clone(),
1127 append_only,
1128 on_conflict: on_conflict.into(),
1129 with_version_columns: with_version_columns.clone(),
1130 webhook_info,
1131 engine,
1132 };
1133
1134 let ((plan, source, table), job_type, shared_shource_id) = match (
1135 format_encode,
1136 cdc_table_info.as_ref(),
1137 ) {
1138 (Some(format_encode), None) => (
1139 gen_create_table_plan_with_source(
1140 handler_args,
1141 explain_options,
1142 table_name.clone(),
1143 column_defs,
1144 wildcard_idx,
1145 constraints,
1146 format_encode,
1147 source_watermarks,
1148 col_id_gen,
1149 include_column_options,
1150 props,
1151 SqlColumnStrategy::FollowChecked,
1152 )
1153 .await?,
1154 TableJobType::General,
1155 None,
1156 ),
1157 (None, None) => {
1158 let context = OptimizerContext::new(handler_args, explain_options);
1159 let (plan, table) = gen_create_table_plan(
1160 context,
1161 table_name.clone(),
1162 column_defs,
1163 constraints,
1164 col_id_gen,
1165 source_watermarks,
1166 props,
1167 false,
1168 )?;
1169
1170 ((plan, None, table), TableJobType::General, None)
1171 }
1172
1173 (None, Some(cdc_table)) => {
1174 sanity_check_for_table_on_cdc_source(
1175 append_only,
1176 &column_defs,
1177 &wildcard_idx,
1178 &constraints,
1179 &source_watermarks,
1180 )?;
1181
1182 generated_columns_check_for_cdc_table(&column_defs)?;
1183 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1184
1185 let session = &handler_args.session;
1186 let db_name = &session.database();
1187 let user_name = &session.user_name();
1188 let search_path = session.config().search_path();
1189 let (schema_name, resolved_table_name) =
1190 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1191 let (database_id, schema_id) =
1192 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1193
1194 let (source_schema, source_name) =
1196 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1197
1198 let source = {
1199 let catalog_reader = session.env().catalog_reader().read_guard();
1200 let schema_path =
1201 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1202
1203 let (source, _) = catalog_reader.get_source_by_name(
1204 db_name,
1205 schema_path,
1206 source_name.as_str(),
1207 )?;
1208 source.clone()
1209 };
1210 let (cdc_with_options, normalized_external_table_name) =
1211 derive_with_options_for_cdc_table(
1212 &source.with_properties,
1213 cdc_table.external_table_name.clone(),
1214 )?;
1215
1216 let (columns, pk_names) = match wildcard_idx {
1217 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1218 None => {
1219 for column_def in &column_defs {
1220 for option_def in &column_def.options {
1221 if let ColumnOption::DefaultValue(_)
1222 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1223 {
1224 return Err(ErrorCode::NotSupported(
1225 "Default value for columns defined on the table created from a CDC source".into(),
1226 "Remove the default value expression in the column definitions".into(),
1227 )
1228 .into());
1229 }
1230 }
1231 }
1232
1233 let (columns, pk_names) =
1234 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1235 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1237 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1238 .context("failed to extract external table config")?;
1239
1240 (columns, pk_names)
1241 }
1242 };
1243
1244 let context: OptimizerContextRef =
1245 OptimizerContext::new(handler_args, explain_options).into();
1246 let shared_source_id = source.id;
1247 let (plan, table) = gen_create_table_plan_for_cdc_table(
1248 context,
1249 source,
1250 normalized_external_table_name,
1251 column_defs,
1252 columns,
1253 pk_names,
1254 cdc_with_options,
1255 col_id_gen,
1256 on_conflict,
1257 with_version_columns,
1258 include_column_options,
1259 table_name.clone(),
1260 resolved_table_name,
1261 database_id,
1262 schema_id,
1263 TableId::placeholder(),
1264 engine,
1265 )?;
1266
1267 (
1268 (plan, None, table),
1269 TableJobType::SharedCdcSource,
1270 Some(shared_source_id),
1271 )
1272 }
1273 (Some(_), Some(_)) => {
1274 return Err(ErrorCode::NotSupported(
1275 "Data format and encoding format doesn't apply to table created from a CDC source"
1276 .into(),
1277 "Remove the FORMAT and ENCODE specification".into(),
1278 )
1279 .into());
1280 }
1281 };
1282 Ok((plan, source, table, job_type, shared_shource_id))
1283}
1284
1285fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1287 let mut found_generated_column = false;
1288 for column in columns {
1289 let mut is_generated = false;
1290
1291 for option_def in &column.options {
1292 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1293 is_generated = true;
1294 break;
1295 }
1296 }
1297
1298 if is_generated {
1299 found_generated_column = true;
1300 } else if found_generated_column {
1301 return Err(ErrorCode::NotSupported(
1302 "Non-generated column found after a generated column.".into(),
1303 "Ensure that all generated columns appear at the end of the cdc table definition."
1304 .into(),
1305 )
1306 .into());
1307 }
1308 }
1309 Ok(())
1310}
1311
1312fn not_null_check_for_cdc_table(
1314 wildcard_idx: &Option<usize>,
1315 column_defs: &Vec<ColumnDef>,
1316) -> Result<()> {
1317 if !wildcard_idx.is_some()
1318 && column_defs.iter().any(|col| {
1319 col.options
1320 .iter()
1321 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1322 })
1323 {
1324 return Err(ErrorCode::NotSupported(
1325 "CDC table with NOT NULL constraint is not supported".to_owned(),
1326 "Please remove the NOT NULL constraint for columns".to_owned(),
1327 )
1328 .into());
1329 }
1330 Ok(())
1331}
1332
1333fn sanity_check_for_table_on_cdc_source(
1335 append_only: bool,
1336 column_defs: &Vec<ColumnDef>,
1337 wildcard_idx: &Option<usize>,
1338 constraints: &Vec<TableConstraint>,
1339 source_watermarks: &Vec<SourceWatermark>,
1340) -> Result<()> {
1341 if wildcard_idx.is_some() && !column_defs.is_empty() {
1343 return Err(ErrorCode::NotSupported(
1344 "wildcard(*) and column definitions cannot be used together".to_owned(),
1345 "Remove the wildcard or column definitions".to_owned(),
1346 )
1347 .into());
1348 }
1349
1350 if !wildcard_idx.is_some()
1352 && !constraints.iter().any(|c| {
1353 matches!(
1354 c,
1355 TableConstraint::Unique {
1356 is_primary: true,
1357 ..
1358 }
1359 )
1360 })
1361 && !column_defs.iter().any(|col| {
1362 col.options
1363 .iter()
1364 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1365 })
1366 {
1367 return Err(ErrorCode::NotSupported(
1368 "CDC table without primary key constraint is not supported".to_owned(),
1369 "Please define a primary key".to_owned(),
1370 )
1371 .into());
1372 }
1373
1374 if append_only {
1375 return Err(ErrorCode::NotSupported(
1376 "append only modifier on the table created from a CDC source".into(),
1377 "Remove the APPEND ONLY clause".into(),
1378 )
1379 .into());
1380 }
1381
1382 if !source_watermarks.is_empty() {
1383 return Err(ErrorCode::NotSupported(
1384 "watermark defined on the table created from a CDC source".into(),
1385 "Remove the Watermark definitions".into(),
1386 )
1387 .into());
1388 }
1389
1390 Ok(())
1391}
1392
1393async fn bind_cdc_table_schema_externally(
1395 cdc_with_options: WithOptionsSecResolved,
1396) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1397 let (options, secret_refs) = cdc_with_options.into_parts();
1399 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1400 .context("failed to extract external table config")?;
1401
1402 let table = ExternalTableImpl::connect(config)
1403 .await
1404 .context("failed to auto derive table schema")?;
1405
1406 Ok((
1407 table
1408 .column_descs()
1409 .iter()
1410 .cloned()
1411 .map(|column_desc| ColumnCatalog {
1412 column_desc,
1413 is_hidden: false,
1414 })
1415 .collect(),
1416 table.pk_names().clone(),
1417 ))
1418}
1419
1420fn bind_cdc_table_schema(
1422 column_defs: &Vec<ColumnDef>,
1423 constraints: &Vec<TableConstraint>,
1424 is_for_replace_plan: bool,
1425) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1426 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1427
1428 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1429 Ok((columns, pk_names))
1430}
1431
1432#[allow(clippy::too_many_arguments)]
1433pub async fn handle_create_table(
1434 handler_args: HandlerArgs,
1435 table_name: ObjectName,
1436 column_defs: Vec<ColumnDef>,
1437 wildcard_idx: Option<usize>,
1438 constraints: Vec<TableConstraint>,
1439 if_not_exists: bool,
1440 format_encode: Option<FormatEncodeOptions>,
1441 source_watermarks: Vec<SourceWatermark>,
1442 append_only: bool,
1443 on_conflict: Option<OnConflict>,
1444 with_version_columns: Vec<String>,
1445 cdc_table_info: Option<CdcTableInfo>,
1446 include_column_options: IncludeOption,
1447 webhook_info: Option<WebhookSourceInfo>,
1448 ast_engine: risingwave_sqlparser::ast::Engine,
1449) -> Result<RwPgResponse> {
1450 let session = handler_args.session.clone();
1451
1452 if append_only {
1453 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1454 }
1455
1456 session.check_cluster_limits().await?;
1457
1458 let engine = match ast_engine {
1459 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1460 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1461 };
1462
1463 if let Either::Right(resp) = session.check_relation_name_duplicated(
1464 table_name.clone(),
1465 StatementType::CREATE_TABLE,
1466 if_not_exists,
1467 )? {
1468 return Ok(resp);
1469 }
1470
1471 let (graph, source, hummock_table, job_type, shared_source_id) = {
1472 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1473 handler_args.clone(),
1474 ExplainOptions::default(),
1475 format_encode,
1476 cdc_table_info,
1477 &table_name,
1478 column_defs.clone(),
1479 wildcard_idx,
1480 constraints.clone(),
1481 source_watermarks,
1482 append_only,
1483 on_conflict,
1484 with_version_columns,
1485 include_column_options,
1486 webhook_info,
1487 engine,
1488 )
1489 .await?;
1490 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1491
1492 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1493
1494 (graph, source, table, job_type, shared_source_id)
1495 };
1496
1497 tracing::trace!(
1498 "name={}, graph=\n{}",
1499 table_name,
1500 serde_json::to_string_pretty(&graph).unwrap()
1501 );
1502
1503 let dependencies = shared_source_id
1504 .map(|id| HashSet::from([id.as_object_id()]))
1505 .unwrap_or_default();
1506
1507 match engine {
1509 Engine::Hummock => {
1510 let catalog_writer = session.catalog_writer()?;
1511 let action = match job_type {
1512 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1513 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1514 };
1515 execute_with_long_running_notification(
1516 catalog_writer.create_table(
1517 source.map(|s| s.to_prost()),
1518 hummock_table.to_prost(),
1519 graph,
1520 job_type,
1521 if_not_exists,
1522 dependencies,
1523 ),
1524 &session,
1525 "CREATE TABLE",
1526 action,
1527 )
1528 .await?;
1529 }
1530 Engine::Iceberg => {
1531 let hummock_table_name = hummock_table.name.clone();
1532 session.create_staging_table(hummock_table.clone());
1533 let res = create_iceberg_engine_table(
1534 session.clone(),
1535 handler_args,
1536 source.map(|s| s.to_prost()),
1537 hummock_table,
1538 graph,
1539 table_name,
1540 job_type,
1541 if_not_exists,
1542 )
1543 .await;
1544 session.drop_staging_table(&hummock_table_name);
1545 res?
1546 }
1547 }
1548
1549 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1550}
1551
1552#[allow(clippy::too_many_arguments)]
1561pub async fn create_iceberg_engine_table(
1562 session: Arc<SessionImpl>,
1563 handler_args: HandlerArgs,
1564 mut source: Option<PbSource>,
1565 table: TableCatalog,
1566 graph: StreamFragmentGraph,
1567 table_name: ObjectName,
1568 job_type: PbTableJobType,
1569 if_not_exists: bool,
1570) -> Result<()> {
1571 let rw_db_name = session
1572 .env()
1573 .catalog_reader()
1574 .read_guard()
1575 .get_database_by_id(table.database_id)?
1576 .name()
1577 .to_owned();
1578 let rw_schema_name = session
1579 .env()
1580 .catalog_reader()
1581 .read_guard()
1582 .get_schema_by_id(table.database_id, table.schema_id)?
1583 .name()
1584 .clone();
1585 let iceberg_catalog_name = rw_db_name.clone();
1586 let iceberg_database_name = rw_schema_name.clone();
1587 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1588
1589 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1590 let sink_decouple = session.config().sink_decouple();
1591 if matches!(sink_decouple, SinkDecouple::Disable) {
1592 bail!(
1593 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1594 );
1595 }
1596
1597 let mut connection_ref = BTreeMap::new();
1598 let with_common = if iceberg_engine_connection.is_empty() {
1599 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1600 } else {
1601 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1602 assert_eq!(parts.len(), 2);
1603 let connection_catalog =
1604 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1605 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1606 if params.connection_type == ConnectionType::Iceberg as i32 {
1607 connection_ref.insert(
1609 "connection".to_owned(),
1610 ConnectionRefValue {
1611 connection_name: ObjectName::from(vec![
1612 Ident::from(parts[0]),
1613 Ident::from(parts[1]),
1614 ]),
1615 },
1616 );
1617
1618 let mut with_common = BTreeMap::new();
1619 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1620 with_common.insert("database.name".to_owned(), iceberg_database_name);
1621 with_common.insert("table.name".to_owned(), iceberg_table_name);
1622
1623 let hosted_catalog = params
1624 .properties
1625 .get("hosted_catalog")
1626 .map(|s| s.eq_ignore_ascii_case("true"))
1627 .unwrap_or(false);
1628 if hosted_catalog {
1629 let meta_client = session.env().meta_client();
1630 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1631
1632 let meta_store_endpoint =
1633 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1634 ErrorCode::InternalError(
1635 "failed to parse the meta store endpoint".to_owned(),
1636 )
1637 })?;
1638 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1639 let meta_store_user = meta_store_endpoint.username().to_owned();
1640 let meta_store_password = match meta_store_endpoint.password() {
1641 Some(password) => percent_decode_str(password)
1642 .decode_utf8()
1643 .map_err(|_| {
1644 ErrorCode::InternalError(
1645 "failed to parse password from meta store endpoint".to_owned(),
1646 )
1647 })?
1648 .into_owned(),
1649 None => "".to_owned(),
1650 };
1651 let meta_store_host = meta_store_endpoint
1652 .host_str()
1653 .ok_or_else(|| {
1654 ErrorCode::InternalError(
1655 "failed to parse host from meta store endpoint".to_owned(),
1656 )
1657 })?
1658 .to_owned();
1659 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1660 ErrorCode::InternalError(
1661 "failed to parse port from meta store endpoint".to_owned(),
1662 )
1663 })?;
1664 let meta_store_database = meta_store_endpoint
1665 .path()
1666 .trim_start_matches('/')
1667 .to_owned();
1668
1669 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1670 bail!("failed to parse meta backend: {}", meta_store_backend);
1671 };
1672
1673 let catalog_uri = match meta_backend {
1674 MetaBackend::Postgres => {
1675 format!(
1676 "jdbc:postgresql://{}:{}/{}",
1677 meta_store_host, meta_store_port, meta_store_database
1678 )
1679 }
1680 MetaBackend::Mysql => {
1681 format!(
1682 "jdbc:mysql://{}:{}/{}",
1683 meta_store_host, meta_store_port, meta_store_database
1684 )
1685 }
1686 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1687 bail!(
1688 "Unsupported meta backend for iceberg engine table: {}",
1689 meta_store_backend
1690 );
1691 }
1692 };
1693
1694 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1695 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1696 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1697 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1698 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1699 }
1700
1701 with_common
1702 } else {
1703 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1704 "Only iceberg connection could be used in iceberg engine".to_owned(),
1705 )));
1706 }
1707 } else {
1708 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1709 "Private Link Service has been deprecated. Please create a new connection instead."
1710 .to_owned(),
1711 )));
1712 }
1713 };
1714
1715 let mut pks = table
1718 .pk_column_names()
1719 .iter()
1720 .map(|c| c.to_string())
1721 .collect::<Vec<String>>();
1722
1723 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1725 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1726 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1727 "select {} as {}, * from {}",
1728 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1729 ))
1730 .context("unable to parse query")?
1731 .try_into()
1732 .unwrap();
1733
1734 let Statement::Query(query) = &stmt else {
1735 panic!("unexpected statement: {:?}", stmt);
1736 };
1737 CreateSink::AsQuery(query.clone())
1738 } else {
1739 CreateSink::From(table_name.clone())
1740 };
1741
1742 let mut sink_name = table_name.clone();
1743 *sink_name.0.last_mut().unwrap() = Ident::from(
1744 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1745 );
1746 let create_sink_stmt = CreateSinkStatement {
1747 if_not_exists: false,
1748 sink_name,
1749 with_properties: WithProperties(vec![]),
1750 sink_from,
1751 columns: vec![],
1752 emit_mode: None,
1753 sink_schema: None,
1754 into_table_name: None,
1755 };
1756
1757 let mut sink_handler_args = handler_args.clone();
1758
1759 let mut sink_with = with_common.clone();
1760
1761 if table.append_only {
1762 sink_with.insert("type".to_owned(), "append-only".to_owned());
1763 } else {
1764 sink_with.insert("primary_key".to_owned(), pks.join(","));
1765 sink_with.insert("type".to_owned(), "upsert".to_owned());
1766 }
1767 let commit_checkpoint_interval = handler_args
1786 .with_options
1787 .get(COMMIT_CHECKPOINT_INTERVAL)
1788 .map(|v| v.to_owned())
1789 .unwrap_or_else(|| "60".to_owned());
1790 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1791 ErrorCode::InvalidInputSyntax(format!(
1792 "commit_checkpoint_interval must be greater than 0: {}",
1793 commit_checkpoint_interval
1794 ))
1795 })?;
1796
1797 if commit_checkpoint_interval == 0 {
1798 bail!("commit_checkpoint_interval must be greater than 0");
1799 }
1800
1801 source
1803 .as_mut()
1804 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1805
1806 let sink_decouple = session.config().sink_decouple();
1807 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1808 bail!(
1809 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1810 )
1811 }
1812
1813 sink_with.insert(
1814 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1815 commit_checkpoint_interval.to_string(),
1816 );
1817 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1818
1819 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1820
1821 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1822 match enable_compaction.to_lowercase().as_str() {
1823 "true" => {
1824 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1825 }
1826 "false" => {
1827 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1828 }
1829 _ => {
1830 return Err(ErrorCode::InvalidInputSyntax(format!(
1831 "enable_compaction must be true or false: {}",
1832 enable_compaction
1833 ))
1834 .into());
1835 }
1836 }
1837
1838 source
1840 .as_mut()
1841 .map(|x| x.with_properties.remove("enable_compaction"));
1842 } else {
1843 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1844 }
1845
1846 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1847 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1848 ErrorCode::InvalidInputSyntax(format!(
1849 "compaction_interval_sec must be greater than 0: {}",
1850 commit_checkpoint_interval
1851 ))
1852 })?;
1853 if compaction_interval_sec == 0 {
1854 bail!("compaction_interval_sec must be greater than 0");
1855 }
1856 sink_with.insert(
1857 "compaction_interval_sec".to_owned(),
1858 compaction_interval_sec.to_string(),
1859 );
1860 source
1862 .as_mut()
1863 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1864 }
1865
1866 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1867 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1868 {
1869 source
1871 .as_mut()
1872 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1873 match enable_snapshot_expiration.to_lowercase().as_str() {
1874 "true" => {
1875 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1876 true
1877 }
1878 "false" => {
1879 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1880 false
1881 }
1882 _ => {
1883 return Err(ErrorCode::InvalidInputSyntax(format!(
1884 "enable_snapshot_expiration must be true or false: {}",
1885 enable_snapshot_expiration
1886 ))
1887 .into());
1888 }
1889 }
1890 } else {
1891 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1892 true
1893 };
1894
1895 if has_enabled_snapshot_expiration {
1896 if let Some(snapshot_expiration_retain_last) = handler_args
1898 .with_options
1899 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1900 {
1901 sink_with.insert(
1902 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1903 snapshot_expiration_retain_last.to_owned(),
1904 );
1905 source
1907 .as_mut()
1908 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1909 }
1910
1911 if let Some(snapshot_expiration_max_age) = handler_args
1912 .with_options
1913 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1914 {
1915 sink_with.insert(
1916 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1917 snapshot_expiration_max_age.to_owned(),
1918 );
1919 source
1921 .as_mut()
1922 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1923 }
1924
1925 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1926 .with_options
1927 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1928 {
1929 sink_with.insert(
1930 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1931 snapshot_expiration_clear_expired_files.to_owned(),
1932 );
1933 source.as_mut().map(|x| {
1935 x.with_properties
1936 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1937 });
1938 }
1939
1940 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1941 .with_options
1942 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1943 {
1944 sink_with.insert(
1945 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1946 snapshot_expiration_clear_expired_meta_data.to_owned(),
1947 );
1948 source.as_mut().map(|x| {
1950 x.with_properties
1951 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1952 });
1953 }
1954 }
1955
1956 if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1957 let format_version = format_version.parse::<u8>().map_err(|_| {
1958 ErrorCode::InvalidInputSyntax(format!(
1959 "format_version must be 1, 2 or 3: {}",
1960 format_version
1961 ))
1962 })?;
1963 if format_version != 1 && format_version != 2 && format_version != 3 {
1964 bail!("format_version must be 1, 2 or 3");
1965 }
1966 sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
1967
1968 source
1970 .as_mut()
1971 .map(|x| x.with_properties.remove(FORMAT_VERSION));
1972 }
1973
1974 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1975 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1976 ErrorCode::InvalidInputSyntax(format!(
1977 "invalid write_mode: {}, must be one of: {}, {}",
1978 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1979 ))
1980 })?;
1981
1982 match write_mode {
1983 IcebergWriteMode::MergeOnRead => {
1984 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1985 }
1986
1987 IcebergWriteMode::CopyOnWrite => {
1988 if table.append_only {
1989 return Err(ErrorCode::NotSupported(
1990 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1991 "Please use MERGE ON READ instead".to_owned(),
1992 )
1993 .into());
1994 }
1995
1996 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1997 }
1998 }
1999
2000 source
2002 .as_mut()
2003 .map(|x| x.with_properties.remove("write_mode"));
2004 } else {
2005 sink_with.insert(
2006 WRITE_MODE.to_owned(),
2007 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2008 );
2009 }
2010
2011 if let Some(max_snapshots_num_before_compaction) =
2012 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2013 {
2014 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2015 .parse::<u32>()
2016 .map_err(|_| {
2017 ErrorCode::InvalidInputSyntax(format!(
2018 "{} must be greater than 0: {}",
2019 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2020 ))
2021 })?;
2022
2023 if max_snapshots_num_before_compaction == 0 {
2024 bail!(format!(
2025 "{} must be greater than 0",
2026 COMPACTION_MAX_SNAPSHOTS_NUM
2027 ));
2028 }
2029
2030 sink_with.insert(
2031 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2032 max_snapshots_num_before_compaction.to_string(),
2033 );
2034
2035 source
2037 .as_mut()
2038 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2039 }
2040
2041 if let Some(small_files_threshold_mb) = handler_args
2042 .with_options
2043 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2044 {
2045 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2046 ErrorCode::InvalidInputSyntax(format!(
2047 "{} must be greater than 0: {}",
2048 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2049 ))
2050 })?;
2051 if small_files_threshold_mb == 0 {
2052 bail!(format!(
2053 "{} must be a greater than 0",
2054 COMPACTION_SMALL_FILES_THRESHOLD_MB
2055 ));
2056 }
2057 sink_with.insert(
2058 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2059 small_files_threshold_mb.to_string(),
2060 );
2061
2062 source.as_mut().map(|x| {
2064 x.with_properties
2065 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2066 });
2067 }
2068
2069 if let Some(delete_files_count_threshold) = handler_args
2070 .with_options
2071 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2072 {
2073 let delete_files_count_threshold =
2074 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2075 ErrorCode::InvalidInputSyntax(format!(
2076 "{} must be greater than 0: {}",
2077 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2078 ))
2079 })?;
2080 if delete_files_count_threshold == 0 {
2081 bail!(format!(
2082 "{} must be greater than 0",
2083 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2084 ));
2085 }
2086 sink_with.insert(
2087 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2088 delete_files_count_threshold.to_string(),
2089 );
2090
2091 source.as_mut().map(|x| {
2093 x.with_properties
2094 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2095 });
2096 }
2097
2098 if let Some(trigger_snapshot_count) = handler_args
2099 .with_options
2100 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2101 {
2102 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2103 ErrorCode::InvalidInputSyntax(format!(
2104 "{} must be greater than 0: {}",
2105 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2106 ))
2107 })?;
2108 if trigger_snapshot_count == 0 {
2109 bail!(format!(
2110 "{} must be greater than 0",
2111 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2112 ));
2113 }
2114 sink_with.insert(
2115 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2116 trigger_snapshot_count.to_string(),
2117 );
2118
2119 source
2121 .as_mut()
2122 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2123 }
2124
2125 if let Some(target_file_size_mb) = handler_args
2126 .with_options
2127 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2128 {
2129 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2130 ErrorCode::InvalidInputSyntax(format!(
2131 "{} must be greater than 0: {}",
2132 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2133 ))
2134 })?;
2135 if target_file_size_mb == 0 {
2136 bail!(format!(
2137 "{} must be greater than 0",
2138 COMPACTION_TARGET_FILE_SIZE_MB
2139 ));
2140 }
2141 sink_with.insert(
2142 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2143 target_file_size_mb.to_string(),
2144 );
2145 source
2147 .as_mut()
2148 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2149 }
2150
2151 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2152 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2153 ErrorCode::InvalidInputSyntax(format!(
2154 "invalid compaction_type: {}, must be one of {:?}",
2155 compaction_type,
2156 &[
2157 CompactionType::Full,
2158 CompactionType::SmallFiles,
2159 CompactionType::FilesWithDelete
2160 ]
2161 ))
2162 })?;
2163
2164 sink_with.insert(
2165 COMPACTION_TYPE.to_owned(),
2166 compaction_type.as_str().to_owned(),
2167 );
2168
2169 source
2171 .as_mut()
2172 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2173 }
2174
2175 let partition_by = handler_args
2176 .with_options
2177 .get("partition_by")
2178 .map(|v| v.to_owned());
2179
2180 if let Some(partition_by) = &partition_by {
2181 let mut partition_columns = vec![];
2182 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2183 table
2184 .columns()
2185 .iter()
2186 .find(|col| col.name().eq_ignore_ascii_case(&column))
2187 .ok_or_else(|| {
2188 ErrorCode::InvalidInputSyntax(format!(
2189 "Partition source column does not exist in schema: {}",
2190 column
2191 ))
2192 })?;
2193
2194 partition_columns.push(column);
2195 }
2196
2197 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2198 |_| {
2199 ErrorCode::InvalidInputSyntax(
2200 "The partition columns should be the prefix of the primary key".to_owned(),
2201 )
2202 },
2203 )?;
2204
2205 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2206
2207 source
2209 .as_mut()
2210 .map(|x| x.with_properties.remove("partition_by"));
2211 }
2212
2213 sink_handler_args.with_options =
2214 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2215 let SinkPlanContext {
2216 sink_plan,
2217 sink_catalog,
2218 ..
2219 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2220 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2221
2222 let mut source_name = table_name.clone();
2223 *source_name.0.last_mut().unwrap() = Ident::from(
2224 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2225 );
2226 let create_source_stmt = CreateSourceStatement {
2227 temporary: false,
2228 if_not_exists: false,
2229 columns: vec![],
2230 source_name,
2231 wildcard_idx: Some(0),
2232 constraints: vec![],
2233 with_properties: WithProperties(vec![]),
2234 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2235 source_watermarks: vec![],
2236 include_column_options: vec![],
2237 };
2238
2239 let mut source_handler_args = handler_args.clone();
2240 let source_with = with_common;
2241 source_handler_args.with_options =
2242 WithOptions::new(source_with, Default::default(), connection_ref);
2243
2244 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2245 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2246 let (with_properties, refresh_mode) =
2247 bind_connector_props(&source_handler_args, &format_encode, true)?;
2248
2249 let (iceberg_catalog, table_identifier) = {
2252 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2253 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2254 iceberg_sink.create_table_if_not_exists().await?;
2255
2256 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2257 let table_identifier = iceberg_sink.config.full_table_name()?;
2258 (iceberg_catalog, table_identifier)
2259 };
2260
2261 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2262 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2263 &session,
2264 &format_encode,
2265 Either::Left(&with_properties),
2266 create_source_type,
2267 )
2268 .await?;
2269 let mut col_id_gen = ColumnIdGenerator::new_initial();
2270
2271 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2272 source_handler_args,
2273 create_source_stmt.source_name,
2274 format_encode,
2275 with_properties,
2276 &create_source_stmt.columns,
2277 create_source_stmt.constraints,
2278 create_source_stmt.wildcard_idx,
2279 create_source_stmt.source_watermarks,
2280 columns_from_resolve_source,
2281 source_info,
2282 create_source_stmt.include_column_options,
2283 &mut col_id_gen,
2284 create_source_type,
2285 overwrite_options.source_rate_limit,
2286 SqlColumnStrategy::FollowChecked,
2287 refresh_mode,
2288 )
2289 .await?;
2290
2291 let _ = Jvm::get_or_init()?;
2294
2295 let catalog_writer = session.catalog_writer()?;
2296 let action = match job_type {
2297 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2298 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2299 };
2300 let res = execute_with_long_running_notification(
2301 catalog_writer.create_iceberg_table(
2302 PbTableJobInfo {
2303 source,
2304 table: Some(table.to_prost()),
2305 fragment_graph: Some(graph),
2306 job_type: job_type as _,
2307 },
2308 PbSinkJobInfo {
2309 sink: Some(sink_catalog.to_proto()),
2310 fragment_graph: Some(sink_graph),
2311 },
2312 iceberg_source_catalog.to_prost(),
2313 if_not_exists,
2314 ),
2315 &session,
2316 "CREATE TABLE",
2317 action,
2318 )
2319 .await;
2320
2321 if res.is_err() {
2322 let _ = iceberg_catalog
2323 .drop_table(&table_identifier)
2324 .await
2325 .inspect_err(|err| {
2326 tracing::error!(
2327 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2328 table_identifier,
2329 err.as_report()
2330 );
2331 });
2332 res?
2333 }
2334
2335 Ok(())
2336}
2337
2338pub fn check_create_table_with_source(
2339 with_options: &WithOptions,
2340 format_encode: Option<FormatEncodeOptions>,
2341 include_column_options: &IncludeOption,
2342 cdc_table_info: &Option<CdcTableInfo>,
2343) -> Result<Option<FormatEncodeOptions>> {
2344 if cdc_table_info.is_some() {
2346 return Ok(format_encode);
2347 }
2348 let defined_source = with_options.is_source_connector();
2349
2350 if !include_column_options.is_empty() && !defined_source {
2351 return Err(ErrorCode::InvalidInputSyntax(
2352 "INCLUDE should be used with a connector".to_owned(),
2353 )
2354 .into());
2355 }
2356 if defined_source {
2357 format_encode.as_ref().ok_or_else(|| {
2358 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2359 })?;
2360 }
2361 Ok(format_encode)
2362}
2363
2364fn ensure_partition_columns_are_prefix_of_primary_key(
2365 partition_columns: &[String],
2366 primary_key_columns: &[String],
2367) -> std::result::Result<(), String> {
2368 if partition_columns.len() > primary_key_columns.len() {
2369 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2370 }
2371
2372 for (i, partition_col) in partition_columns.iter().enumerate() {
2373 if primary_key_columns.get(i) != Some(partition_col) {
2374 return Err(format!(
2375 "Partition column '{}' is not a prefix of the primary key.",
2376 partition_col
2377 ));
2378 }
2379 }
2380
2381 Ok(())
2382}
2383
2384#[allow(clippy::too_many_arguments)]
2385pub async fn generate_stream_graph_for_replace_table(
2386 _session: &Arc<SessionImpl>,
2387 table_name: ObjectName,
2388 original_catalog: &Arc<TableCatalog>,
2389 handler_args: HandlerArgs,
2390 statement: Statement,
2391 col_id_gen: ColumnIdGenerator,
2392 sql_column_strategy: SqlColumnStrategy,
2393) -> Result<(
2394 StreamFragmentGraph,
2395 TableCatalog,
2396 Option<SourceCatalog>,
2397 TableJobType,
2398)> {
2399 let Statement::CreateTable {
2400 columns,
2401 constraints,
2402 source_watermarks,
2403 append_only,
2404 on_conflict,
2405 with_version_columns,
2406 wildcard_idx,
2407 cdc_table_info,
2408 format_encode,
2409 include_column_options,
2410 engine,
2411 with_options,
2412 ..
2413 } = statement
2414 else {
2415 panic!("unexpected statement type: {:?}", statement);
2416 };
2417
2418 let format_encode = format_encode
2419 .clone()
2420 .map(|format_encode| format_encode.into_v2_with_warning());
2421
2422 let engine = match engine {
2423 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2424 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2425 };
2426
2427 let is_drop_connector =
2428 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2429 if is_drop_connector {
2430 debug_assert!(
2431 source_watermarks.is_empty()
2432 && include_column_options.is_empty()
2433 && with_options
2434 .iter()
2435 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2436 );
2437 }
2438
2439 let props = CreateTableProps {
2440 definition: handler_args.normalized_sql.clone(),
2441 append_only,
2442 on_conflict: on_conflict.into(),
2443 with_version_columns: with_version_columns
2444 .iter()
2445 .map(|col| col.real_value())
2446 .collect(),
2447 webhook_info: original_catalog.webhook_info.clone(),
2448 engine,
2449 };
2450
2451 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2452 (Some(format_encode), None) => (
2453 gen_create_table_plan_with_source(
2454 handler_args,
2455 ExplainOptions::default(),
2456 table_name,
2457 columns,
2458 wildcard_idx,
2459 constraints,
2460 format_encode,
2461 source_watermarks,
2462 col_id_gen,
2463 include_column_options,
2464 props,
2465 sql_column_strategy,
2466 )
2467 .await?,
2468 TableJobType::General,
2469 ),
2470 (None, None) => {
2471 let context = OptimizerContext::from_handler_args(handler_args);
2472 let (plan, table) = gen_create_table_plan(
2473 context,
2474 table_name,
2475 columns,
2476 constraints,
2477 col_id_gen,
2478 source_watermarks,
2479 props,
2480 true,
2481 )?;
2482 ((plan, None, table), TableJobType::General)
2483 }
2484 (None, Some(cdc_table)) => {
2485 let session = &handler_args.session;
2486 let (source, resolved_table_name) =
2487 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2488
2489 let (cdc_with_options, normalized_external_table_name) =
2490 derive_with_options_for_cdc_table(
2491 &source.with_properties,
2492 cdc_table.external_table_name.clone(),
2493 )?;
2494
2495 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2496
2497 let context: OptimizerContextRef =
2498 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2499 let (plan, table) = gen_create_table_plan_for_cdc_table(
2500 context,
2501 source,
2502 normalized_external_table_name,
2503 columns,
2504 column_catalogs,
2505 pk_names,
2506 cdc_with_options,
2507 col_id_gen,
2508 on_conflict,
2509 with_version_columns
2510 .iter()
2511 .map(|col| col.real_value())
2512 .collect(),
2513 include_column_options,
2514 table_name,
2515 resolved_table_name,
2516 original_catalog.database_id,
2517 original_catalog.schema_id,
2518 original_catalog.id(),
2519 engine,
2520 )?;
2521
2522 ((plan, None, table), TableJobType::SharedCdcSource)
2523 }
2524 (Some(_), Some(_)) => {
2525 return Err(ErrorCode::NotSupported(
2526 "Data format and encoding format doesn't apply to table created from a CDC source"
2527 .into(),
2528 "Remove the FORMAT and ENCODE specification".into(),
2529 )
2530 .into());
2531 }
2532 };
2533
2534 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2535 Err(ErrorCode::InvalidInputSyntax(
2536 "alter primary key of table is not supported".to_owned(),
2537 ))?
2538 }
2539
2540 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2541
2542 table.id = original_catalog.id();
2544 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2545 table.associated_source_id = Some(source_id);
2546
2547 let source = source.as_mut().unwrap();
2548 source.id = source_id;
2549 source.associated_table_id = Some(table.id());
2550 }
2551
2552 Ok((graph, table, source, job_type))
2553}
2554
2555fn get_source_and_resolved_table_name(
2556 session: &Arc<SessionImpl>,
2557 cdc_table: CdcTableInfo,
2558 table_name: ObjectName,
2559) -> Result<(Arc<SourceCatalog>, String)> {
2560 let db_name = &session.database();
2561 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2562
2563 let (source_schema, source_name) =
2564 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2565
2566 let source = {
2567 let catalog_reader = session.env().catalog_reader().read_guard();
2568 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2569 let (source, _) = catalog_reader.get_source_by_name(
2570 db_name,
2571 SchemaPath::Name(schema_name.as_str()),
2572 source_name.as_str(),
2573 )?;
2574 source.clone()
2575 };
2576
2577 Ok((source, resolved_table_name))
2578}
2579
2580fn bind_webhook_info(
2582 session: &Arc<SessionImpl>,
2583 columns_defs: &[ColumnDef],
2584 webhook_info: WebhookSourceInfo,
2585) -> Result<PbWebhookSourceInfo> {
2586 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2588 {
2589 return Err(ErrorCode::InvalidInputSyntax(
2590 "Table with webhook source should have exactly one JSONB column".to_owned(),
2591 )
2592 .into());
2593 }
2594
2595 let WebhookSourceInfo {
2596 secret_ref,
2597 signature_expr,
2598 wait_for_persistence,
2599 is_batched,
2600 } = webhook_info;
2601
2602 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2604 let db_name = &session.database();
2605 let (schema_name, secret_name) =
2606 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2607 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2608 (
2609 Some(PbSecretRef {
2610 secret_id: secret_catalog.id,
2611 ref_as: match secret_ref.ref_as {
2612 SecretRefAsType::Text => PbRefAsType::Text,
2613 SecretRefAsType::File => PbRefAsType::File,
2614 }
2615 .into(),
2616 }),
2617 Some(secret_name),
2618 )
2619 } else {
2620 (None, None)
2621 };
2622
2623 let signature_expr = if let Some(signature_expr) = signature_expr {
2624 let secure_compare_context = SecureCompareContext {
2625 column_name: columns_defs[0].name.real_value(),
2626 secret_name,
2627 };
2628 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2629 let expr = binder.bind_expr(&signature_expr)?;
2630
2631 if expr.as_function_call().is_none()
2633 || expr.as_function_call().unwrap().func_type()
2634 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2635 {
2636 return Err(ErrorCode::InvalidInputSyntax(
2637 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2638 )
2639 .into());
2640 }
2641
2642 Some(expr.to_expr_proto())
2643 } else {
2644 session.notice_to_user(
2645 "VALIDATE clause is strongly recommended for safety or production usages",
2646 );
2647 None
2648 };
2649
2650 let pb_webhook_info = PbWebhookSourceInfo {
2651 secret_ref: pb_secret_ref,
2652 signature_expr,
2653 wait_for_persistence,
2654 is_batched,
2655 };
2656
2657 Ok(pb_webhook_info)
2658}
2659
2660#[cfg(test)]
2661mod tests {
2662 use risingwave_common::catalog::{
2663 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2664 };
2665 use risingwave_common::types::{DataType, StructType};
2666
2667 use super::*;
2668 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2669
2670 #[tokio::test]
2671 async fn test_create_table_handler() {
2672 let sql =
2673 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2674 let frontend = LocalFrontend::new(Default::default()).await;
2675 frontend.run_sql(sql).await.unwrap();
2676
2677 let session = frontend.session_ref();
2678 let catalog_reader = session.env().catalog_reader().read_guard();
2679 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2680
2681 let (table, _) = catalog_reader
2683 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2684 .unwrap();
2685 assert_eq!(table.name(), "t");
2686
2687 let columns = table
2688 .columns
2689 .iter()
2690 .map(|col| (col.name(), col.data_type().clone()))
2691 .collect::<HashMap<&str, DataType>>();
2692
2693 let expected_columns = maplit::hashmap! {
2694 ROW_ID_COLUMN_NAME => DataType::Serial,
2695 "v1" => DataType::Int16,
2696 "v2" => StructType::new(
2697 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2698 )
2699 .with_ids([3, 4, 5].map(ColumnId::new))
2700 .into(),
2701 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2702 };
2703
2704 assert_eq!(columns, expected_columns, "{columns:#?}");
2705 }
2706
2707 #[test]
2708 fn test_bind_primary_key() {
2709 for (sql, expected) in [
2712 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2713 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2714 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2715 (
2716 "create table t (v1 int primary key, v2 int primary key)",
2717 Err("multiple primary keys are not allowed"),
2718 ),
2719 (
2720 "create table t (v1 int primary key primary key, v2 int)",
2721 Err("multiple primary keys are not allowed"),
2722 ),
2723 (
2724 "create table t (v1 int, v2 int, primary key (v1))",
2725 Ok(&[1]),
2726 ),
2727 (
2728 "create table t (v1 int, primary key (v2), v2 int)",
2729 Ok(&[2]),
2730 ),
2731 (
2732 "create table t (primary key (v2, v1), v1 int, v2 int)",
2733 Ok(&[2, 1]),
2734 ),
2735 (
2736 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2737 Err("multiple primary keys are not allowed"),
2738 ),
2739 (
2740 "create table t (v1 int primary key, primary key (v1), v2 int)",
2741 Err("multiple primary keys are not allowed"),
2742 ),
2743 (
2744 "create table t (v1 int, primary key (V3), v2 int)",
2745 Err("column \"v3\" named in key does not exist"),
2746 ),
2747 ] {
2748 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2749 let risingwave_sqlparser::ast::Statement::CreateTable {
2750 columns: column_defs,
2751 constraints,
2752 ..
2753 } = ast.remove(0)
2754 else {
2755 panic!("test case should be create table")
2756 };
2757 let actual: Result<_> = (|| {
2758 let mut columns = bind_sql_columns(&column_defs, false)?;
2759 let mut col_id_gen = ColumnIdGenerator::new_initial();
2760 for c in &mut columns {
2761 col_id_gen.generate(c)?;
2762 }
2763
2764 let pk_names =
2765 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2766 let (_, pk_column_ids, _) =
2767 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2768 Ok(pk_column_ids)
2769 })();
2770 match (expected, actual) {
2771 (Ok(expected), Ok(actual)) => assert_eq!(
2772 expected.iter().copied().map(ColumnId::new).collect_vec(),
2773 actual,
2774 "sql: {sql}"
2775 ),
2776 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2777 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2778 (Err(expected), Err(actual)) => assert!(
2779 actual.to_string().contains(expected),
2780 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2781 ),
2782 }
2783 }
2784 }
2785
2786 #[tokio::test]
2787 async fn test_duplicate_props_options() {
2788 let proto_file = create_proto_file(PROTO_FILE_DATA);
2789 let sql = format!(
2790 r#"CREATE TABLE t
2791 WITH (
2792 connector = 'kinesis',
2793 aws.region='user_test_topic',
2794 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2795 aws.credentials.access_key_id = 'your_access_key_1',
2796 aws.credentials.secret_access_key = 'your_secret_key_1'
2797 )
2798 FORMAT PLAIN ENCODE PROTOBUF (
2799 message = '.test.TestRecord',
2800 aws.credentials.access_key_id = 'your_access_key_2',
2801 aws.credentials.secret_access_key = 'your_secret_key_2',
2802 schema.location = 'file://{}',
2803 )"#,
2804 proto_file.path().to_str().unwrap()
2805 );
2806 let frontend = LocalFrontend::new(Default::default()).await;
2807 frontend.run_sql(sql).await.unwrap();
2808
2809 let session = frontend.session_ref();
2810 let catalog_reader = session.env().catalog_reader().read_guard();
2811 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2812
2813 let (source, _) = catalog_reader
2815 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2816 .unwrap();
2817 assert_eq!(source.name, "t");
2818
2819 assert_eq!(
2821 source
2822 .info
2823 .format_encode_options
2824 .get("aws.credentials.access_key_id")
2825 .unwrap(),
2826 "your_access_key_2"
2827 );
2828 assert_eq!(
2829 source
2830 .info
2831 .format_encode_options
2832 .get("aws.credentials.secret_access_key")
2833 .unwrap(),
2834 "your_secret_key_2"
2835 );
2836
2837 assert_eq!(
2839 source
2840 .with_properties
2841 .get("aws.credentials.access_key_id")
2842 .unwrap(),
2843 "your_access_key_1"
2844 );
2845 assert_eq!(
2846 source
2847 .with_properties
2848 .get("aws.credentials.secret_access_key")
2849 .unwrap(),
2850 "your_secret_key_1"
2851 );
2852
2853 assert!(!source.with_properties.contains_key("schema.location"));
2855 }
2856}