1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{
45 AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
46};
47use risingwave_pb::catalog::connection::Info as ConnectionInfo;
48use risingwave_pb::catalog::connection_params::ConnectionType;
49use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
50use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
51use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
52use risingwave_pb::plan_common::{
53 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
54};
55use risingwave_pb::secret::PbSecretRef;
56use risingwave_pb::secret::secret_ref::PbRefAsType;
57use risingwave_pb::stream_plan::StreamFragmentGraph;
58use risingwave_sqlparser::ast::{
59 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
60 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
61 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
62 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
63};
64use risingwave_sqlparser::parser::IncludeOption;
65use thiserror_ext::AsReport;
66
67use super::RwPgResponse;
68use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
69use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
70use crate::catalog::root_catalog::SchemaPath;
71use crate::catalog::source_catalog::SourceCatalog;
72use crate::catalog::table_catalog::TableVersion;
73use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
74use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
75use crate::expr::{Expr, ExprImpl, ExprRewriter};
76use crate::handler::HandlerArgs;
77use crate::handler::create_source::{
78 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
79 bind_source_watermark, handle_addition_columns,
80};
81use crate::handler::util::{
82 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
83};
84use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
85use crate::optimizer::plan_node::{
86 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
87 ensure_sync_log_store_fragment_root,
88};
89use crate::optimizer::property::{Order, RequiredDist};
90use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
91use crate::session::SessionImpl;
92use crate::session::current::notice_to_user;
93use crate::stream_fragmenter::{GraphJobType, build_graph};
94use crate::utils::OverwriteOptions;
95use crate::{Binder, Explain, TableCatalog, WithOptions};
96
97mod col_id_gen;
98pub use col_id_gen::*;
99use risingwave_connector::sink::SinkParam;
100use risingwave_connector::sink::iceberg::{
101 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
102 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
103 COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
104 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS,
105 CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
106 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
107 IcebergWriteMode, ORDER_KEY, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
108 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
109 SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
110 validate_order_key_columns,
111};
112use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
113
114use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
115
116fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
117 for option_def in &c.options {
118 match option_def.option {
119 ColumnOption::GeneratedColumns(_) => {}
120 ColumnOption::DefaultValue(_) => {}
121 ColumnOption::DefaultValueInternal { .. } => {}
122 ColumnOption::Unique { is_primary: true } => {}
123 ColumnOption::Null => {}
124 ColumnOption::NotNull => {}
125 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
126 }
127 }
128 Ok(())
129}
130
131pub fn bind_sql_columns(
135 column_defs: &[ColumnDef],
136 is_for_drop_table_connector: bool,
137) -> Result<Vec<ColumnCatalog>> {
138 let mut columns = Vec::with_capacity(column_defs.len());
139
140 for column in column_defs {
141 ensure_column_options_supported(column)?;
142 let ColumnDef {
146 name,
147 data_type,
148 collation,
149 options,
150 ..
151 } = column;
152
153 let data_type = data_type
154 .clone()
155 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
156 if let Some(collation) = collation {
157 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
163 bail_not_implemented!(
164 "Collate collation other than `C` or `POSIX` is not implemented"
165 );
166 }
167
168 match data_type {
169 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
170 _ => {
171 return Err(ErrorCode::NotSupported(
172 format!("{} is not a collatable data type", data_type),
173 "The only built-in collatable data types are `varchar`, please check your type".into(),
174 ).into());
175 }
176 }
177 }
178
179 if !is_for_drop_table_connector {
180 check_column_name_not_reserved(&name.real_value())?;
184 }
185
186 let nullable: bool = !options
187 .iter()
188 .any(|def| matches!(def.option, ColumnOption::NotNull));
189
190 columns.push(ColumnCatalog {
191 column_desc: ColumnDesc {
192 data_type: bind_data_type(&data_type)?,
193 column_id: ColumnId::placeholder(),
194 name: name.real_value(),
195 generated_or_default_column: None,
196 description: None,
197 additional_column: AdditionalColumn { column_type: None },
198 version: ColumnDescVersion::LATEST,
199 system_column: None,
200 nullable,
201 },
202 is_hidden: false,
203 });
204 }
205
206 Ok(columns)
207}
208
209fn check_generated_column_constraints(
210 column_name: &String,
211 column_id: ColumnId,
212 expr: &ExprImpl,
213 column_catalogs: &[ColumnCatalog],
214 generated_column_names: &[String],
215 pk_column_ids: &[ColumnId],
216) -> Result<()> {
217 let input_refs = expr.collect_input_refs(column_catalogs.len());
218 for idx in input_refs.ones() {
219 let referred_generated_column = &column_catalogs[idx].column_desc.name;
220 if generated_column_names
221 .iter()
222 .any(|c| c == referred_generated_column)
223 {
224 return Err(ErrorCode::BindError(format!(
225 "Generated can not reference another generated column. \
226 But here generated column \"{}\" referenced another generated column \"{}\"",
227 column_name, referred_generated_column
228 ))
229 .into());
230 }
231 }
232
233 if pk_column_ids.contains(&column_id) && expr.is_impure() {
234 return Err(ErrorCode::BindError(format!(
235 "Generated columns with impure expressions should not be part of the primary key. \
236 Here column \"{}\" is defined as part of the primary key.",
237 column_name
238 ))
239 .into());
240 }
241
242 Ok(())
243}
244
245pub fn bind_sql_column_constraints(
248 session: &SessionImpl,
249 table_name: String,
250 column_catalogs: &mut [ColumnCatalog],
251 columns: &[ColumnDef],
252 pk_column_ids: &[ColumnId],
253) -> Result<()> {
254 let generated_column_names = {
255 let mut names = vec![];
256 for column in columns {
257 for option_def in &column.options {
258 if let ColumnOption::GeneratedColumns(_) = option_def.option {
259 names.push(column.name.real_value());
260 break;
261 }
262 }
263 }
264 names
265 };
266
267 let mut binder = Binder::new_for_ddl(session);
268 binder.bind_columns_to_context(table_name, column_catalogs)?;
269
270 for column in columns {
271 let Some(idx) = column_catalogs
272 .iter()
273 .position(|c| c.name() == column.name.real_value())
274 else {
275 continue;
278 };
279
280 for option_def in &column.options {
281 match &option_def.option {
282 ColumnOption::GeneratedColumns(expr) => {
283 binder.set_clause(Some(Clause::GeneratedColumn));
284
285 let expr_impl = binder.bind_expr(expr).with_context(|| {
286 format!(
287 "fail to bind expression in generated column \"{}\"",
288 column.name.real_value()
289 )
290 })?;
291
292 check_generated_column_constraints(
293 &column.name.real_value(),
294 column_catalogs[idx].column_id(),
295 &expr_impl,
296 column_catalogs,
297 &generated_column_names,
298 pk_column_ids,
299 )?;
300
301 column_catalogs[idx].column_desc.generated_or_default_column = Some(
302 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
303 expr: Some(expr_impl.to_expr_proto()),
304 }),
305 );
306 binder.set_clause(None);
307 }
308 ColumnOption::DefaultValue(expr) => {
309 let expr_impl = binder
310 .bind_expr(expr)?
311 .cast_assign(column_catalogs[idx].data_type())?;
312
313 let rewritten_expr_impl = session
321 .pinned_snapshot()
322 .inline_now_proc_time()
323 .rewrite_expr(expr_impl.clone());
324
325 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
326 let snapshot_value = snapshot_value?;
327
328 column_catalogs[idx].column_desc.generated_or_default_column =
329 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
330 snapshot_value: Some(snapshot_value.to_protobuf()),
331 expr: Some(expr_impl.to_expr_proto()),
332 }));
334 } else {
335 return Err(ErrorCode::BindError(format!(
336 "Default expression used in column `{}` cannot be evaluated. \
337 Use generated columns instead if you mean to reference other columns.",
338 column.name
339 ))
340 .into());
341 }
342 }
343 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
344 if persisted.is_empty() {
349 bail_bind_error!(
350 "DEFAULT INTERNAL is only used for internal purposes, \
351 please specify a concrete default value"
352 );
353 }
354
355 let desc = DefaultColumnDesc::decode(&**persisted)
356 .expect("failed to decode persisted `DefaultColumnDesc`");
357
358 column_catalogs[idx].column_desc.generated_or_default_column =
359 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
360 }
361 _ => {}
362 }
363 }
364 }
365 Ok(())
366}
367
368pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
370 let mut pk_column_names = vec![];
371
372 for constraint in table_constraints {
373 match constraint {
374 TableConstraint::Unique {
375 name: _,
376 columns,
377 is_primary: true,
378 } => {
379 if !pk_column_names.is_empty() {
380 return Err(multiple_pk_definition_err());
381 }
382 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
383 }
384 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
385 }
386 }
387 Ok(pk_column_names)
388}
389
390pub fn bind_sql_pk_names(
391 columns_defs: &[ColumnDef],
392 pk_names_from_table_constraints: Vec<String>,
393) -> Result<Vec<String>> {
394 let mut pk_column_names = pk_names_from_table_constraints;
395
396 for column in columns_defs {
397 for option_def in &column.options {
398 if let ColumnOption::Unique { is_primary: true } = option_def.option {
399 if !pk_column_names.is_empty() {
400 return Err(multiple_pk_definition_err());
401 }
402 pk_column_names.push(column.name.real_value());
403 };
404 }
405 }
406
407 Ok(pk_column_names)
408}
409
410fn multiple_pk_definition_err() -> RwError {
411 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
412}
413
414pub fn bind_pk_and_row_id_on_relation(
419 mut columns: Vec<ColumnCatalog>,
420 pk_names: Vec<String>,
421 must_need_pk: bool,
422) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
423 for c in &columns {
424 assert!(c.column_id() != ColumnId::placeholder());
425 }
426
427 let name_to_id = columns
429 .iter()
430 .map(|c| (c.name(), c.column_id()))
431 .collect::<HashMap<_, _>>();
432
433 let mut pk_column_ids: Vec<_> = pk_names
434 .iter()
435 .map(|name| {
436 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
437 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
438 })
439 })
440 .try_collect()?;
441
442 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
444
445 let row_id_index = need_row_id.then(|| {
446 let column = ColumnCatalog::row_id_column();
447 let index = columns.len();
448 pk_column_ids = vec![column.column_id()];
449 columns.push(column);
450 index
451 });
452
453 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
454 Err(ErrorCode::InvalidInputSyntax(format!(
455 "column \"{col}\" specified more than once"
456 )))?;
457 }
458
459 Ok((columns, pk_column_ids, row_id_index))
460}
461
462#[allow(clippy::too_many_arguments)]
465pub(crate) async fn gen_create_table_plan_with_source(
466 mut handler_args: HandlerArgs,
467 explain_options: ExplainOptions,
468 table_name: ObjectName,
469 column_defs: Vec<ColumnDef>,
470 wildcard_idx: Option<usize>,
471 constraints: Vec<TableConstraint>,
472 format_encode: FormatEncodeOptions,
473 source_watermarks: Vec<SourceWatermark>,
474 mut col_id_gen: ColumnIdGenerator,
475 include_column_options: IncludeOption,
476 props: CreateTableProps,
477 sql_column_strategy: SqlColumnStrategy,
478) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
479 if props.append_only
480 && format_encode.format != Format::Plain
481 && format_encode.format != Format::Native
482 {
483 return Err(ErrorCode::BindError(format!(
484 "Append only table does not support format {}.",
485 format_encode.format
486 ))
487 .into());
488 }
489
490 let session = &handler_args.session;
491 let (with_properties, refresh_mode) =
492 bind_connector_props(&handler_args, &format_encode, false)?;
493 if with_properties.is_shareable_cdc_connector() {
494 generated_columns_check_for_cdc_table(&column_defs)?;
495 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
496 } else if column_defs.iter().any(|col| {
497 col.options
498 .iter()
499 .any(|def| matches!(def.option, ColumnOption::NotNull))
500 }) {
501 notice_to_user(
503 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
504 );
505 }
506
507 let db_name: &str = &session.database();
508 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
509
510 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
512 session,
513 &format_encode,
514 Either::Left(&with_properties),
515 CreateSourceType::Table,
516 )
517 .await?;
518
519 let overwrite_options = OverwriteOptions::new(&mut handler_args);
520 let rate_limit = overwrite_options.source_rate_limit;
521 let source = bind_create_source_or_table_with_connector(
522 handler_args.clone(),
523 table_name,
524 format_encode,
525 with_properties,
526 &column_defs,
527 constraints,
528 wildcard_idx,
529 source_watermarks,
530 columns_from_resolve_source,
531 source_info,
532 include_column_options,
533 &mut col_id_gen,
534 CreateSourceType::Table,
535 rate_limit,
536 sql_column_strategy,
537 refresh_mode,
538 )
539 .await?;
540
541 let context = OptimizerContext::new(handler_args, explain_options);
542
543 let (plan, table) = gen_table_plan_with_source(
544 context.into(),
545 schema_name,
546 source.clone(),
547 col_id_gen.into_version(),
548 props,
549 )?;
550
551 Ok((plan, Some(source), table))
552}
553
554#[allow(clippy::too_many_arguments)]
557pub(crate) fn gen_create_table_plan(
558 context: OptimizerContext,
559 table_name: ObjectName,
560 column_defs: Vec<ColumnDef>,
561 constraints: Vec<TableConstraint>,
562 mut col_id_gen: ColumnIdGenerator,
563 source_watermarks: Vec<SourceWatermark>,
564 props: CreateTableProps,
565 is_for_replace_plan: bool,
566) -> Result<(PlanRef, TableCatalog)> {
567 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
568 for c in &mut columns {
569 col_id_gen.generate(c)?;
570 }
571
572 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
573 if !secret_refs.is_empty() || !connection_refs.is_empty() {
574 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
575 }
576
577 gen_create_table_plan_without_source(
578 context,
579 table_name,
580 columns,
581 column_defs,
582 constraints,
583 source_watermarks,
584 col_id_gen.into_version(),
585 props,
586 )
587}
588
589#[allow(clippy::too_many_arguments)]
590pub(crate) fn gen_create_table_plan_without_source(
591 context: OptimizerContext,
592 table_name: ObjectName,
593 columns: Vec<ColumnCatalog>,
594 column_defs: Vec<ColumnDef>,
595 constraints: Vec<TableConstraint>,
596 source_watermarks: Vec<SourceWatermark>,
597 version: TableVersion,
598 props: CreateTableProps,
599) -> Result<(PlanRef, TableCatalog)> {
600 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
602 let (mut columns, pk_column_ids, row_id_index) =
603 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
604
605 let watermark_descs = bind_source_watermark(
606 context.session_ctx(),
607 table_name.real_value(),
608 source_watermarks,
609 &columns,
610 )?;
611
612 bind_sql_column_constraints(
613 context.session_ctx(),
614 table_name.real_value(),
615 &mut columns,
616 &column_defs,
617 &pk_column_ids,
618 )?;
619 let session = context.session_ctx().clone();
620
621 let db_name = &session.database();
622 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
623
624 let info = CreateTableInfo {
625 columns,
626 pk_column_ids,
627 row_id_index,
628 watermark_descs,
629 source_catalog: None,
630 version,
631 };
632
633 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
634}
635
636fn gen_table_plan_with_source(
637 context: OptimizerContextRef,
638 schema_name: Option<String>,
639 source_catalog: SourceCatalog,
640 version: TableVersion,
641 props: CreateTableProps,
642) -> Result<(PlanRef, TableCatalog)> {
643 let table_name = source_catalog.name.clone();
644
645 let info = CreateTableInfo {
646 columns: source_catalog.columns.clone(),
647 pk_column_ids: source_catalog.pk_col_ids.clone(),
648 row_id_index: source_catalog.row_id_index,
649 watermark_descs: source_catalog.watermark_descs.clone(),
650 source_catalog: Some(source_catalog),
651 version,
652 };
653
654 gen_table_plan_inner(context, schema_name, table_name, info, props)
655}
656
657#[derive(Clone, Copy)]
659pub enum EitherOnConflict {
660 Ast(Option<OnConflict>),
661 Resolved(ConflictBehavior),
662}
663
664impl From<Option<OnConflict>> for EitherOnConflict {
665 fn from(v: Option<OnConflict>) -> Self {
666 Self::Ast(v)
667 }
668}
669
670impl From<ConflictBehavior> for EitherOnConflict {
671 fn from(v: ConflictBehavior) -> Self {
672 Self::Resolved(v)
673 }
674}
675
676impl EitherOnConflict {
677 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
679 let conflict_behavior = match self {
680 EitherOnConflict::Ast(on_conflict) => {
681 if append_only {
682 if row_id_as_pk {
683 ConflictBehavior::NoCheck
685 } else {
686 if let Some(on_conflict) = on_conflict
688 && on_conflict != OnConflict::Nothing
689 {
690 return Err(ErrorCode::InvalidInputSyntax(
691 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
692 the ON CONFLICT behavior must be DO NOTHING."
693 .to_owned(),
694 )
695 .into());
696 }
697 ConflictBehavior::IgnoreConflict
698 }
699 } else {
700 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
702 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
703 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
704 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
705 }
706 }
707 }
708 EitherOnConflict::Resolved(b) => b,
709 };
710
711 Ok(conflict_behavior)
712 }
713}
714
715pub struct CreateTableInfo {
720 pub columns: Vec<ColumnCatalog>,
721 pub pk_column_ids: Vec<ColumnId>,
722 pub row_id_index: Option<usize>,
723 pub watermark_descs: Vec<WatermarkDesc>,
724 pub source_catalog: Option<SourceCatalog>,
725 pub version: TableVersion,
726}
727
728pub struct CreateTableProps {
733 pub definition: String,
734 pub append_only: bool,
735 pub on_conflict: EitherOnConflict,
736 pub with_version_columns: Vec<String>,
737 pub webhook_info: Option<PbWebhookSourceInfo>,
738 pub engine: Engine,
739}
740
741#[allow(clippy::too_many_arguments)]
742fn gen_table_plan_inner(
743 context: OptimizerContextRef,
744 schema_name: Option<String>,
745 table_name: String,
746 info: CreateTableInfo,
747 props: CreateTableProps,
748) -> Result<(PlanRef, TableCatalog)> {
749 let CreateTableInfo {
750 ref columns,
751 row_id_index,
752 ref watermark_descs,
753 ref source_catalog,
754 ..
755 } = info;
756 let CreateTableProps { append_only, .. } = props;
757
758 let (database_id, schema_id) = context
759 .session_ctx()
760 .get_database_and_schema_id_for_create(schema_name)?;
761
762 let session = context.session_ctx().clone();
763 let retention_seconds = context.with_options().retention_seconds();
764
765 let source_node: LogicalPlanRef = LogicalSource::new(
766 source_catalog.clone().map(Rc::new),
767 columns.clone(),
768 row_id_index,
769 SourceNodeKind::CreateTable,
770 context.clone(),
771 None,
772 )?
773 .into();
774
775 let required_cols = FixedBitSet::with_capacity(columns.len());
776 let plan_root = PlanRoot::new_with_logical_plan(
777 source_node,
778 RequiredDist::Any,
779 Order::any(),
780 required_cols,
781 vec![],
782 );
783
784 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
785
786 if !append_only && has_non_ttl_watermark {
787 return Err(ErrorCode::NotSupported(
788 "Defining watermarks on table requires the table to be append only.".to_owned(),
789 "Use the key words `APPEND ONLY`".to_owned(),
790 )
791 .into());
792 }
793
794 if !append_only && retention_seconds.is_some() {
795 if session
796 .config()
797 .unsafe_enable_storage_retention_for_non_append_only_tables()
798 {
799 tracing::warn!(
800 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
801 table_name
802 );
803 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
804 This may lead to stream inconsistency and unrecoverable \
805 node failure if there is any row INSERT/UPDATE/DELETE operation \
806 corresponding to the TTLed primary keys";
807 session.notice_to_user(NOTICE);
808 } else {
809 return Err(ErrorCode::NotSupported(
810 "Defining retention seconds on table requires the table to be append only."
811 .to_owned(),
812 "Use the key words `APPEND ONLY`".to_owned(),
813 )
814 .into());
815 }
816 }
817
818 let materialize =
819 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
820
821 let mut table = materialize.table().clone();
822 table.owner = session.user_id();
823
824 Ok((
825 ensure_sync_log_store_fragment_root(materialize.into()),
826 table,
827 ))
828}
829
830#[allow(clippy::too_many_arguments)]
834pub(crate) fn gen_create_table_plan_for_cdc_table(
835 context: OptimizerContextRef,
836 source: Arc<SourceCatalog>,
837 external_table_name: String,
838 column_defs: Vec<ColumnDef>,
839 source_watermarks: Vec<SourceWatermark>,
840 mut columns: Vec<ColumnCatalog>,
841 pk_names: Vec<String>,
842 cdc_with_options: WithOptionsSecResolved,
843 mut col_id_gen: ColumnIdGenerator,
844 on_conflict: Option<OnConflict>,
845 with_version_columns: Vec<String>,
846 include_column_options: IncludeOption,
847 table_name: ObjectName,
848 resolved_table_name: String, database_id: DatabaseId,
850 schema_id: SchemaId,
851 table_id: TableId,
852 engine: Engine,
853) -> Result<(PlanRef, TableCatalog)> {
854 let session = context.session_ctx().clone();
855
856 handle_addition_columns(
858 None,
859 &cdc_with_options,
860 include_column_options,
861 &mut columns,
862 true,
863 )?;
864
865 for c in &mut columns {
866 col_id_gen.generate(c)?;
867 }
868
869 let (mut columns, pk_column_ids, _row_id_index) =
870 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
871
872 let watermark_descs = bind_source_watermark(
873 context.session_ctx(),
874 table_name.real_value(),
875 source_watermarks,
876 &columns,
877 )?;
878
879 bind_sql_column_constraints(
881 context.session_ctx(),
882 table_name.real_value(),
883 &mut columns,
884 &column_defs,
885 &pk_column_ids,
886 )?;
887
888 let definition = context.normalized_sql().to_owned();
889
890 let pk_column_indices = {
891 let mut id_to_idx = HashMap::new();
892 columns.iter().enumerate().for_each(|(idx, c)| {
893 id_to_idx.insert(c.column_id(), idx);
894 });
895 pk_column_ids
897 .iter()
898 .map(|c| id_to_idx.get(c).copied().unwrap())
899 .collect_vec()
900 };
901 let table_pk = pk_column_indices
902 .iter()
903 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
904 .collect();
905
906 let (options, secret_refs) = cdc_with_options.into_parts();
907
908 let non_generated_column_descs = columns
909 .iter()
910 .filter(|&c| !c.is_generated())
911 .map(|c| c.column_desc.clone())
912 .collect_vec();
913 let non_generated_column_num = non_generated_column_descs.len();
914 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
915 let cdc_table_desc = CdcTableDesc {
916 table_id,
917 source_id: source.id, external_table_name: external_table_name.clone(),
919 pk: table_pk,
920 columns: non_generated_column_descs,
921 stream_key: pk_column_indices,
922 connect_properties: options,
923 secret_refs,
924 };
925
926 tracing::debug!(?cdc_table_desc, "create cdc table");
927 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
928
929 let logical_scan = LogicalCdcScan::create(
930 external_table_name.clone(),
931 Rc::new(cdc_table_desc),
932 context.clone(),
933 options,
934 );
935
936 let scan_node: LogicalPlanRef = logical_scan.into();
937 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
938 let plan_root = PlanRoot::new_with_logical_plan(
939 scan_node,
940 RequiredDist::Any,
941 Order::any(),
942 required_cols,
943 vec![],
944 );
945
946 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
947 let materialize = plan_root.gen_table_plan(
948 context,
949 resolved_table_name,
950 database_id,
951 schema_id,
952 CreateTableInfo {
953 columns,
954 pk_column_ids,
955 row_id_index: None,
956 watermark_descs,
957 source_catalog: Some((*source).clone()),
958 version: col_id_gen.into_version(),
959 },
960 CreateTableProps {
961 definition,
962 append_only: false,
963 on_conflict: on_conflict.into(),
964 with_version_columns,
965 webhook_info: None,
966 engine,
967 },
968 )?;
969
970 let mut table = materialize.table().clone();
971 table.owner = session.user_id();
972 table.cdc_table_id = Some(cdc_table_id);
973 table.cdc_table_type = Some(cdc_table_type);
974 Ok((
975 ensure_sync_log_store_fragment_root(materialize.into()),
976 table,
977 ))
978}
979
980fn derive_with_options_for_cdc_table(
988 source_with_properties: &WithOptionsSecResolved,
989 external_table_name: String,
990) -> Result<(WithOptionsSecResolved, String)> {
991 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
992 let source_database_name: &str = source_with_properties
994 .get("database.name")
995 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
996 .as_str();
997 let mut with_options = source_with_properties.clone();
998 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
999 match connector.as_str() {
1000 MYSQL_CDC_CONNECTOR => {
1001 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
1004 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
1005 })?;
1006 if !source_database_name
1008 .split(',')
1009 .map(|s| s.trim())
1010 .any(|name| name == db_name)
1011 {
1012 return Err(anyhow!(
1013 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1014 db_name,
1015 source_database_name
1016 ).into());
1017 }
1018 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1019 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1020 return Ok((with_options, external_table_name));
1022 }
1023 POSTGRES_CDC_CONNECTOR => {
1024 let (schema_name, table_name) = external_table_name
1025 .split_once('.')
1026 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1027
1028 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1030 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1031 return Ok((with_options, external_table_name));
1033 }
1034 SQL_SERVER_CDC_CONNECTOR => {
1035 let parts: Vec<&str> = external_table_name.split('.').collect();
1043 let (schema_name, table_name) = match parts.len() {
1044 3 => {
1045 let db_name = parts[0];
1048 let schema_name = parts[1];
1049 let table_name = parts[2];
1050
1051 if db_name != source_database_name {
1052 return Err(anyhow!(
1053 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1054 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1055 db_name,
1056 source_database_name
1057 ).into());
1058 }
1059 (schema_name, table_name)
1060 }
1061 2 => {
1062 let schema_name = parts[0];
1065 let table_name = parts[1];
1066 (schema_name, table_name)
1067 }
1068 1 => {
1069 return Err(anyhow!(
1072 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1073 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1074 external_table_name,
1075 external_table_name,
1076 source_database_name,
1077 external_table_name
1078 ).into());
1079 }
1080 _ => {
1081 return Err(anyhow!(
1083 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1084 external_table_name
1085 ).into());
1086 }
1087 };
1088
1089 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1091 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1092
1093 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1096 return Ok((with_options, normalized_external_table_name));
1097 }
1098 _ => {
1099 return Err(RwError::from(anyhow!(
1100 "connector {} is not supported for cdc table",
1101 connector
1102 )));
1103 }
1104 };
1105 }
1106 unreachable!("All valid CDC connectors should have returned by now")
1107}
1108
1109#[allow(clippy::too_many_arguments)]
1110pub(super) async fn handle_create_table_plan(
1111 handler_args: HandlerArgs,
1112 explain_options: ExplainOptions,
1113 format_encode: Option<FormatEncodeOptions>,
1114 cdc_table_info: Option<CdcTableInfo>,
1115 table_name: &ObjectName,
1116 column_defs: Vec<ColumnDef>,
1117 wildcard_idx: Option<usize>,
1118 constraints: Vec<TableConstraint>,
1119 source_watermarks: Vec<SourceWatermark>,
1120 append_only: bool,
1121 on_conflict: Option<OnConflict>,
1122 with_version_columns: Vec<String>,
1123 include_column_options: IncludeOption,
1124 webhook_info: Option<WebhookSourceInfo>,
1125 engine: Engine,
1126) -> Result<(
1127 PlanRef,
1128 Option<SourceCatalog>,
1129 TableCatalog,
1130 TableJobType,
1131 Option<SourceId>,
1132)> {
1133 let col_id_gen = ColumnIdGenerator::new_initial();
1134 let format_encode = check_create_table_with_source(
1135 &handler_args.with_options,
1136 format_encode,
1137 &include_column_options,
1138 &cdc_table_info,
1139 )?;
1140 let webhook_info = webhook_info
1141 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1142 .transpose()?;
1143
1144 let props = CreateTableProps {
1145 definition: handler_args.normalized_sql.clone(),
1146 append_only,
1147 on_conflict: on_conflict.into(),
1148 with_version_columns: with_version_columns.clone(),
1149 webhook_info,
1150 engine,
1151 };
1152
1153 let ((plan, source, table), job_type, shared_shource_id) = match (
1154 format_encode,
1155 cdc_table_info.as_ref(),
1156 ) {
1157 (Some(format_encode), None) => (
1158 gen_create_table_plan_with_source(
1159 handler_args,
1160 explain_options,
1161 table_name.clone(),
1162 column_defs,
1163 wildcard_idx,
1164 constraints,
1165 format_encode,
1166 source_watermarks,
1167 col_id_gen,
1168 include_column_options,
1169 props,
1170 SqlColumnStrategy::FollowChecked,
1171 )
1172 .await?,
1173 TableJobType::General,
1174 None,
1175 ),
1176 (None, None) => {
1177 let context = OptimizerContext::new(handler_args, explain_options);
1178 let (plan, table) = gen_create_table_plan(
1179 context,
1180 table_name.clone(),
1181 column_defs,
1182 constraints,
1183 col_id_gen,
1184 source_watermarks,
1185 props,
1186 false,
1187 )?;
1188
1189 ((plan, None, table), TableJobType::General, None)
1190 }
1191
1192 (None, Some(cdc_table)) => {
1193 sanity_check_for_table_on_cdc_source(
1194 append_only,
1195 &column_defs,
1196 &wildcard_idx,
1197 &constraints,
1198 &source_watermarks,
1199 )?;
1200
1201 generated_columns_check_for_cdc_table(&column_defs)?;
1202 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1203
1204 let session = &handler_args.session;
1205 let db_name = &session.database();
1206 let user_name = &session.user_name();
1207 let search_path = session.config().search_path();
1208 let (schema_name, resolved_table_name) =
1209 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1210 let (database_id, schema_id) =
1211 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1212
1213 let (source_schema, source_name) =
1215 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1216
1217 let source = {
1218 let catalog_reader = session.env().catalog_reader().read_guard();
1219 let schema_path =
1220 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1221
1222 let (source, _) = catalog_reader.get_source_by_name(
1223 db_name,
1224 schema_path,
1225 source_name.as_str(),
1226 )?;
1227 source.clone()
1228 };
1229 let (cdc_with_options, normalized_external_table_name) =
1230 derive_with_options_for_cdc_table(
1231 &source.with_properties,
1232 cdc_table.external_table_name.clone(),
1233 )?;
1234
1235 let (columns, pk_names) = match wildcard_idx {
1236 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1237 None => {
1238 for column_def in &column_defs {
1239 for option_def in &column_def.options {
1240 if let ColumnOption::DefaultValue(_)
1241 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1242 {
1243 return Err(ErrorCode::NotSupported(
1244 "Default value for columns defined on the table created from a CDC source".into(),
1245 "Remove the default value expression in the column definitions".into(),
1246 )
1247 .into());
1248 }
1249 }
1250 }
1251
1252 let (columns, pk_names) =
1253 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1254 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1256 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1257 .context("failed to extract external table config")?;
1258
1259 (columns, pk_names)
1260 }
1261 };
1262
1263 let context: OptimizerContextRef =
1264 OptimizerContext::new(handler_args, explain_options).into();
1265 let shared_source_id = source.id;
1266 let (plan, table) = gen_create_table_plan_for_cdc_table(
1267 context,
1268 source,
1269 normalized_external_table_name,
1270 column_defs,
1271 source_watermarks,
1272 columns,
1273 pk_names,
1274 cdc_with_options,
1275 col_id_gen,
1276 on_conflict,
1277 with_version_columns,
1278 include_column_options,
1279 table_name.clone(),
1280 resolved_table_name,
1281 database_id,
1282 schema_id,
1283 TableId::placeholder(),
1284 engine,
1285 )?;
1286
1287 (
1288 (plan, None, table),
1289 TableJobType::SharedCdcSource,
1290 Some(shared_source_id),
1291 )
1292 }
1293 (Some(_), Some(_)) => {
1294 return Err(ErrorCode::NotSupported(
1295 "Data format and encoding format doesn't apply to table created from a CDC source"
1296 .into(),
1297 "Remove the FORMAT and ENCODE specification".into(),
1298 )
1299 .into());
1300 }
1301 };
1302 Ok((plan, source, table, job_type, shared_shource_id))
1303}
1304
1305fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1307 let mut found_generated_column = false;
1308 for column in columns {
1309 let mut is_generated = false;
1310
1311 for option_def in &column.options {
1312 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1313 is_generated = true;
1314 break;
1315 }
1316 }
1317
1318 if is_generated {
1319 found_generated_column = true;
1320 } else if found_generated_column {
1321 return Err(ErrorCode::NotSupported(
1322 "Non-generated column found after a generated column.".into(),
1323 "Ensure that all generated columns appear at the end of the cdc table definition."
1324 .into(),
1325 )
1326 .into());
1327 }
1328 }
1329 Ok(())
1330}
1331
1332fn not_null_check_for_cdc_table(
1334 wildcard_idx: &Option<usize>,
1335 column_defs: &Vec<ColumnDef>,
1336) -> Result<()> {
1337 if !wildcard_idx.is_some()
1338 && column_defs.iter().any(|col| {
1339 col.options
1340 .iter()
1341 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1342 })
1343 {
1344 return Err(ErrorCode::NotSupported(
1345 "CDC table with NOT NULL constraint is not supported".to_owned(),
1346 "Please remove the NOT NULL constraint for columns".to_owned(),
1347 )
1348 .into());
1349 }
1350 Ok(())
1351}
1352
1353fn sanity_check_for_table_on_cdc_source(
1355 append_only: bool,
1356 column_defs: &Vec<ColumnDef>,
1357 wildcard_idx: &Option<usize>,
1358 constraints: &Vec<TableConstraint>,
1359 source_watermarks: &Vec<SourceWatermark>,
1360) -> Result<()> {
1361 if wildcard_idx.is_some() && !column_defs.is_empty() {
1363 return Err(ErrorCode::NotSupported(
1364 "wildcard(*) and column definitions cannot be used together".to_owned(),
1365 "Remove the wildcard or column definitions".to_owned(),
1366 )
1367 .into());
1368 }
1369
1370 if !wildcard_idx.is_some()
1372 && !constraints.iter().any(|c| {
1373 matches!(
1374 c,
1375 TableConstraint::Unique {
1376 is_primary: true,
1377 ..
1378 }
1379 )
1380 })
1381 && !column_defs.iter().any(|col| {
1382 col.options
1383 .iter()
1384 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1385 })
1386 {
1387 return Err(ErrorCode::NotSupported(
1388 "CDC table without primary key constraint is not supported".to_owned(),
1389 "Please define a primary key".to_owned(),
1390 )
1391 .into());
1392 }
1393
1394 if append_only {
1395 return Err(ErrorCode::NotSupported(
1396 "append only modifier on the table created from a CDC source".into(),
1397 "Remove the APPEND ONLY clause".into(),
1398 )
1399 .into());
1400 }
1401
1402 if !source_watermarks.is_empty()
1403 && source_watermarks
1404 .iter()
1405 .any(|watermark| !watermark.with_ttl)
1406 {
1407 return Err(ErrorCode::NotSupported(
1408 "non-TTL watermark defined on the table created from a CDC source".into(),
1409 "Use `WATERMARK ... WITH TTL` instead.".into(),
1410 )
1411 .into());
1412 }
1413
1414 Ok(())
1415}
1416
1417async fn bind_cdc_table_schema_externally(
1419 cdc_with_options: WithOptionsSecResolved,
1420) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1421 let (options, secret_refs) = cdc_with_options.into_parts();
1423 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1424 .context("failed to extract external table config")?;
1425
1426 let table = ExternalTableImpl::connect(config)
1427 .await
1428 .context("failed to auto derive table schema")?;
1429
1430 Ok((
1431 table
1432 .column_descs()
1433 .iter()
1434 .cloned()
1435 .map(|column_desc| ColumnCatalog {
1436 column_desc,
1437 is_hidden: false,
1438 })
1439 .collect(),
1440 table.pk_names().clone(),
1441 ))
1442}
1443
1444fn bind_cdc_table_schema(
1446 column_defs: &Vec<ColumnDef>,
1447 constraints: &Vec<TableConstraint>,
1448 is_for_replace_plan: bool,
1449) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1450 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1451
1452 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1453 Ok((columns, pk_names))
1454}
1455
1456#[allow(clippy::too_many_arguments)]
1457pub async fn handle_create_table(
1458 handler_args: HandlerArgs,
1459 table_name: ObjectName,
1460 column_defs: Vec<ColumnDef>,
1461 wildcard_idx: Option<usize>,
1462 constraints: Vec<TableConstraint>,
1463 if_not_exists: bool,
1464 format_encode: Option<FormatEncodeOptions>,
1465 source_watermarks: Vec<SourceWatermark>,
1466 append_only: bool,
1467 on_conflict: Option<OnConflict>,
1468 with_version_columns: Vec<String>,
1469 cdc_table_info: Option<CdcTableInfo>,
1470 include_column_options: IncludeOption,
1471 webhook_info: Option<WebhookSourceInfo>,
1472 ast_engine: risingwave_sqlparser::ast::Engine,
1473) -> Result<RwPgResponse> {
1474 let session = handler_args.session.clone();
1475
1476 if append_only {
1477 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1478 }
1479
1480 session.check_cluster_limits().await?;
1481
1482 let engine = match ast_engine {
1483 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1484 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1485 };
1486
1487 if let Either::Right(resp) = session.check_relation_name_duplicated(
1488 table_name.clone(),
1489 StatementType::CREATE_TABLE,
1490 if_not_exists,
1491 )? {
1492 return Ok(resp);
1493 }
1494
1495 let (graph, source, hummock_table, job_type, shared_source_id) = {
1496 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1497 handler_args.clone(),
1498 ExplainOptions::default(),
1499 format_encode,
1500 cdc_table_info,
1501 &table_name,
1502 column_defs.clone(),
1503 wildcard_idx,
1504 constraints.clone(),
1505 source_watermarks,
1506 append_only,
1507 on_conflict,
1508 with_version_columns,
1509 include_column_options,
1510 webhook_info,
1511 engine,
1512 )
1513 .await?;
1514 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1515
1516 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1517
1518 (graph, source, table, job_type, shared_source_id)
1519 };
1520
1521 tracing::trace!(
1522 "name={}, graph=\n{}",
1523 table_name,
1524 serde_json::to_string_pretty(&graph).unwrap()
1525 );
1526
1527 let dependencies = shared_source_id
1528 .map(|id| HashSet::from([id.as_object_id()]))
1529 .unwrap_or_default();
1530
1531 match engine {
1533 Engine::Hummock => {
1534 let catalog_writer = session.catalog_writer()?;
1535 let action = match job_type {
1536 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1537 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1538 };
1539 execute_with_long_running_notification(
1540 catalog_writer.create_table(
1541 source.map(|s| s.to_prost()),
1542 hummock_table.to_prost(),
1543 graph,
1544 job_type,
1545 if_not_exists,
1546 dependencies,
1547 ),
1548 &session,
1549 "CREATE TABLE",
1550 action,
1551 )
1552 .await?;
1553 }
1554 Engine::Iceberg => {
1555 let hummock_table_name = hummock_table.name.clone();
1556 session.create_staging_table(hummock_table.clone());
1557 let res = Box::pin(create_iceberg_engine_table(
1558 session.clone(),
1559 handler_args,
1560 source.map(|s| s.to_prost()),
1561 hummock_table,
1562 graph,
1563 table_name,
1564 job_type,
1565 if_not_exists,
1566 ))
1567 .await;
1568 session.drop_staging_table(&hummock_table_name);
1569 res?
1570 }
1571 }
1572
1573 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1574}
1575
1576#[allow(clippy::too_many_arguments)]
1585pub async fn create_iceberg_engine_table(
1586 session: Arc<SessionImpl>,
1587 handler_args: HandlerArgs,
1588 mut source: Option<PbSource>,
1589 table: TableCatalog,
1590 graph: StreamFragmentGraph,
1591 table_name: ObjectName,
1592 job_type: PbTableJobType,
1593 if_not_exists: bool,
1594) -> Result<()> {
1595 let rw_db_name = session
1596 .env()
1597 .catalog_reader()
1598 .read_guard()
1599 .get_database_by_id(table.database_id)?
1600 .name()
1601 .to_owned();
1602 let rw_schema_name = session
1603 .env()
1604 .catalog_reader()
1605 .read_guard()
1606 .get_schema_by_id(table.database_id, table.schema_id)?
1607 .name()
1608 .clone();
1609 let iceberg_catalog_name = rw_db_name.clone();
1610 let iceberg_database_name = rw_schema_name.clone();
1611 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1612
1613 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1614 let sink_decouple = session.config().sink_decouple();
1615 if matches!(sink_decouple, SinkDecouple::Disable) {
1616 bail!(
1617 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1618 );
1619 }
1620
1621 let mut connection_ref = BTreeMap::new();
1622 let with_common = if iceberg_engine_connection.is_empty() {
1623 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1624 } else {
1625 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1626 assert_eq!(parts.len(), 2);
1627 let connection_catalog =
1628 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1629 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1630 if params.connection_type == ConnectionType::Iceberg as i32 {
1631 connection_ref.insert(
1633 "connection".to_owned(),
1634 ConnectionRefValue {
1635 connection_name: ObjectName::from(vec![
1636 Ident::from(parts[0]),
1637 Ident::from(parts[1]),
1638 ]),
1639 },
1640 );
1641
1642 let mut with_common = BTreeMap::new();
1643 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1644 with_common.insert("database.name".to_owned(), iceberg_database_name);
1645 with_common.insert("table.name".to_owned(), iceberg_table_name);
1646
1647 let hosted_catalog = params
1648 .properties
1649 .get("hosted_catalog")
1650 .map(|s| s.eq_ignore_ascii_case("true"))
1651 .unwrap_or(false);
1652 if hosted_catalog {
1653 let meta_client = session.env().meta_client();
1654 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1655
1656 let meta_store_endpoint =
1657 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1658 ErrorCode::InternalError(
1659 "failed to parse the meta store endpoint".to_owned(),
1660 )
1661 })?;
1662 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1663 let meta_store_user = meta_store_endpoint.username().to_owned();
1664 let meta_store_password = match meta_store_endpoint.password() {
1665 Some(password) => percent_decode_str(password)
1666 .decode_utf8()
1667 .map_err(|_| {
1668 ErrorCode::InternalError(
1669 "failed to parse password from meta store endpoint".to_owned(),
1670 )
1671 })?
1672 .into_owned(),
1673 None => "".to_owned(),
1674 };
1675 let meta_store_host = meta_store_endpoint
1676 .host_str()
1677 .ok_or_else(|| {
1678 ErrorCode::InternalError(
1679 "failed to parse host from meta store endpoint".to_owned(),
1680 )
1681 })?
1682 .to_owned();
1683 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1684 ErrorCode::InternalError(
1685 "failed to parse port from meta store endpoint".to_owned(),
1686 )
1687 })?;
1688 let meta_store_database = meta_store_endpoint
1689 .path()
1690 .trim_start_matches('/')
1691 .to_owned();
1692
1693 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1694 bail!("failed to parse meta backend: {}", meta_store_backend);
1695 };
1696
1697 let catalog_uri = match meta_backend {
1698 MetaBackend::Postgres => {
1699 format!(
1700 "jdbc:postgresql://{}:{}/{}",
1701 meta_store_host, meta_store_port, meta_store_database
1702 )
1703 }
1704 MetaBackend::Mysql => {
1705 format!(
1706 "jdbc:mysql://{}:{}/{}",
1707 meta_store_host, meta_store_port, meta_store_database
1708 )
1709 }
1710 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1711 bail!(
1712 "Unsupported meta backend for iceberg engine table: {}",
1713 meta_store_backend
1714 );
1715 }
1716 };
1717
1718 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1719 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1720 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1721 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1722 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1723 }
1724
1725 with_common
1726 } else {
1727 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1728 "Only iceberg connection could be used in iceberg engine".to_owned(),
1729 )));
1730 }
1731 } else {
1732 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1733 "Private Link Service has been deprecated. Please create a new connection instead."
1734 .to_owned(),
1735 )));
1736 }
1737 };
1738
1739 let mut pks = table
1742 .pk_column_names()
1743 .iter()
1744 .map(|c| c.to_string())
1745 .collect::<Vec<String>>();
1746
1747 if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1749 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1750 }
1751
1752 let sink_from = CreateSink::From(table_name.clone());
1753
1754 let mut sink_name = table_name.clone();
1755 *sink_name.0.last_mut().unwrap() = Ident::from(
1756 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1757 );
1758 let create_sink_stmt = CreateSinkStatement {
1759 if_not_exists: false,
1760 sink_name,
1761 with_properties: WithProperties(vec![]),
1762 sink_from,
1763 columns: vec![],
1764 emit_mode: None,
1765 sink_schema: None,
1766 into_table_name: None,
1767 };
1768
1769 let mut sink_handler_args = handler_args.clone();
1770
1771 let mut sink_with = with_common.clone();
1772
1773 if !handler_args
1775 .with_options
1776 .get(ENABLE_COMPACTION)
1777 .is_some_and(|val| val.eq_ignore_ascii_case("true"))
1778 {
1779 sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1780 }
1781
1782 if table.append_only {
1783 sink_with.insert("type".to_owned(), "append-only".to_owned());
1784 } else {
1785 sink_with.insert("primary_key".to_owned(), pks.join(","));
1786 sink_with.insert("type".to_owned(), "upsert".to_owned());
1787 }
1788 let commit_checkpoint_interval = handler_args
1807 .with_options
1808 .get(COMMIT_CHECKPOINT_INTERVAL)
1809 .map(|v| v.to_owned())
1810 .unwrap_or_else(|| "60".to_owned());
1811 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1812 ErrorCode::InvalidInputSyntax(format!(
1813 "commit_checkpoint_interval must be greater than 0: {}",
1814 commit_checkpoint_interval
1815 ))
1816 })?;
1817
1818 if commit_checkpoint_interval == 0 {
1819 bail!("commit_checkpoint_interval must be greater than 0");
1820 }
1821
1822 source
1824 .as_mut()
1825 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1826
1827 let sink_decouple = session.config().sink_decouple();
1828 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1829 bail!(
1830 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1831 )
1832 }
1833
1834 sink_with.insert(
1835 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1836 commit_checkpoint_interval.to_string(),
1837 );
1838 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1839
1840 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1841
1842 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1843 match enable_compaction.to_lowercase().as_str() {
1844 "true" => {
1845 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1846 }
1847 "false" => {
1848 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1849 }
1850 _ => {
1851 return Err(ErrorCode::InvalidInputSyntax(format!(
1852 "enable_compaction must be true or false: {}",
1853 enable_compaction
1854 ))
1855 .into());
1856 }
1857 }
1858
1859 source
1861 .as_mut()
1862 .map(|x| x.with_properties.remove("enable_compaction"));
1863 } else {
1864 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1865 }
1866
1867 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1868 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1869 ErrorCode::InvalidInputSyntax(format!(
1870 "compaction_interval_sec must be greater than 0: {}",
1871 commit_checkpoint_interval
1872 ))
1873 })?;
1874 if compaction_interval_sec == 0 {
1875 bail!("compaction_interval_sec must be greater than 0");
1876 }
1877 sink_with.insert(
1878 "compaction_interval_sec".to_owned(),
1879 compaction_interval_sec.to_string(),
1880 );
1881 source
1883 .as_mut()
1884 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1885 }
1886
1887 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1888 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1889 {
1890 source
1892 .as_mut()
1893 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1894 match enable_snapshot_expiration.to_lowercase().as_str() {
1895 "true" => {
1896 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1897 true
1898 }
1899 "false" => {
1900 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1901 false
1902 }
1903 _ => {
1904 return Err(ErrorCode::InvalidInputSyntax(format!(
1905 "enable_snapshot_expiration must be true or false: {}",
1906 enable_snapshot_expiration
1907 ))
1908 .into());
1909 }
1910 }
1911 } else {
1912 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1913 true
1914 };
1915
1916 if has_enabled_snapshot_expiration {
1917 if let Some(snapshot_expiration_retain_last) = handler_args
1919 .with_options
1920 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1921 {
1922 sink_with.insert(
1923 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1924 snapshot_expiration_retain_last.to_owned(),
1925 );
1926 source
1928 .as_mut()
1929 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1930 }
1931
1932 if let Some(snapshot_expiration_max_age) = handler_args
1933 .with_options
1934 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1935 {
1936 sink_with.insert(
1937 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1938 snapshot_expiration_max_age.to_owned(),
1939 );
1940 source
1942 .as_mut()
1943 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1944 }
1945
1946 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1947 .with_options
1948 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1949 {
1950 sink_with.insert(
1951 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1952 snapshot_expiration_clear_expired_files.to_owned(),
1953 );
1954 source.as_mut().map(|x| {
1956 x.with_properties
1957 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1958 });
1959 }
1960
1961 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1962 .with_options
1963 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1964 {
1965 sink_with.insert(
1966 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1967 snapshot_expiration_clear_expired_meta_data.to_owned(),
1968 );
1969 source.as_mut().map(|x| {
1971 x.with_properties
1972 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1973 });
1974 }
1975 }
1976
1977 if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1978 let format_version = format_version.parse::<u8>().map_err(|_| {
1979 ErrorCode::InvalidInputSyntax(format!(
1980 "format_version must be 1, 2 or 3: {}",
1981 format_version
1982 ))
1983 })?;
1984 if format_version != 1 && format_version != 2 && format_version != 3 {
1985 bail!("format_version must be 1, 2 or 3");
1986 }
1987 sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
1988
1989 source
1991 .as_mut()
1992 .map(|x| x.with_properties.remove(FORMAT_VERSION));
1993 }
1994
1995 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1996 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1997 ErrorCode::InvalidInputSyntax(format!(
1998 "invalid write_mode: {}, must be one of: {}, {}",
1999 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
2000 ))
2001 })?;
2002
2003 match write_mode {
2004 IcebergWriteMode::MergeOnRead => {
2005 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2006 }
2007
2008 IcebergWriteMode::CopyOnWrite => {
2009 if table.append_only {
2010 return Err(ErrorCode::NotSupported(
2011 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2012 "Please use MERGE ON READ instead".to_owned(),
2013 )
2014 .into());
2015 }
2016
2017 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2018 }
2019 }
2020
2021 source
2023 .as_mut()
2024 .map(|x| x.with_properties.remove("write_mode"));
2025 } else {
2026 sink_with.insert(
2027 WRITE_MODE.to_owned(),
2028 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2029 );
2030 }
2031
2032 if let Some(max_snapshots_num_before_compaction) =
2033 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2034 {
2035 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2036 .parse::<u32>()
2037 .map_err(|_| {
2038 ErrorCode::InvalidInputSyntax(format!(
2039 "{} must be greater than 0: {}",
2040 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2041 ))
2042 })?;
2043
2044 if max_snapshots_num_before_compaction == 0 {
2045 bail!(format!(
2046 "{} must be greater than 0",
2047 COMPACTION_MAX_SNAPSHOTS_NUM
2048 ));
2049 }
2050
2051 sink_with.insert(
2052 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2053 max_snapshots_num_before_compaction.to_string(),
2054 );
2055
2056 source
2058 .as_mut()
2059 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2060 }
2061
2062 if let Some(small_files_threshold_mb) = handler_args
2063 .with_options
2064 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2065 {
2066 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2067 ErrorCode::InvalidInputSyntax(format!(
2068 "{} must be greater than 0: {}",
2069 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2070 ))
2071 })?;
2072 if small_files_threshold_mb == 0 {
2073 bail!(format!(
2074 "{} must be a greater than 0",
2075 COMPACTION_SMALL_FILES_THRESHOLD_MB
2076 ));
2077 }
2078 sink_with.insert(
2079 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2080 small_files_threshold_mb.to_string(),
2081 );
2082
2083 source.as_mut().map(|x| {
2085 x.with_properties
2086 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2087 });
2088 }
2089
2090 if let Some(delete_files_count_threshold) = handler_args
2091 .with_options
2092 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2093 {
2094 let delete_files_count_threshold =
2095 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2096 ErrorCode::InvalidInputSyntax(format!(
2097 "{} must be greater than 0: {}",
2098 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2099 ))
2100 })?;
2101 if delete_files_count_threshold == 0 {
2102 bail!(format!(
2103 "{} must be greater than 0",
2104 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2105 ));
2106 }
2107 sink_with.insert(
2108 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2109 delete_files_count_threshold.to_string(),
2110 );
2111
2112 source.as_mut().map(|x| {
2114 x.with_properties
2115 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2116 });
2117 }
2118
2119 if let Some(trigger_snapshot_count) = handler_args
2120 .with_options
2121 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2122 {
2123 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2124 ErrorCode::InvalidInputSyntax(format!(
2125 "{} must be greater than 0: {}",
2126 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2127 ))
2128 })?;
2129 if trigger_snapshot_count == 0 {
2130 bail!(format!(
2131 "{} must be greater than 0",
2132 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2133 ));
2134 }
2135 sink_with.insert(
2136 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2137 trigger_snapshot_count.to_string(),
2138 );
2139
2140 source
2142 .as_mut()
2143 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2144 }
2145
2146 if let Some(target_file_size_mb) = handler_args
2147 .with_options
2148 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2149 {
2150 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2151 ErrorCode::InvalidInputSyntax(format!(
2152 "{} must be greater than 0: {}",
2153 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2154 ))
2155 })?;
2156 if target_file_size_mb == 0 {
2157 bail!(format!(
2158 "{} must be greater than 0",
2159 COMPACTION_TARGET_FILE_SIZE_MB
2160 ));
2161 }
2162 sink_with.insert(
2163 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2164 target_file_size_mb.to_string(),
2165 );
2166 source
2168 .as_mut()
2169 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2170 }
2171
2172 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2173 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2174 ErrorCode::InvalidInputSyntax(format!(
2175 "invalid compaction_type: {}, must be one of {:?}",
2176 compaction_type,
2177 &[
2178 CompactionType::Full,
2179 CompactionType::SmallFiles,
2180 CompactionType::FilesWithDelete
2181 ]
2182 ))
2183 })?;
2184
2185 sink_with.insert(
2186 COMPACTION_TYPE.to_owned(),
2187 compaction_type.as_str().to_owned(),
2188 );
2189
2190 source
2192 .as_mut()
2193 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2194 }
2195
2196 if let Some(write_parquet_compression) = handler_args
2197 .with_options
2198 .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2199 {
2200 sink_with.insert(
2201 COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2202 write_parquet_compression.to_owned(),
2203 );
2204 source.as_mut().map(|x| {
2206 x.with_properties
2207 .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2208 });
2209 }
2210
2211 if let Some(write_parquet_max_row_group_rows) = handler_args
2212 .with_options
2213 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2214 {
2215 let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2216 .parse::<usize>()
2217 .map_err(|_| {
2218 ErrorCode::InvalidInputSyntax(format!(
2219 "{} must be a positive integer: {}",
2220 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2221 ))
2222 })?;
2223 if write_parquet_max_row_group_rows == 0 {
2224 bail!(format!(
2225 "{} must be greater than 0",
2226 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2227 ));
2228 }
2229 sink_with.insert(
2230 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2231 write_parquet_max_row_group_rows.to_string(),
2232 );
2233 source.as_mut().map(|x| {
2235 x.with_properties
2236 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2237 });
2238 }
2239
2240 if let Some(write_parquet_max_row_group_bytes) = handler_args
2241 .with_options
2242 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2243 {
2244 let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2245 .parse::<usize>()
2246 .map_err(|_| {
2247 ErrorCode::InvalidInputSyntax(format!(
2248 "{} must be a positive integer: {}",
2249 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2250 ))
2251 })?;
2252 if write_parquet_max_row_group_bytes == 0 {
2253 bail!(format!(
2254 "{} must be greater than 0",
2255 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2256 ));
2257 }
2258 sink_with.insert(
2259 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2260 write_parquet_max_row_group_bytes.to_string(),
2261 );
2262 source.as_mut().map(|x| {
2263 x.with_properties
2264 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2265 });
2266 }
2267
2268 let partition_by = handler_args
2269 .with_options
2270 .get("partition_by")
2271 .map(|v| v.to_owned());
2272
2273 if let Some(partition_by) = &partition_by {
2274 let mut partition_columns = vec![];
2275 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2276 table
2277 .columns()
2278 .iter()
2279 .find(|col| col.name().eq_ignore_ascii_case(&column))
2280 .ok_or_else(|| {
2281 ErrorCode::InvalidInputSyntax(format!(
2282 "Partition source column does not exist in schema: {}",
2283 column
2284 ))
2285 })?;
2286
2287 partition_columns.push(column);
2288 }
2289
2290 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2291 |_| {
2292 ErrorCode::InvalidInputSyntax(
2293 "The partition columns should be the prefix of the primary key".to_owned(),
2294 )
2295 },
2296 )?;
2297
2298 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2299
2300 source
2302 .as_mut()
2303 .map(|x| x.with_properties.remove("partition_by"));
2304 }
2305
2306 let order_key = handler_args
2307 .with_options
2308 .get(ORDER_KEY)
2309 .map(|v| v.to_owned());
2310 if let Some(order_key) = &order_key {
2311 validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2312 .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2313
2314 sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2315
2316 source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2317 }
2318
2319 sink_handler_args.with_options =
2320 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2321 let SinkPlanContext {
2322 sink_plan,
2323 sink_catalog,
2324 ..
2325 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2326 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2327
2328 let mut source_name = table_name.clone();
2329 *source_name.0.last_mut().unwrap() = Ident::from(
2330 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2331 );
2332 let create_source_stmt = CreateSourceStatement {
2333 temporary: false,
2334 if_not_exists: false,
2335 columns: vec![],
2336 source_name,
2337 wildcard_idx: Some(0),
2338 constraints: vec![],
2339 with_properties: WithProperties(vec![]),
2340 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2341 source_watermarks: vec![],
2342 include_column_options: vec![],
2343 };
2344
2345 let mut source_handler_args = handler_args.clone();
2346 let source_with = with_common;
2347 source_handler_args.with_options =
2348 WithOptions::new(source_with, Default::default(), connection_ref);
2349
2350 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2351 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2352 let (with_properties, refresh_mode) =
2353 bind_connector_props(&source_handler_args, &format_encode, true)?;
2354
2355 let (iceberg_catalog, table_identifier) = {
2358 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2359 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2360 iceberg_sink.create_table_if_not_exists().await?;
2361
2362 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2363 let table_identifier = iceberg_sink.config.full_table_name()?;
2364 (iceberg_catalog, table_identifier)
2365 };
2366
2367 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2368 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2369 &session,
2370 &format_encode,
2371 Either::Left(&with_properties),
2372 create_source_type,
2373 )
2374 .await?;
2375 let mut col_id_gen = ColumnIdGenerator::new_initial();
2376
2377 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2378 source_handler_args,
2379 create_source_stmt.source_name,
2380 format_encode,
2381 with_properties,
2382 &create_source_stmt.columns,
2383 create_source_stmt.constraints,
2384 create_source_stmt.wildcard_idx,
2385 create_source_stmt.source_watermarks,
2386 columns_from_resolve_source,
2387 source_info,
2388 create_source_stmt.include_column_options,
2389 &mut col_id_gen,
2390 create_source_type,
2391 overwrite_options.source_rate_limit,
2392 SqlColumnStrategy::FollowChecked,
2393 refresh_mode,
2394 )
2395 .await?;
2396
2397 let _ = Jvm::get_or_init()?;
2400
2401 let catalog_writer = session.catalog_writer()?;
2402 let action = match job_type {
2403 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2404 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2405 };
2406 let res = execute_with_long_running_notification(
2407 catalog_writer.create_iceberg_table(
2408 PbTableJobInfo {
2409 source,
2410 table: Some(table.to_prost()),
2411 fragment_graph: Some(graph),
2412 job_type: job_type as _,
2413 },
2414 PbSinkJobInfo {
2415 sink: Some(sink_catalog.to_proto()),
2416 fragment_graph: Some(sink_graph),
2417 },
2418 iceberg_source_catalog.to_prost(),
2419 if_not_exists,
2420 ),
2421 &session,
2422 "CREATE TABLE",
2423 action,
2424 )
2425 .await;
2426
2427 if res.is_err() {
2428 let _ = iceberg_catalog
2429 .drop_table(&table_identifier)
2430 .await
2431 .inspect_err(|err| {
2432 tracing::error!(
2433 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2434 table_identifier,
2435 err.as_report()
2436 );
2437 });
2438 res?
2439 }
2440
2441 Ok(())
2442}
2443
2444pub fn check_create_table_with_source(
2445 with_options: &WithOptions,
2446 format_encode: Option<FormatEncodeOptions>,
2447 include_column_options: &IncludeOption,
2448 cdc_table_info: &Option<CdcTableInfo>,
2449) -> Result<Option<FormatEncodeOptions>> {
2450 if cdc_table_info.is_some() {
2452 return Ok(format_encode);
2453 }
2454 let defined_source = with_options.is_source_connector();
2455
2456 if !include_column_options.is_empty() && !defined_source {
2457 return Err(ErrorCode::InvalidInputSyntax(
2458 "INCLUDE should be used with a connector".to_owned(),
2459 )
2460 .into());
2461 }
2462 if defined_source {
2463 format_encode.as_ref().ok_or_else(|| {
2464 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2465 })?;
2466 }
2467 Ok(format_encode)
2468}
2469
2470fn ensure_partition_columns_are_prefix_of_primary_key(
2471 partition_columns: &[String],
2472 primary_key_columns: &[String],
2473) -> std::result::Result<(), String> {
2474 if partition_columns.len() > primary_key_columns.len() {
2475 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2476 }
2477
2478 for (i, partition_col) in partition_columns.iter().enumerate() {
2479 if primary_key_columns.get(i) != Some(partition_col) {
2480 return Err(format!(
2481 "Partition column '{}' is not a prefix of the primary key.",
2482 partition_col
2483 ));
2484 }
2485 }
2486
2487 Ok(())
2488}
2489
2490#[allow(clippy::too_many_arguments)]
2491pub async fn generate_stream_graph_for_replace_table(
2492 _session: &Arc<SessionImpl>,
2493 table_name: ObjectName,
2494 original_catalog: &Arc<TableCatalog>,
2495 handler_args: HandlerArgs,
2496 statement: Statement,
2497 col_id_gen: ColumnIdGenerator,
2498 sql_column_strategy: SqlColumnStrategy,
2499) -> Result<(
2500 StreamFragmentGraph,
2501 TableCatalog,
2502 Option<SourceCatalog>,
2503 TableJobType,
2504)> {
2505 let Statement::CreateTable {
2506 columns,
2507 constraints,
2508 source_watermarks,
2509 append_only,
2510 on_conflict,
2511 with_version_columns,
2512 wildcard_idx,
2513 cdc_table_info,
2514 format_encode,
2515 include_column_options,
2516 engine,
2517 with_options,
2518 ..
2519 } = statement
2520 else {
2521 panic!("unexpected statement type: {:?}", statement);
2522 };
2523
2524 let format_encode = format_encode
2525 .clone()
2526 .map(|format_encode| format_encode.into_v2_with_warning());
2527
2528 let engine = match engine {
2529 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2530 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2531 };
2532
2533 let is_drop_connector =
2534 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2535 if is_drop_connector {
2536 debug_assert!(
2537 source_watermarks.is_empty()
2538 && include_column_options.is_empty()
2539 && with_options
2540 .iter()
2541 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2542 );
2543 }
2544
2545 let props = CreateTableProps {
2546 definition: handler_args.normalized_sql.clone(),
2547 append_only,
2548 on_conflict: on_conflict.into(),
2549 with_version_columns: with_version_columns
2550 .iter()
2551 .map(|col| col.real_value())
2552 .collect(),
2553 webhook_info: original_catalog.webhook_info.clone(),
2554 engine,
2555 };
2556
2557 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2558 (Some(format_encode), None) => (
2559 gen_create_table_plan_with_source(
2560 handler_args,
2561 ExplainOptions::default(),
2562 table_name,
2563 columns,
2564 wildcard_idx,
2565 constraints,
2566 format_encode,
2567 source_watermarks,
2568 col_id_gen,
2569 include_column_options,
2570 props,
2571 sql_column_strategy,
2572 )
2573 .await?,
2574 TableJobType::General,
2575 ),
2576 (None, None) => {
2577 let context = OptimizerContext::from_handler_args(handler_args);
2578 let (plan, table) = gen_create_table_plan(
2579 context,
2580 table_name,
2581 columns,
2582 constraints,
2583 col_id_gen,
2584 source_watermarks,
2585 props,
2586 true,
2587 )?;
2588 ((plan, None, table), TableJobType::General)
2589 }
2590 (None, Some(cdc_table)) => {
2591 sanity_check_for_table_on_cdc_source(
2592 append_only,
2593 &columns,
2594 &wildcard_idx,
2595 &constraints,
2596 &source_watermarks,
2597 )?;
2598
2599 let session = &handler_args.session;
2600 let (source, resolved_table_name) =
2601 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2602
2603 let (cdc_with_options, normalized_external_table_name) =
2604 derive_with_options_for_cdc_table(
2605 &source.with_properties,
2606 cdc_table.external_table_name.clone(),
2607 )?;
2608
2609 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2610
2611 let context: OptimizerContextRef =
2612 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2613 let (plan, table) = gen_create_table_plan_for_cdc_table(
2614 context,
2615 source,
2616 normalized_external_table_name,
2617 columns,
2618 source_watermarks,
2619 column_catalogs,
2620 pk_names,
2621 cdc_with_options,
2622 col_id_gen,
2623 on_conflict,
2624 with_version_columns
2625 .iter()
2626 .map(|col| col.real_value())
2627 .collect(),
2628 include_column_options,
2629 table_name,
2630 resolved_table_name,
2631 original_catalog.database_id,
2632 original_catalog.schema_id,
2633 original_catalog.id(),
2634 engine,
2635 )?;
2636
2637 ((plan, None, table), TableJobType::SharedCdcSource)
2638 }
2639 (Some(_), Some(_)) => {
2640 return Err(ErrorCode::NotSupported(
2641 "Data format and encoding format doesn't apply to table created from a CDC source"
2642 .into(),
2643 "Remove the FORMAT and ENCODE specification".into(),
2644 )
2645 .into());
2646 }
2647 };
2648
2649 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2650 Err(ErrorCode::InvalidInputSyntax(
2651 "alter primary key of table is not supported".to_owned(),
2652 ))?
2653 }
2654
2655 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2656
2657 table.id = original_catalog.id();
2659 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2660 table.associated_source_id = Some(source_id);
2661
2662 let source = source.as_mut().unwrap();
2663 source.id = source_id;
2664 source.associated_table_id = Some(table.id());
2665 }
2666
2667 Ok((graph, table, source, job_type))
2668}
2669
2670fn get_source_and_resolved_table_name(
2671 session: &Arc<SessionImpl>,
2672 cdc_table: CdcTableInfo,
2673 table_name: ObjectName,
2674) -> Result<(Arc<SourceCatalog>, String)> {
2675 let db_name = &session.database();
2676 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2677
2678 let (source_schema, source_name) =
2679 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2680
2681 let source = {
2682 let catalog_reader = session.env().catalog_reader().read_guard();
2683 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2684 let (source, _) = catalog_reader.get_source_by_name(
2685 db_name,
2686 SchemaPath::Name(schema_name.as_str()),
2687 source_name.as_str(),
2688 )?;
2689 source.clone()
2690 };
2691
2692 Ok((source, resolved_table_name))
2693}
2694
2695fn bind_webhook_info(
2697 session: &Arc<SessionImpl>,
2698 column_defs: &[ColumnDef],
2699 webhook_info: WebhookSourceInfo,
2700) -> Result<PbWebhookSourceInfo> {
2701 let WebhookSourceInfo {
2702 secret_ref,
2703 signature_expr,
2704 wait_for_persistence,
2705 is_batched,
2706 } = webhook_info;
2707
2708 for column in column_defs {
2709 for option_def in &column.options {
2710 match option_def.option {
2711 ColumnOption::Null => {}
2712 ColumnOption::GeneratedColumns(_) => {
2713 return Err(ErrorCode::InvalidInputSyntax(
2714 "generated columns are not supported for webhook tables".to_owned(),
2715 )
2716 .into());
2717 }
2718 ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2719 return Err(ErrorCode::InvalidInputSyntax(
2720 "default values are not supported for webhook tables".to_owned(),
2721 )
2722 .into());
2723 }
2724 ColumnOption::NotNull
2725 | ColumnOption::Unique { .. }
2726 | ColumnOption::ForeignKey { .. }
2727 | ColumnOption::Check(_)
2728 | ColumnOption::DialectSpecific(_) => {
2729 return Err(ErrorCode::InvalidInputSyntax(
2730 "only NULL column option is supported for webhook tables".to_owned(),
2731 )
2732 .into());
2733 }
2734 }
2735 }
2736 }
2737
2738 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2740 let db_name = &session.database();
2741 let (schema_name, secret_name) =
2742 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2743 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2744 (
2745 Some(PbSecretRef {
2746 secret_id: secret_catalog.id,
2747 ref_as: match secret_ref.ref_as {
2748 SecretRefAsType::Text => PbRefAsType::Text,
2749 SecretRefAsType::File => PbRefAsType::File,
2750 }
2751 .into(),
2752 }),
2753 Some(secret_name),
2754 )
2755 } else {
2756 (None, None)
2757 };
2758
2759 let signature_expr = if let Some(signature_expr) = signature_expr {
2760 let payload_name = if column_defs.len() == 1
2761 && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2762 {
2763 column_defs[0].name.real_value()
2764 } else {
2765 WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2766 };
2767 let secure_compare_context = SecureCompareContext {
2768 payload_name,
2769 secret_name,
2770 };
2771 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2772 let expr = binder.bind_expr(&signature_expr)?;
2773
2774 if expr.as_function_call().is_none()
2776 || expr.as_function_call().unwrap().func_type()
2777 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2778 {
2779 return Err(ErrorCode::InvalidInputSyntax(
2780 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2781 )
2782 .into());
2783 }
2784
2785 Some(expr.to_expr_proto())
2786 } else {
2787 session.notice_to_user(
2788 "VALIDATE clause is strongly recommended for safety or production usages",
2789 );
2790 None
2791 };
2792
2793 let pb_webhook_info = PbWebhookSourceInfo {
2794 secret_ref: pb_secret_ref,
2795 signature_expr,
2796 wait_for_persistence,
2797 is_batched,
2798 };
2799
2800 Ok(pb_webhook_info)
2801}
2802
2803#[cfg(test)]
2804mod tests {
2805 use risingwave_common::catalog::{
2806 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2807 };
2808 use risingwave_common::types::{DataType, StructType};
2809
2810 use super::*;
2811 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2812
2813 #[tokio::test]
2814 async fn test_create_table_handler() {
2815 let sql =
2816 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2817 let frontend = LocalFrontend::new(Default::default()).await;
2818 frontend.run_sql(sql).await.unwrap();
2819
2820 let session = frontend.session_ref();
2821 let catalog_reader = session.env().catalog_reader().read_guard();
2822 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2823
2824 let (table, _) = catalog_reader
2826 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2827 .unwrap();
2828 assert_eq!(table.name(), "t");
2829
2830 let columns = table
2831 .columns
2832 .iter()
2833 .map(|col| (col.name(), col.data_type().clone()))
2834 .collect::<HashMap<&str, DataType>>();
2835
2836 let expected_columns = maplit::hashmap! {
2837 ROW_ID_COLUMN_NAME => DataType::Serial,
2838 "v1" => DataType::Int16,
2839 "v2" => StructType::new(
2840 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2841 )
2842 .with_ids([3, 4, 5].map(ColumnId::new))
2843 .into(),
2844 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2845 };
2846
2847 assert_eq!(columns, expected_columns, "{columns:#?}");
2848 }
2849
2850 #[tokio::test]
2851 async fn test_create_webhook_table_with_arbitrary_columns() {
2852 let frontend = LocalFrontend::new(Default::default()).await;
2853 frontend
2854 .run_sql("create schema ingest_schema;")
2855 .await
2856 .unwrap();
2857 frontend
2858 .run_sql(
2859 r#"
2860 create table ingest_schema.orders (
2861 id int,
2862 customer_name varchar,
2863 amount double precision,
2864 primary key (id)
2865 ) with (
2866 connector = 'webhook'
2867 ) validate as secure_compare(
2868 headers->>'x-rw-signature',
2869 'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2870 );
2871 "#,
2872 )
2873 .await
2874 .unwrap();
2875
2876 let session = frontend.session_ref();
2877 let catalog_reader = session.env().catalog_reader().read_guard();
2878 let (table, _) = catalog_reader
2879 .get_created_table_by_name(
2880 DEFAULT_DATABASE_NAME,
2881 SchemaPath::Name("ingest_schema"),
2882 "orders",
2883 )
2884 .unwrap();
2885
2886 assert!(table.webhook_info.is_some());
2887 assert_eq!(
2888 table
2889 .columns
2890 .iter()
2891 .filter(|column| column.can_dml())
2892 .count(),
2893 3
2894 );
2895 }
2896
2897 #[tokio::test]
2898 async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
2899 let frontend = LocalFrontend::new(Default::default()).await;
2900 frontend
2901 .run_sql(
2902 r#"
2903 create table webhook_single_column (
2904 body jsonb
2905 ) with (
2906 connector = 'webhook'
2907 ) validate as secure_compare(
2908 headers->>'x-rw-signature',
2909 'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
2910 );
2911 "#,
2912 )
2913 .await
2914 .unwrap();
2915 }
2916
2917 #[tokio::test]
2918 async fn test_create_webhook_table_with_generated_columns() {
2919 let frontend = LocalFrontend::new(Default::default()).await;
2920 let err = frontend
2921 .run_sql(
2922 r#"
2923 create table webhook_generated_columns (
2924 id int,
2925 amount double precision,
2926 amount_with_fee double precision as amount + 1.0
2927 ) with (
2928 connector = 'webhook'
2929 );
2930 "#,
2931 )
2932 .await
2933 .unwrap_err();
2934
2935 assert!(
2936 err.to_string()
2937 .contains("generated columns are not supported for webhook tables"),
2938 "{err:?}"
2939 );
2940 }
2941
2942 #[tokio::test]
2943 async fn test_create_webhook_table_with_default_value() {
2944 let frontend = LocalFrontend::new(Default::default()).await;
2945 let err = frontend
2946 .run_sql(
2947 r#"
2948 create table webhook_default_value (
2949 id int default 42,
2950 amount double precision
2951 ) with (
2952 connector = 'webhook'
2953 );
2954 "#,
2955 )
2956 .await
2957 .unwrap_err();
2958
2959 assert!(
2960 err.to_string()
2961 .contains("default values are not supported for webhook tables"),
2962 "{err:?}"
2963 );
2964 }
2965
2966 #[tokio::test]
2967 async fn test_create_webhook_table_with_not_null_option() {
2968 let frontend = LocalFrontend::new(Default::default()).await;
2969 let err = frontend
2970 .run_sql(
2971 r#"
2972 create table webhook_not_null (
2973 id int not null,
2974 amount double precision
2975 ) with (
2976 connector = 'webhook'
2977 );
2978 "#,
2979 )
2980 .await
2981 .unwrap_err();
2982
2983 assert!(
2984 err.to_string()
2985 .contains("only NULL column option is supported for webhook tables"),
2986 "{err:?}"
2987 );
2988 }
2989
2990 #[test]
2991 fn test_bind_primary_key() {
2992 for (sql, expected) in [
2995 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2996 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2997 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2998 (
2999 "create table t (v1 int primary key, v2 int primary key)",
3000 Err("multiple primary keys are not allowed"),
3001 ),
3002 (
3003 "create table t (v1 int primary key primary key, v2 int)",
3004 Err("multiple primary keys are not allowed"),
3005 ),
3006 (
3007 "create table t (v1 int, v2 int, primary key (v1))",
3008 Ok(&[1]),
3009 ),
3010 (
3011 "create table t (v1 int, primary key (v2), v2 int)",
3012 Ok(&[2]),
3013 ),
3014 (
3015 "create table t (primary key (v2, v1), v1 int, v2 int)",
3016 Ok(&[2, 1]),
3017 ),
3018 (
3019 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3020 Err("multiple primary keys are not allowed"),
3021 ),
3022 (
3023 "create table t (v1 int primary key, primary key (v1), v2 int)",
3024 Err("multiple primary keys are not allowed"),
3025 ),
3026 (
3027 "create table t (v1 int, primary key (V3), v2 int)",
3028 Err("column \"v3\" named in key does not exist"),
3029 ),
3030 ] {
3031 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3032 let risingwave_sqlparser::ast::Statement::CreateTable {
3033 columns: column_defs,
3034 constraints,
3035 ..
3036 } = ast.remove(0)
3037 else {
3038 panic!("test case should be create table")
3039 };
3040 let actual: Result<_> = (|| {
3041 let mut columns = bind_sql_columns(&column_defs, false)?;
3042 let mut col_id_gen = ColumnIdGenerator::new_initial();
3043 for c in &mut columns {
3044 col_id_gen.generate(c)?;
3045 }
3046
3047 let pk_names =
3048 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3049 let (_, pk_column_ids, _) =
3050 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3051 Ok(pk_column_ids)
3052 })();
3053 match (expected, actual) {
3054 (Ok(expected), Ok(actual)) => assert_eq!(
3055 expected.iter().copied().map(ColumnId::new).collect_vec(),
3056 actual,
3057 "sql: {sql}"
3058 ),
3059 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3060 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3061 (Err(expected), Err(actual)) => assert!(
3062 actual.to_string().contains(expected),
3063 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3064 ),
3065 }
3066 }
3067 }
3068
3069 #[tokio::test]
3070 async fn test_duplicate_props_options() {
3071 let proto_file = create_proto_file(PROTO_FILE_DATA);
3072 let sql = format!(
3073 r#"CREATE TABLE t
3074 WITH (
3075 connector = 'kinesis',
3076 aws.region='user_test_topic',
3077 endpoint='172.10.1.1:9090,172.10.1.2:9090',
3078 aws.credentials.access_key_id = 'your_access_key_1',
3079 aws.credentials.secret_access_key = 'your_secret_key_1'
3080 )
3081 FORMAT PLAIN ENCODE PROTOBUF (
3082 message = '.test.TestRecord',
3083 aws.credentials.access_key_id = 'your_access_key_2',
3084 aws.credentials.secret_access_key = 'your_secret_key_2',
3085 schema.location = 'file://{}',
3086 )"#,
3087 proto_file.path().to_str().unwrap()
3088 );
3089 let frontend = LocalFrontend::new(Default::default()).await;
3090 frontend.run_sql(sql).await.unwrap();
3091
3092 let session = frontend.session_ref();
3093 let catalog_reader = session.env().catalog_reader().read_guard();
3094 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3095
3096 let (source, _) = catalog_reader
3098 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3099 .unwrap();
3100 assert_eq!(source.name, "t");
3101
3102 assert_eq!(
3104 source
3105 .info
3106 .format_encode_options
3107 .get("aws.credentials.access_key_id")
3108 .unwrap(),
3109 "your_access_key_2"
3110 );
3111 assert_eq!(
3112 source
3113 .info
3114 .format_encode_options
3115 .get("aws.credentials.secret_access_key")
3116 .unwrap(),
3117 "your_secret_key_2"
3118 );
3119
3120 assert_eq!(
3122 source
3123 .with_properties
3124 .get("aws.credentials.access_key_id")
3125 .unwrap(),
3126 "your_access_key_1"
3127 );
3128 assert_eq!(
3129 source
3130 .with_properties
3131 .get("aws.credentials.secret_access_key")
3132 .unwrap(),
3133 "your_secret_key_1"
3134 );
3135
3136 assert!(!source.with_properties.contains_key("schema.location"));
3138 }
3139}