1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77 bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::SourceSchemaCompatExt;
80use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
81use crate::optimizer::plan_node::{
82 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
83};
84use crate::optimizer::property::{Order, RequiredDist};
85use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
86use crate::session::SessionImpl;
87use crate::session::current::notice_to_user;
88use crate::stream_fragmenter::{GraphJobType, build_graph};
89use crate::utils::OverwriteOptions;
90use crate::{Binder, Explain, TableCatalog, WithOptions};
91
92mod col_id_gen;
93pub use col_id_gen::*;
94use risingwave_connector::sink::SinkParam;
95use risingwave_connector::sink::iceberg::{
96 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
97 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
98 COMPACTION_TRIGGER_SNAPSHOT_COUNT, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
99 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
100 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
101 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
102 parse_partition_by_exprs,
103};
104use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
105
106use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
107
108fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
109 for option_def in &c.options {
110 match option_def.option {
111 ColumnOption::GeneratedColumns(_) => {}
112 ColumnOption::DefaultValue(_) => {}
113 ColumnOption::DefaultValueInternal { .. } => {}
114 ColumnOption::Unique { is_primary: true } => {}
115 ColumnOption::Null => {}
116 ColumnOption::NotNull => {}
117 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
118 }
119 }
120 Ok(())
121}
122
123pub fn bind_sql_columns(
127 column_defs: &[ColumnDef],
128 is_for_drop_table_connector: bool,
129) -> Result<Vec<ColumnCatalog>> {
130 let mut columns = Vec::with_capacity(column_defs.len());
131
132 for column in column_defs {
133 ensure_column_options_supported(column)?;
134 let ColumnDef {
138 name,
139 data_type,
140 collation,
141 options,
142 ..
143 } = column;
144
145 let data_type = data_type
146 .clone()
147 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
148 if let Some(collation) = collation {
149 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
155 bail_not_implemented!(
156 "Collate collation other than `C` or `POSIX` is not implemented"
157 );
158 }
159
160 match data_type {
161 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
162 _ => {
163 return Err(ErrorCode::NotSupported(
164 format!("{} is not a collatable data type", data_type),
165 "The only built-in collatable data types are `varchar`, please check your type".into(),
166 ).into());
167 }
168 }
169 }
170
171 if !is_for_drop_table_connector {
172 check_column_name_not_reserved(&name.real_value())?;
176 }
177
178 let nullable: bool = !options
179 .iter()
180 .any(|def| matches!(def.option, ColumnOption::NotNull));
181
182 columns.push(ColumnCatalog {
183 column_desc: ColumnDesc {
184 data_type: bind_data_type(&data_type)?,
185 column_id: ColumnId::placeholder(),
186 name: name.real_value(),
187 generated_or_default_column: None,
188 description: None,
189 additional_column: AdditionalColumn { column_type: None },
190 version: ColumnDescVersion::LATEST,
191 system_column: None,
192 nullable,
193 },
194 is_hidden: false,
195 });
196 }
197
198 Ok(columns)
199}
200
201fn check_generated_column_constraints(
202 column_name: &String,
203 column_id: ColumnId,
204 expr: &ExprImpl,
205 column_catalogs: &[ColumnCatalog],
206 generated_column_names: &[String],
207 pk_column_ids: &[ColumnId],
208) -> Result<()> {
209 let input_refs = expr.collect_input_refs(column_catalogs.len());
210 for idx in input_refs.ones() {
211 let referred_generated_column = &column_catalogs[idx].column_desc.name;
212 if generated_column_names
213 .iter()
214 .any(|c| c == referred_generated_column)
215 {
216 return Err(ErrorCode::BindError(format!(
217 "Generated can not reference another generated column. \
218 But here generated column \"{}\" referenced another generated column \"{}\"",
219 column_name, referred_generated_column
220 ))
221 .into());
222 }
223 }
224
225 if pk_column_ids.contains(&column_id) && expr.is_impure() {
226 return Err(ErrorCode::BindError(format!(
227 "Generated columns with impure expressions should not be part of the primary key. \
228 Here column \"{}\" is defined as part of the primary key.",
229 column_name
230 ))
231 .into());
232 }
233
234 Ok(())
235}
236
237pub fn bind_sql_column_constraints(
240 session: &SessionImpl,
241 table_name: String,
242 column_catalogs: &mut [ColumnCatalog],
243 columns: &[ColumnDef],
244 pk_column_ids: &[ColumnId],
245) -> Result<()> {
246 let generated_column_names = {
247 let mut names = vec![];
248 for column in columns {
249 for option_def in &column.options {
250 if let ColumnOption::GeneratedColumns(_) = option_def.option {
251 names.push(column.name.real_value());
252 break;
253 }
254 }
255 }
256 names
257 };
258
259 let mut binder = Binder::new_for_ddl(session);
260 binder.bind_columns_to_context(table_name, column_catalogs)?;
261
262 for column in columns {
263 let Some(idx) = column_catalogs
264 .iter()
265 .position(|c| c.name() == column.name.real_value())
266 else {
267 continue;
270 };
271
272 for option_def in &column.options {
273 match &option_def.option {
274 ColumnOption::GeneratedColumns(expr) => {
275 binder.set_clause(Some(Clause::GeneratedColumn));
276
277 let expr_impl = binder.bind_expr(expr).with_context(|| {
278 format!(
279 "fail to bind expression in generated column \"{}\"",
280 column.name.real_value()
281 )
282 })?;
283
284 check_generated_column_constraints(
285 &column.name.real_value(),
286 column_catalogs[idx].column_id(),
287 &expr_impl,
288 column_catalogs,
289 &generated_column_names,
290 pk_column_ids,
291 )?;
292
293 column_catalogs[idx].column_desc.generated_or_default_column = Some(
294 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
295 expr: Some(expr_impl.to_expr_proto()),
296 }),
297 );
298 binder.set_clause(None);
299 }
300 ColumnOption::DefaultValue(expr) => {
301 let expr_impl = binder
302 .bind_expr(expr)?
303 .cast_assign(column_catalogs[idx].data_type())?;
304
305 let rewritten_expr_impl = session
313 .pinned_snapshot()
314 .inline_now_proc_time()
315 .rewrite_expr(expr_impl.clone());
316
317 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
318 let snapshot_value = snapshot_value?;
319
320 column_catalogs[idx].column_desc.generated_or_default_column =
321 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
322 snapshot_value: Some(snapshot_value.to_protobuf()),
323 expr: Some(expr_impl.to_expr_proto()),
324 }));
326 } else {
327 return Err(ErrorCode::BindError(format!(
328 "Default expression used in column `{}` cannot be evaluated. \
329 Use generated columns instead if you mean to reference other columns.",
330 column.name
331 ))
332 .into());
333 }
334 }
335 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
336 if persisted.is_empty() {
341 bail_bind_error!(
342 "DEFAULT INTERNAL is only used for internal purposes, \
343 please specify a concrete default value"
344 );
345 }
346
347 let desc = DefaultColumnDesc::decode(&**persisted)
348 .expect("failed to decode persisted `DefaultColumnDesc`");
349
350 column_catalogs[idx].column_desc.generated_or_default_column =
351 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
352 }
353 _ => {}
354 }
355 }
356 }
357 Ok(())
358}
359
360pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
362 let mut pk_column_names = vec![];
363
364 for constraint in table_constraints {
365 match constraint {
366 TableConstraint::Unique {
367 name: _,
368 columns,
369 is_primary: true,
370 } => {
371 if !pk_column_names.is_empty() {
372 return Err(multiple_pk_definition_err());
373 }
374 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
375 }
376 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
377 }
378 }
379 Ok(pk_column_names)
380}
381
382pub fn bind_sql_pk_names(
383 columns_defs: &[ColumnDef],
384 pk_names_from_table_constraints: Vec<String>,
385) -> Result<Vec<String>> {
386 let mut pk_column_names = pk_names_from_table_constraints;
387
388 for column in columns_defs {
389 for option_def in &column.options {
390 if let ColumnOption::Unique { is_primary: true } = option_def.option {
391 if !pk_column_names.is_empty() {
392 return Err(multiple_pk_definition_err());
393 }
394 pk_column_names.push(column.name.real_value());
395 };
396 }
397 }
398
399 Ok(pk_column_names)
400}
401
402fn multiple_pk_definition_err() -> RwError {
403 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
404}
405
406pub fn bind_pk_and_row_id_on_relation(
411 mut columns: Vec<ColumnCatalog>,
412 pk_names: Vec<String>,
413 must_need_pk: bool,
414) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
415 for c in &columns {
416 assert!(c.column_id() != ColumnId::placeholder());
417 }
418
419 let name_to_id = columns
421 .iter()
422 .map(|c| (c.name(), c.column_id()))
423 .collect::<HashMap<_, _>>();
424
425 let mut pk_column_ids: Vec<_> = pk_names
426 .iter()
427 .map(|name| {
428 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
429 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
430 })
431 })
432 .try_collect()?;
433
434 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
436
437 let row_id_index = need_row_id.then(|| {
438 let column = ColumnCatalog::row_id_column();
439 let index = columns.len();
440 pk_column_ids = vec![column.column_id()];
441 columns.push(column);
442 index
443 });
444
445 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
446 Err(ErrorCode::InvalidInputSyntax(format!(
447 "column \"{col}\" specified more than once"
448 )))?;
449 }
450
451 Ok((columns, pk_column_ids, row_id_index))
452}
453
454#[allow(clippy::too_many_arguments)]
457pub(crate) async fn gen_create_table_plan_with_source(
458 mut handler_args: HandlerArgs,
459 explain_options: ExplainOptions,
460 table_name: ObjectName,
461 column_defs: Vec<ColumnDef>,
462 wildcard_idx: Option<usize>,
463 constraints: Vec<TableConstraint>,
464 format_encode: FormatEncodeOptions,
465 source_watermarks: Vec<SourceWatermark>,
466 mut col_id_gen: ColumnIdGenerator,
467 include_column_options: IncludeOption,
468 props: CreateTableProps,
469 sql_column_strategy: SqlColumnStrategy,
470) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
471 if props.append_only
472 && format_encode.format != Format::Plain
473 && format_encode.format != Format::Native
474 {
475 return Err(ErrorCode::BindError(format!(
476 "Append only table does not support format {}.",
477 format_encode.format
478 ))
479 .into());
480 }
481
482 let session = &handler_args.session;
483 let (with_properties, refresh_mode) =
484 bind_connector_props(&handler_args, &format_encode, false)?;
485 if with_properties.is_shareable_cdc_connector() {
486 generated_columns_check_for_cdc_table(&column_defs)?;
487 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
488 } else if column_defs.iter().any(|col| {
489 col.options
490 .iter()
491 .any(|def| matches!(def.option, ColumnOption::NotNull))
492 }) {
493 notice_to_user(
495 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
496 );
497 }
498
499 let db_name: &str = &session.database();
500 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
501
502 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
504 session,
505 &format_encode,
506 Either::Left(&with_properties),
507 CreateSourceType::Table,
508 )
509 .await?;
510
511 let overwrite_options = OverwriteOptions::new(&mut handler_args);
512 let rate_limit = overwrite_options.source_rate_limit;
513 let source = bind_create_source_or_table_with_connector(
514 handler_args.clone(),
515 table_name,
516 format_encode,
517 with_properties,
518 &column_defs,
519 constraints,
520 wildcard_idx,
521 source_watermarks,
522 columns_from_resolve_source,
523 source_info,
524 include_column_options,
525 &mut col_id_gen,
526 CreateSourceType::Table,
527 rate_limit,
528 sql_column_strategy,
529 refresh_mode,
530 )
531 .await?;
532
533 let context = OptimizerContext::new(handler_args, explain_options);
534
535 let (plan, table) = gen_table_plan_with_source(
536 context.into(),
537 schema_name,
538 source.clone(),
539 col_id_gen.into_version(),
540 props,
541 )?;
542
543 Ok((plan, Some(source), table))
544}
545
546#[allow(clippy::too_many_arguments)]
549pub(crate) fn gen_create_table_plan(
550 context: OptimizerContext,
551 table_name: ObjectName,
552 column_defs: Vec<ColumnDef>,
553 constraints: Vec<TableConstraint>,
554 mut col_id_gen: ColumnIdGenerator,
555 source_watermarks: Vec<SourceWatermark>,
556 props: CreateTableProps,
557 is_for_replace_plan: bool,
558) -> Result<(PlanRef, TableCatalog)> {
559 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
560 for c in &mut columns {
561 col_id_gen.generate(c)?;
562 }
563
564 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
565 if !secret_refs.is_empty() || !connection_refs.is_empty() {
566 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
567 }
568
569 gen_create_table_plan_without_source(
570 context,
571 table_name,
572 columns,
573 column_defs,
574 constraints,
575 source_watermarks,
576 col_id_gen.into_version(),
577 props,
578 )
579}
580
581#[allow(clippy::too_many_arguments)]
582pub(crate) fn gen_create_table_plan_without_source(
583 context: OptimizerContext,
584 table_name: ObjectName,
585 columns: Vec<ColumnCatalog>,
586 column_defs: Vec<ColumnDef>,
587 constraints: Vec<TableConstraint>,
588 source_watermarks: Vec<SourceWatermark>,
589 version: TableVersion,
590 props: CreateTableProps,
591) -> Result<(PlanRef, TableCatalog)> {
592 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
594 let (mut columns, pk_column_ids, row_id_index) =
595 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
596
597 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
598 context.session_ctx(),
599 table_name.real_value(),
600 source_watermarks,
601 &columns,
602 )?;
603
604 bind_sql_column_constraints(
605 context.session_ctx(),
606 table_name.real_value(),
607 &mut columns,
608 &column_defs,
609 &pk_column_ids,
610 )?;
611 let session = context.session_ctx().clone();
612
613 let db_name = &session.database();
614 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
615
616 let info = CreateTableInfo {
617 columns,
618 pk_column_ids,
619 row_id_index,
620 watermark_descs,
621 source_catalog: None,
622 version,
623 };
624
625 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
626}
627
628fn gen_table_plan_with_source(
629 context: OptimizerContextRef,
630 schema_name: Option<String>,
631 source_catalog: SourceCatalog,
632 version: TableVersion,
633 props: CreateTableProps,
634) -> Result<(PlanRef, TableCatalog)> {
635 let table_name = source_catalog.name.clone();
636
637 let info = CreateTableInfo {
638 columns: source_catalog.columns.clone(),
639 pk_column_ids: source_catalog.pk_col_ids.clone(),
640 row_id_index: source_catalog.row_id_index,
641 watermark_descs: source_catalog.watermark_descs.clone(),
642 source_catalog: Some(source_catalog),
643 version,
644 };
645
646 gen_table_plan_inner(context, schema_name, table_name, info, props)
647}
648
649#[derive(Clone, Copy)]
651pub enum EitherOnConflict {
652 Ast(Option<OnConflict>),
653 Resolved(ConflictBehavior),
654}
655
656impl From<Option<OnConflict>> for EitherOnConflict {
657 fn from(v: Option<OnConflict>) -> Self {
658 Self::Ast(v)
659 }
660}
661
662impl From<ConflictBehavior> for EitherOnConflict {
663 fn from(v: ConflictBehavior) -> Self {
664 Self::Resolved(v)
665 }
666}
667
668impl EitherOnConflict {
669 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
671 let conflict_behavior = match self {
672 EitherOnConflict::Ast(on_conflict) => {
673 if append_only {
674 if row_id_as_pk {
675 ConflictBehavior::NoCheck
677 } else {
678 if let Some(on_conflict) = on_conflict
680 && on_conflict != OnConflict::Nothing
681 {
682 return Err(ErrorCode::InvalidInputSyntax(
683 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
684 the ON CONFLICT behavior must be DO NOTHING."
685 .to_owned(),
686 )
687 .into());
688 }
689 ConflictBehavior::IgnoreConflict
690 }
691 } else {
692 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
694 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
695 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
696 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
697 }
698 }
699 }
700 EitherOnConflict::Resolved(b) => b,
701 };
702
703 Ok(conflict_behavior)
704 }
705}
706
707pub struct CreateTableInfo {
712 pub columns: Vec<ColumnCatalog>,
713 pub pk_column_ids: Vec<ColumnId>,
714 pub row_id_index: Option<usize>,
715 pub watermark_descs: Vec<WatermarkDesc>,
716 pub source_catalog: Option<SourceCatalog>,
717 pub version: TableVersion,
718}
719
720pub struct CreateTableProps {
725 pub definition: String,
726 pub append_only: bool,
727 pub on_conflict: EitherOnConflict,
728 pub with_version_columns: Vec<String>,
729 pub webhook_info: Option<PbWebhookSourceInfo>,
730 pub engine: Engine,
731}
732
733#[allow(clippy::too_many_arguments)]
734fn gen_table_plan_inner(
735 context: OptimizerContextRef,
736 schema_name: Option<String>,
737 table_name: String,
738 info: CreateTableInfo,
739 props: CreateTableProps,
740) -> Result<(PlanRef, TableCatalog)> {
741 let CreateTableInfo {
742 ref columns,
743 row_id_index,
744 ref watermark_descs,
745 ref source_catalog,
746 ..
747 } = info;
748 let CreateTableProps { append_only, .. } = props;
749
750 let (database_id, schema_id) = context
751 .session_ctx()
752 .get_database_and_schema_id_for_create(schema_name)?;
753
754 let session = context.session_ctx().clone();
755 let retention_seconds = context.with_options().retention_seconds();
756
757 let source_node: LogicalPlanRef = LogicalSource::new(
758 source_catalog.clone().map(Rc::new),
759 columns.clone(),
760 row_id_index,
761 SourceNodeKind::CreateTable,
762 context.clone(),
763 None,
764 )?
765 .into();
766
767 let required_cols = FixedBitSet::with_capacity(columns.len());
768 let plan_root = PlanRoot::new_with_logical_plan(
769 source_node,
770 RequiredDist::Any,
771 Order::any(),
772 required_cols,
773 vec![],
774 );
775
776 if !append_only && !watermark_descs.is_empty() {
777 return Err(ErrorCode::NotSupported(
778 "Defining watermarks on table requires the table to be append only.".to_owned(),
779 "Use the key words `APPEND ONLY`".to_owned(),
780 )
781 .into());
782 }
783
784 if !append_only && retention_seconds.is_some() {
785 if session
786 .config()
787 .unsafe_enable_storage_retention_for_non_append_only_tables()
788 {
789 tracing::warn!(
790 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
791 table_name
792 );
793 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
794 This may lead to stream inconsistency and unrecoverable \
795 node failure if there is any row INSERT/UPDATE/DELETE operation \
796 corresponding to the TTLed primary keys";
797 session.notice_to_user(NOTICE);
798 } else {
799 return Err(ErrorCode::NotSupported(
800 "Defining retention seconds on table requires the table to be append only."
801 .to_owned(),
802 "Use the key words `APPEND ONLY`".to_owned(),
803 )
804 .into());
805 }
806 }
807
808 let materialize =
809 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
810
811 let mut table = materialize.table().clone();
812 table.owner = session.user_id();
813
814 Ok((materialize.into(), table))
815}
816
817#[allow(clippy::too_many_arguments)]
821pub(crate) fn gen_create_table_plan_for_cdc_table(
822 context: OptimizerContextRef,
823 source: Arc<SourceCatalog>,
824 external_table_name: String,
825 column_defs: Vec<ColumnDef>,
826 mut columns: Vec<ColumnCatalog>,
827 pk_names: Vec<String>,
828 cdc_with_options: WithOptionsSecResolved,
829 mut col_id_gen: ColumnIdGenerator,
830 on_conflict: Option<OnConflict>,
831 with_version_columns: Vec<String>,
832 include_column_options: IncludeOption,
833 table_name: ObjectName,
834 resolved_table_name: String, database_id: DatabaseId,
836 schema_id: SchemaId,
837 table_id: TableId,
838 engine: Engine,
839) -> Result<(PlanRef, TableCatalog)> {
840 let session = context.session_ctx().clone();
841
842 handle_addition_columns(
844 None,
845 &cdc_with_options,
846 include_column_options,
847 &mut columns,
848 true,
849 )?;
850
851 for c in &mut columns {
852 col_id_gen.generate(c)?;
853 }
854
855 let (mut columns, pk_column_ids, _row_id_index) =
856 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
857
858 bind_sql_column_constraints(
860 context.session_ctx(),
861 table_name.real_value(),
862 &mut columns,
863 &column_defs,
864 &pk_column_ids,
865 )?;
866
867 let definition = context.normalized_sql().to_owned();
868
869 let pk_column_indices = {
870 let mut id_to_idx = HashMap::new();
871 columns.iter().enumerate().for_each(|(idx, c)| {
872 id_to_idx.insert(c.column_id(), idx);
873 });
874 pk_column_ids
876 .iter()
877 .map(|c| id_to_idx.get(c).copied().unwrap())
878 .collect_vec()
879 };
880 let table_pk = pk_column_indices
881 .iter()
882 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
883 .collect();
884
885 let (options, secret_refs) = cdc_with_options.into_parts();
886
887 let non_generated_column_descs = columns
888 .iter()
889 .filter(|&c| !c.is_generated())
890 .map(|c| c.column_desc.clone())
891 .collect_vec();
892 let non_generated_column_num = non_generated_column_descs.len();
893 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
894 let cdc_table_desc = CdcTableDesc {
895 table_id,
896 source_id: source.id, external_table_name: external_table_name.clone(),
898 pk: table_pk,
899 columns: non_generated_column_descs,
900 stream_key: pk_column_indices,
901 connect_properties: options,
902 secret_refs,
903 };
904
905 tracing::debug!(?cdc_table_desc, "create cdc table");
906 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
907
908 let logical_scan = LogicalCdcScan::create(
909 external_table_name.clone(),
910 Rc::new(cdc_table_desc),
911 context.clone(),
912 options,
913 );
914
915 let scan_node: LogicalPlanRef = logical_scan.into();
916 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
917 let plan_root = PlanRoot::new_with_logical_plan(
918 scan_node,
919 RequiredDist::Any,
920 Order::any(),
921 required_cols,
922 vec![],
923 );
924
925 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
926 let materialize = plan_root.gen_table_plan(
927 context,
928 resolved_table_name,
929 database_id,
930 schema_id,
931 CreateTableInfo {
932 columns,
933 pk_column_ids,
934 row_id_index: None,
935 watermark_descs: vec![],
936 source_catalog: Some((*source).clone()),
937 version: col_id_gen.into_version(),
938 },
939 CreateTableProps {
940 definition,
941 append_only: false,
942 on_conflict: on_conflict.into(),
943 with_version_columns,
944 webhook_info: None,
945 engine,
946 },
947 )?;
948
949 let mut table = materialize.table().clone();
950 table.owner = session.user_id();
951 table.cdc_table_id = Some(cdc_table_id);
952 table.cdc_table_type = Some(cdc_table_type);
953 Ok((materialize.into(), table))
954}
955
956fn derive_with_options_for_cdc_table(
957 source_with_properties: &WithOptionsSecResolved,
958 external_table_name: String,
959) -> Result<WithOptionsSecResolved> {
960 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
961 let source_database_name: &str = source_with_properties
963 .get("database.name")
964 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
965 .as_str();
966 let mut with_options = source_with_properties.clone();
967 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
968 match connector.as_str() {
969 MYSQL_CDC_CONNECTOR => {
970 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
973 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
974 })?;
975 if !source_database_name
977 .split(',')
978 .map(|s| s.trim())
979 .any(|name| name == db_name)
980 {
981 return Err(anyhow!(
982 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
983 db_name,
984 source_database_name
985 ).into());
986 }
987 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
988 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
989 }
990 POSTGRES_CDC_CONNECTOR => {
991 let (schema_name, table_name) = external_table_name
992 .split_once('.')
993 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
994
995 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
997 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
998 }
999 SQL_SERVER_CDC_CONNECTOR => {
1000 let parts: Vec<&str> = external_table_name.split('.').collect();
1006 let (_, schema_name, table_name) = match parts.len() {
1007 3 => {
1008 let db_name = parts[0];
1010 let schema_name = parts[1];
1011 let table_name = parts[2];
1012
1013 if db_name != source_database_name {
1015 return Err(anyhow!(
1016 "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
1017 db_name,
1018 source_database_name
1019 ).into());
1020 }
1021 (db_name, schema_name, table_name)
1022 }
1023 2 => {
1024 let schema_name = parts[0];
1026 let table_name = parts[1];
1027 (source_database_name, schema_name, table_name)
1028 }
1029 1 => {
1030 let table_name = parts[0];
1032 (source_database_name, "dbo", table_name)
1033 }
1034 _ => {
1035 return Err(anyhow!(
1036 "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1037 ).into());
1038 }
1039 };
1040
1041 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1043 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1044 }
1045 _ => {
1046 return Err(RwError::from(anyhow!(
1047 "connector {} is not supported for cdc table",
1048 connector
1049 )));
1050 }
1051 };
1052 }
1053 Ok(with_options)
1054}
1055
1056#[allow(clippy::too_many_arguments)]
1057pub(super) async fn handle_create_table_plan(
1058 handler_args: HandlerArgs,
1059 explain_options: ExplainOptions,
1060 format_encode: Option<FormatEncodeOptions>,
1061 cdc_table_info: Option<CdcTableInfo>,
1062 table_name: &ObjectName,
1063 column_defs: Vec<ColumnDef>,
1064 wildcard_idx: Option<usize>,
1065 constraints: Vec<TableConstraint>,
1066 source_watermarks: Vec<SourceWatermark>,
1067 append_only: bool,
1068 on_conflict: Option<OnConflict>,
1069 with_version_columns: Vec<String>,
1070 include_column_options: IncludeOption,
1071 webhook_info: Option<WebhookSourceInfo>,
1072 engine: Engine,
1073) -> Result<(
1074 PlanRef,
1075 Option<SourceCatalog>,
1076 TableCatalog,
1077 TableJobType,
1078 Option<SourceId>,
1079)> {
1080 let col_id_gen = ColumnIdGenerator::new_initial();
1081 let format_encode = check_create_table_with_source(
1082 &handler_args.with_options,
1083 format_encode,
1084 &include_column_options,
1085 &cdc_table_info,
1086 )?;
1087 let webhook_info = webhook_info
1088 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1089 .transpose()?;
1090
1091 let props = CreateTableProps {
1092 definition: handler_args.normalized_sql.clone(),
1093 append_only,
1094 on_conflict: on_conflict.into(),
1095 with_version_columns: with_version_columns.clone(),
1096 webhook_info,
1097 engine,
1098 };
1099
1100 let ((plan, source, table), job_type, shared_shource_id) = match (
1101 format_encode,
1102 cdc_table_info.as_ref(),
1103 ) {
1104 (Some(format_encode), None) => (
1105 gen_create_table_plan_with_source(
1106 handler_args,
1107 explain_options,
1108 table_name.clone(),
1109 column_defs,
1110 wildcard_idx,
1111 constraints,
1112 format_encode,
1113 source_watermarks,
1114 col_id_gen,
1115 include_column_options,
1116 props,
1117 SqlColumnStrategy::FollowChecked,
1118 )
1119 .await?,
1120 TableJobType::General,
1121 None,
1122 ),
1123 (None, None) => {
1124 let context = OptimizerContext::new(handler_args, explain_options);
1125 let (plan, table) = gen_create_table_plan(
1126 context,
1127 table_name.clone(),
1128 column_defs,
1129 constraints,
1130 col_id_gen,
1131 source_watermarks,
1132 props,
1133 false,
1134 )?;
1135
1136 ((plan, None, table), TableJobType::General, None)
1137 }
1138
1139 (None, Some(cdc_table)) => {
1140 sanity_check_for_table_on_cdc_source(
1141 append_only,
1142 &column_defs,
1143 &wildcard_idx,
1144 &constraints,
1145 &source_watermarks,
1146 )?;
1147
1148 generated_columns_check_for_cdc_table(&column_defs)?;
1149 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1150
1151 let session = &handler_args.session;
1152 let db_name = &session.database();
1153 let user_name = &session.user_name();
1154 let search_path = session.config().search_path();
1155 let (schema_name, resolved_table_name) =
1156 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1157 let (database_id, schema_id) =
1158 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1159
1160 let (format_encode, source_name) =
1162 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1163
1164 let source = {
1165 let catalog_reader = session.env().catalog_reader().read_guard();
1166 let schema_path =
1167 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1168
1169 let (source, _) = catalog_reader.get_source_by_name(
1170 db_name,
1171 schema_path,
1172 source_name.as_str(),
1173 )?;
1174 source.clone()
1175 };
1176 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1177 &source.with_properties,
1178 cdc_table.external_table_name.clone(),
1179 )?;
1180
1181 let (columns, pk_names) = match wildcard_idx {
1182 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1183 None => {
1184 for column_def in &column_defs {
1185 for option_def in &column_def.options {
1186 if let ColumnOption::DefaultValue(_)
1187 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1188 {
1189 return Err(ErrorCode::NotSupported(
1190 "Default value for columns defined on the table created from a CDC source".into(),
1191 "Remove the default value expression in the column definitions".into(),
1192 )
1193 .into());
1194 }
1195 }
1196 }
1197
1198 let (columns, pk_names) =
1199 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1200 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1202 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1203 .context("failed to extract external table config")?;
1204
1205 (columns, pk_names)
1206 }
1207 };
1208
1209 let context: OptimizerContextRef =
1210 OptimizerContext::new(handler_args, explain_options).into();
1211 let shared_source_id = source.id;
1212 let (plan, table) = gen_create_table_plan_for_cdc_table(
1213 context,
1214 source,
1215 cdc_table.external_table_name.clone(),
1216 column_defs,
1217 columns,
1218 pk_names,
1219 cdc_with_options,
1220 col_id_gen,
1221 on_conflict,
1222 with_version_columns,
1223 include_column_options,
1224 table_name.clone(),
1225 resolved_table_name,
1226 database_id,
1227 schema_id,
1228 TableId::placeholder(),
1229 engine,
1230 )?;
1231
1232 (
1233 (plan, None, table),
1234 TableJobType::SharedCdcSource,
1235 Some(shared_source_id),
1236 )
1237 }
1238 (Some(_), Some(_)) => {
1239 return Err(ErrorCode::NotSupported(
1240 "Data format and encoding format doesn't apply to table created from a CDC source"
1241 .into(),
1242 "Remove the FORMAT and ENCODE specification".into(),
1243 )
1244 .into());
1245 }
1246 };
1247 Ok((plan, source, table, job_type, shared_shource_id))
1248}
1249
1250fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1252 let mut found_generated_column = false;
1253 for column in columns {
1254 let mut is_generated = false;
1255
1256 for option_def in &column.options {
1257 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1258 is_generated = true;
1259 break;
1260 }
1261 }
1262
1263 if is_generated {
1264 found_generated_column = true;
1265 } else if found_generated_column {
1266 return Err(ErrorCode::NotSupported(
1267 "Non-generated column found after a generated column.".into(),
1268 "Ensure that all generated columns appear at the end of the cdc table definition."
1269 .into(),
1270 )
1271 .into());
1272 }
1273 }
1274 Ok(())
1275}
1276
1277fn not_null_check_for_cdc_table(
1279 wildcard_idx: &Option<usize>,
1280 column_defs: &Vec<ColumnDef>,
1281) -> Result<()> {
1282 if !wildcard_idx.is_some()
1283 && column_defs.iter().any(|col| {
1284 col.options
1285 .iter()
1286 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1287 })
1288 {
1289 return Err(ErrorCode::NotSupported(
1290 "CDC table with NOT NULL constraint is not supported".to_owned(),
1291 "Please remove the NOT NULL constraint for columns".to_owned(),
1292 )
1293 .into());
1294 }
1295 Ok(())
1296}
1297
1298fn sanity_check_for_table_on_cdc_source(
1300 append_only: bool,
1301 column_defs: &Vec<ColumnDef>,
1302 wildcard_idx: &Option<usize>,
1303 constraints: &Vec<TableConstraint>,
1304 source_watermarks: &Vec<SourceWatermark>,
1305) -> Result<()> {
1306 if wildcard_idx.is_some() && !column_defs.is_empty() {
1308 return Err(ErrorCode::NotSupported(
1309 "wildcard(*) and column definitions cannot be used together".to_owned(),
1310 "Remove the wildcard or column definitions".to_owned(),
1311 )
1312 .into());
1313 }
1314
1315 if !wildcard_idx.is_some()
1317 && !constraints.iter().any(|c| {
1318 matches!(
1319 c,
1320 TableConstraint::Unique {
1321 is_primary: true,
1322 ..
1323 }
1324 )
1325 })
1326 && !column_defs.iter().any(|col| {
1327 col.options
1328 .iter()
1329 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1330 })
1331 {
1332 return Err(ErrorCode::NotSupported(
1333 "CDC table without primary key constraint is not supported".to_owned(),
1334 "Please define a primary key".to_owned(),
1335 )
1336 .into());
1337 }
1338
1339 if append_only {
1340 return Err(ErrorCode::NotSupported(
1341 "append only modifier on the table created from a CDC source".into(),
1342 "Remove the APPEND ONLY clause".into(),
1343 )
1344 .into());
1345 }
1346
1347 if !source_watermarks.is_empty() {
1348 return Err(ErrorCode::NotSupported(
1349 "watermark defined on the table created from a CDC source".into(),
1350 "Remove the Watermark definitions".into(),
1351 )
1352 .into());
1353 }
1354
1355 Ok(())
1356}
1357
1358async fn bind_cdc_table_schema_externally(
1360 cdc_with_options: WithOptionsSecResolved,
1361) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1362 let (options, secret_refs) = cdc_with_options.into_parts();
1364 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1365 .context("failed to extract external table config")?;
1366
1367 let table = ExternalTableImpl::connect(config)
1368 .await
1369 .context("failed to auto derive table schema")?;
1370
1371 Ok((
1372 table
1373 .column_descs()
1374 .iter()
1375 .cloned()
1376 .map(|column_desc| ColumnCatalog {
1377 column_desc,
1378 is_hidden: false,
1379 })
1380 .collect(),
1381 table.pk_names().clone(),
1382 ))
1383}
1384
1385fn bind_cdc_table_schema(
1387 column_defs: &Vec<ColumnDef>,
1388 constraints: &Vec<TableConstraint>,
1389 is_for_replace_plan: bool,
1390) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1391 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1392
1393 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1394 Ok((columns, pk_names))
1395}
1396
1397#[allow(clippy::too_many_arguments)]
1398pub async fn handle_create_table(
1399 handler_args: HandlerArgs,
1400 table_name: ObjectName,
1401 column_defs: Vec<ColumnDef>,
1402 wildcard_idx: Option<usize>,
1403 constraints: Vec<TableConstraint>,
1404 if_not_exists: bool,
1405 format_encode: Option<FormatEncodeOptions>,
1406 source_watermarks: Vec<SourceWatermark>,
1407 append_only: bool,
1408 on_conflict: Option<OnConflict>,
1409 with_version_columns: Vec<String>,
1410 cdc_table_info: Option<CdcTableInfo>,
1411 include_column_options: IncludeOption,
1412 webhook_info: Option<WebhookSourceInfo>,
1413 ast_engine: risingwave_sqlparser::ast::Engine,
1414) -> Result<RwPgResponse> {
1415 let session = handler_args.session.clone();
1416
1417 if append_only {
1418 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1419 }
1420
1421 session.check_cluster_limits().await?;
1422
1423 let engine = match ast_engine {
1424 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1425 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1426 };
1427
1428 if let Either::Right(resp) = session.check_relation_name_duplicated(
1429 table_name.clone(),
1430 StatementType::CREATE_TABLE,
1431 if_not_exists,
1432 )? {
1433 return Ok(resp);
1434 }
1435
1436 let (graph, source, hummock_table, job_type, shared_source_id) = {
1437 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1438 handler_args.clone(),
1439 ExplainOptions::default(),
1440 format_encode,
1441 cdc_table_info,
1442 &table_name,
1443 column_defs.clone(),
1444 wildcard_idx,
1445 constraints.clone(),
1446 source_watermarks,
1447 append_only,
1448 on_conflict,
1449 with_version_columns,
1450 include_column_options,
1451 webhook_info,
1452 engine,
1453 )
1454 .await?;
1455 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1456
1457 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1458
1459 (graph, source, table, job_type, shared_source_id)
1460 };
1461
1462 tracing::trace!(
1463 "name={}, graph=\n{}",
1464 table_name,
1465 serde_json::to_string_pretty(&graph).unwrap()
1466 );
1467
1468 let dependencies = shared_source_id
1469 .map(|id| HashSet::from([id.as_object_id()]))
1470 .unwrap_or_default();
1471
1472 match engine {
1474 Engine::Hummock => {
1475 let catalog_writer = session.catalog_writer()?;
1476 catalog_writer
1477 .create_table(
1478 source.map(|s| s.to_prost()),
1479 hummock_table.to_prost(),
1480 graph,
1481 job_type,
1482 if_not_exists,
1483 dependencies,
1484 )
1485 .await?;
1486 }
1487 Engine::Iceberg => {
1488 let hummock_table_name = hummock_table.name.clone();
1489 session.create_staging_table(hummock_table.clone());
1490 let res = create_iceberg_engine_table(
1491 session.clone(),
1492 handler_args,
1493 source.map(|s| s.to_prost()),
1494 hummock_table,
1495 graph,
1496 table_name,
1497 job_type,
1498 if_not_exists,
1499 )
1500 .await;
1501 session.drop_staging_table(&hummock_table_name);
1502 res?
1503 }
1504 }
1505
1506 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1507}
1508
1509#[allow(clippy::too_many_arguments)]
1518pub async fn create_iceberg_engine_table(
1519 session: Arc<SessionImpl>,
1520 handler_args: HandlerArgs,
1521 mut source: Option<PbSource>,
1522 table: TableCatalog,
1523 graph: StreamFragmentGraph,
1524 table_name: ObjectName,
1525 job_type: PbTableJobType,
1526 if_not_exists: bool,
1527) -> Result<()> {
1528 let meta_client = session.env().meta_client();
1529 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1530
1531 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1532 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1533 })?;
1534 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1535 let meta_store_user = meta_store_endpoint.username().to_owned();
1536 let meta_store_password = match meta_store_endpoint.password() {
1537 Some(password) => percent_decode_str(password)
1538 .decode_utf8()
1539 .map_err(|_| {
1540 ErrorCode::InternalError(
1541 "failed to parse password from meta store endpoint".to_owned(),
1542 )
1543 })?
1544 .into_owned(),
1545 None => "".to_owned(),
1546 };
1547 let meta_store_host = meta_store_endpoint
1548 .host_str()
1549 .ok_or_else(|| {
1550 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1551 })?
1552 .to_owned();
1553 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1554 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1555 })?;
1556 let meta_store_database = meta_store_endpoint
1557 .path()
1558 .trim_start_matches('/')
1559 .to_owned();
1560
1561 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1562 bail!("failed to parse meta backend: {}", meta_store_backend);
1563 };
1564
1565 let catalog_uri = match meta_backend {
1566 MetaBackend::Postgres => {
1567 format!(
1568 "jdbc:postgresql://{}:{}/{}",
1569 meta_store_host.clone(),
1570 meta_store_port.clone(),
1571 meta_store_database.clone()
1572 )
1573 }
1574 MetaBackend::Mysql => {
1575 format!(
1576 "jdbc:mysql://{}:{}/{}",
1577 meta_store_host.clone(),
1578 meta_store_port.clone(),
1579 meta_store_database.clone()
1580 )
1581 }
1582 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1583 bail!(
1584 "Unsupported meta backend for iceberg engine table: {}",
1585 meta_store_backend
1586 );
1587 }
1588 };
1589
1590 let rw_db_name = session
1591 .env()
1592 .catalog_reader()
1593 .read_guard()
1594 .get_database_by_id(table.database_id)?
1595 .name()
1596 .to_owned();
1597 let rw_schema_name = session
1598 .env()
1599 .catalog_reader()
1600 .read_guard()
1601 .get_schema_by_id(table.database_id, table.schema_id)?
1602 .name()
1603 .clone();
1604 let iceberg_catalog_name = rw_db_name.clone();
1605 let iceberg_database_name = rw_schema_name.clone();
1606 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1607
1608 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1609 let sink_decouple = session.config().sink_decouple();
1610 if matches!(sink_decouple, SinkDecouple::Disable) {
1611 bail!(
1612 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1613 );
1614 }
1615
1616 let mut connection_ref = BTreeMap::new();
1617 let with_common = if iceberg_engine_connection.is_empty() {
1618 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1619 } else {
1620 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1621 assert_eq!(parts.len(), 2);
1622 let connection_catalog =
1623 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1624 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1625 if params.connection_type == ConnectionType::Iceberg as i32 {
1626 connection_ref.insert(
1628 "connection".to_owned(),
1629 ConnectionRefValue {
1630 connection_name: ObjectName::from(vec![
1631 Ident::from(parts[0]),
1632 Ident::from(parts[1]),
1633 ]),
1634 },
1635 );
1636
1637 let mut with_common = BTreeMap::new();
1638 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1639 with_common.insert("database.name".to_owned(), iceberg_database_name);
1640 with_common.insert("table.name".to_owned(), iceberg_table_name);
1641
1642 if let Some(s) = params.properties.get("hosted_catalog")
1643 && s.eq_ignore_ascii_case("true")
1644 {
1645 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1646 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1647 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1648 with_common.insert(
1649 "catalog.jdbc.password".to_owned(),
1650 meta_store_password.clone(),
1651 );
1652 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1653 }
1654
1655 with_common
1656 } else {
1657 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1658 "Only iceberg connection could be used in iceberg engine".to_owned(),
1659 )));
1660 }
1661 } else {
1662 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1663 "Private Link Service has been deprecated. Please create a new connection instead."
1664 .to_owned(),
1665 )));
1666 }
1667 };
1668
1669 let mut pks = table
1672 .pk_column_names()
1673 .iter()
1674 .map(|c| c.to_string())
1675 .collect::<Vec<String>>();
1676
1677 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1679 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1680 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1681 "select {} as {}, * from {}",
1682 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1683 ))
1684 .context("unable to parse query")?
1685 .try_into()
1686 .unwrap();
1687
1688 let Statement::Query(query) = &stmt else {
1689 panic!("unexpected statement: {:?}", stmt);
1690 };
1691 CreateSink::AsQuery(query.clone())
1692 } else {
1693 CreateSink::From(table_name.clone())
1694 };
1695
1696 let mut sink_name = table_name.clone();
1697 *sink_name.0.last_mut().unwrap() = Ident::from(
1698 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1699 );
1700 let create_sink_stmt = CreateSinkStatement {
1701 if_not_exists: false,
1702 sink_name,
1703 with_properties: WithProperties(vec![]),
1704 sink_from,
1705 columns: vec![],
1706 emit_mode: None,
1707 sink_schema: None,
1708 into_table_name: None,
1709 };
1710
1711 let mut sink_handler_args = handler_args.clone();
1712
1713 let mut sink_with = with_common.clone();
1714
1715 if table.append_only {
1716 sink_with.insert("type".to_owned(), "append-only".to_owned());
1717 } else {
1718 sink_with.insert("primary_key".to_owned(), pks.join(","));
1719 sink_with.insert("type".to_owned(), "upsert".to_owned());
1720 }
1721 let commit_checkpoint_interval = handler_args
1740 .with_options
1741 .get(COMMIT_CHECKPOINT_INTERVAL)
1742 .map(|v| v.to_owned())
1743 .unwrap_or_else(|| "60".to_owned());
1744 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1745 ErrorCode::InvalidInputSyntax(format!(
1746 "commit_checkpoint_interval must be greater than 0: {}",
1747 commit_checkpoint_interval
1748 ))
1749 })?;
1750
1751 if commit_checkpoint_interval == 0 {
1752 bail!("commit_checkpoint_interval must be greater than 0");
1753 }
1754
1755 source
1757 .as_mut()
1758 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1759
1760 let sink_decouple = session.config().sink_decouple();
1761 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1762 bail!(
1763 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1764 )
1765 }
1766
1767 sink_with.insert(
1768 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1769 commit_checkpoint_interval.to_string(),
1770 );
1771 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1772
1773 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1774
1775 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1776 match enable_compaction.to_lowercase().as_str() {
1777 "true" => {
1778 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1779 }
1780 "false" => {
1781 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1782 }
1783 _ => {
1784 return Err(ErrorCode::InvalidInputSyntax(format!(
1785 "enable_compaction must be true or false: {}",
1786 enable_compaction
1787 ))
1788 .into());
1789 }
1790 }
1791
1792 source
1794 .as_mut()
1795 .map(|x| x.with_properties.remove("enable_compaction"));
1796 } else {
1797 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1798 }
1799
1800 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1801 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1802 ErrorCode::InvalidInputSyntax(format!(
1803 "compaction_interval_sec must be greater than 0: {}",
1804 commit_checkpoint_interval
1805 ))
1806 })?;
1807 if compaction_interval_sec == 0 {
1808 bail!("compaction_interval_sec must be greater than 0");
1809 }
1810 sink_with.insert(
1811 "compaction_interval_sec".to_owned(),
1812 compaction_interval_sec.to_string(),
1813 );
1814 source
1816 .as_mut()
1817 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1818 }
1819
1820 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1821 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1822 {
1823 source
1825 .as_mut()
1826 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1827 match enable_snapshot_expiration.to_lowercase().as_str() {
1828 "true" => {
1829 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1830 true
1831 }
1832 "false" => {
1833 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1834 false
1835 }
1836 _ => {
1837 return Err(ErrorCode::InvalidInputSyntax(format!(
1838 "enable_snapshot_expiration must be true or false: {}",
1839 enable_snapshot_expiration
1840 ))
1841 .into());
1842 }
1843 }
1844 } else {
1845 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1846 true
1847 };
1848
1849 if has_enabled_snapshot_expiration {
1850 if let Some(snapshot_expiration_retain_last) = handler_args
1852 .with_options
1853 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1854 {
1855 sink_with.insert(
1856 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1857 snapshot_expiration_retain_last.to_owned(),
1858 );
1859 source
1861 .as_mut()
1862 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1863 }
1864
1865 if let Some(snapshot_expiration_max_age) = handler_args
1866 .with_options
1867 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1868 {
1869 sink_with.insert(
1870 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1871 snapshot_expiration_max_age.to_owned(),
1872 );
1873 source
1875 .as_mut()
1876 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1877 }
1878
1879 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1880 .with_options
1881 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1882 {
1883 sink_with.insert(
1884 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1885 snapshot_expiration_clear_expired_files.to_owned(),
1886 );
1887 source.as_mut().map(|x| {
1889 x.with_properties
1890 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1891 });
1892 }
1893
1894 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1895 .with_options
1896 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1897 {
1898 sink_with.insert(
1899 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1900 snapshot_expiration_clear_expired_meta_data.to_owned(),
1901 );
1902 source.as_mut().map(|x| {
1904 x.with_properties
1905 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1906 });
1907 }
1908 }
1909
1910 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1911 match write_mode.to_lowercase().as_str() {
1912 ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1913 sink_with.insert(
1914 WRITE_MODE.to_owned(),
1915 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1916 );
1917 }
1918
1919 ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1920 if table.append_only {
1921 return Err(ErrorCode::NotSupported(
1922 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1923 "Please use MERGE ON READ instead".to_owned(),
1924 )
1925 .into());
1926 }
1927
1928 sink_with.insert(
1929 WRITE_MODE.to_owned(),
1930 ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1931 );
1932 }
1933
1934 _ => {
1935 return Err(ErrorCode::InvalidInputSyntax(format!(
1936 "write_mode must be one of: {}, {}",
1937 ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1938 ))
1939 .into());
1940 }
1941 }
1942
1943 source
1945 .as_mut()
1946 .map(|x| x.with_properties.remove("write_mode"));
1947 } else {
1948 sink_with.insert(
1949 WRITE_MODE.to_owned(),
1950 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1951 );
1952 }
1953
1954 if let Some(max_snapshots_num_before_compaction) =
1955 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
1956 {
1957 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
1958 .parse::<u32>()
1959 .map_err(|_| {
1960 ErrorCode::InvalidInputSyntax(format!(
1961 "{} must be greater than 0: {}",
1962 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
1963 ))
1964 })?;
1965 if max_snapshots_num_before_compaction == 0 {
1966 bail!(format!(
1967 "{} must be greater than 0",
1968 COMPACTION_MAX_SNAPSHOTS_NUM
1969 ));
1970 }
1971 sink_with.insert(
1972 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
1973 max_snapshots_num_before_compaction.to_string(),
1974 );
1975
1976 source
1978 .as_mut()
1979 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
1980 }
1981
1982 if let Some(small_files_threshold_mb) = handler_args
1983 .with_options
1984 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
1985 {
1986 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
1987 ErrorCode::InvalidInputSyntax(format!(
1988 "{} must be greater than 0: {}",
1989 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
1990 ))
1991 })?;
1992 if small_files_threshold_mb == 0 {
1993 bail!(format!(
1994 "{} must be a greater than 0",
1995 COMPACTION_SMALL_FILES_THRESHOLD_MB
1996 ));
1997 }
1998 sink_with.insert(
1999 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2000 small_files_threshold_mb.to_string(),
2001 );
2002
2003 source.as_mut().map(|x| {
2005 x.with_properties
2006 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2007 });
2008 }
2009
2010 if let Some(delete_files_count_threshold) = handler_args
2011 .with_options
2012 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2013 {
2014 let delete_files_count_threshold =
2015 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2016 ErrorCode::InvalidInputSyntax(format!(
2017 "{} must be greater than 0: {}",
2018 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2019 ))
2020 })?;
2021 if delete_files_count_threshold == 0 {
2022 bail!(format!(
2023 "{} must be greater than 0",
2024 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2025 ));
2026 }
2027 sink_with.insert(
2028 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2029 delete_files_count_threshold.to_string(),
2030 );
2031
2032 source.as_mut().map(|x| {
2034 x.with_properties
2035 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2036 });
2037 }
2038
2039 if let Some(trigger_snapshot_count) = handler_args
2040 .with_options
2041 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2042 {
2043 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2044 ErrorCode::InvalidInputSyntax(format!(
2045 "{} must be greater than 0: {}",
2046 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2047 ))
2048 })?;
2049 if trigger_snapshot_count == 0 {
2050 bail!(format!(
2051 "{} must be greater than 0",
2052 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2053 ));
2054 }
2055 sink_with.insert(
2056 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2057 trigger_snapshot_count.to_string(),
2058 );
2059
2060 source
2062 .as_mut()
2063 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2064 }
2065
2066 if let Some(target_file_size_mb) = handler_args
2067 .with_options
2068 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2069 {
2070 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2071 ErrorCode::InvalidInputSyntax(format!(
2072 "{} must be greater than 0: {}",
2073 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2074 ))
2075 })?;
2076 if target_file_size_mb == 0 {
2077 bail!(format!(
2078 "{} must be greater than 0",
2079 COMPACTION_TARGET_FILE_SIZE_MB
2080 ));
2081 }
2082 sink_with.insert(
2083 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2084 target_file_size_mb.to_string(),
2085 );
2086 source
2088 .as_mut()
2089 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2090 }
2091
2092 let partition_by = handler_args
2093 .with_options
2094 .get("partition_by")
2095 .map(|v| v.to_owned());
2096
2097 if let Some(partition_by) = &partition_by {
2098 let mut partition_columns = vec![];
2099 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2100 table
2101 .columns()
2102 .iter()
2103 .find(|col| col.name().eq_ignore_ascii_case(&column))
2104 .ok_or_else(|| {
2105 ErrorCode::InvalidInputSyntax(format!(
2106 "Partition source column does not exist in schema: {}",
2107 column
2108 ))
2109 })?;
2110
2111 partition_columns.push(column);
2112 }
2113
2114 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2115 |_| {
2116 ErrorCode::InvalidInputSyntax(
2117 "The partition columns should be the prefix of the primary key".to_owned(),
2118 )
2119 },
2120 )?;
2121
2122 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2123
2124 source
2126 .as_mut()
2127 .map(|x| x.with_properties.remove("partition_by"));
2128 }
2129
2130 sink_handler_args.with_options =
2131 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2132 let SinkPlanContext {
2133 sink_plan,
2134 sink_catalog,
2135 ..
2136 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2137 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2138
2139 let mut source_name = table_name.clone();
2140 *source_name.0.last_mut().unwrap() = Ident::from(
2141 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2142 );
2143 let create_source_stmt = CreateSourceStatement {
2144 temporary: false,
2145 if_not_exists: false,
2146 columns: vec![],
2147 source_name,
2148 wildcard_idx: Some(0),
2149 constraints: vec![],
2150 with_properties: WithProperties(vec![]),
2151 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2152 source_watermarks: vec![],
2153 include_column_options: vec![],
2154 };
2155
2156 let mut source_handler_args = handler_args.clone();
2157 let source_with = with_common;
2158 source_handler_args.with_options =
2159 WithOptions::new(source_with, Default::default(), connection_ref);
2160
2161 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2162 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2163 let (with_properties, refresh_mode) =
2164 bind_connector_props(&source_handler_args, &format_encode, true)?;
2165
2166 let (iceberg_catalog, table_identifier) = {
2169 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2170 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2171 iceberg_sink.create_table_if_not_exists().await?;
2172
2173 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2174 let table_identifier = iceberg_sink.config.full_table_name()?;
2175 (iceberg_catalog, table_identifier)
2176 };
2177
2178 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2179 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2180 &session,
2181 &format_encode,
2182 Either::Left(&with_properties),
2183 create_source_type,
2184 )
2185 .await?;
2186 let mut col_id_gen = ColumnIdGenerator::new_initial();
2187
2188 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2189 source_handler_args,
2190 create_source_stmt.source_name,
2191 format_encode,
2192 with_properties,
2193 &create_source_stmt.columns,
2194 create_source_stmt.constraints,
2195 create_source_stmt.wildcard_idx,
2196 create_source_stmt.source_watermarks,
2197 columns_from_resolve_source,
2198 source_info,
2199 create_source_stmt.include_column_options,
2200 &mut col_id_gen,
2201 create_source_type,
2202 overwrite_options.source_rate_limit,
2203 SqlColumnStrategy::FollowChecked,
2204 refresh_mode,
2205 )
2206 .await?;
2207
2208 let _ = Jvm::get_or_init()?;
2211
2212 let catalog_writer = session.catalog_writer()?;
2213 let res = catalog_writer
2214 .create_iceberg_table(
2215 PbTableJobInfo {
2216 source,
2217 table: Some(table.to_prost()),
2218 fragment_graph: Some(graph),
2219 job_type: job_type as _,
2220 },
2221 PbSinkJobInfo {
2222 sink: Some(sink_catalog.to_proto()),
2223 fragment_graph: Some(sink_graph),
2224 },
2225 iceberg_source_catalog.to_prost(),
2226 if_not_exists,
2227 )
2228 .await;
2229 if res.is_err() {
2230 let _ = iceberg_catalog
2231 .drop_table(&table_identifier)
2232 .await
2233 .inspect_err(|err| {
2234 tracing::error!(
2235 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2236 table_identifier,
2237 err.as_report()
2238 );
2239 });
2240 res?
2241 }
2242
2243 Ok(())
2244}
2245
2246pub fn check_create_table_with_source(
2247 with_options: &WithOptions,
2248 format_encode: Option<FormatEncodeOptions>,
2249 include_column_options: &IncludeOption,
2250 cdc_table_info: &Option<CdcTableInfo>,
2251) -> Result<Option<FormatEncodeOptions>> {
2252 if cdc_table_info.is_some() {
2254 return Ok(format_encode);
2255 }
2256 let defined_source = with_options.is_source_connector();
2257
2258 if !include_column_options.is_empty() && !defined_source {
2259 return Err(ErrorCode::InvalidInputSyntax(
2260 "INCLUDE should be used with a connector".to_owned(),
2261 )
2262 .into());
2263 }
2264 if defined_source {
2265 format_encode.as_ref().ok_or_else(|| {
2266 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2267 })?;
2268 }
2269 Ok(format_encode)
2270}
2271
2272fn ensure_partition_columns_are_prefix_of_primary_key(
2273 partition_columns: &[String],
2274 primary_key_columns: &[String],
2275) -> std::result::Result<(), String> {
2276 if partition_columns.len() > primary_key_columns.len() {
2277 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2278 }
2279
2280 for (i, partition_col) in partition_columns.iter().enumerate() {
2281 if primary_key_columns.get(i) != Some(partition_col) {
2282 return Err(format!(
2283 "Partition column '{}' is not a prefix of the primary key.",
2284 partition_col
2285 ));
2286 }
2287 }
2288
2289 Ok(())
2290}
2291
2292#[allow(clippy::too_many_arguments)]
2293pub async fn generate_stream_graph_for_replace_table(
2294 _session: &Arc<SessionImpl>,
2295 table_name: ObjectName,
2296 original_catalog: &Arc<TableCatalog>,
2297 handler_args: HandlerArgs,
2298 statement: Statement,
2299 col_id_gen: ColumnIdGenerator,
2300 sql_column_strategy: SqlColumnStrategy,
2301) -> Result<(
2302 StreamFragmentGraph,
2303 TableCatalog,
2304 Option<SourceCatalog>,
2305 TableJobType,
2306)> {
2307 let Statement::CreateTable {
2308 columns,
2309 constraints,
2310 source_watermarks,
2311 append_only,
2312 on_conflict,
2313 with_version_columns,
2314 wildcard_idx,
2315 cdc_table_info,
2316 format_encode,
2317 include_column_options,
2318 engine,
2319 with_options,
2320 ..
2321 } = statement
2322 else {
2323 panic!("unexpected statement type: {:?}", statement);
2324 };
2325
2326 let format_encode = format_encode
2327 .clone()
2328 .map(|format_encode| format_encode.into_v2_with_warning());
2329
2330 let engine = match engine {
2331 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2332 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2333 };
2334
2335 let is_drop_connector =
2336 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2337 if is_drop_connector {
2338 debug_assert!(
2339 source_watermarks.is_empty()
2340 && include_column_options.is_empty()
2341 && with_options
2342 .iter()
2343 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2344 );
2345 }
2346
2347 let props = CreateTableProps {
2348 definition: handler_args.normalized_sql.clone(),
2349 append_only,
2350 on_conflict: on_conflict.into(),
2351 with_version_columns: with_version_columns
2352 .iter()
2353 .map(|col| col.real_value())
2354 .collect(),
2355 webhook_info: original_catalog.webhook_info.clone(),
2356 engine,
2357 };
2358
2359 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2360 (Some(format_encode), None) => (
2361 gen_create_table_plan_with_source(
2362 handler_args,
2363 ExplainOptions::default(),
2364 table_name,
2365 columns,
2366 wildcard_idx,
2367 constraints,
2368 format_encode,
2369 source_watermarks,
2370 col_id_gen,
2371 include_column_options,
2372 props,
2373 sql_column_strategy,
2374 )
2375 .await?,
2376 TableJobType::General,
2377 ),
2378 (None, None) => {
2379 let context = OptimizerContext::from_handler_args(handler_args);
2380 let (plan, table) = gen_create_table_plan(
2381 context,
2382 table_name,
2383 columns,
2384 constraints,
2385 col_id_gen,
2386 source_watermarks,
2387 props,
2388 true,
2389 )?;
2390 ((plan, None, table), TableJobType::General)
2391 }
2392 (None, Some(cdc_table)) => {
2393 let session = &handler_args.session;
2394 let (source, resolved_table_name, database_id, schema_id) =
2395 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2396
2397 let cdc_with_options = derive_with_options_for_cdc_table(
2398 &source.with_properties,
2399 cdc_table.external_table_name.clone(),
2400 )?;
2401
2402 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2403
2404 let context: OptimizerContextRef =
2405 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2406 let (plan, table) = gen_create_table_plan_for_cdc_table(
2407 context,
2408 source,
2409 cdc_table.external_table_name.clone(),
2410 columns,
2411 column_catalogs,
2412 pk_names,
2413 cdc_with_options,
2414 col_id_gen,
2415 on_conflict,
2416 with_version_columns
2417 .iter()
2418 .map(|col| col.real_value())
2419 .collect(),
2420 include_column_options,
2421 table_name,
2422 resolved_table_name,
2423 database_id,
2424 schema_id,
2425 original_catalog.id(),
2426 engine,
2427 )?;
2428
2429 ((plan, None, table), TableJobType::SharedCdcSource)
2430 }
2431 (Some(_), Some(_)) => {
2432 return Err(ErrorCode::NotSupported(
2433 "Data format and encoding format doesn't apply to table created from a CDC source"
2434 .into(),
2435 "Remove the FORMAT and ENCODE specification".into(),
2436 )
2437 .into());
2438 }
2439 };
2440
2441 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2442 Err(ErrorCode::InvalidInputSyntax(
2443 "alter primary key of table is not supported".to_owned(),
2444 ))?
2445 }
2446
2447 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2448
2449 table.id = original_catalog.id();
2451 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2452 table.associated_source_id = Some(source_id);
2453
2454 let source = source.as_mut().unwrap();
2455 source.id = source_id;
2456 source.associated_table_id = Some(table.id());
2457 }
2458
2459 Ok((graph, table, source, job_type))
2460}
2461
2462fn get_source_and_resolved_table_name(
2463 session: &Arc<SessionImpl>,
2464 cdc_table: CdcTableInfo,
2465 table_name: ObjectName,
2466) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2467 let db_name = &session.database();
2468 let (schema_name, resolved_table_name) =
2469 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2470 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
2471
2472 let (format_encode, source_name) =
2473 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2474
2475 let source = {
2476 let catalog_reader = session.env().catalog_reader().read_guard();
2477 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2478 let (source, _) = catalog_reader.get_source_by_name(
2479 db_name,
2480 SchemaPath::Name(schema_name.as_str()),
2481 source_name.as_str(),
2482 )?;
2483 source.clone()
2484 };
2485
2486 Ok((source, resolved_table_name, database_id, schema_id))
2487}
2488
2489fn bind_webhook_info(
2491 session: &Arc<SessionImpl>,
2492 columns_defs: &[ColumnDef],
2493 webhook_info: WebhookSourceInfo,
2494) -> Result<PbWebhookSourceInfo> {
2495 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2497 {
2498 return Err(ErrorCode::InvalidInputSyntax(
2499 "Table with webhook source should have exactly one JSONB column".to_owned(),
2500 )
2501 .into());
2502 }
2503
2504 let WebhookSourceInfo {
2505 secret_ref,
2506 signature_expr,
2507 wait_for_persistence,
2508 is_batched,
2509 } = webhook_info;
2510
2511 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2513 let db_name = &session.database();
2514 let (schema_name, secret_name) =
2515 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2516 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2517 (
2518 Some(PbSecretRef {
2519 secret_id: secret_catalog.id,
2520 ref_as: match secret_ref.ref_as {
2521 SecretRefAsType::Text => PbRefAsType::Text,
2522 SecretRefAsType::File => PbRefAsType::File,
2523 }
2524 .into(),
2525 }),
2526 Some(secret_name),
2527 )
2528 } else {
2529 (None, None)
2530 };
2531
2532 let secure_compare_context = SecureCompareContext {
2533 column_name: columns_defs[0].name.real_value(),
2534 secret_name,
2535 };
2536 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2537 let expr = binder.bind_expr(&signature_expr)?;
2538
2539 if expr.as_function_call().is_none()
2541 || expr.as_function_call().unwrap().func_type()
2542 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2543 {
2544 return Err(ErrorCode::InvalidInputSyntax(
2545 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2546 )
2547 .into());
2548 }
2549
2550 let pb_webhook_info = PbWebhookSourceInfo {
2551 secret_ref: pb_secret_ref,
2552 signature_expr: Some(expr.to_expr_proto()),
2553 wait_for_persistence,
2554 is_batched,
2555 };
2556
2557 Ok(pb_webhook_info)
2558}
2559
2560#[cfg(test)]
2561mod tests {
2562 use risingwave_common::catalog::{
2563 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2564 };
2565 use risingwave_common::types::{DataType, StructType};
2566
2567 use super::*;
2568 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2569
2570 #[tokio::test]
2571 async fn test_create_table_handler() {
2572 let sql =
2573 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2574 let frontend = LocalFrontend::new(Default::default()).await;
2575 frontend.run_sql(sql).await.unwrap();
2576
2577 let session = frontend.session_ref();
2578 let catalog_reader = session.env().catalog_reader().read_guard();
2579 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2580
2581 let (table, _) = catalog_reader
2583 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2584 .unwrap();
2585 assert_eq!(table.name(), "t");
2586
2587 let columns = table
2588 .columns
2589 .iter()
2590 .map(|col| (col.name(), col.data_type().clone()))
2591 .collect::<HashMap<&str, DataType>>();
2592
2593 let expected_columns = maplit::hashmap! {
2594 ROW_ID_COLUMN_NAME => DataType::Serial,
2595 "v1" => DataType::Int16,
2596 "v2" => StructType::new(
2597 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2598 )
2599 .with_ids([3, 4, 5].map(ColumnId::new))
2600 .into(),
2601 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2602 };
2603
2604 assert_eq!(columns, expected_columns, "{columns:#?}");
2605 }
2606
2607 #[test]
2608 fn test_bind_primary_key() {
2609 for (sql, expected) in [
2612 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2613 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2614 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2615 (
2616 "create table t (v1 int primary key, v2 int primary key)",
2617 Err("multiple primary keys are not allowed"),
2618 ),
2619 (
2620 "create table t (v1 int primary key primary key, v2 int)",
2621 Err("multiple primary keys are not allowed"),
2622 ),
2623 (
2624 "create table t (v1 int, v2 int, primary key (v1))",
2625 Ok(&[1]),
2626 ),
2627 (
2628 "create table t (v1 int, primary key (v2), v2 int)",
2629 Ok(&[2]),
2630 ),
2631 (
2632 "create table t (primary key (v2, v1), v1 int, v2 int)",
2633 Ok(&[2, 1]),
2634 ),
2635 (
2636 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2637 Err("multiple primary keys are not allowed"),
2638 ),
2639 (
2640 "create table t (v1 int primary key, primary key (v1), v2 int)",
2641 Err("multiple primary keys are not allowed"),
2642 ),
2643 (
2644 "create table t (v1 int, primary key (V3), v2 int)",
2645 Err("column \"v3\" named in key does not exist"),
2646 ),
2647 ] {
2648 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2649 let risingwave_sqlparser::ast::Statement::CreateTable {
2650 columns: column_defs,
2651 constraints,
2652 ..
2653 } = ast.remove(0)
2654 else {
2655 panic!("test case should be create table")
2656 };
2657 let actual: Result<_> = (|| {
2658 let mut columns = bind_sql_columns(&column_defs, false)?;
2659 let mut col_id_gen = ColumnIdGenerator::new_initial();
2660 for c in &mut columns {
2661 col_id_gen.generate(c)?;
2662 }
2663
2664 let pk_names =
2665 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2666 let (_, pk_column_ids, _) =
2667 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2668 Ok(pk_column_ids)
2669 })();
2670 match (expected, actual) {
2671 (Ok(expected), Ok(actual)) => assert_eq!(
2672 expected.iter().copied().map(ColumnId::new).collect_vec(),
2673 actual,
2674 "sql: {sql}"
2675 ),
2676 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2677 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2678 (Err(expected), Err(actual)) => assert!(
2679 actual.to_string().contains(expected),
2680 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2681 ),
2682 }
2683 }
2684 }
2685
2686 #[tokio::test]
2687 async fn test_duplicate_props_options() {
2688 let proto_file = create_proto_file(PROTO_FILE_DATA);
2689 let sql = format!(
2690 r#"CREATE TABLE t
2691 WITH (
2692 connector = 'kinesis',
2693 aws.region='user_test_topic',
2694 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2695 aws.credentials.access_key_id = 'your_access_key_1',
2696 aws.credentials.secret_access_key = 'your_secret_key_1'
2697 )
2698 FORMAT PLAIN ENCODE PROTOBUF (
2699 message = '.test.TestRecord',
2700 aws.credentials.access_key_id = 'your_access_key_2',
2701 aws.credentials.secret_access_key = 'your_secret_key_2',
2702 schema.location = 'file://{}',
2703 )"#,
2704 proto_file.path().to_str().unwrap()
2705 );
2706 let frontend = LocalFrontend::new(Default::default()).await;
2707 frontend.run_sql(sql).await.unwrap();
2708
2709 let session = frontend.session_ref();
2710 let catalog_reader = session.env().catalog_reader().read_guard();
2711 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2712
2713 let (source, _) = catalog_reader
2715 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2716 .unwrap();
2717 assert_eq!(source.name, "t");
2718
2719 assert_eq!(
2721 source
2722 .info
2723 .format_encode_options
2724 .get("aws.credentials.access_key_id")
2725 .unwrap(),
2726 "your_access_key_2"
2727 );
2728 assert_eq!(
2729 source
2730 .info
2731 .format_encode_options
2732 .get("aws.credentials.secret_access_key")
2733 .unwrap(),
2734 "your_secret_key_2"
2735 );
2736
2737 assert_eq!(
2739 source
2740 .with_properties
2741 .get("aws.credentials.access_key_id")
2742 .unwrap(),
2743 "your_access_key_1"
2744 );
2745 assert_eq!(
2746 source
2747 .with_properties
2748 .get("aws.credentials.secret_access_key")
2749 .unwrap(),
2750 "your_secret_key_1"
2751 );
2752
2753 assert!(!source.with_properties.contains_key("schema.location"));
2755 }
2756}