1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77 bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::{
80 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
81};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
83use crate::optimizer::plan_node::{
84 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
85};
86use crate::optimizer::property::{Order, RequiredDist};
87use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
88use crate::session::SessionImpl;
89use crate::session::current::notice_to_user;
90use crate::stream_fragmenter::{GraphJobType, build_graph};
91use crate::utils::OverwriteOptions;
92use crate::{Binder, Explain, TableCatalog, WithOptions};
93
94mod col_id_gen;
95pub use col_id_gen::*;
96use risingwave_connector::sink::SinkParam;
97use risingwave_connector::sink::iceberg::{
98 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
99 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
100 COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, CompactionType, ENABLE_COMPACTION,
101 ENABLE_SNAPSHOT_EXPIRATION, ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ,
102 IcebergSink, IcebergWriteMode, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
103 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
104 SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
105};
106use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
107
108use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
109
110fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
111 for option_def in &c.options {
112 match option_def.option {
113 ColumnOption::GeneratedColumns(_) => {}
114 ColumnOption::DefaultValue(_) => {}
115 ColumnOption::DefaultValueInternal { .. } => {}
116 ColumnOption::Unique { is_primary: true } => {}
117 ColumnOption::Null => {}
118 ColumnOption::NotNull => {}
119 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
120 }
121 }
122 Ok(())
123}
124
125pub fn bind_sql_columns(
129 column_defs: &[ColumnDef],
130 is_for_drop_table_connector: bool,
131) -> Result<Vec<ColumnCatalog>> {
132 let mut columns = Vec::with_capacity(column_defs.len());
133
134 for column in column_defs {
135 ensure_column_options_supported(column)?;
136 let ColumnDef {
140 name,
141 data_type,
142 collation,
143 options,
144 ..
145 } = column;
146
147 let data_type = data_type
148 .clone()
149 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
150 if let Some(collation) = collation {
151 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
157 bail_not_implemented!(
158 "Collate collation other than `C` or `POSIX` is not implemented"
159 );
160 }
161
162 match data_type {
163 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
164 _ => {
165 return Err(ErrorCode::NotSupported(
166 format!("{} is not a collatable data type", data_type),
167 "The only built-in collatable data types are `varchar`, please check your type".into(),
168 ).into());
169 }
170 }
171 }
172
173 if !is_for_drop_table_connector {
174 check_column_name_not_reserved(&name.real_value())?;
178 }
179
180 let nullable: bool = !options
181 .iter()
182 .any(|def| matches!(def.option, ColumnOption::NotNull));
183
184 columns.push(ColumnCatalog {
185 column_desc: ColumnDesc {
186 data_type: bind_data_type(&data_type)?,
187 column_id: ColumnId::placeholder(),
188 name: name.real_value(),
189 generated_or_default_column: None,
190 description: None,
191 additional_column: AdditionalColumn { column_type: None },
192 version: ColumnDescVersion::LATEST,
193 system_column: None,
194 nullable,
195 },
196 is_hidden: false,
197 });
198 }
199
200 Ok(columns)
201}
202
203fn check_generated_column_constraints(
204 column_name: &String,
205 column_id: ColumnId,
206 expr: &ExprImpl,
207 column_catalogs: &[ColumnCatalog],
208 generated_column_names: &[String],
209 pk_column_ids: &[ColumnId],
210) -> Result<()> {
211 let input_refs = expr.collect_input_refs(column_catalogs.len());
212 for idx in input_refs.ones() {
213 let referred_generated_column = &column_catalogs[idx].column_desc.name;
214 if generated_column_names
215 .iter()
216 .any(|c| c == referred_generated_column)
217 {
218 return Err(ErrorCode::BindError(format!(
219 "Generated can not reference another generated column. \
220 But here generated column \"{}\" referenced another generated column \"{}\"",
221 column_name, referred_generated_column
222 ))
223 .into());
224 }
225 }
226
227 if pk_column_ids.contains(&column_id) && expr.is_impure() {
228 return Err(ErrorCode::BindError(format!(
229 "Generated columns with impure expressions should not be part of the primary key. \
230 Here column \"{}\" is defined as part of the primary key.",
231 column_name
232 ))
233 .into());
234 }
235
236 Ok(())
237}
238
239pub fn bind_sql_column_constraints(
242 session: &SessionImpl,
243 table_name: String,
244 column_catalogs: &mut [ColumnCatalog],
245 columns: &[ColumnDef],
246 pk_column_ids: &[ColumnId],
247) -> Result<()> {
248 let generated_column_names = {
249 let mut names = vec![];
250 for column in columns {
251 for option_def in &column.options {
252 if let ColumnOption::GeneratedColumns(_) = option_def.option {
253 names.push(column.name.real_value());
254 break;
255 }
256 }
257 }
258 names
259 };
260
261 let mut binder = Binder::new_for_ddl(session);
262 binder.bind_columns_to_context(table_name, column_catalogs)?;
263
264 for column in columns {
265 let Some(idx) = column_catalogs
266 .iter()
267 .position(|c| c.name() == column.name.real_value())
268 else {
269 continue;
272 };
273
274 for option_def in &column.options {
275 match &option_def.option {
276 ColumnOption::GeneratedColumns(expr) => {
277 binder.set_clause(Some(Clause::GeneratedColumn));
278
279 let expr_impl = binder.bind_expr(expr).with_context(|| {
280 format!(
281 "fail to bind expression in generated column \"{}\"",
282 column.name.real_value()
283 )
284 })?;
285
286 check_generated_column_constraints(
287 &column.name.real_value(),
288 column_catalogs[idx].column_id(),
289 &expr_impl,
290 column_catalogs,
291 &generated_column_names,
292 pk_column_ids,
293 )?;
294
295 column_catalogs[idx].column_desc.generated_or_default_column = Some(
296 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
297 expr: Some(expr_impl.to_expr_proto()),
298 }),
299 );
300 binder.set_clause(None);
301 }
302 ColumnOption::DefaultValue(expr) => {
303 let expr_impl = binder
304 .bind_expr(expr)?
305 .cast_assign(column_catalogs[idx].data_type())?;
306
307 let rewritten_expr_impl = session
315 .pinned_snapshot()
316 .inline_now_proc_time()
317 .rewrite_expr(expr_impl.clone());
318
319 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
320 let snapshot_value = snapshot_value?;
321
322 column_catalogs[idx].column_desc.generated_or_default_column =
323 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
324 snapshot_value: Some(snapshot_value.to_protobuf()),
325 expr: Some(expr_impl.to_expr_proto()),
326 }));
328 } else {
329 return Err(ErrorCode::BindError(format!(
330 "Default expression used in column `{}` cannot be evaluated. \
331 Use generated columns instead if you mean to reference other columns.",
332 column.name
333 ))
334 .into());
335 }
336 }
337 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
338 if persisted.is_empty() {
343 bail_bind_error!(
344 "DEFAULT INTERNAL is only used for internal purposes, \
345 please specify a concrete default value"
346 );
347 }
348
349 let desc = DefaultColumnDesc::decode(&**persisted)
350 .expect("failed to decode persisted `DefaultColumnDesc`");
351
352 column_catalogs[idx].column_desc.generated_or_default_column =
353 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
354 }
355 _ => {}
356 }
357 }
358 }
359 Ok(())
360}
361
362pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
364 let mut pk_column_names = vec![];
365
366 for constraint in table_constraints {
367 match constraint {
368 TableConstraint::Unique {
369 name: _,
370 columns,
371 is_primary: true,
372 } => {
373 if !pk_column_names.is_empty() {
374 return Err(multiple_pk_definition_err());
375 }
376 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
377 }
378 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
379 }
380 }
381 Ok(pk_column_names)
382}
383
384pub fn bind_sql_pk_names(
385 columns_defs: &[ColumnDef],
386 pk_names_from_table_constraints: Vec<String>,
387) -> Result<Vec<String>> {
388 let mut pk_column_names = pk_names_from_table_constraints;
389
390 for column in columns_defs {
391 for option_def in &column.options {
392 if let ColumnOption::Unique { is_primary: true } = option_def.option {
393 if !pk_column_names.is_empty() {
394 return Err(multiple_pk_definition_err());
395 }
396 pk_column_names.push(column.name.real_value());
397 };
398 }
399 }
400
401 Ok(pk_column_names)
402}
403
404fn multiple_pk_definition_err() -> RwError {
405 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
406}
407
408pub fn bind_pk_and_row_id_on_relation(
413 mut columns: Vec<ColumnCatalog>,
414 pk_names: Vec<String>,
415 must_need_pk: bool,
416) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
417 for c in &columns {
418 assert!(c.column_id() != ColumnId::placeholder());
419 }
420
421 let name_to_id = columns
423 .iter()
424 .map(|c| (c.name(), c.column_id()))
425 .collect::<HashMap<_, _>>();
426
427 let mut pk_column_ids: Vec<_> = pk_names
428 .iter()
429 .map(|name| {
430 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
431 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
432 })
433 })
434 .try_collect()?;
435
436 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
438
439 let row_id_index = need_row_id.then(|| {
440 let column = ColumnCatalog::row_id_column();
441 let index = columns.len();
442 pk_column_ids = vec![column.column_id()];
443 columns.push(column);
444 index
445 });
446
447 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
448 Err(ErrorCode::InvalidInputSyntax(format!(
449 "column \"{col}\" specified more than once"
450 )))?;
451 }
452
453 Ok((columns, pk_column_ids, row_id_index))
454}
455
456#[allow(clippy::too_many_arguments)]
459pub(crate) async fn gen_create_table_plan_with_source(
460 mut handler_args: HandlerArgs,
461 explain_options: ExplainOptions,
462 table_name: ObjectName,
463 column_defs: Vec<ColumnDef>,
464 wildcard_idx: Option<usize>,
465 constraints: Vec<TableConstraint>,
466 format_encode: FormatEncodeOptions,
467 source_watermarks: Vec<SourceWatermark>,
468 mut col_id_gen: ColumnIdGenerator,
469 include_column_options: IncludeOption,
470 props: CreateTableProps,
471 sql_column_strategy: SqlColumnStrategy,
472) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
473 if props.append_only
474 && format_encode.format != Format::Plain
475 && format_encode.format != Format::Native
476 {
477 return Err(ErrorCode::BindError(format!(
478 "Append only table does not support format {}.",
479 format_encode.format
480 ))
481 .into());
482 }
483
484 let session = &handler_args.session;
485 let (with_properties, refresh_mode) =
486 bind_connector_props(&handler_args, &format_encode, false)?;
487 if with_properties.is_shareable_cdc_connector() {
488 generated_columns_check_for_cdc_table(&column_defs)?;
489 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
490 } else if column_defs.iter().any(|col| {
491 col.options
492 .iter()
493 .any(|def| matches!(def.option, ColumnOption::NotNull))
494 }) {
495 notice_to_user(
497 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
498 );
499 }
500
501 let db_name: &str = &session.database();
502 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
503
504 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
506 session,
507 &format_encode,
508 Either::Left(&with_properties),
509 CreateSourceType::Table,
510 )
511 .await?;
512
513 let overwrite_options = OverwriteOptions::new(&mut handler_args);
514 let rate_limit = overwrite_options.source_rate_limit;
515 let source = bind_create_source_or_table_with_connector(
516 handler_args.clone(),
517 table_name,
518 format_encode,
519 with_properties,
520 &column_defs,
521 constraints,
522 wildcard_idx,
523 source_watermarks,
524 columns_from_resolve_source,
525 source_info,
526 include_column_options,
527 &mut col_id_gen,
528 CreateSourceType::Table,
529 rate_limit,
530 sql_column_strategy,
531 refresh_mode,
532 )
533 .await?;
534
535 let context = OptimizerContext::new(handler_args, explain_options);
536
537 let (plan, table) = gen_table_plan_with_source(
538 context.into(),
539 schema_name,
540 source.clone(),
541 col_id_gen.into_version(),
542 props,
543 )?;
544
545 Ok((plan, Some(source), table))
546}
547
548#[allow(clippy::too_many_arguments)]
551pub(crate) fn gen_create_table_plan(
552 context: OptimizerContext,
553 table_name: ObjectName,
554 column_defs: Vec<ColumnDef>,
555 constraints: Vec<TableConstraint>,
556 mut col_id_gen: ColumnIdGenerator,
557 source_watermarks: Vec<SourceWatermark>,
558 props: CreateTableProps,
559 is_for_replace_plan: bool,
560) -> Result<(PlanRef, TableCatalog)> {
561 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
562 for c in &mut columns {
563 col_id_gen.generate(c)?;
564 }
565
566 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
567 if !secret_refs.is_empty() || !connection_refs.is_empty() {
568 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
569 }
570
571 gen_create_table_plan_without_source(
572 context,
573 table_name,
574 columns,
575 column_defs,
576 constraints,
577 source_watermarks,
578 col_id_gen.into_version(),
579 props,
580 )
581}
582
583#[allow(clippy::too_many_arguments)]
584pub(crate) fn gen_create_table_plan_without_source(
585 context: OptimizerContext,
586 table_name: ObjectName,
587 columns: Vec<ColumnCatalog>,
588 column_defs: Vec<ColumnDef>,
589 constraints: Vec<TableConstraint>,
590 source_watermarks: Vec<SourceWatermark>,
591 version: TableVersion,
592 props: CreateTableProps,
593) -> Result<(PlanRef, TableCatalog)> {
594 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
596 let (mut columns, pk_column_ids, row_id_index) =
597 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
598
599 let watermark_descs = bind_source_watermark(
600 context.session_ctx(),
601 table_name.real_value(),
602 source_watermarks,
603 &columns,
604 )?;
605
606 bind_sql_column_constraints(
607 context.session_ctx(),
608 table_name.real_value(),
609 &mut columns,
610 &column_defs,
611 &pk_column_ids,
612 )?;
613 let session = context.session_ctx().clone();
614
615 let db_name = &session.database();
616 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
617
618 let info = CreateTableInfo {
619 columns,
620 pk_column_ids,
621 row_id_index,
622 watermark_descs,
623 source_catalog: None,
624 version,
625 };
626
627 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
628}
629
630fn gen_table_plan_with_source(
631 context: OptimizerContextRef,
632 schema_name: Option<String>,
633 source_catalog: SourceCatalog,
634 version: TableVersion,
635 props: CreateTableProps,
636) -> Result<(PlanRef, TableCatalog)> {
637 let table_name = source_catalog.name.clone();
638
639 let info = CreateTableInfo {
640 columns: source_catalog.columns.clone(),
641 pk_column_ids: source_catalog.pk_col_ids.clone(),
642 row_id_index: source_catalog.row_id_index,
643 watermark_descs: source_catalog.watermark_descs.clone(),
644 source_catalog: Some(source_catalog),
645 version,
646 };
647
648 gen_table_plan_inner(context, schema_name, table_name, info, props)
649}
650
651#[derive(Clone, Copy)]
653pub enum EitherOnConflict {
654 Ast(Option<OnConflict>),
655 Resolved(ConflictBehavior),
656}
657
658impl From<Option<OnConflict>> for EitherOnConflict {
659 fn from(v: Option<OnConflict>) -> Self {
660 Self::Ast(v)
661 }
662}
663
664impl From<ConflictBehavior> for EitherOnConflict {
665 fn from(v: ConflictBehavior) -> Self {
666 Self::Resolved(v)
667 }
668}
669
670impl EitherOnConflict {
671 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
673 let conflict_behavior = match self {
674 EitherOnConflict::Ast(on_conflict) => {
675 if append_only {
676 if row_id_as_pk {
677 ConflictBehavior::NoCheck
679 } else {
680 if let Some(on_conflict) = on_conflict
682 && on_conflict != OnConflict::Nothing
683 {
684 return Err(ErrorCode::InvalidInputSyntax(
685 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
686 the ON CONFLICT behavior must be DO NOTHING."
687 .to_owned(),
688 )
689 .into());
690 }
691 ConflictBehavior::IgnoreConflict
692 }
693 } else {
694 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
696 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
697 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
698 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
699 }
700 }
701 }
702 EitherOnConflict::Resolved(b) => b,
703 };
704
705 Ok(conflict_behavior)
706 }
707}
708
709pub struct CreateTableInfo {
714 pub columns: Vec<ColumnCatalog>,
715 pub pk_column_ids: Vec<ColumnId>,
716 pub row_id_index: Option<usize>,
717 pub watermark_descs: Vec<WatermarkDesc>,
718 pub source_catalog: Option<SourceCatalog>,
719 pub version: TableVersion,
720}
721
722pub struct CreateTableProps {
727 pub definition: String,
728 pub append_only: bool,
729 pub on_conflict: EitherOnConflict,
730 pub with_version_columns: Vec<String>,
731 pub webhook_info: Option<PbWebhookSourceInfo>,
732 pub engine: Engine,
733}
734
735#[allow(clippy::too_many_arguments)]
736fn gen_table_plan_inner(
737 context: OptimizerContextRef,
738 schema_name: Option<String>,
739 table_name: String,
740 info: CreateTableInfo,
741 props: CreateTableProps,
742) -> Result<(PlanRef, TableCatalog)> {
743 let CreateTableInfo {
744 ref columns,
745 row_id_index,
746 ref watermark_descs,
747 ref source_catalog,
748 ..
749 } = info;
750 let CreateTableProps { append_only, .. } = props;
751
752 let (database_id, schema_id) = context
753 .session_ctx()
754 .get_database_and_schema_id_for_create(schema_name)?;
755
756 let session = context.session_ctx().clone();
757 let retention_seconds = context.with_options().retention_seconds();
758
759 let source_node: LogicalPlanRef = LogicalSource::new(
760 source_catalog.clone().map(Rc::new),
761 columns.clone(),
762 row_id_index,
763 SourceNodeKind::CreateTable,
764 context.clone(),
765 None,
766 )?
767 .into();
768
769 let required_cols = FixedBitSet::with_capacity(columns.len());
770 let plan_root = PlanRoot::new_with_logical_plan(
771 source_node,
772 RequiredDist::Any,
773 Order::any(),
774 required_cols,
775 vec![],
776 );
777
778 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
779
780 if !append_only && has_non_ttl_watermark {
781 return Err(ErrorCode::NotSupported(
782 "Defining watermarks on table requires the table to be append only.".to_owned(),
783 "Use the key words `APPEND ONLY`".to_owned(),
784 )
785 .into());
786 }
787
788 if !append_only && retention_seconds.is_some() {
789 if session
790 .config()
791 .unsafe_enable_storage_retention_for_non_append_only_tables()
792 {
793 tracing::warn!(
794 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
795 table_name
796 );
797 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
798 This may lead to stream inconsistency and unrecoverable \
799 node failure if there is any row INSERT/UPDATE/DELETE operation \
800 corresponding to the TTLed primary keys";
801 session.notice_to_user(NOTICE);
802 } else {
803 return Err(ErrorCode::NotSupported(
804 "Defining retention seconds on table requires the table to be append only."
805 .to_owned(),
806 "Use the key words `APPEND ONLY`".to_owned(),
807 )
808 .into());
809 }
810 }
811
812 let materialize =
813 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
814
815 let mut table = materialize.table().clone();
816 table.owner = session.user_id();
817
818 Ok((materialize.into(), table))
819}
820
821#[allow(clippy::too_many_arguments)]
825pub(crate) fn gen_create_table_plan_for_cdc_table(
826 context: OptimizerContextRef,
827 source: Arc<SourceCatalog>,
828 external_table_name: String,
829 column_defs: Vec<ColumnDef>,
830 mut columns: Vec<ColumnCatalog>,
831 pk_names: Vec<String>,
832 cdc_with_options: WithOptionsSecResolved,
833 mut col_id_gen: ColumnIdGenerator,
834 on_conflict: Option<OnConflict>,
835 with_version_columns: Vec<String>,
836 include_column_options: IncludeOption,
837 table_name: ObjectName,
838 resolved_table_name: String, database_id: DatabaseId,
840 schema_id: SchemaId,
841 table_id: TableId,
842 engine: Engine,
843) -> Result<(PlanRef, TableCatalog)> {
844 let session = context.session_ctx().clone();
845
846 handle_addition_columns(
848 None,
849 &cdc_with_options,
850 include_column_options,
851 &mut columns,
852 true,
853 )?;
854
855 for c in &mut columns {
856 col_id_gen.generate(c)?;
857 }
858
859 let (mut columns, pk_column_ids, _row_id_index) =
860 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
861
862 bind_sql_column_constraints(
864 context.session_ctx(),
865 table_name.real_value(),
866 &mut columns,
867 &column_defs,
868 &pk_column_ids,
869 )?;
870
871 let definition = context.normalized_sql().to_owned();
872
873 let pk_column_indices = {
874 let mut id_to_idx = HashMap::new();
875 columns.iter().enumerate().for_each(|(idx, c)| {
876 id_to_idx.insert(c.column_id(), idx);
877 });
878 pk_column_ids
880 .iter()
881 .map(|c| id_to_idx.get(c).copied().unwrap())
882 .collect_vec()
883 };
884 let table_pk = pk_column_indices
885 .iter()
886 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
887 .collect();
888
889 let (options, secret_refs) = cdc_with_options.into_parts();
890
891 let non_generated_column_descs = columns
892 .iter()
893 .filter(|&c| !c.is_generated())
894 .map(|c| c.column_desc.clone())
895 .collect_vec();
896 let non_generated_column_num = non_generated_column_descs.len();
897 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
898 let cdc_table_desc = CdcTableDesc {
899 table_id,
900 source_id: source.id, external_table_name: external_table_name.clone(),
902 pk: table_pk,
903 columns: non_generated_column_descs,
904 stream_key: pk_column_indices,
905 connect_properties: options,
906 secret_refs,
907 };
908
909 tracing::debug!(?cdc_table_desc, "create cdc table");
910 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
911
912 let logical_scan = LogicalCdcScan::create(
913 external_table_name.clone(),
914 Rc::new(cdc_table_desc),
915 context.clone(),
916 options,
917 );
918
919 let scan_node: LogicalPlanRef = logical_scan.into();
920 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
921 let plan_root = PlanRoot::new_with_logical_plan(
922 scan_node,
923 RequiredDist::Any,
924 Order::any(),
925 required_cols,
926 vec![],
927 );
928
929 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
930 let materialize = plan_root.gen_table_plan(
931 context,
932 resolved_table_name,
933 database_id,
934 schema_id,
935 CreateTableInfo {
936 columns,
937 pk_column_ids,
938 row_id_index: None,
939 watermark_descs: vec![],
940 source_catalog: Some((*source).clone()),
941 version: col_id_gen.into_version(),
942 },
943 CreateTableProps {
944 definition,
945 append_only: false,
946 on_conflict: on_conflict.into(),
947 with_version_columns,
948 webhook_info: None,
949 engine,
950 },
951 )?;
952
953 let mut table = materialize.table().clone();
954 table.owner = session.user_id();
955 table.cdc_table_id = Some(cdc_table_id);
956 table.cdc_table_type = Some(cdc_table_type);
957 Ok((materialize.into(), table))
958}
959
960fn derive_with_options_for_cdc_table(
968 source_with_properties: &WithOptionsSecResolved,
969 external_table_name: String,
970) -> Result<(WithOptionsSecResolved, String)> {
971 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
972 let source_database_name: &str = source_with_properties
974 .get("database.name")
975 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
976 .as_str();
977 let mut with_options = source_with_properties.clone();
978 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
979 match connector.as_str() {
980 MYSQL_CDC_CONNECTOR => {
981 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
984 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
985 })?;
986 if !source_database_name
988 .split(',')
989 .map(|s| s.trim())
990 .any(|name| name == db_name)
991 {
992 return Err(anyhow!(
993 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
994 db_name,
995 source_database_name
996 ).into());
997 }
998 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
999 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1000 return Ok((with_options, external_table_name));
1002 }
1003 POSTGRES_CDC_CONNECTOR => {
1004 let (schema_name, table_name) = external_table_name
1005 .split_once('.')
1006 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1007
1008 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1010 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1011 return Ok((with_options, external_table_name));
1013 }
1014 SQL_SERVER_CDC_CONNECTOR => {
1015 let parts: Vec<&str> = external_table_name.split('.').collect();
1023 let (schema_name, table_name) = match parts.len() {
1024 3 => {
1025 let db_name = parts[0];
1028 let schema_name = parts[1];
1029 let table_name = parts[2];
1030
1031 if db_name != source_database_name {
1032 return Err(anyhow!(
1033 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1034 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1035 db_name,
1036 source_database_name
1037 ).into());
1038 }
1039 (schema_name, table_name)
1040 }
1041 2 => {
1042 let schema_name = parts[0];
1045 let table_name = parts[1];
1046 (schema_name, table_name)
1047 }
1048 1 => {
1049 return Err(anyhow!(
1052 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1053 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1054 external_table_name,
1055 external_table_name,
1056 source_database_name,
1057 external_table_name
1058 ).into());
1059 }
1060 _ => {
1061 return Err(anyhow!(
1063 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1064 external_table_name
1065 ).into());
1066 }
1067 };
1068
1069 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1071 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1072
1073 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1076 return Ok((with_options, normalized_external_table_name));
1077 }
1078 _ => {
1079 return Err(RwError::from(anyhow!(
1080 "connector {} is not supported for cdc table",
1081 connector
1082 )));
1083 }
1084 };
1085 }
1086 unreachable!("All valid CDC connectors should have returned by now")
1087}
1088
1089#[allow(clippy::too_many_arguments)]
1090pub(super) async fn handle_create_table_plan(
1091 handler_args: HandlerArgs,
1092 explain_options: ExplainOptions,
1093 format_encode: Option<FormatEncodeOptions>,
1094 cdc_table_info: Option<CdcTableInfo>,
1095 table_name: &ObjectName,
1096 column_defs: Vec<ColumnDef>,
1097 wildcard_idx: Option<usize>,
1098 constraints: Vec<TableConstraint>,
1099 source_watermarks: Vec<SourceWatermark>,
1100 append_only: bool,
1101 on_conflict: Option<OnConflict>,
1102 with_version_columns: Vec<String>,
1103 include_column_options: IncludeOption,
1104 webhook_info: Option<WebhookSourceInfo>,
1105 engine: Engine,
1106) -> Result<(
1107 PlanRef,
1108 Option<SourceCatalog>,
1109 TableCatalog,
1110 TableJobType,
1111 Option<SourceId>,
1112)> {
1113 let col_id_gen = ColumnIdGenerator::new_initial();
1114 let format_encode = check_create_table_with_source(
1115 &handler_args.with_options,
1116 format_encode,
1117 &include_column_options,
1118 &cdc_table_info,
1119 )?;
1120 let webhook_info = webhook_info
1121 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1122 .transpose()?;
1123
1124 let props = CreateTableProps {
1125 definition: handler_args.normalized_sql.clone(),
1126 append_only,
1127 on_conflict: on_conflict.into(),
1128 with_version_columns: with_version_columns.clone(),
1129 webhook_info,
1130 engine,
1131 };
1132
1133 let ((plan, source, table), job_type, shared_shource_id) = match (
1134 format_encode,
1135 cdc_table_info.as_ref(),
1136 ) {
1137 (Some(format_encode), None) => (
1138 gen_create_table_plan_with_source(
1139 handler_args,
1140 explain_options,
1141 table_name.clone(),
1142 column_defs,
1143 wildcard_idx,
1144 constraints,
1145 format_encode,
1146 source_watermarks,
1147 col_id_gen,
1148 include_column_options,
1149 props,
1150 SqlColumnStrategy::FollowChecked,
1151 )
1152 .await?,
1153 TableJobType::General,
1154 None,
1155 ),
1156 (None, None) => {
1157 let context = OptimizerContext::new(handler_args, explain_options);
1158 let (plan, table) = gen_create_table_plan(
1159 context,
1160 table_name.clone(),
1161 column_defs,
1162 constraints,
1163 col_id_gen,
1164 source_watermarks,
1165 props,
1166 false,
1167 )?;
1168
1169 ((plan, None, table), TableJobType::General, None)
1170 }
1171
1172 (None, Some(cdc_table)) => {
1173 sanity_check_for_table_on_cdc_source(
1174 append_only,
1175 &column_defs,
1176 &wildcard_idx,
1177 &constraints,
1178 &source_watermarks,
1179 )?;
1180
1181 generated_columns_check_for_cdc_table(&column_defs)?;
1182 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1183
1184 let session = &handler_args.session;
1185 let db_name = &session.database();
1186 let user_name = &session.user_name();
1187 let search_path = session.config().search_path();
1188 let (schema_name, resolved_table_name) =
1189 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1190 let (database_id, schema_id) =
1191 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1192
1193 let (source_schema, source_name) =
1195 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1196
1197 let source = {
1198 let catalog_reader = session.env().catalog_reader().read_guard();
1199 let schema_path =
1200 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1201
1202 let (source, _) = catalog_reader.get_source_by_name(
1203 db_name,
1204 schema_path,
1205 source_name.as_str(),
1206 )?;
1207 source.clone()
1208 };
1209 let (cdc_with_options, normalized_external_table_name) =
1210 derive_with_options_for_cdc_table(
1211 &source.with_properties,
1212 cdc_table.external_table_name.clone(),
1213 )?;
1214
1215 let (columns, pk_names) = match wildcard_idx {
1216 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1217 None => {
1218 for column_def in &column_defs {
1219 for option_def in &column_def.options {
1220 if let ColumnOption::DefaultValue(_)
1221 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1222 {
1223 return Err(ErrorCode::NotSupported(
1224 "Default value for columns defined on the table created from a CDC source".into(),
1225 "Remove the default value expression in the column definitions".into(),
1226 )
1227 .into());
1228 }
1229 }
1230 }
1231
1232 let (columns, pk_names) =
1233 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1234 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1236 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1237 .context("failed to extract external table config")?;
1238
1239 (columns, pk_names)
1240 }
1241 };
1242
1243 let context: OptimizerContextRef =
1244 OptimizerContext::new(handler_args, explain_options).into();
1245 let shared_source_id = source.id;
1246 let (plan, table) = gen_create_table_plan_for_cdc_table(
1247 context,
1248 source,
1249 normalized_external_table_name,
1250 column_defs,
1251 columns,
1252 pk_names,
1253 cdc_with_options,
1254 col_id_gen,
1255 on_conflict,
1256 with_version_columns,
1257 include_column_options,
1258 table_name.clone(),
1259 resolved_table_name,
1260 database_id,
1261 schema_id,
1262 TableId::placeholder(),
1263 engine,
1264 )?;
1265
1266 (
1267 (plan, None, table),
1268 TableJobType::SharedCdcSource,
1269 Some(shared_source_id),
1270 )
1271 }
1272 (Some(_), Some(_)) => {
1273 return Err(ErrorCode::NotSupported(
1274 "Data format and encoding format doesn't apply to table created from a CDC source"
1275 .into(),
1276 "Remove the FORMAT and ENCODE specification".into(),
1277 )
1278 .into());
1279 }
1280 };
1281 Ok((plan, source, table, job_type, shared_shource_id))
1282}
1283
1284fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1286 let mut found_generated_column = false;
1287 for column in columns {
1288 let mut is_generated = false;
1289
1290 for option_def in &column.options {
1291 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1292 is_generated = true;
1293 break;
1294 }
1295 }
1296
1297 if is_generated {
1298 found_generated_column = true;
1299 } else if found_generated_column {
1300 return Err(ErrorCode::NotSupported(
1301 "Non-generated column found after a generated column.".into(),
1302 "Ensure that all generated columns appear at the end of the cdc table definition."
1303 .into(),
1304 )
1305 .into());
1306 }
1307 }
1308 Ok(())
1309}
1310
1311fn not_null_check_for_cdc_table(
1313 wildcard_idx: &Option<usize>,
1314 column_defs: &Vec<ColumnDef>,
1315) -> Result<()> {
1316 if !wildcard_idx.is_some()
1317 && column_defs.iter().any(|col| {
1318 col.options
1319 .iter()
1320 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1321 })
1322 {
1323 return Err(ErrorCode::NotSupported(
1324 "CDC table with NOT NULL constraint is not supported".to_owned(),
1325 "Please remove the NOT NULL constraint for columns".to_owned(),
1326 )
1327 .into());
1328 }
1329 Ok(())
1330}
1331
1332fn sanity_check_for_table_on_cdc_source(
1334 append_only: bool,
1335 column_defs: &Vec<ColumnDef>,
1336 wildcard_idx: &Option<usize>,
1337 constraints: &Vec<TableConstraint>,
1338 source_watermarks: &Vec<SourceWatermark>,
1339) -> Result<()> {
1340 if wildcard_idx.is_some() && !column_defs.is_empty() {
1342 return Err(ErrorCode::NotSupported(
1343 "wildcard(*) and column definitions cannot be used together".to_owned(),
1344 "Remove the wildcard or column definitions".to_owned(),
1345 )
1346 .into());
1347 }
1348
1349 if !wildcard_idx.is_some()
1351 && !constraints.iter().any(|c| {
1352 matches!(
1353 c,
1354 TableConstraint::Unique {
1355 is_primary: true,
1356 ..
1357 }
1358 )
1359 })
1360 && !column_defs.iter().any(|col| {
1361 col.options
1362 .iter()
1363 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1364 })
1365 {
1366 return Err(ErrorCode::NotSupported(
1367 "CDC table without primary key constraint is not supported".to_owned(),
1368 "Please define a primary key".to_owned(),
1369 )
1370 .into());
1371 }
1372
1373 if append_only {
1374 return Err(ErrorCode::NotSupported(
1375 "append only modifier on the table created from a CDC source".into(),
1376 "Remove the APPEND ONLY clause".into(),
1377 )
1378 .into());
1379 }
1380
1381 if !source_watermarks.is_empty() {
1382 return Err(ErrorCode::NotSupported(
1383 "watermark defined on the table created from a CDC source".into(),
1384 "Remove the Watermark definitions".into(),
1385 )
1386 .into());
1387 }
1388
1389 Ok(())
1390}
1391
1392async fn bind_cdc_table_schema_externally(
1394 cdc_with_options: WithOptionsSecResolved,
1395) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1396 let (options, secret_refs) = cdc_with_options.into_parts();
1398 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1399 .context("failed to extract external table config")?;
1400
1401 let table = ExternalTableImpl::connect(config)
1402 .await
1403 .context("failed to auto derive table schema")?;
1404
1405 Ok((
1406 table
1407 .column_descs()
1408 .iter()
1409 .cloned()
1410 .map(|column_desc| ColumnCatalog {
1411 column_desc,
1412 is_hidden: false,
1413 })
1414 .collect(),
1415 table.pk_names().clone(),
1416 ))
1417}
1418
1419fn bind_cdc_table_schema(
1421 column_defs: &Vec<ColumnDef>,
1422 constraints: &Vec<TableConstraint>,
1423 is_for_replace_plan: bool,
1424) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1425 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1426
1427 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1428 Ok((columns, pk_names))
1429}
1430
1431#[allow(clippy::too_many_arguments)]
1432pub async fn handle_create_table(
1433 handler_args: HandlerArgs,
1434 table_name: ObjectName,
1435 column_defs: Vec<ColumnDef>,
1436 wildcard_idx: Option<usize>,
1437 constraints: Vec<TableConstraint>,
1438 if_not_exists: bool,
1439 format_encode: Option<FormatEncodeOptions>,
1440 source_watermarks: Vec<SourceWatermark>,
1441 append_only: bool,
1442 on_conflict: Option<OnConflict>,
1443 with_version_columns: Vec<String>,
1444 cdc_table_info: Option<CdcTableInfo>,
1445 include_column_options: IncludeOption,
1446 webhook_info: Option<WebhookSourceInfo>,
1447 ast_engine: risingwave_sqlparser::ast::Engine,
1448) -> Result<RwPgResponse> {
1449 let session = handler_args.session.clone();
1450
1451 if append_only {
1452 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1453 }
1454
1455 session.check_cluster_limits().await?;
1456
1457 let engine = match ast_engine {
1458 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1459 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1460 };
1461
1462 if let Either::Right(resp) = session.check_relation_name_duplicated(
1463 table_name.clone(),
1464 StatementType::CREATE_TABLE,
1465 if_not_exists,
1466 )? {
1467 return Ok(resp);
1468 }
1469
1470 let (graph, source, hummock_table, job_type, shared_source_id) = {
1471 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1472 handler_args.clone(),
1473 ExplainOptions::default(),
1474 format_encode,
1475 cdc_table_info,
1476 &table_name,
1477 column_defs.clone(),
1478 wildcard_idx,
1479 constraints.clone(),
1480 source_watermarks,
1481 append_only,
1482 on_conflict,
1483 with_version_columns,
1484 include_column_options,
1485 webhook_info,
1486 engine,
1487 )
1488 .await?;
1489 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1490
1491 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1492
1493 (graph, source, table, job_type, shared_source_id)
1494 };
1495
1496 tracing::trace!(
1497 "name={}, graph=\n{}",
1498 table_name,
1499 serde_json::to_string_pretty(&graph).unwrap()
1500 );
1501
1502 let dependencies = shared_source_id
1503 .map(|id| HashSet::from([id.as_object_id()]))
1504 .unwrap_or_default();
1505
1506 match engine {
1508 Engine::Hummock => {
1509 let catalog_writer = session.catalog_writer()?;
1510 let action = match job_type {
1511 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1512 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1513 };
1514 execute_with_long_running_notification(
1515 catalog_writer.create_table(
1516 source.map(|s| s.to_prost()),
1517 hummock_table.to_prost(),
1518 graph,
1519 job_type,
1520 if_not_exists,
1521 dependencies,
1522 ),
1523 &session,
1524 "CREATE TABLE",
1525 action,
1526 )
1527 .await?;
1528 }
1529 Engine::Iceberg => {
1530 let hummock_table_name = hummock_table.name.clone();
1531 session.create_staging_table(hummock_table.clone());
1532 let res = create_iceberg_engine_table(
1533 session.clone(),
1534 handler_args,
1535 source.map(|s| s.to_prost()),
1536 hummock_table,
1537 graph,
1538 table_name,
1539 job_type,
1540 if_not_exists,
1541 )
1542 .await;
1543 session.drop_staging_table(&hummock_table_name);
1544 res?
1545 }
1546 }
1547
1548 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1549}
1550
1551#[allow(clippy::too_many_arguments)]
1560pub async fn create_iceberg_engine_table(
1561 session: Arc<SessionImpl>,
1562 handler_args: HandlerArgs,
1563 mut source: Option<PbSource>,
1564 table: TableCatalog,
1565 graph: StreamFragmentGraph,
1566 table_name: ObjectName,
1567 job_type: PbTableJobType,
1568 if_not_exists: bool,
1569) -> Result<()> {
1570 let rw_db_name = session
1571 .env()
1572 .catalog_reader()
1573 .read_guard()
1574 .get_database_by_id(table.database_id)?
1575 .name()
1576 .to_owned();
1577 let rw_schema_name = session
1578 .env()
1579 .catalog_reader()
1580 .read_guard()
1581 .get_schema_by_id(table.database_id, table.schema_id)?
1582 .name()
1583 .clone();
1584 let iceberg_catalog_name = rw_db_name.clone();
1585 let iceberg_database_name = rw_schema_name.clone();
1586 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1587
1588 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1589 let sink_decouple = session.config().sink_decouple();
1590 if matches!(sink_decouple, SinkDecouple::Disable) {
1591 bail!(
1592 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1593 );
1594 }
1595
1596 let mut connection_ref = BTreeMap::new();
1597 let with_common = if iceberg_engine_connection.is_empty() {
1598 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1599 } else {
1600 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1601 assert_eq!(parts.len(), 2);
1602 let connection_catalog =
1603 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1604 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1605 if params.connection_type == ConnectionType::Iceberg as i32 {
1606 connection_ref.insert(
1608 "connection".to_owned(),
1609 ConnectionRefValue {
1610 connection_name: ObjectName::from(vec![
1611 Ident::from(parts[0]),
1612 Ident::from(parts[1]),
1613 ]),
1614 },
1615 );
1616
1617 let mut with_common = BTreeMap::new();
1618 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1619 with_common.insert("database.name".to_owned(), iceberg_database_name);
1620 with_common.insert("table.name".to_owned(), iceberg_table_name);
1621
1622 let hosted_catalog = params
1623 .properties
1624 .get("hosted_catalog")
1625 .map(|s| s.eq_ignore_ascii_case("true"))
1626 .unwrap_or(false);
1627 if hosted_catalog {
1628 let meta_client = session.env().meta_client();
1629 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1630
1631 let meta_store_endpoint =
1632 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1633 ErrorCode::InternalError(
1634 "failed to parse the meta store endpoint".to_owned(),
1635 )
1636 })?;
1637 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1638 let meta_store_user = meta_store_endpoint.username().to_owned();
1639 let meta_store_password = match meta_store_endpoint.password() {
1640 Some(password) => percent_decode_str(password)
1641 .decode_utf8()
1642 .map_err(|_| {
1643 ErrorCode::InternalError(
1644 "failed to parse password from meta store endpoint".to_owned(),
1645 )
1646 })?
1647 .into_owned(),
1648 None => "".to_owned(),
1649 };
1650 let meta_store_host = meta_store_endpoint
1651 .host_str()
1652 .ok_or_else(|| {
1653 ErrorCode::InternalError(
1654 "failed to parse host from meta store endpoint".to_owned(),
1655 )
1656 })?
1657 .to_owned();
1658 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1659 ErrorCode::InternalError(
1660 "failed to parse port from meta store endpoint".to_owned(),
1661 )
1662 })?;
1663 let meta_store_database = meta_store_endpoint
1664 .path()
1665 .trim_start_matches('/')
1666 .to_owned();
1667
1668 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1669 bail!("failed to parse meta backend: {}", meta_store_backend);
1670 };
1671
1672 let catalog_uri = match meta_backend {
1673 MetaBackend::Postgres => {
1674 format!(
1675 "jdbc:postgresql://{}:{}/{}",
1676 meta_store_host, meta_store_port, meta_store_database
1677 )
1678 }
1679 MetaBackend::Mysql => {
1680 format!(
1681 "jdbc:mysql://{}:{}/{}",
1682 meta_store_host, meta_store_port, meta_store_database
1683 )
1684 }
1685 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1686 bail!(
1687 "Unsupported meta backend for iceberg engine table: {}",
1688 meta_store_backend
1689 );
1690 }
1691 };
1692
1693 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1694 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1695 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1696 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1697 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1698 }
1699
1700 with_common
1701 } else {
1702 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1703 "Only iceberg connection could be used in iceberg engine".to_owned(),
1704 )));
1705 }
1706 } else {
1707 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1708 "Private Link Service has been deprecated. Please create a new connection instead."
1709 .to_owned(),
1710 )));
1711 }
1712 };
1713
1714 let mut pks = table
1717 .pk_column_names()
1718 .iter()
1719 .map(|c| c.to_string())
1720 .collect::<Vec<String>>();
1721
1722 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1724 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1725 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1726 "select {} as {}, * from {}",
1727 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1728 ))
1729 .context("unable to parse query")?
1730 .try_into()
1731 .unwrap();
1732
1733 let Statement::Query(query) = &stmt else {
1734 panic!("unexpected statement: {:?}", stmt);
1735 };
1736 CreateSink::AsQuery(query.clone())
1737 } else {
1738 CreateSink::From(table_name.clone())
1739 };
1740
1741 let mut sink_name = table_name.clone();
1742 *sink_name.0.last_mut().unwrap() = Ident::from(
1743 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1744 );
1745 let create_sink_stmt = CreateSinkStatement {
1746 if_not_exists: false,
1747 sink_name,
1748 with_properties: WithProperties(vec![]),
1749 sink_from,
1750 columns: vec![],
1751 emit_mode: None,
1752 sink_schema: None,
1753 into_table_name: None,
1754 };
1755
1756 let mut sink_handler_args = handler_args.clone();
1757
1758 let mut sink_with = with_common.clone();
1759
1760 if table.append_only {
1761 sink_with.insert("type".to_owned(), "append-only".to_owned());
1762 } else {
1763 sink_with.insert("primary_key".to_owned(), pks.join(","));
1764 sink_with.insert("type".to_owned(), "upsert".to_owned());
1765 }
1766 let commit_checkpoint_interval = handler_args
1785 .with_options
1786 .get(COMMIT_CHECKPOINT_INTERVAL)
1787 .map(|v| v.to_owned())
1788 .unwrap_or_else(|| "60".to_owned());
1789 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1790 ErrorCode::InvalidInputSyntax(format!(
1791 "commit_checkpoint_interval must be greater than 0: {}",
1792 commit_checkpoint_interval
1793 ))
1794 })?;
1795
1796 if commit_checkpoint_interval == 0 {
1797 bail!("commit_checkpoint_interval must be greater than 0");
1798 }
1799
1800 source
1802 .as_mut()
1803 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1804
1805 let sink_decouple = session.config().sink_decouple();
1806 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1807 bail!(
1808 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1809 )
1810 }
1811
1812 sink_with.insert(
1813 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1814 commit_checkpoint_interval.to_string(),
1815 );
1816 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1817
1818 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1819
1820 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1821 match enable_compaction.to_lowercase().as_str() {
1822 "true" => {
1823 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1824 }
1825 "false" => {
1826 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1827 }
1828 _ => {
1829 return Err(ErrorCode::InvalidInputSyntax(format!(
1830 "enable_compaction must be true or false: {}",
1831 enable_compaction
1832 ))
1833 .into());
1834 }
1835 }
1836
1837 source
1839 .as_mut()
1840 .map(|x| x.with_properties.remove("enable_compaction"));
1841 } else {
1842 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1843 }
1844
1845 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1846 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1847 ErrorCode::InvalidInputSyntax(format!(
1848 "compaction_interval_sec must be greater than 0: {}",
1849 commit_checkpoint_interval
1850 ))
1851 })?;
1852 if compaction_interval_sec == 0 {
1853 bail!("compaction_interval_sec must be greater than 0");
1854 }
1855 sink_with.insert(
1856 "compaction_interval_sec".to_owned(),
1857 compaction_interval_sec.to_string(),
1858 );
1859 source
1861 .as_mut()
1862 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1863 }
1864
1865 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1866 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1867 {
1868 source
1870 .as_mut()
1871 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1872 match enable_snapshot_expiration.to_lowercase().as_str() {
1873 "true" => {
1874 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1875 true
1876 }
1877 "false" => {
1878 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1879 false
1880 }
1881 _ => {
1882 return Err(ErrorCode::InvalidInputSyntax(format!(
1883 "enable_snapshot_expiration must be true or false: {}",
1884 enable_snapshot_expiration
1885 ))
1886 .into());
1887 }
1888 }
1889 } else {
1890 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1891 true
1892 };
1893
1894 if has_enabled_snapshot_expiration {
1895 if let Some(snapshot_expiration_retain_last) = handler_args
1897 .with_options
1898 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1899 {
1900 sink_with.insert(
1901 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1902 snapshot_expiration_retain_last.to_owned(),
1903 );
1904 source
1906 .as_mut()
1907 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1908 }
1909
1910 if let Some(snapshot_expiration_max_age) = handler_args
1911 .with_options
1912 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1913 {
1914 sink_with.insert(
1915 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1916 snapshot_expiration_max_age.to_owned(),
1917 );
1918 source
1920 .as_mut()
1921 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1922 }
1923
1924 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1925 .with_options
1926 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1927 {
1928 sink_with.insert(
1929 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1930 snapshot_expiration_clear_expired_files.to_owned(),
1931 );
1932 source.as_mut().map(|x| {
1934 x.with_properties
1935 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1936 });
1937 }
1938
1939 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1940 .with_options
1941 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1942 {
1943 sink_with.insert(
1944 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1945 snapshot_expiration_clear_expired_meta_data.to_owned(),
1946 );
1947 source.as_mut().map(|x| {
1949 x.with_properties
1950 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1951 });
1952 }
1953 }
1954
1955 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1956 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1957 ErrorCode::InvalidInputSyntax(format!(
1958 "invalid write_mode: {}, must be one of: {}, {}",
1959 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1960 ))
1961 })?;
1962
1963 match write_mode {
1964 IcebergWriteMode::MergeOnRead => {
1965 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1966 }
1967
1968 IcebergWriteMode::CopyOnWrite => {
1969 if table.append_only {
1970 return Err(ErrorCode::NotSupported(
1971 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1972 "Please use MERGE ON READ instead".to_owned(),
1973 )
1974 .into());
1975 }
1976
1977 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1978 }
1979 }
1980
1981 source
1983 .as_mut()
1984 .map(|x| x.with_properties.remove("write_mode"));
1985 } else {
1986 sink_with.insert(
1987 WRITE_MODE.to_owned(),
1988 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1989 );
1990 }
1991
1992 if let Some(max_snapshots_num_before_compaction) =
1993 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
1994 {
1995 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
1996 .parse::<u32>()
1997 .map_err(|_| {
1998 ErrorCode::InvalidInputSyntax(format!(
1999 "{} must be greater than 0: {}",
2000 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2001 ))
2002 })?;
2003
2004 if max_snapshots_num_before_compaction == 0 {
2005 bail!(format!(
2006 "{} must be greater than 0",
2007 COMPACTION_MAX_SNAPSHOTS_NUM
2008 ));
2009 }
2010
2011 sink_with.insert(
2012 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2013 max_snapshots_num_before_compaction.to_string(),
2014 );
2015
2016 source
2018 .as_mut()
2019 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2020 }
2021
2022 if let Some(small_files_threshold_mb) = handler_args
2023 .with_options
2024 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2025 {
2026 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2027 ErrorCode::InvalidInputSyntax(format!(
2028 "{} must be greater than 0: {}",
2029 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2030 ))
2031 })?;
2032 if small_files_threshold_mb == 0 {
2033 bail!(format!(
2034 "{} must be a greater than 0",
2035 COMPACTION_SMALL_FILES_THRESHOLD_MB
2036 ));
2037 }
2038 sink_with.insert(
2039 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2040 small_files_threshold_mb.to_string(),
2041 );
2042
2043 source.as_mut().map(|x| {
2045 x.with_properties
2046 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2047 });
2048 }
2049
2050 if let Some(delete_files_count_threshold) = handler_args
2051 .with_options
2052 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2053 {
2054 let delete_files_count_threshold =
2055 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2056 ErrorCode::InvalidInputSyntax(format!(
2057 "{} must be greater than 0: {}",
2058 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2059 ))
2060 })?;
2061 if delete_files_count_threshold == 0 {
2062 bail!(format!(
2063 "{} must be greater than 0",
2064 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2065 ));
2066 }
2067 sink_with.insert(
2068 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2069 delete_files_count_threshold.to_string(),
2070 );
2071
2072 source.as_mut().map(|x| {
2074 x.with_properties
2075 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2076 });
2077 }
2078
2079 if let Some(trigger_snapshot_count) = handler_args
2080 .with_options
2081 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2082 {
2083 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2084 ErrorCode::InvalidInputSyntax(format!(
2085 "{} must be greater than 0: {}",
2086 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2087 ))
2088 })?;
2089 if trigger_snapshot_count == 0 {
2090 bail!(format!(
2091 "{} must be greater than 0",
2092 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2093 ));
2094 }
2095 sink_with.insert(
2096 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2097 trigger_snapshot_count.to_string(),
2098 );
2099
2100 source
2102 .as_mut()
2103 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2104 }
2105
2106 if let Some(target_file_size_mb) = handler_args
2107 .with_options
2108 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2109 {
2110 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2111 ErrorCode::InvalidInputSyntax(format!(
2112 "{} must be greater than 0: {}",
2113 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2114 ))
2115 })?;
2116 if target_file_size_mb == 0 {
2117 bail!(format!(
2118 "{} must be greater than 0",
2119 COMPACTION_TARGET_FILE_SIZE_MB
2120 ));
2121 }
2122 sink_with.insert(
2123 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2124 target_file_size_mb.to_string(),
2125 );
2126 source
2128 .as_mut()
2129 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2130 }
2131
2132 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2133 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2134 ErrorCode::InvalidInputSyntax(format!(
2135 "invalid compaction_type: {}, must be one of {:?}",
2136 compaction_type,
2137 &[
2138 CompactionType::Full,
2139 CompactionType::SmallFiles,
2140 CompactionType::FilesWithDelete
2141 ]
2142 ))
2143 })?;
2144
2145 sink_with.insert(
2146 COMPACTION_TYPE.to_owned(),
2147 compaction_type.as_str().to_owned(),
2148 );
2149
2150 source
2152 .as_mut()
2153 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2154 }
2155
2156 let partition_by = handler_args
2157 .with_options
2158 .get("partition_by")
2159 .map(|v| v.to_owned());
2160
2161 if let Some(partition_by) = &partition_by {
2162 let mut partition_columns = vec![];
2163 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2164 table
2165 .columns()
2166 .iter()
2167 .find(|col| col.name().eq_ignore_ascii_case(&column))
2168 .ok_or_else(|| {
2169 ErrorCode::InvalidInputSyntax(format!(
2170 "Partition source column does not exist in schema: {}",
2171 column
2172 ))
2173 })?;
2174
2175 partition_columns.push(column);
2176 }
2177
2178 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2179 |_| {
2180 ErrorCode::InvalidInputSyntax(
2181 "The partition columns should be the prefix of the primary key".to_owned(),
2182 )
2183 },
2184 )?;
2185
2186 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2187
2188 source
2190 .as_mut()
2191 .map(|x| x.with_properties.remove("partition_by"));
2192 }
2193
2194 sink_handler_args.with_options =
2195 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2196 let SinkPlanContext {
2197 sink_plan,
2198 sink_catalog,
2199 ..
2200 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2201 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2202
2203 let mut source_name = table_name.clone();
2204 *source_name.0.last_mut().unwrap() = Ident::from(
2205 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2206 );
2207 let create_source_stmt = CreateSourceStatement {
2208 temporary: false,
2209 if_not_exists: false,
2210 columns: vec![],
2211 source_name,
2212 wildcard_idx: Some(0),
2213 constraints: vec![],
2214 with_properties: WithProperties(vec![]),
2215 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2216 source_watermarks: vec![],
2217 include_column_options: vec![],
2218 };
2219
2220 let mut source_handler_args = handler_args.clone();
2221 let source_with = with_common;
2222 source_handler_args.with_options =
2223 WithOptions::new(source_with, Default::default(), connection_ref);
2224
2225 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2226 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2227 let (with_properties, refresh_mode) =
2228 bind_connector_props(&source_handler_args, &format_encode, true)?;
2229
2230 let (iceberg_catalog, table_identifier) = {
2233 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2234 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2235 iceberg_sink.create_table_if_not_exists().await?;
2236
2237 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2238 let table_identifier = iceberg_sink.config.full_table_name()?;
2239 (iceberg_catalog, table_identifier)
2240 };
2241
2242 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2243 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2244 &session,
2245 &format_encode,
2246 Either::Left(&with_properties),
2247 create_source_type,
2248 )
2249 .await?;
2250 let mut col_id_gen = ColumnIdGenerator::new_initial();
2251
2252 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2253 source_handler_args,
2254 create_source_stmt.source_name,
2255 format_encode,
2256 with_properties,
2257 &create_source_stmt.columns,
2258 create_source_stmt.constraints,
2259 create_source_stmt.wildcard_idx,
2260 create_source_stmt.source_watermarks,
2261 columns_from_resolve_source,
2262 source_info,
2263 create_source_stmt.include_column_options,
2264 &mut col_id_gen,
2265 create_source_type,
2266 overwrite_options.source_rate_limit,
2267 SqlColumnStrategy::FollowChecked,
2268 refresh_mode,
2269 )
2270 .await?;
2271
2272 let _ = Jvm::get_or_init()?;
2275
2276 let catalog_writer = session.catalog_writer()?;
2277 let action = match job_type {
2278 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2279 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2280 };
2281 let res = execute_with_long_running_notification(
2282 catalog_writer.create_iceberg_table(
2283 PbTableJobInfo {
2284 source,
2285 table: Some(table.to_prost()),
2286 fragment_graph: Some(graph),
2287 job_type: job_type as _,
2288 },
2289 PbSinkJobInfo {
2290 sink: Some(sink_catalog.to_proto()),
2291 fragment_graph: Some(sink_graph),
2292 },
2293 iceberg_source_catalog.to_prost(),
2294 if_not_exists,
2295 ),
2296 &session,
2297 "CREATE TABLE",
2298 action,
2299 )
2300 .await;
2301
2302 if res.is_err() {
2303 let _ = iceberg_catalog
2304 .drop_table(&table_identifier)
2305 .await
2306 .inspect_err(|err| {
2307 tracing::error!(
2308 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2309 table_identifier,
2310 err.as_report()
2311 );
2312 });
2313 res?
2314 }
2315
2316 Ok(())
2317}
2318
2319pub fn check_create_table_with_source(
2320 with_options: &WithOptions,
2321 format_encode: Option<FormatEncodeOptions>,
2322 include_column_options: &IncludeOption,
2323 cdc_table_info: &Option<CdcTableInfo>,
2324) -> Result<Option<FormatEncodeOptions>> {
2325 if cdc_table_info.is_some() {
2327 return Ok(format_encode);
2328 }
2329 let defined_source = with_options.is_source_connector();
2330
2331 if !include_column_options.is_empty() && !defined_source {
2332 return Err(ErrorCode::InvalidInputSyntax(
2333 "INCLUDE should be used with a connector".to_owned(),
2334 )
2335 .into());
2336 }
2337 if defined_source {
2338 format_encode.as_ref().ok_or_else(|| {
2339 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2340 })?;
2341 }
2342 Ok(format_encode)
2343}
2344
2345fn ensure_partition_columns_are_prefix_of_primary_key(
2346 partition_columns: &[String],
2347 primary_key_columns: &[String],
2348) -> std::result::Result<(), String> {
2349 if partition_columns.len() > primary_key_columns.len() {
2350 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2351 }
2352
2353 for (i, partition_col) in partition_columns.iter().enumerate() {
2354 if primary_key_columns.get(i) != Some(partition_col) {
2355 return Err(format!(
2356 "Partition column '{}' is not a prefix of the primary key.",
2357 partition_col
2358 ));
2359 }
2360 }
2361
2362 Ok(())
2363}
2364
2365#[allow(clippy::too_many_arguments)]
2366pub async fn generate_stream_graph_for_replace_table(
2367 _session: &Arc<SessionImpl>,
2368 table_name: ObjectName,
2369 original_catalog: &Arc<TableCatalog>,
2370 handler_args: HandlerArgs,
2371 statement: Statement,
2372 col_id_gen: ColumnIdGenerator,
2373 sql_column_strategy: SqlColumnStrategy,
2374) -> Result<(
2375 StreamFragmentGraph,
2376 TableCatalog,
2377 Option<SourceCatalog>,
2378 TableJobType,
2379)> {
2380 let Statement::CreateTable {
2381 columns,
2382 constraints,
2383 source_watermarks,
2384 append_only,
2385 on_conflict,
2386 with_version_columns,
2387 wildcard_idx,
2388 cdc_table_info,
2389 format_encode,
2390 include_column_options,
2391 engine,
2392 with_options,
2393 ..
2394 } = statement
2395 else {
2396 panic!("unexpected statement type: {:?}", statement);
2397 };
2398
2399 let format_encode = format_encode
2400 .clone()
2401 .map(|format_encode| format_encode.into_v2_with_warning());
2402
2403 let engine = match engine {
2404 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2405 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2406 };
2407
2408 let is_drop_connector =
2409 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2410 if is_drop_connector {
2411 debug_assert!(
2412 source_watermarks.is_empty()
2413 && include_column_options.is_empty()
2414 && with_options
2415 .iter()
2416 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2417 );
2418 }
2419
2420 let props = CreateTableProps {
2421 definition: handler_args.normalized_sql.clone(),
2422 append_only,
2423 on_conflict: on_conflict.into(),
2424 with_version_columns: with_version_columns
2425 .iter()
2426 .map(|col| col.real_value())
2427 .collect(),
2428 webhook_info: original_catalog.webhook_info.clone(),
2429 engine,
2430 };
2431
2432 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2433 (Some(format_encode), None) => (
2434 gen_create_table_plan_with_source(
2435 handler_args,
2436 ExplainOptions::default(),
2437 table_name,
2438 columns,
2439 wildcard_idx,
2440 constraints,
2441 format_encode,
2442 source_watermarks,
2443 col_id_gen,
2444 include_column_options,
2445 props,
2446 sql_column_strategy,
2447 )
2448 .await?,
2449 TableJobType::General,
2450 ),
2451 (None, None) => {
2452 let context = OptimizerContext::from_handler_args(handler_args);
2453 let (plan, table) = gen_create_table_plan(
2454 context,
2455 table_name,
2456 columns,
2457 constraints,
2458 col_id_gen,
2459 source_watermarks,
2460 props,
2461 true,
2462 )?;
2463 ((plan, None, table), TableJobType::General)
2464 }
2465 (None, Some(cdc_table)) => {
2466 let session = &handler_args.session;
2467 let (source, resolved_table_name) =
2468 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2469
2470 let (cdc_with_options, normalized_external_table_name) =
2471 derive_with_options_for_cdc_table(
2472 &source.with_properties,
2473 cdc_table.external_table_name.clone(),
2474 )?;
2475
2476 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2477
2478 let context: OptimizerContextRef =
2479 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2480 let (plan, table) = gen_create_table_plan_for_cdc_table(
2481 context,
2482 source,
2483 normalized_external_table_name,
2484 columns,
2485 column_catalogs,
2486 pk_names,
2487 cdc_with_options,
2488 col_id_gen,
2489 on_conflict,
2490 with_version_columns
2491 .iter()
2492 .map(|col| col.real_value())
2493 .collect(),
2494 include_column_options,
2495 table_name,
2496 resolved_table_name,
2497 original_catalog.database_id,
2498 original_catalog.schema_id,
2499 original_catalog.id(),
2500 engine,
2501 )?;
2502
2503 ((plan, None, table), TableJobType::SharedCdcSource)
2504 }
2505 (Some(_), Some(_)) => {
2506 return Err(ErrorCode::NotSupported(
2507 "Data format and encoding format doesn't apply to table created from a CDC source"
2508 .into(),
2509 "Remove the FORMAT and ENCODE specification".into(),
2510 )
2511 .into());
2512 }
2513 };
2514
2515 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2516 Err(ErrorCode::InvalidInputSyntax(
2517 "alter primary key of table is not supported".to_owned(),
2518 ))?
2519 }
2520
2521 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2522
2523 table.id = original_catalog.id();
2525 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2526 table.associated_source_id = Some(source_id);
2527
2528 let source = source.as_mut().unwrap();
2529 source.id = source_id;
2530 source.associated_table_id = Some(table.id());
2531 }
2532
2533 Ok((graph, table, source, job_type))
2534}
2535
2536fn get_source_and_resolved_table_name(
2537 session: &Arc<SessionImpl>,
2538 cdc_table: CdcTableInfo,
2539 table_name: ObjectName,
2540) -> Result<(Arc<SourceCatalog>, String)> {
2541 let db_name = &session.database();
2542 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2543
2544 let (source_schema, source_name) =
2545 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2546
2547 let source = {
2548 let catalog_reader = session.env().catalog_reader().read_guard();
2549 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2550 let (source, _) = catalog_reader.get_source_by_name(
2551 db_name,
2552 SchemaPath::Name(schema_name.as_str()),
2553 source_name.as_str(),
2554 )?;
2555 source.clone()
2556 };
2557
2558 Ok((source, resolved_table_name))
2559}
2560
2561fn bind_webhook_info(
2563 session: &Arc<SessionImpl>,
2564 columns_defs: &[ColumnDef],
2565 webhook_info: WebhookSourceInfo,
2566) -> Result<PbWebhookSourceInfo> {
2567 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2569 {
2570 return Err(ErrorCode::InvalidInputSyntax(
2571 "Table with webhook source should have exactly one JSONB column".to_owned(),
2572 )
2573 .into());
2574 }
2575
2576 let WebhookSourceInfo {
2577 secret_ref,
2578 signature_expr,
2579 wait_for_persistence,
2580 is_batched,
2581 } = webhook_info;
2582
2583 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2585 let db_name = &session.database();
2586 let (schema_name, secret_name) =
2587 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2588 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2589 (
2590 Some(PbSecretRef {
2591 secret_id: secret_catalog.id,
2592 ref_as: match secret_ref.ref_as {
2593 SecretRefAsType::Text => PbRefAsType::Text,
2594 SecretRefAsType::File => PbRefAsType::File,
2595 }
2596 .into(),
2597 }),
2598 Some(secret_name),
2599 )
2600 } else {
2601 (None, None)
2602 };
2603
2604 let signature_expr = if let Some(signature_expr) = signature_expr {
2605 let secure_compare_context = SecureCompareContext {
2606 column_name: columns_defs[0].name.real_value(),
2607 secret_name,
2608 };
2609 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2610 let expr = binder.bind_expr(&signature_expr)?;
2611
2612 if expr.as_function_call().is_none()
2614 || expr.as_function_call().unwrap().func_type()
2615 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2616 {
2617 return Err(ErrorCode::InvalidInputSyntax(
2618 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2619 )
2620 .into());
2621 }
2622
2623 Some(expr.to_expr_proto())
2624 } else {
2625 session.notice_to_user(
2626 "VALIDATE clause is strongly recommended for safety or production usages",
2627 );
2628 None
2629 };
2630
2631 let pb_webhook_info = PbWebhookSourceInfo {
2632 secret_ref: pb_secret_ref,
2633 signature_expr,
2634 wait_for_persistence,
2635 is_batched,
2636 };
2637
2638 Ok(pb_webhook_info)
2639}
2640
2641#[cfg(test)]
2642mod tests {
2643 use risingwave_common::catalog::{
2644 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2645 };
2646 use risingwave_common::types::{DataType, StructType};
2647
2648 use super::*;
2649 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2650
2651 #[tokio::test]
2652 async fn test_create_table_handler() {
2653 let sql =
2654 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2655 let frontend = LocalFrontend::new(Default::default()).await;
2656 frontend.run_sql(sql).await.unwrap();
2657
2658 let session = frontend.session_ref();
2659 let catalog_reader = session.env().catalog_reader().read_guard();
2660 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2661
2662 let (table, _) = catalog_reader
2664 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2665 .unwrap();
2666 assert_eq!(table.name(), "t");
2667
2668 let columns = table
2669 .columns
2670 .iter()
2671 .map(|col| (col.name(), col.data_type().clone()))
2672 .collect::<HashMap<&str, DataType>>();
2673
2674 let expected_columns = maplit::hashmap! {
2675 ROW_ID_COLUMN_NAME => DataType::Serial,
2676 "v1" => DataType::Int16,
2677 "v2" => StructType::new(
2678 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2679 )
2680 .with_ids([3, 4, 5].map(ColumnId::new))
2681 .into(),
2682 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2683 };
2684
2685 assert_eq!(columns, expected_columns, "{columns:#?}");
2686 }
2687
2688 #[test]
2689 fn test_bind_primary_key() {
2690 for (sql, expected) in [
2693 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2694 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2695 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2696 (
2697 "create table t (v1 int primary key, v2 int primary key)",
2698 Err("multiple primary keys are not allowed"),
2699 ),
2700 (
2701 "create table t (v1 int primary key primary key, v2 int)",
2702 Err("multiple primary keys are not allowed"),
2703 ),
2704 (
2705 "create table t (v1 int, v2 int, primary key (v1))",
2706 Ok(&[1]),
2707 ),
2708 (
2709 "create table t (v1 int, primary key (v2), v2 int)",
2710 Ok(&[2]),
2711 ),
2712 (
2713 "create table t (primary key (v2, v1), v1 int, v2 int)",
2714 Ok(&[2, 1]),
2715 ),
2716 (
2717 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2718 Err("multiple primary keys are not allowed"),
2719 ),
2720 (
2721 "create table t (v1 int primary key, primary key (v1), v2 int)",
2722 Err("multiple primary keys are not allowed"),
2723 ),
2724 (
2725 "create table t (v1 int, primary key (V3), v2 int)",
2726 Err("column \"v3\" named in key does not exist"),
2727 ),
2728 ] {
2729 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2730 let risingwave_sqlparser::ast::Statement::CreateTable {
2731 columns: column_defs,
2732 constraints,
2733 ..
2734 } = ast.remove(0)
2735 else {
2736 panic!("test case should be create table")
2737 };
2738 let actual: Result<_> = (|| {
2739 let mut columns = bind_sql_columns(&column_defs, false)?;
2740 let mut col_id_gen = ColumnIdGenerator::new_initial();
2741 for c in &mut columns {
2742 col_id_gen.generate(c)?;
2743 }
2744
2745 let pk_names =
2746 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2747 let (_, pk_column_ids, _) =
2748 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2749 Ok(pk_column_ids)
2750 })();
2751 match (expected, actual) {
2752 (Ok(expected), Ok(actual)) => assert_eq!(
2753 expected.iter().copied().map(ColumnId::new).collect_vec(),
2754 actual,
2755 "sql: {sql}"
2756 ),
2757 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2758 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2759 (Err(expected), Err(actual)) => assert!(
2760 actual.to_string().contains(expected),
2761 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2762 ),
2763 }
2764 }
2765 }
2766
2767 #[tokio::test]
2768 async fn test_duplicate_props_options() {
2769 let proto_file = create_proto_file(PROTO_FILE_DATA);
2770 let sql = format!(
2771 r#"CREATE TABLE t
2772 WITH (
2773 connector = 'kinesis',
2774 aws.region='user_test_topic',
2775 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2776 aws.credentials.access_key_id = 'your_access_key_1',
2777 aws.credentials.secret_access_key = 'your_secret_key_1'
2778 )
2779 FORMAT PLAIN ENCODE PROTOBUF (
2780 message = '.test.TestRecord',
2781 aws.credentials.access_key_id = 'your_access_key_2',
2782 aws.credentials.secret_access_key = 'your_secret_key_2',
2783 schema.location = 'file://{}',
2784 )"#,
2785 proto_file.path().to_str().unwrap()
2786 );
2787 let frontend = LocalFrontend::new(Default::default()).await;
2788 frontend.run_sql(sql).await.unwrap();
2789
2790 let session = frontend.session_ref();
2791 let catalog_reader = session.env().catalog_reader().read_guard();
2792 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2793
2794 let (source, _) = catalog_reader
2796 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2797 .unwrap();
2798 assert_eq!(source.name, "t");
2799
2800 assert_eq!(
2802 source
2803 .info
2804 .format_encode_options
2805 .get("aws.credentials.access_key_id")
2806 .unwrap(),
2807 "your_access_key_2"
2808 );
2809 assert_eq!(
2810 source
2811 .info
2812 .format_encode_options
2813 .get("aws.credentials.secret_access_key")
2814 .unwrap(),
2815 "your_secret_key_2"
2816 );
2817
2818 assert_eq!(
2820 source
2821 .with_properties
2822 .get("aws.credentials.access_key_id")
2823 .unwrap(),
2824 "your_access_key_1"
2825 );
2826 assert_eq!(
2827 source
2828 .with_properties
2829 .get("aws.credentials.secret_access_key")
2830 .unwrap(),
2831 "your_secret_key_1"
2832 );
2833
2834 assert!(!source.with_properties.contains_key("schema.location"));
2836 }
2837}