1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{
45 AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
46};
47use risingwave_pb::catalog::connection::Info as ConnectionInfo;
48use risingwave_pb::catalog::connection_params::ConnectionType;
49use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
50use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
51use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
52use risingwave_pb::plan_common::{
53 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
54};
55use risingwave_pb::secret::PbSecretRef;
56use risingwave_pb::secret::secret_ref::PbRefAsType;
57use risingwave_pb::stream_plan::StreamFragmentGraph;
58use risingwave_sqlparser::ast::{
59 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
60 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
61 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
62 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
63};
64use risingwave_sqlparser::parser::IncludeOption;
65use thiserror_ext::AsReport;
66
67use super::RwPgResponse;
68use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
69use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
70use crate::catalog::root_catalog::SchemaPath;
71use crate::catalog::source_catalog::SourceCatalog;
72use crate::catalog::table_catalog::TableVersion;
73use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
74use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
75use crate::expr::{Expr, ExprImpl, ExprRewriter};
76use crate::handler::HandlerArgs;
77use crate::handler::create_source::{
78 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
79 bind_source_watermark, handle_addition_columns,
80};
81use crate::handler::util::{
82 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
83};
84use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
85use crate::optimizer::plan_node::{
86 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
87};
88use crate::optimizer::property::{Order, RequiredDist};
89use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
90use crate::session::SessionImpl;
91use crate::session::current::notice_to_user;
92use crate::stream_fragmenter::{GraphJobType, build_graph};
93use crate::utils::OverwriteOptions;
94use crate::{Binder, Explain, TableCatalog, WithOptions};
95
96mod col_id_gen;
97pub use col_id_gen::*;
98use risingwave_connector::sink::SinkParam;
99use risingwave_connector::sink::iceberg::{
100 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
101 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
102 COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
103 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS,
104 CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
105 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
106 IcebergWriteMode, ORDER_KEY, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
107 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
108 SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
109 validate_order_key_columns,
110};
111use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
112
113use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
114
115fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
116 for option_def in &c.options {
117 match option_def.option {
118 ColumnOption::GeneratedColumns(_) => {}
119 ColumnOption::DefaultValue(_) => {}
120 ColumnOption::DefaultValueInternal { .. } => {}
121 ColumnOption::Unique { is_primary: true } => {}
122 ColumnOption::Null => {}
123 ColumnOption::NotNull => {}
124 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
125 }
126 }
127 Ok(())
128}
129
130pub fn bind_sql_columns(
134 column_defs: &[ColumnDef],
135 is_for_drop_table_connector: bool,
136) -> Result<Vec<ColumnCatalog>> {
137 let mut columns = Vec::with_capacity(column_defs.len());
138
139 for column in column_defs {
140 ensure_column_options_supported(column)?;
141 let ColumnDef {
145 name,
146 data_type,
147 collation,
148 options,
149 ..
150 } = column;
151
152 let data_type = data_type
153 .clone()
154 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
155 if let Some(collation) = collation {
156 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
162 bail_not_implemented!(
163 "Collate collation other than `C` or `POSIX` is not implemented"
164 );
165 }
166
167 match data_type {
168 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
169 _ => {
170 return Err(ErrorCode::NotSupported(
171 format!("{} is not a collatable data type", data_type),
172 "The only built-in collatable data types are `varchar`, please check your type".into(),
173 ).into());
174 }
175 }
176 }
177
178 if !is_for_drop_table_connector {
179 check_column_name_not_reserved(&name.real_value())?;
183 }
184
185 let nullable: bool = !options
186 .iter()
187 .any(|def| matches!(def.option, ColumnOption::NotNull));
188
189 columns.push(ColumnCatalog {
190 column_desc: ColumnDesc {
191 data_type: bind_data_type(&data_type)?,
192 column_id: ColumnId::placeholder(),
193 name: name.real_value(),
194 generated_or_default_column: None,
195 description: None,
196 additional_column: AdditionalColumn { column_type: None },
197 version: ColumnDescVersion::LATEST,
198 system_column: None,
199 nullable,
200 },
201 is_hidden: false,
202 });
203 }
204
205 Ok(columns)
206}
207
208fn check_generated_column_constraints(
209 column_name: &String,
210 column_id: ColumnId,
211 expr: &ExprImpl,
212 column_catalogs: &[ColumnCatalog],
213 generated_column_names: &[String],
214 pk_column_ids: &[ColumnId],
215) -> Result<()> {
216 let input_refs = expr.collect_input_refs(column_catalogs.len());
217 for idx in input_refs.ones() {
218 let referred_generated_column = &column_catalogs[idx].column_desc.name;
219 if generated_column_names
220 .iter()
221 .any(|c| c == referred_generated_column)
222 {
223 return Err(ErrorCode::BindError(format!(
224 "Generated can not reference another generated column. \
225 But here generated column \"{}\" referenced another generated column \"{}\"",
226 column_name, referred_generated_column
227 ))
228 .into());
229 }
230 }
231
232 if pk_column_ids.contains(&column_id) && expr.is_impure() {
233 return Err(ErrorCode::BindError(format!(
234 "Generated columns with impure expressions should not be part of the primary key. \
235 Here column \"{}\" is defined as part of the primary key.",
236 column_name
237 ))
238 .into());
239 }
240
241 Ok(())
242}
243
244pub fn bind_sql_column_constraints(
247 session: &SessionImpl,
248 table_name: String,
249 column_catalogs: &mut [ColumnCatalog],
250 columns: &[ColumnDef],
251 pk_column_ids: &[ColumnId],
252) -> Result<()> {
253 let generated_column_names = {
254 let mut names = vec![];
255 for column in columns {
256 for option_def in &column.options {
257 if let ColumnOption::GeneratedColumns(_) = option_def.option {
258 names.push(column.name.real_value());
259 break;
260 }
261 }
262 }
263 names
264 };
265
266 let mut binder = Binder::new_for_ddl(session);
267 binder.bind_columns_to_context(table_name, column_catalogs)?;
268
269 for column in columns {
270 let Some(idx) = column_catalogs
271 .iter()
272 .position(|c| c.name() == column.name.real_value())
273 else {
274 continue;
277 };
278
279 for option_def in &column.options {
280 match &option_def.option {
281 ColumnOption::GeneratedColumns(expr) => {
282 binder.set_clause(Some(Clause::GeneratedColumn));
283
284 let expr_impl = binder.bind_expr(expr).with_context(|| {
285 format!(
286 "fail to bind expression in generated column \"{}\"",
287 column.name.real_value()
288 )
289 })?;
290
291 check_generated_column_constraints(
292 &column.name.real_value(),
293 column_catalogs[idx].column_id(),
294 &expr_impl,
295 column_catalogs,
296 &generated_column_names,
297 pk_column_ids,
298 )?;
299
300 column_catalogs[idx].column_desc.generated_or_default_column = Some(
301 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
302 expr: Some(expr_impl.to_expr_proto()),
303 }),
304 );
305 binder.set_clause(None);
306 }
307 ColumnOption::DefaultValue(expr) => {
308 let expr_impl = binder
309 .bind_expr(expr)?
310 .cast_assign(column_catalogs[idx].data_type())?;
311
312 let rewritten_expr_impl = session
320 .pinned_snapshot()
321 .inline_now_proc_time()
322 .rewrite_expr(expr_impl.clone());
323
324 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
325 let snapshot_value = snapshot_value?;
326
327 column_catalogs[idx].column_desc.generated_or_default_column =
328 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
329 snapshot_value: Some(snapshot_value.to_protobuf()),
330 expr: Some(expr_impl.to_expr_proto()),
331 }));
333 } else {
334 return Err(ErrorCode::BindError(format!(
335 "Default expression used in column `{}` cannot be evaluated. \
336 Use generated columns instead if you mean to reference other columns.",
337 column.name
338 ))
339 .into());
340 }
341 }
342 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
343 if persisted.is_empty() {
348 bail_bind_error!(
349 "DEFAULT INTERNAL is only used for internal purposes, \
350 please specify a concrete default value"
351 );
352 }
353
354 let desc = DefaultColumnDesc::decode(&**persisted)
355 .expect("failed to decode persisted `DefaultColumnDesc`");
356
357 column_catalogs[idx].column_desc.generated_or_default_column =
358 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
359 }
360 _ => {}
361 }
362 }
363 }
364 Ok(())
365}
366
367pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
369 let mut pk_column_names = vec![];
370
371 for constraint in table_constraints {
372 match constraint {
373 TableConstraint::Unique {
374 name: _,
375 columns,
376 is_primary: true,
377 } => {
378 if !pk_column_names.is_empty() {
379 return Err(multiple_pk_definition_err());
380 }
381 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
382 }
383 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
384 }
385 }
386 Ok(pk_column_names)
387}
388
389pub fn bind_sql_pk_names(
390 columns_defs: &[ColumnDef],
391 pk_names_from_table_constraints: Vec<String>,
392) -> Result<Vec<String>> {
393 let mut pk_column_names = pk_names_from_table_constraints;
394
395 for column in columns_defs {
396 for option_def in &column.options {
397 if let ColumnOption::Unique { is_primary: true } = option_def.option {
398 if !pk_column_names.is_empty() {
399 return Err(multiple_pk_definition_err());
400 }
401 pk_column_names.push(column.name.real_value());
402 };
403 }
404 }
405
406 Ok(pk_column_names)
407}
408
409fn multiple_pk_definition_err() -> RwError {
410 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
411}
412
413pub fn bind_pk_and_row_id_on_relation(
418 mut columns: Vec<ColumnCatalog>,
419 pk_names: Vec<String>,
420 must_need_pk: bool,
421) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
422 for c in &columns {
423 assert!(c.column_id() != ColumnId::placeholder());
424 }
425
426 let name_to_id = columns
428 .iter()
429 .map(|c| (c.name(), c.column_id()))
430 .collect::<HashMap<_, _>>();
431
432 let mut pk_column_ids: Vec<_> = pk_names
433 .iter()
434 .map(|name| {
435 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
436 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
437 })
438 })
439 .try_collect()?;
440
441 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
443
444 let row_id_index = need_row_id.then(|| {
445 let column = ColumnCatalog::row_id_column();
446 let index = columns.len();
447 pk_column_ids = vec![column.column_id()];
448 columns.push(column);
449 index
450 });
451
452 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
453 Err(ErrorCode::InvalidInputSyntax(format!(
454 "column \"{col}\" specified more than once"
455 )))?;
456 }
457
458 Ok((columns, pk_column_ids, row_id_index))
459}
460
461#[allow(clippy::too_many_arguments)]
464pub(crate) async fn gen_create_table_plan_with_source(
465 mut handler_args: HandlerArgs,
466 explain_options: ExplainOptions,
467 table_name: ObjectName,
468 column_defs: Vec<ColumnDef>,
469 wildcard_idx: Option<usize>,
470 constraints: Vec<TableConstraint>,
471 format_encode: FormatEncodeOptions,
472 source_watermarks: Vec<SourceWatermark>,
473 mut col_id_gen: ColumnIdGenerator,
474 include_column_options: IncludeOption,
475 props: CreateTableProps,
476 sql_column_strategy: SqlColumnStrategy,
477) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
478 if props.append_only
479 && format_encode.format != Format::Plain
480 && format_encode.format != Format::Native
481 {
482 return Err(ErrorCode::BindError(format!(
483 "Append only table does not support format {}.",
484 format_encode.format
485 ))
486 .into());
487 }
488
489 let session = &handler_args.session;
490 let (with_properties, refresh_mode) =
491 bind_connector_props(&handler_args, &format_encode, false)?;
492 if with_properties.is_shareable_cdc_connector() {
493 generated_columns_check_for_cdc_table(&column_defs)?;
494 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
495 } else if column_defs.iter().any(|col| {
496 col.options
497 .iter()
498 .any(|def| matches!(def.option, ColumnOption::NotNull))
499 }) {
500 notice_to_user(
502 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
503 );
504 }
505
506 let db_name: &str = &session.database();
507 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
508
509 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
511 session,
512 &format_encode,
513 Either::Left(&with_properties),
514 CreateSourceType::Table,
515 )
516 .await?;
517
518 let overwrite_options = OverwriteOptions::new(&mut handler_args);
519 let rate_limit = overwrite_options.source_rate_limit;
520 let source = bind_create_source_or_table_with_connector(
521 handler_args.clone(),
522 table_name,
523 format_encode,
524 with_properties,
525 &column_defs,
526 constraints,
527 wildcard_idx,
528 source_watermarks,
529 columns_from_resolve_source,
530 source_info,
531 include_column_options,
532 &mut col_id_gen,
533 CreateSourceType::Table,
534 rate_limit,
535 sql_column_strategy,
536 refresh_mode,
537 )
538 .await?;
539
540 let context = OptimizerContext::new(handler_args, explain_options);
541
542 let (plan, table) = gen_table_plan_with_source(
543 context.into(),
544 schema_name,
545 source.clone(),
546 col_id_gen.into_version(),
547 props,
548 )?;
549
550 Ok((plan, Some(source), table))
551}
552
553#[allow(clippy::too_many_arguments)]
556pub(crate) fn gen_create_table_plan(
557 context: OptimizerContext,
558 table_name: ObjectName,
559 column_defs: Vec<ColumnDef>,
560 constraints: Vec<TableConstraint>,
561 mut col_id_gen: ColumnIdGenerator,
562 source_watermarks: Vec<SourceWatermark>,
563 props: CreateTableProps,
564 is_for_replace_plan: bool,
565) -> Result<(PlanRef, TableCatalog)> {
566 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
567 for c in &mut columns {
568 col_id_gen.generate(c)?;
569 }
570
571 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
572 if !secret_refs.is_empty() || !connection_refs.is_empty() {
573 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
574 }
575
576 gen_create_table_plan_without_source(
577 context,
578 table_name,
579 columns,
580 column_defs,
581 constraints,
582 source_watermarks,
583 col_id_gen.into_version(),
584 props,
585 )
586}
587
588#[allow(clippy::too_many_arguments)]
589pub(crate) fn gen_create_table_plan_without_source(
590 context: OptimizerContext,
591 table_name: ObjectName,
592 columns: Vec<ColumnCatalog>,
593 column_defs: Vec<ColumnDef>,
594 constraints: Vec<TableConstraint>,
595 source_watermarks: Vec<SourceWatermark>,
596 version: TableVersion,
597 props: CreateTableProps,
598) -> Result<(PlanRef, TableCatalog)> {
599 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
601 let (mut columns, pk_column_ids, row_id_index) =
602 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
603
604 let watermark_descs = bind_source_watermark(
605 context.session_ctx(),
606 table_name.real_value(),
607 source_watermarks,
608 &columns,
609 )?;
610
611 bind_sql_column_constraints(
612 context.session_ctx(),
613 table_name.real_value(),
614 &mut columns,
615 &column_defs,
616 &pk_column_ids,
617 )?;
618 let session = context.session_ctx().clone();
619
620 let db_name = &session.database();
621 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
622
623 let info = CreateTableInfo {
624 columns,
625 pk_column_ids,
626 row_id_index,
627 watermark_descs,
628 source_catalog: None,
629 version,
630 };
631
632 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
633}
634
635fn gen_table_plan_with_source(
636 context: OptimizerContextRef,
637 schema_name: Option<String>,
638 source_catalog: SourceCatalog,
639 version: TableVersion,
640 props: CreateTableProps,
641) -> Result<(PlanRef, TableCatalog)> {
642 let table_name = source_catalog.name.clone();
643
644 let info = CreateTableInfo {
645 columns: source_catalog.columns.clone(),
646 pk_column_ids: source_catalog.pk_col_ids.clone(),
647 row_id_index: source_catalog.row_id_index,
648 watermark_descs: source_catalog.watermark_descs.clone(),
649 source_catalog: Some(source_catalog),
650 version,
651 };
652
653 gen_table_plan_inner(context, schema_name, table_name, info, props)
654}
655
656#[derive(Clone, Copy)]
658pub enum EitherOnConflict {
659 Ast(Option<OnConflict>),
660 Resolved(ConflictBehavior),
661}
662
663impl From<Option<OnConflict>> for EitherOnConflict {
664 fn from(v: Option<OnConflict>) -> Self {
665 Self::Ast(v)
666 }
667}
668
669impl From<ConflictBehavior> for EitherOnConflict {
670 fn from(v: ConflictBehavior) -> Self {
671 Self::Resolved(v)
672 }
673}
674
675impl EitherOnConflict {
676 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
678 let conflict_behavior = match self {
679 EitherOnConflict::Ast(on_conflict) => {
680 if append_only {
681 if row_id_as_pk {
682 ConflictBehavior::NoCheck
684 } else {
685 if let Some(on_conflict) = on_conflict
687 && on_conflict != OnConflict::Nothing
688 {
689 return Err(ErrorCode::InvalidInputSyntax(
690 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
691 the ON CONFLICT behavior must be DO NOTHING."
692 .to_owned(),
693 )
694 .into());
695 }
696 ConflictBehavior::IgnoreConflict
697 }
698 } else {
699 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
701 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
702 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
703 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
704 }
705 }
706 }
707 EitherOnConflict::Resolved(b) => b,
708 };
709
710 Ok(conflict_behavior)
711 }
712}
713
714pub struct CreateTableInfo {
719 pub columns: Vec<ColumnCatalog>,
720 pub pk_column_ids: Vec<ColumnId>,
721 pub row_id_index: Option<usize>,
722 pub watermark_descs: Vec<WatermarkDesc>,
723 pub source_catalog: Option<SourceCatalog>,
724 pub version: TableVersion,
725}
726
727pub struct CreateTableProps {
732 pub definition: String,
733 pub append_only: bool,
734 pub on_conflict: EitherOnConflict,
735 pub with_version_columns: Vec<String>,
736 pub webhook_info: Option<PbWebhookSourceInfo>,
737 pub engine: Engine,
738}
739
740#[allow(clippy::too_many_arguments)]
741fn gen_table_plan_inner(
742 context: OptimizerContextRef,
743 schema_name: Option<String>,
744 table_name: String,
745 info: CreateTableInfo,
746 props: CreateTableProps,
747) -> Result<(PlanRef, TableCatalog)> {
748 let CreateTableInfo {
749 ref columns,
750 row_id_index,
751 ref watermark_descs,
752 ref source_catalog,
753 ..
754 } = info;
755 let CreateTableProps { append_only, .. } = props;
756
757 let (database_id, schema_id) = context
758 .session_ctx()
759 .get_database_and_schema_id_for_create(schema_name)?;
760
761 let session = context.session_ctx().clone();
762 let retention_seconds = context.with_options().retention_seconds();
763
764 let source_node: LogicalPlanRef = LogicalSource::new(
765 source_catalog.clone().map(Rc::new),
766 columns.clone(),
767 row_id_index,
768 SourceNodeKind::CreateTable,
769 context.clone(),
770 None,
771 )?
772 .into();
773
774 let required_cols = FixedBitSet::with_capacity(columns.len());
775 let plan_root = PlanRoot::new_with_logical_plan(
776 source_node,
777 RequiredDist::Any,
778 Order::any(),
779 required_cols,
780 vec![],
781 );
782
783 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
784
785 if !append_only && has_non_ttl_watermark {
786 return Err(ErrorCode::NotSupported(
787 "Defining watermarks on table requires the table to be append only.".to_owned(),
788 "Use the key words `APPEND ONLY`".to_owned(),
789 )
790 .into());
791 }
792
793 if !append_only && retention_seconds.is_some() {
794 if session
795 .config()
796 .unsafe_enable_storage_retention_for_non_append_only_tables()
797 {
798 tracing::warn!(
799 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
800 table_name
801 );
802 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
803 This may lead to stream inconsistency and unrecoverable \
804 node failure if there is any row INSERT/UPDATE/DELETE operation \
805 corresponding to the TTLed primary keys";
806 session.notice_to_user(NOTICE);
807 } else {
808 return Err(ErrorCode::NotSupported(
809 "Defining retention seconds on table requires the table to be append only."
810 .to_owned(),
811 "Use the key words `APPEND ONLY`".to_owned(),
812 )
813 .into());
814 }
815 }
816
817 let materialize =
818 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
819
820 let mut table = materialize.table().clone();
821 table.owner = session.user_id();
822
823 Ok((materialize.into(), table))
824}
825
826#[allow(clippy::too_many_arguments)]
830pub(crate) fn gen_create_table_plan_for_cdc_table(
831 context: OptimizerContextRef,
832 source: Arc<SourceCatalog>,
833 external_table_name: String,
834 column_defs: Vec<ColumnDef>,
835 source_watermarks: Vec<SourceWatermark>,
836 mut columns: Vec<ColumnCatalog>,
837 pk_names: Vec<String>,
838 cdc_with_options: WithOptionsSecResolved,
839 mut col_id_gen: ColumnIdGenerator,
840 on_conflict: Option<OnConflict>,
841 with_version_columns: Vec<String>,
842 include_column_options: IncludeOption,
843 table_name: ObjectName,
844 resolved_table_name: String, database_id: DatabaseId,
846 schema_id: SchemaId,
847 table_id: TableId,
848 engine: Engine,
849) -> Result<(PlanRef, TableCatalog)> {
850 let session = context.session_ctx().clone();
851
852 handle_addition_columns(
854 None,
855 &cdc_with_options,
856 include_column_options,
857 &mut columns,
858 true,
859 )?;
860
861 for c in &mut columns {
862 col_id_gen.generate(c)?;
863 }
864
865 let (mut columns, pk_column_ids, _row_id_index) =
866 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
867
868 let watermark_descs = bind_source_watermark(
869 context.session_ctx(),
870 table_name.real_value(),
871 source_watermarks,
872 &columns,
873 )?;
874
875 bind_sql_column_constraints(
877 context.session_ctx(),
878 table_name.real_value(),
879 &mut columns,
880 &column_defs,
881 &pk_column_ids,
882 )?;
883
884 let definition = context.normalized_sql().to_owned();
885
886 let pk_column_indices = {
887 let mut id_to_idx = HashMap::new();
888 columns.iter().enumerate().for_each(|(idx, c)| {
889 id_to_idx.insert(c.column_id(), idx);
890 });
891 pk_column_ids
893 .iter()
894 .map(|c| id_to_idx.get(c).copied().unwrap())
895 .collect_vec()
896 };
897 let table_pk = pk_column_indices
898 .iter()
899 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
900 .collect();
901
902 let (options, secret_refs) = cdc_with_options.into_parts();
903
904 let non_generated_column_descs = columns
905 .iter()
906 .filter(|&c| !c.is_generated())
907 .map(|c| c.column_desc.clone())
908 .collect_vec();
909 let non_generated_column_num = non_generated_column_descs.len();
910 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
911 let cdc_table_desc = CdcTableDesc {
912 table_id,
913 source_id: source.id, external_table_name: external_table_name.clone(),
915 pk: table_pk,
916 columns: non_generated_column_descs,
917 stream_key: pk_column_indices,
918 connect_properties: options,
919 secret_refs,
920 };
921
922 tracing::debug!(?cdc_table_desc, "create cdc table");
923 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
924
925 let logical_scan = LogicalCdcScan::create(
926 external_table_name.clone(),
927 Rc::new(cdc_table_desc),
928 context.clone(),
929 options,
930 );
931
932 let scan_node: LogicalPlanRef = logical_scan.into();
933 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
934 let plan_root = PlanRoot::new_with_logical_plan(
935 scan_node,
936 RequiredDist::Any,
937 Order::any(),
938 required_cols,
939 vec![],
940 );
941
942 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
943 let materialize = plan_root.gen_table_plan(
944 context,
945 resolved_table_name,
946 database_id,
947 schema_id,
948 CreateTableInfo {
949 columns,
950 pk_column_ids,
951 row_id_index: None,
952 watermark_descs,
953 source_catalog: Some((*source).clone()),
954 version: col_id_gen.into_version(),
955 },
956 CreateTableProps {
957 definition,
958 append_only: false,
959 on_conflict: on_conflict.into(),
960 with_version_columns,
961 webhook_info: None,
962 engine,
963 },
964 )?;
965
966 let mut table = materialize.table().clone();
967 table.owner = session.user_id();
968 table.cdc_table_id = Some(cdc_table_id);
969 table.cdc_table_type = Some(cdc_table_type);
970 Ok((materialize.into(), table))
971}
972
973fn derive_with_options_for_cdc_table(
981 source_with_properties: &WithOptionsSecResolved,
982 external_table_name: String,
983) -> Result<(WithOptionsSecResolved, String)> {
984 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
985 let source_database_name: &str = source_with_properties
987 .get("database.name")
988 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
989 .as_str();
990 let mut with_options = source_with_properties.clone();
991 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
992 match connector.as_str() {
993 MYSQL_CDC_CONNECTOR => {
994 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
997 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
998 })?;
999 if !source_database_name
1001 .split(',')
1002 .map(|s| s.trim())
1003 .any(|name| name == db_name)
1004 {
1005 return Err(anyhow!(
1006 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1007 db_name,
1008 source_database_name
1009 ).into());
1010 }
1011 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1012 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1013 return Ok((with_options, external_table_name));
1015 }
1016 POSTGRES_CDC_CONNECTOR => {
1017 let (schema_name, table_name) = external_table_name
1018 .split_once('.')
1019 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1020
1021 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1023 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1024 return Ok((with_options, external_table_name));
1026 }
1027 SQL_SERVER_CDC_CONNECTOR => {
1028 let parts: Vec<&str> = external_table_name.split('.').collect();
1036 let (schema_name, table_name) = match parts.len() {
1037 3 => {
1038 let db_name = parts[0];
1041 let schema_name = parts[1];
1042 let table_name = parts[2];
1043
1044 if db_name != source_database_name {
1045 return Err(anyhow!(
1046 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1047 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1048 db_name,
1049 source_database_name
1050 ).into());
1051 }
1052 (schema_name, table_name)
1053 }
1054 2 => {
1055 let schema_name = parts[0];
1058 let table_name = parts[1];
1059 (schema_name, table_name)
1060 }
1061 1 => {
1062 return Err(anyhow!(
1065 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1066 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1067 external_table_name,
1068 external_table_name,
1069 source_database_name,
1070 external_table_name
1071 ).into());
1072 }
1073 _ => {
1074 return Err(anyhow!(
1076 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1077 external_table_name
1078 ).into());
1079 }
1080 };
1081
1082 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1084 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1085
1086 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1089 return Ok((with_options, normalized_external_table_name));
1090 }
1091 _ => {
1092 return Err(RwError::from(anyhow!(
1093 "connector {} is not supported for cdc table",
1094 connector
1095 )));
1096 }
1097 };
1098 }
1099 unreachable!("All valid CDC connectors should have returned by now")
1100}
1101
1102#[allow(clippy::too_many_arguments)]
1103pub(super) async fn handle_create_table_plan(
1104 handler_args: HandlerArgs,
1105 explain_options: ExplainOptions,
1106 format_encode: Option<FormatEncodeOptions>,
1107 cdc_table_info: Option<CdcTableInfo>,
1108 table_name: &ObjectName,
1109 column_defs: Vec<ColumnDef>,
1110 wildcard_idx: Option<usize>,
1111 constraints: Vec<TableConstraint>,
1112 source_watermarks: Vec<SourceWatermark>,
1113 append_only: bool,
1114 on_conflict: Option<OnConflict>,
1115 with_version_columns: Vec<String>,
1116 include_column_options: IncludeOption,
1117 webhook_info: Option<WebhookSourceInfo>,
1118 engine: Engine,
1119) -> Result<(
1120 PlanRef,
1121 Option<SourceCatalog>,
1122 TableCatalog,
1123 TableJobType,
1124 Option<SourceId>,
1125)> {
1126 let col_id_gen = ColumnIdGenerator::new_initial();
1127 let format_encode = check_create_table_with_source(
1128 &handler_args.with_options,
1129 format_encode,
1130 &include_column_options,
1131 &cdc_table_info,
1132 )?;
1133 let webhook_info = webhook_info
1134 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1135 .transpose()?;
1136
1137 let props = CreateTableProps {
1138 definition: handler_args.normalized_sql.clone(),
1139 append_only,
1140 on_conflict: on_conflict.into(),
1141 with_version_columns: with_version_columns.clone(),
1142 webhook_info,
1143 engine,
1144 };
1145
1146 let ((plan, source, table), job_type, shared_shource_id) = match (
1147 format_encode,
1148 cdc_table_info.as_ref(),
1149 ) {
1150 (Some(format_encode), None) => (
1151 gen_create_table_plan_with_source(
1152 handler_args,
1153 explain_options,
1154 table_name.clone(),
1155 column_defs,
1156 wildcard_idx,
1157 constraints,
1158 format_encode,
1159 source_watermarks,
1160 col_id_gen,
1161 include_column_options,
1162 props,
1163 SqlColumnStrategy::FollowChecked,
1164 )
1165 .await?,
1166 TableJobType::General,
1167 None,
1168 ),
1169 (None, None) => {
1170 let context = OptimizerContext::new(handler_args, explain_options);
1171 let (plan, table) = gen_create_table_plan(
1172 context,
1173 table_name.clone(),
1174 column_defs,
1175 constraints,
1176 col_id_gen,
1177 source_watermarks,
1178 props,
1179 false,
1180 )?;
1181
1182 ((plan, None, table), TableJobType::General, None)
1183 }
1184
1185 (None, Some(cdc_table)) => {
1186 sanity_check_for_table_on_cdc_source(
1187 append_only,
1188 &column_defs,
1189 &wildcard_idx,
1190 &constraints,
1191 &source_watermarks,
1192 )?;
1193
1194 generated_columns_check_for_cdc_table(&column_defs)?;
1195 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1196
1197 let session = &handler_args.session;
1198 let db_name = &session.database();
1199 let user_name = &session.user_name();
1200 let search_path = session.config().search_path();
1201 let (schema_name, resolved_table_name) =
1202 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1203 let (database_id, schema_id) =
1204 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1205
1206 let (source_schema, source_name) =
1208 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1209
1210 let source = {
1211 let catalog_reader = session.env().catalog_reader().read_guard();
1212 let schema_path =
1213 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1214
1215 let (source, _) = catalog_reader.get_source_by_name(
1216 db_name,
1217 schema_path,
1218 source_name.as_str(),
1219 )?;
1220 source.clone()
1221 };
1222 let (cdc_with_options, normalized_external_table_name) =
1223 derive_with_options_for_cdc_table(
1224 &source.with_properties,
1225 cdc_table.external_table_name.clone(),
1226 )?;
1227
1228 let (columns, pk_names) = match wildcard_idx {
1229 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1230 None => {
1231 for column_def in &column_defs {
1232 for option_def in &column_def.options {
1233 if let ColumnOption::DefaultValue(_)
1234 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1235 {
1236 return Err(ErrorCode::NotSupported(
1237 "Default value for columns defined on the table created from a CDC source".into(),
1238 "Remove the default value expression in the column definitions".into(),
1239 )
1240 .into());
1241 }
1242 }
1243 }
1244
1245 let (columns, pk_names) =
1246 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1247 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1249 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1250 .context("failed to extract external table config")?;
1251
1252 (columns, pk_names)
1253 }
1254 };
1255
1256 let context: OptimizerContextRef =
1257 OptimizerContext::new(handler_args, explain_options).into();
1258 let shared_source_id = source.id;
1259 let (plan, table) = gen_create_table_plan_for_cdc_table(
1260 context,
1261 source,
1262 normalized_external_table_name,
1263 column_defs,
1264 source_watermarks,
1265 columns,
1266 pk_names,
1267 cdc_with_options,
1268 col_id_gen,
1269 on_conflict,
1270 with_version_columns,
1271 include_column_options,
1272 table_name.clone(),
1273 resolved_table_name,
1274 database_id,
1275 schema_id,
1276 TableId::placeholder(),
1277 engine,
1278 )?;
1279
1280 (
1281 (plan, None, table),
1282 TableJobType::SharedCdcSource,
1283 Some(shared_source_id),
1284 )
1285 }
1286 (Some(_), Some(_)) => {
1287 return Err(ErrorCode::NotSupported(
1288 "Data format and encoding format doesn't apply to table created from a CDC source"
1289 .into(),
1290 "Remove the FORMAT and ENCODE specification".into(),
1291 )
1292 .into());
1293 }
1294 };
1295 Ok((plan, source, table, job_type, shared_shource_id))
1296}
1297
1298fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1300 let mut found_generated_column = false;
1301 for column in columns {
1302 let mut is_generated = false;
1303
1304 for option_def in &column.options {
1305 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1306 is_generated = true;
1307 break;
1308 }
1309 }
1310
1311 if is_generated {
1312 found_generated_column = true;
1313 } else if found_generated_column {
1314 return Err(ErrorCode::NotSupported(
1315 "Non-generated column found after a generated column.".into(),
1316 "Ensure that all generated columns appear at the end of the cdc table definition."
1317 .into(),
1318 )
1319 .into());
1320 }
1321 }
1322 Ok(())
1323}
1324
1325fn not_null_check_for_cdc_table(
1327 wildcard_idx: &Option<usize>,
1328 column_defs: &Vec<ColumnDef>,
1329) -> Result<()> {
1330 if !wildcard_idx.is_some()
1331 && column_defs.iter().any(|col| {
1332 col.options
1333 .iter()
1334 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1335 })
1336 {
1337 return Err(ErrorCode::NotSupported(
1338 "CDC table with NOT NULL constraint is not supported".to_owned(),
1339 "Please remove the NOT NULL constraint for columns".to_owned(),
1340 )
1341 .into());
1342 }
1343 Ok(())
1344}
1345
1346fn sanity_check_for_table_on_cdc_source(
1348 append_only: bool,
1349 column_defs: &Vec<ColumnDef>,
1350 wildcard_idx: &Option<usize>,
1351 constraints: &Vec<TableConstraint>,
1352 source_watermarks: &Vec<SourceWatermark>,
1353) -> Result<()> {
1354 if wildcard_idx.is_some() && !column_defs.is_empty() {
1356 return Err(ErrorCode::NotSupported(
1357 "wildcard(*) and column definitions cannot be used together".to_owned(),
1358 "Remove the wildcard or column definitions".to_owned(),
1359 )
1360 .into());
1361 }
1362
1363 if !wildcard_idx.is_some()
1365 && !constraints.iter().any(|c| {
1366 matches!(
1367 c,
1368 TableConstraint::Unique {
1369 is_primary: true,
1370 ..
1371 }
1372 )
1373 })
1374 && !column_defs.iter().any(|col| {
1375 col.options
1376 .iter()
1377 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1378 })
1379 {
1380 return Err(ErrorCode::NotSupported(
1381 "CDC table without primary key constraint is not supported".to_owned(),
1382 "Please define a primary key".to_owned(),
1383 )
1384 .into());
1385 }
1386
1387 if append_only {
1388 return Err(ErrorCode::NotSupported(
1389 "append only modifier on the table created from a CDC source".into(),
1390 "Remove the APPEND ONLY clause".into(),
1391 )
1392 .into());
1393 }
1394
1395 if !source_watermarks.is_empty()
1396 && source_watermarks
1397 .iter()
1398 .any(|watermark| !watermark.with_ttl)
1399 {
1400 return Err(ErrorCode::NotSupported(
1401 "non-TTL watermark defined on the table created from a CDC source".into(),
1402 "Use `WATERMARK ... WITH TTL` instead.".into(),
1403 )
1404 .into());
1405 }
1406
1407 Ok(())
1408}
1409
1410async fn bind_cdc_table_schema_externally(
1412 cdc_with_options: WithOptionsSecResolved,
1413) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1414 let (options, secret_refs) = cdc_with_options.into_parts();
1416 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1417 .context("failed to extract external table config")?;
1418
1419 let table = ExternalTableImpl::connect(config)
1420 .await
1421 .context("failed to auto derive table schema")?;
1422
1423 Ok((
1424 table
1425 .column_descs()
1426 .iter()
1427 .cloned()
1428 .map(|column_desc| ColumnCatalog {
1429 column_desc,
1430 is_hidden: false,
1431 })
1432 .collect(),
1433 table.pk_names().clone(),
1434 ))
1435}
1436
1437fn bind_cdc_table_schema(
1439 column_defs: &Vec<ColumnDef>,
1440 constraints: &Vec<TableConstraint>,
1441 is_for_replace_plan: bool,
1442) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1443 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1444
1445 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1446 Ok((columns, pk_names))
1447}
1448
1449#[allow(clippy::too_many_arguments)]
1450pub async fn handle_create_table(
1451 handler_args: HandlerArgs,
1452 table_name: ObjectName,
1453 column_defs: Vec<ColumnDef>,
1454 wildcard_idx: Option<usize>,
1455 constraints: Vec<TableConstraint>,
1456 if_not_exists: bool,
1457 format_encode: Option<FormatEncodeOptions>,
1458 source_watermarks: Vec<SourceWatermark>,
1459 append_only: bool,
1460 on_conflict: Option<OnConflict>,
1461 with_version_columns: Vec<String>,
1462 cdc_table_info: Option<CdcTableInfo>,
1463 include_column_options: IncludeOption,
1464 webhook_info: Option<WebhookSourceInfo>,
1465 ast_engine: risingwave_sqlparser::ast::Engine,
1466) -> Result<RwPgResponse> {
1467 let session = handler_args.session.clone();
1468
1469 if append_only {
1470 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1471 }
1472
1473 session.check_cluster_limits().await?;
1474
1475 let engine = match ast_engine {
1476 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1477 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1478 };
1479
1480 if let Either::Right(resp) = session.check_relation_name_duplicated(
1481 table_name.clone(),
1482 StatementType::CREATE_TABLE,
1483 if_not_exists,
1484 )? {
1485 return Ok(resp);
1486 }
1487
1488 let (graph, source, hummock_table, job_type, shared_source_id) = {
1489 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1490 handler_args.clone(),
1491 ExplainOptions::default(),
1492 format_encode,
1493 cdc_table_info,
1494 &table_name,
1495 column_defs.clone(),
1496 wildcard_idx,
1497 constraints.clone(),
1498 source_watermarks,
1499 append_only,
1500 on_conflict,
1501 with_version_columns,
1502 include_column_options,
1503 webhook_info,
1504 engine,
1505 )
1506 .await?;
1507 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1508
1509 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1510
1511 (graph, source, table, job_type, shared_source_id)
1512 };
1513
1514 tracing::trace!(
1515 "name={}, graph=\n{}",
1516 table_name,
1517 serde_json::to_string_pretty(&graph).unwrap()
1518 );
1519
1520 let dependencies = shared_source_id
1521 .map(|id| HashSet::from([id.as_object_id()]))
1522 .unwrap_or_default();
1523
1524 match engine {
1526 Engine::Hummock => {
1527 let catalog_writer = session.catalog_writer()?;
1528 let action = match job_type {
1529 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1530 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1531 };
1532 execute_with_long_running_notification(
1533 catalog_writer.create_table(
1534 source.map(|s| s.to_prost()),
1535 hummock_table.to_prost(),
1536 graph,
1537 job_type,
1538 if_not_exists,
1539 dependencies,
1540 ),
1541 &session,
1542 "CREATE TABLE",
1543 action,
1544 )
1545 .await?;
1546 }
1547 Engine::Iceberg => {
1548 let hummock_table_name = hummock_table.name.clone();
1549 session.create_staging_table(hummock_table.clone());
1550 let res = Box::pin(create_iceberg_engine_table(
1551 session.clone(),
1552 handler_args,
1553 source.map(|s| s.to_prost()),
1554 hummock_table,
1555 graph,
1556 table_name,
1557 job_type,
1558 if_not_exists,
1559 ))
1560 .await;
1561 session.drop_staging_table(&hummock_table_name);
1562 res?
1563 }
1564 }
1565
1566 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1567}
1568
1569#[allow(clippy::too_many_arguments)]
1578pub async fn create_iceberg_engine_table(
1579 session: Arc<SessionImpl>,
1580 handler_args: HandlerArgs,
1581 mut source: Option<PbSource>,
1582 table: TableCatalog,
1583 graph: StreamFragmentGraph,
1584 table_name: ObjectName,
1585 job_type: PbTableJobType,
1586 if_not_exists: bool,
1587) -> Result<()> {
1588 let rw_db_name = session
1589 .env()
1590 .catalog_reader()
1591 .read_guard()
1592 .get_database_by_id(table.database_id)?
1593 .name()
1594 .to_owned();
1595 let rw_schema_name = session
1596 .env()
1597 .catalog_reader()
1598 .read_guard()
1599 .get_schema_by_id(table.database_id, table.schema_id)?
1600 .name()
1601 .clone();
1602 let iceberg_catalog_name = rw_db_name.clone();
1603 let iceberg_database_name = rw_schema_name.clone();
1604 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1605
1606 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1607 let sink_decouple = session.config().sink_decouple();
1608 if matches!(sink_decouple, SinkDecouple::Disable) {
1609 bail!(
1610 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1611 );
1612 }
1613
1614 let mut connection_ref = BTreeMap::new();
1615 let with_common = if iceberg_engine_connection.is_empty() {
1616 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1617 } else {
1618 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1619 assert_eq!(parts.len(), 2);
1620 let connection_catalog =
1621 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1622 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1623 if params.connection_type == ConnectionType::Iceberg as i32 {
1624 connection_ref.insert(
1626 "connection".to_owned(),
1627 ConnectionRefValue {
1628 connection_name: ObjectName::from(vec![
1629 Ident::from(parts[0]),
1630 Ident::from(parts[1]),
1631 ]),
1632 },
1633 );
1634
1635 let mut with_common = BTreeMap::new();
1636 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1637 with_common.insert("database.name".to_owned(), iceberg_database_name);
1638 with_common.insert("table.name".to_owned(), iceberg_table_name);
1639
1640 let hosted_catalog = params
1641 .properties
1642 .get("hosted_catalog")
1643 .map(|s| s.eq_ignore_ascii_case("true"))
1644 .unwrap_or(false);
1645 if hosted_catalog {
1646 let meta_client = session.env().meta_client();
1647 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1648
1649 let meta_store_endpoint =
1650 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1651 ErrorCode::InternalError(
1652 "failed to parse the meta store endpoint".to_owned(),
1653 )
1654 })?;
1655 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1656 let meta_store_user = meta_store_endpoint.username().to_owned();
1657 let meta_store_password = match meta_store_endpoint.password() {
1658 Some(password) => percent_decode_str(password)
1659 .decode_utf8()
1660 .map_err(|_| {
1661 ErrorCode::InternalError(
1662 "failed to parse password from meta store endpoint".to_owned(),
1663 )
1664 })?
1665 .into_owned(),
1666 None => "".to_owned(),
1667 };
1668 let meta_store_host = meta_store_endpoint
1669 .host_str()
1670 .ok_or_else(|| {
1671 ErrorCode::InternalError(
1672 "failed to parse host from meta store endpoint".to_owned(),
1673 )
1674 })?
1675 .to_owned();
1676 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1677 ErrorCode::InternalError(
1678 "failed to parse port from meta store endpoint".to_owned(),
1679 )
1680 })?;
1681 let meta_store_database = meta_store_endpoint
1682 .path()
1683 .trim_start_matches('/')
1684 .to_owned();
1685
1686 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1687 bail!("failed to parse meta backend: {}", meta_store_backend);
1688 };
1689
1690 let catalog_uri = match meta_backend {
1691 MetaBackend::Postgres => {
1692 format!(
1693 "jdbc:postgresql://{}:{}/{}",
1694 meta_store_host, meta_store_port, meta_store_database
1695 )
1696 }
1697 MetaBackend::Mysql => {
1698 format!(
1699 "jdbc:mysql://{}:{}/{}",
1700 meta_store_host, meta_store_port, meta_store_database
1701 )
1702 }
1703 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1704 bail!(
1705 "Unsupported meta backend for iceberg engine table: {}",
1706 meta_store_backend
1707 );
1708 }
1709 };
1710
1711 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1712 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1713 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1714 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1715 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1716 }
1717
1718 with_common
1719 } else {
1720 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1721 "Only iceberg connection could be used in iceberg engine".to_owned(),
1722 )));
1723 }
1724 } else {
1725 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1726 "Private Link Service has been deprecated. Please create a new connection instead."
1727 .to_owned(),
1728 )));
1729 }
1730 };
1731
1732 let mut pks = table
1735 .pk_column_names()
1736 .iter()
1737 .map(|c| c.to_string())
1738 .collect::<Vec<String>>();
1739
1740 if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1742 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1743 }
1744
1745 let sink_from = CreateSink::From(table_name.clone());
1746
1747 let mut sink_name = table_name.clone();
1748 *sink_name.0.last_mut().unwrap() = Ident::from(
1749 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1750 );
1751 let create_sink_stmt = CreateSinkStatement {
1752 if_not_exists: false,
1753 sink_name,
1754 with_properties: WithProperties(vec![]),
1755 sink_from,
1756 columns: vec![],
1757 emit_mode: None,
1758 sink_schema: None,
1759 into_table_name: None,
1760 };
1761
1762 let mut sink_handler_args = handler_args.clone();
1763
1764 let mut sink_with = with_common.clone();
1765
1766 if !handler_args
1768 .with_options
1769 .get(ENABLE_COMPACTION)
1770 .is_some_and(|val| val.eq_ignore_ascii_case("true"))
1771 {
1772 sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1773 }
1774
1775 if table.append_only {
1776 sink_with.insert("type".to_owned(), "append-only".to_owned());
1777 } else {
1778 sink_with.insert("primary_key".to_owned(), pks.join(","));
1779 sink_with.insert("type".to_owned(), "upsert".to_owned());
1780 }
1781 let commit_checkpoint_interval = handler_args
1800 .with_options
1801 .get(COMMIT_CHECKPOINT_INTERVAL)
1802 .map(|v| v.to_owned())
1803 .unwrap_or_else(|| "60".to_owned());
1804 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1805 ErrorCode::InvalidInputSyntax(format!(
1806 "commit_checkpoint_interval must be greater than 0: {}",
1807 commit_checkpoint_interval
1808 ))
1809 })?;
1810
1811 if commit_checkpoint_interval == 0 {
1812 bail!("commit_checkpoint_interval must be greater than 0");
1813 }
1814
1815 source
1817 .as_mut()
1818 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1819
1820 let sink_decouple = session.config().sink_decouple();
1821 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1822 bail!(
1823 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1824 )
1825 }
1826
1827 sink_with.insert(
1828 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1829 commit_checkpoint_interval.to_string(),
1830 );
1831 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1832
1833 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1834
1835 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1836 match enable_compaction.to_lowercase().as_str() {
1837 "true" => {
1838 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1839 }
1840 "false" => {
1841 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1842 }
1843 _ => {
1844 return Err(ErrorCode::InvalidInputSyntax(format!(
1845 "enable_compaction must be true or false: {}",
1846 enable_compaction
1847 ))
1848 .into());
1849 }
1850 }
1851
1852 source
1854 .as_mut()
1855 .map(|x| x.with_properties.remove("enable_compaction"));
1856 } else {
1857 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1858 }
1859
1860 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1861 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1862 ErrorCode::InvalidInputSyntax(format!(
1863 "compaction_interval_sec must be greater than 0: {}",
1864 commit_checkpoint_interval
1865 ))
1866 })?;
1867 if compaction_interval_sec == 0 {
1868 bail!("compaction_interval_sec must be greater than 0");
1869 }
1870 sink_with.insert(
1871 "compaction_interval_sec".to_owned(),
1872 compaction_interval_sec.to_string(),
1873 );
1874 source
1876 .as_mut()
1877 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1878 }
1879
1880 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1881 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1882 {
1883 source
1885 .as_mut()
1886 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1887 match enable_snapshot_expiration.to_lowercase().as_str() {
1888 "true" => {
1889 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1890 true
1891 }
1892 "false" => {
1893 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1894 false
1895 }
1896 _ => {
1897 return Err(ErrorCode::InvalidInputSyntax(format!(
1898 "enable_snapshot_expiration must be true or false: {}",
1899 enable_snapshot_expiration
1900 ))
1901 .into());
1902 }
1903 }
1904 } else {
1905 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1906 true
1907 };
1908
1909 if has_enabled_snapshot_expiration {
1910 if let Some(snapshot_expiration_retain_last) = handler_args
1912 .with_options
1913 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1914 {
1915 sink_with.insert(
1916 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1917 snapshot_expiration_retain_last.to_owned(),
1918 );
1919 source
1921 .as_mut()
1922 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1923 }
1924
1925 if let Some(snapshot_expiration_max_age) = handler_args
1926 .with_options
1927 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1928 {
1929 sink_with.insert(
1930 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1931 snapshot_expiration_max_age.to_owned(),
1932 );
1933 source
1935 .as_mut()
1936 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1937 }
1938
1939 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1940 .with_options
1941 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1942 {
1943 sink_with.insert(
1944 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1945 snapshot_expiration_clear_expired_files.to_owned(),
1946 );
1947 source.as_mut().map(|x| {
1949 x.with_properties
1950 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1951 });
1952 }
1953
1954 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1955 .with_options
1956 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1957 {
1958 sink_with.insert(
1959 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1960 snapshot_expiration_clear_expired_meta_data.to_owned(),
1961 );
1962 source.as_mut().map(|x| {
1964 x.with_properties
1965 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1966 });
1967 }
1968 }
1969
1970 if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1971 let format_version = format_version.parse::<u8>().map_err(|_| {
1972 ErrorCode::InvalidInputSyntax(format!(
1973 "format_version must be 1, 2 or 3: {}",
1974 format_version
1975 ))
1976 })?;
1977 if format_version != 1 && format_version != 2 && format_version != 3 {
1978 bail!("format_version must be 1, 2 or 3");
1979 }
1980 sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
1981
1982 source
1984 .as_mut()
1985 .map(|x| x.with_properties.remove(FORMAT_VERSION));
1986 }
1987
1988 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1989 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1990 ErrorCode::InvalidInputSyntax(format!(
1991 "invalid write_mode: {}, must be one of: {}, {}",
1992 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1993 ))
1994 })?;
1995
1996 match write_mode {
1997 IcebergWriteMode::MergeOnRead => {
1998 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1999 }
2000
2001 IcebergWriteMode::CopyOnWrite => {
2002 if table.append_only {
2003 return Err(ErrorCode::NotSupported(
2004 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2005 "Please use MERGE ON READ instead".to_owned(),
2006 )
2007 .into());
2008 }
2009
2010 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2011 }
2012 }
2013
2014 source
2016 .as_mut()
2017 .map(|x| x.with_properties.remove("write_mode"));
2018 } else {
2019 sink_with.insert(
2020 WRITE_MODE.to_owned(),
2021 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2022 );
2023 }
2024
2025 if let Some(max_snapshots_num_before_compaction) =
2026 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2027 {
2028 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2029 .parse::<u32>()
2030 .map_err(|_| {
2031 ErrorCode::InvalidInputSyntax(format!(
2032 "{} must be greater than 0: {}",
2033 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2034 ))
2035 })?;
2036
2037 if max_snapshots_num_before_compaction == 0 {
2038 bail!(format!(
2039 "{} must be greater than 0",
2040 COMPACTION_MAX_SNAPSHOTS_NUM
2041 ));
2042 }
2043
2044 sink_with.insert(
2045 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2046 max_snapshots_num_before_compaction.to_string(),
2047 );
2048
2049 source
2051 .as_mut()
2052 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2053 }
2054
2055 if let Some(small_files_threshold_mb) = handler_args
2056 .with_options
2057 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2058 {
2059 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2060 ErrorCode::InvalidInputSyntax(format!(
2061 "{} must be greater than 0: {}",
2062 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2063 ))
2064 })?;
2065 if small_files_threshold_mb == 0 {
2066 bail!(format!(
2067 "{} must be a greater than 0",
2068 COMPACTION_SMALL_FILES_THRESHOLD_MB
2069 ));
2070 }
2071 sink_with.insert(
2072 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2073 small_files_threshold_mb.to_string(),
2074 );
2075
2076 source.as_mut().map(|x| {
2078 x.with_properties
2079 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2080 });
2081 }
2082
2083 if let Some(delete_files_count_threshold) = handler_args
2084 .with_options
2085 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2086 {
2087 let delete_files_count_threshold =
2088 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2089 ErrorCode::InvalidInputSyntax(format!(
2090 "{} must be greater than 0: {}",
2091 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2092 ))
2093 })?;
2094 if delete_files_count_threshold == 0 {
2095 bail!(format!(
2096 "{} must be greater than 0",
2097 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2098 ));
2099 }
2100 sink_with.insert(
2101 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2102 delete_files_count_threshold.to_string(),
2103 );
2104
2105 source.as_mut().map(|x| {
2107 x.with_properties
2108 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2109 });
2110 }
2111
2112 if let Some(trigger_snapshot_count) = handler_args
2113 .with_options
2114 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2115 {
2116 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2117 ErrorCode::InvalidInputSyntax(format!(
2118 "{} must be greater than 0: {}",
2119 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2120 ))
2121 })?;
2122 if trigger_snapshot_count == 0 {
2123 bail!(format!(
2124 "{} must be greater than 0",
2125 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2126 ));
2127 }
2128 sink_with.insert(
2129 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2130 trigger_snapshot_count.to_string(),
2131 );
2132
2133 source
2135 .as_mut()
2136 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2137 }
2138
2139 if let Some(target_file_size_mb) = handler_args
2140 .with_options
2141 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2142 {
2143 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2144 ErrorCode::InvalidInputSyntax(format!(
2145 "{} must be greater than 0: {}",
2146 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2147 ))
2148 })?;
2149 if target_file_size_mb == 0 {
2150 bail!(format!(
2151 "{} must be greater than 0",
2152 COMPACTION_TARGET_FILE_SIZE_MB
2153 ));
2154 }
2155 sink_with.insert(
2156 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2157 target_file_size_mb.to_string(),
2158 );
2159 source
2161 .as_mut()
2162 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2163 }
2164
2165 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2166 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2167 ErrorCode::InvalidInputSyntax(format!(
2168 "invalid compaction_type: {}, must be one of {:?}",
2169 compaction_type,
2170 &[
2171 CompactionType::Full,
2172 CompactionType::SmallFiles,
2173 CompactionType::FilesWithDelete
2174 ]
2175 ))
2176 })?;
2177
2178 sink_with.insert(
2179 COMPACTION_TYPE.to_owned(),
2180 compaction_type.as_str().to_owned(),
2181 );
2182
2183 source
2185 .as_mut()
2186 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2187 }
2188
2189 if let Some(write_parquet_compression) = handler_args
2190 .with_options
2191 .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2192 {
2193 sink_with.insert(
2194 COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2195 write_parquet_compression.to_owned(),
2196 );
2197 source.as_mut().map(|x| {
2199 x.with_properties
2200 .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2201 });
2202 }
2203
2204 if let Some(write_parquet_max_row_group_rows) = handler_args
2205 .with_options
2206 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2207 {
2208 let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2209 .parse::<usize>()
2210 .map_err(|_| {
2211 ErrorCode::InvalidInputSyntax(format!(
2212 "{} must be a positive integer: {}",
2213 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2214 ))
2215 })?;
2216 if write_parquet_max_row_group_rows == 0 {
2217 bail!(format!(
2218 "{} must be greater than 0",
2219 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2220 ));
2221 }
2222 sink_with.insert(
2223 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2224 write_parquet_max_row_group_rows.to_string(),
2225 );
2226 source.as_mut().map(|x| {
2228 x.with_properties
2229 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2230 });
2231 }
2232
2233 if let Some(write_parquet_max_row_group_bytes) = handler_args
2234 .with_options
2235 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2236 {
2237 let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2238 .parse::<usize>()
2239 .map_err(|_| {
2240 ErrorCode::InvalidInputSyntax(format!(
2241 "{} must be a positive integer: {}",
2242 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2243 ))
2244 })?;
2245 if write_parquet_max_row_group_bytes == 0 {
2246 bail!(format!(
2247 "{} must be greater than 0",
2248 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2249 ));
2250 }
2251 sink_with.insert(
2252 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2253 write_parquet_max_row_group_bytes.to_string(),
2254 );
2255 source.as_mut().map(|x| {
2256 x.with_properties
2257 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2258 });
2259 }
2260
2261 let partition_by = handler_args
2262 .with_options
2263 .get("partition_by")
2264 .map(|v| v.to_owned());
2265
2266 if let Some(partition_by) = &partition_by {
2267 let mut partition_columns = vec![];
2268 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2269 table
2270 .columns()
2271 .iter()
2272 .find(|col| col.name().eq_ignore_ascii_case(&column))
2273 .ok_or_else(|| {
2274 ErrorCode::InvalidInputSyntax(format!(
2275 "Partition source column does not exist in schema: {}",
2276 column
2277 ))
2278 })?;
2279
2280 partition_columns.push(column);
2281 }
2282
2283 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2284 |_| {
2285 ErrorCode::InvalidInputSyntax(
2286 "The partition columns should be the prefix of the primary key".to_owned(),
2287 )
2288 },
2289 )?;
2290
2291 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2292
2293 source
2295 .as_mut()
2296 .map(|x| x.with_properties.remove("partition_by"));
2297 }
2298
2299 let order_key = handler_args
2300 .with_options
2301 .get(ORDER_KEY)
2302 .map(|v| v.to_owned());
2303 if let Some(order_key) = &order_key {
2304 validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2305 .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2306
2307 sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2308
2309 source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2310 }
2311
2312 sink_handler_args.with_options =
2313 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2314 let SinkPlanContext {
2315 sink_plan,
2316 sink_catalog,
2317 ..
2318 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2319 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2320
2321 let mut source_name = table_name.clone();
2322 *source_name.0.last_mut().unwrap() = Ident::from(
2323 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2324 );
2325 let create_source_stmt = CreateSourceStatement {
2326 temporary: false,
2327 if_not_exists: false,
2328 columns: vec![],
2329 source_name,
2330 wildcard_idx: Some(0),
2331 constraints: vec![],
2332 with_properties: WithProperties(vec![]),
2333 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2334 source_watermarks: vec![],
2335 include_column_options: vec![],
2336 };
2337
2338 let mut source_handler_args = handler_args.clone();
2339 let source_with = with_common;
2340 source_handler_args.with_options =
2341 WithOptions::new(source_with, Default::default(), connection_ref);
2342
2343 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2344 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2345 let (with_properties, refresh_mode) =
2346 bind_connector_props(&source_handler_args, &format_encode, true)?;
2347
2348 let (iceberg_catalog, table_identifier) = {
2351 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2352 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2353 iceberg_sink.create_table_if_not_exists().await?;
2354
2355 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2356 let table_identifier = iceberg_sink.config.full_table_name()?;
2357 (iceberg_catalog, table_identifier)
2358 };
2359
2360 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2361 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2362 &session,
2363 &format_encode,
2364 Either::Left(&with_properties),
2365 create_source_type,
2366 )
2367 .await?;
2368 let mut col_id_gen = ColumnIdGenerator::new_initial();
2369
2370 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2371 source_handler_args,
2372 create_source_stmt.source_name,
2373 format_encode,
2374 with_properties,
2375 &create_source_stmt.columns,
2376 create_source_stmt.constraints,
2377 create_source_stmt.wildcard_idx,
2378 create_source_stmt.source_watermarks,
2379 columns_from_resolve_source,
2380 source_info,
2381 create_source_stmt.include_column_options,
2382 &mut col_id_gen,
2383 create_source_type,
2384 overwrite_options.source_rate_limit,
2385 SqlColumnStrategy::FollowChecked,
2386 refresh_mode,
2387 )
2388 .await?;
2389
2390 let _ = Jvm::get_or_init()?;
2393
2394 let catalog_writer = session.catalog_writer()?;
2395 let action = match job_type {
2396 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2397 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2398 };
2399 let res = execute_with_long_running_notification(
2400 catalog_writer.create_iceberg_table(
2401 PbTableJobInfo {
2402 source,
2403 table: Some(table.to_prost()),
2404 fragment_graph: Some(graph),
2405 job_type: job_type as _,
2406 },
2407 PbSinkJobInfo {
2408 sink: Some(sink_catalog.to_proto()),
2409 fragment_graph: Some(sink_graph),
2410 },
2411 iceberg_source_catalog.to_prost(),
2412 if_not_exists,
2413 ),
2414 &session,
2415 "CREATE TABLE",
2416 action,
2417 )
2418 .await;
2419
2420 if res.is_err() {
2421 let _ = iceberg_catalog
2422 .drop_table(&table_identifier)
2423 .await
2424 .inspect_err(|err| {
2425 tracing::error!(
2426 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2427 table_identifier,
2428 err.as_report()
2429 );
2430 });
2431 res?
2432 }
2433
2434 Ok(())
2435}
2436
2437pub fn check_create_table_with_source(
2438 with_options: &WithOptions,
2439 format_encode: Option<FormatEncodeOptions>,
2440 include_column_options: &IncludeOption,
2441 cdc_table_info: &Option<CdcTableInfo>,
2442) -> Result<Option<FormatEncodeOptions>> {
2443 if cdc_table_info.is_some() {
2445 return Ok(format_encode);
2446 }
2447 let defined_source = with_options.is_source_connector();
2448
2449 if !include_column_options.is_empty() && !defined_source {
2450 return Err(ErrorCode::InvalidInputSyntax(
2451 "INCLUDE should be used with a connector".to_owned(),
2452 )
2453 .into());
2454 }
2455 if defined_source {
2456 format_encode.as_ref().ok_or_else(|| {
2457 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2458 })?;
2459 }
2460 Ok(format_encode)
2461}
2462
2463fn ensure_partition_columns_are_prefix_of_primary_key(
2464 partition_columns: &[String],
2465 primary_key_columns: &[String],
2466) -> std::result::Result<(), String> {
2467 if partition_columns.len() > primary_key_columns.len() {
2468 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2469 }
2470
2471 for (i, partition_col) in partition_columns.iter().enumerate() {
2472 if primary_key_columns.get(i) != Some(partition_col) {
2473 return Err(format!(
2474 "Partition column '{}' is not a prefix of the primary key.",
2475 partition_col
2476 ));
2477 }
2478 }
2479
2480 Ok(())
2481}
2482
2483#[allow(clippy::too_many_arguments)]
2484pub async fn generate_stream_graph_for_replace_table(
2485 _session: &Arc<SessionImpl>,
2486 table_name: ObjectName,
2487 original_catalog: &Arc<TableCatalog>,
2488 handler_args: HandlerArgs,
2489 statement: Statement,
2490 col_id_gen: ColumnIdGenerator,
2491 sql_column_strategy: SqlColumnStrategy,
2492) -> Result<(
2493 StreamFragmentGraph,
2494 TableCatalog,
2495 Option<SourceCatalog>,
2496 TableJobType,
2497)> {
2498 let Statement::CreateTable {
2499 columns,
2500 constraints,
2501 source_watermarks,
2502 append_only,
2503 on_conflict,
2504 with_version_columns,
2505 wildcard_idx,
2506 cdc_table_info,
2507 format_encode,
2508 include_column_options,
2509 engine,
2510 with_options,
2511 ..
2512 } = statement
2513 else {
2514 panic!("unexpected statement type: {:?}", statement);
2515 };
2516
2517 let format_encode = format_encode
2518 .clone()
2519 .map(|format_encode| format_encode.into_v2_with_warning());
2520
2521 let engine = match engine {
2522 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2523 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2524 };
2525
2526 let is_drop_connector =
2527 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2528 if is_drop_connector {
2529 debug_assert!(
2530 source_watermarks.is_empty()
2531 && include_column_options.is_empty()
2532 && with_options
2533 .iter()
2534 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2535 );
2536 }
2537
2538 let props = CreateTableProps {
2539 definition: handler_args.normalized_sql.clone(),
2540 append_only,
2541 on_conflict: on_conflict.into(),
2542 with_version_columns: with_version_columns
2543 .iter()
2544 .map(|col| col.real_value())
2545 .collect(),
2546 webhook_info: original_catalog.webhook_info.clone(),
2547 engine,
2548 };
2549
2550 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2551 (Some(format_encode), None) => (
2552 gen_create_table_plan_with_source(
2553 handler_args,
2554 ExplainOptions::default(),
2555 table_name,
2556 columns,
2557 wildcard_idx,
2558 constraints,
2559 format_encode,
2560 source_watermarks,
2561 col_id_gen,
2562 include_column_options,
2563 props,
2564 sql_column_strategy,
2565 )
2566 .await?,
2567 TableJobType::General,
2568 ),
2569 (None, None) => {
2570 let context = OptimizerContext::from_handler_args(handler_args);
2571 let (plan, table) = gen_create_table_plan(
2572 context,
2573 table_name,
2574 columns,
2575 constraints,
2576 col_id_gen,
2577 source_watermarks,
2578 props,
2579 true,
2580 )?;
2581 ((plan, None, table), TableJobType::General)
2582 }
2583 (None, Some(cdc_table)) => {
2584 sanity_check_for_table_on_cdc_source(
2585 append_only,
2586 &columns,
2587 &wildcard_idx,
2588 &constraints,
2589 &source_watermarks,
2590 )?;
2591
2592 let session = &handler_args.session;
2593 let (source, resolved_table_name) =
2594 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2595
2596 let (cdc_with_options, normalized_external_table_name) =
2597 derive_with_options_for_cdc_table(
2598 &source.with_properties,
2599 cdc_table.external_table_name.clone(),
2600 )?;
2601
2602 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2603
2604 let context: OptimizerContextRef =
2605 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2606 let (plan, table) = gen_create_table_plan_for_cdc_table(
2607 context,
2608 source,
2609 normalized_external_table_name,
2610 columns,
2611 source_watermarks,
2612 column_catalogs,
2613 pk_names,
2614 cdc_with_options,
2615 col_id_gen,
2616 on_conflict,
2617 with_version_columns
2618 .iter()
2619 .map(|col| col.real_value())
2620 .collect(),
2621 include_column_options,
2622 table_name,
2623 resolved_table_name,
2624 original_catalog.database_id,
2625 original_catalog.schema_id,
2626 original_catalog.id(),
2627 engine,
2628 )?;
2629
2630 ((plan, None, table), TableJobType::SharedCdcSource)
2631 }
2632 (Some(_), Some(_)) => {
2633 return Err(ErrorCode::NotSupported(
2634 "Data format and encoding format doesn't apply to table created from a CDC source"
2635 .into(),
2636 "Remove the FORMAT and ENCODE specification".into(),
2637 )
2638 .into());
2639 }
2640 };
2641
2642 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2643 Err(ErrorCode::InvalidInputSyntax(
2644 "alter primary key of table is not supported".to_owned(),
2645 ))?
2646 }
2647
2648 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2649
2650 table.id = original_catalog.id();
2652 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2653 table.associated_source_id = Some(source_id);
2654
2655 let source = source.as_mut().unwrap();
2656 source.id = source_id;
2657 source.associated_table_id = Some(table.id());
2658 }
2659
2660 Ok((graph, table, source, job_type))
2661}
2662
2663fn get_source_and_resolved_table_name(
2664 session: &Arc<SessionImpl>,
2665 cdc_table: CdcTableInfo,
2666 table_name: ObjectName,
2667) -> Result<(Arc<SourceCatalog>, String)> {
2668 let db_name = &session.database();
2669 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2670
2671 let (source_schema, source_name) =
2672 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2673
2674 let source = {
2675 let catalog_reader = session.env().catalog_reader().read_guard();
2676 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2677 let (source, _) = catalog_reader.get_source_by_name(
2678 db_name,
2679 SchemaPath::Name(schema_name.as_str()),
2680 source_name.as_str(),
2681 )?;
2682 source.clone()
2683 };
2684
2685 Ok((source, resolved_table_name))
2686}
2687
2688fn bind_webhook_info(
2690 session: &Arc<SessionImpl>,
2691 column_defs: &[ColumnDef],
2692 webhook_info: WebhookSourceInfo,
2693) -> Result<PbWebhookSourceInfo> {
2694 let WebhookSourceInfo {
2695 secret_ref,
2696 signature_expr,
2697 wait_for_persistence,
2698 is_batched,
2699 } = webhook_info;
2700
2701 for column in column_defs {
2702 for option_def in &column.options {
2703 match option_def.option {
2704 ColumnOption::Null => {}
2705 ColumnOption::GeneratedColumns(_) => {
2706 return Err(ErrorCode::InvalidInputSyntax(
2707 "generated columns are not supported for webhook tables".to_owned(),
2708 )
2709 .into());
2710 }
2711 ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2712 return Err(ErrorCode::InvalidInputSyntax(
2713 "default values are not supported for webhook tables".to_owned(),
2714 )
2715 .into());
2716 }
2717 ColumnOption::NotNull
2718 | ColumnOption::Unique { .. }
2719 | ColumnOption::ForeignKey { .. }
2720 | ColumnOption::Check(_)
2721 | ColumnOption::DialectSpecific(_) => {
2722 return Err(ErrorCode::InvalidInputSyntax(
2723 "only NULL column option is supported for webhook tables".to_owned(),
2724 )
2725 .into());
2726 }
2727 }
2728 }
2729 }
2730
2731 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2733 let db_name = &session.database();
2734 let (schema_name, secret_name) =
2735 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2736 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2737 (
2738 Some(PbSecretRef {
2739 secret_id: secret_catalog.id,
2740 ref_as: match secret_ref.ref_as {
2741 SecretRefAsType::Text => PbRefAsType::Text,
2742 SecretRefAsType::File => PbRefAsType::File,
2743 }
2744 .into(),
2745 }),
2746 Some(secret_name),
2747 )
2748 } else {
2749 (None, None)
2750 };
2751
2752 let signature_expr = if let Some(signature_expr) = signature_expr {
2753 let payload_name = if column_defs.len() == 1
2754 && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2755 {
2756 column_defs[0].name.real_value()
2757 } else {
2758 WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2759 };
2760 let secure_compare_context = SecureCompareContext {
2761 payload_name,
2762 secret_name,
2763 };
2764 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2765 let expr = binder.bind_expr(&signature_expr)?;
2766
2767 if expr.as_function_call().is_none()
2769 || expr.as_function_call().unwrap().func_type()
2770 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2771 {
2772 return Err(ErrorCode::InvalidInputSyntax(
2773 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2774 )
2775 .into());
2776 }
2777
2778 Some(expr.to_expr_proto())
2779 } else {
2780 session.notice_to_user(
2781 "VALIDATE clause is strongly recommended for safety or production usages",
2782 );
2783 None
2784 };
2785
2786 let pb_webhook_info = PbWebhookSourceInfo {
2787 secret_ref: pb_secret_ref,
2788 signature_expr,
2789 wait_for_persistence,
2790 is_batched,
2791 };
2792
2793 Ok(pb_webhook_info)
2794}
2795
2796#[cfg(test)]
2797mod tests {
2798 use risingwave_common::catalog::{
2799 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2800 };
2801 use risingwave_common::types::{DataType, StructType};
2802
2803 use super::*;
2804 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2805
2806 #[tokio::test]
2807 async fn test_create_table_handler() {
2808 let sql =
2809 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2810 let frontend = LocalFrontend::new(Default::default()).await;
2811 frontend.run_sql(sql).await.unwrap();
2812
2813 let session = frontend.session_ref();
2814 let catalog_reader = session.env().catalog_reader().read_guard();
2815 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2816
2817 let (table, _) = catalog_reader
2819 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2820 .unwrap();
2821 assert_eq!(table.name(), "t");
2822
2823 let columns = table
2824 .columns
2825 .iter()
2826 .map(|col| (col.name(), col.data_type().clone()))
2827 .collect::<HashMap<&str, DataType>>();
2828
2829 let expected_columns = maplit::hashmap! {
2830 ROW_ID_COLUMN_NAME => DataType::Serial,
2831 "v1" => DataType::Int16,
2832 "v2" => StructType::new(
2833 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2834 )
2835 .with_ids([3, 4, 5].map(ColumnId::new))
2836 .into(),
2837 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2838 };
2839
2840 assert_eq!(columns, expected_columns, "{columns:#?}");
2841 }
2842
2843 #[tokio::test]
2844 async fn test_create_webhook_table_with_arbitrary_columns() {
2845 let frontend = LocalFrontend::new(Default::default()).await;
2846 frontend
2847 .run_sql("create schema ingest_schema;")
2848 .await
2849 .unwrap();
2850 frontend
2851 .run_sql(
2852 r#"
2853 create table ingest_schema.orders (
2854 id int,
2855 customer_name varchar,
2856 amount double precision,
2857 primary key (id)
2858 ) with (
2859 connector = 'webhook'
2860 ) validate as secure_compare(
2861 headers->>'x-rw-signature',
2862 'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2863 );
2864 "#,
2865 )
2866 .await
2867 .unwrap();
2868
2869 let session = frontend.session_ref();
2870 let catalog_reader = session.env().catalog_reader().read_guard();
2871 let (table, _) = catalog_reader
2872 .get_created_table_by_name(
2873 DEFAULT_DATABASE_NAME,
2874 SchemaPath::Name("ingest_schema"),
2875 "orders",
2876 )
2877 .unwrap();
2878
2879 assert!(table.webhook_info.is_some());
2880 assert_eq!(
2881 table
2882 .columns
2883 .iter()
2884 .filter(|column| column.can_dml())
2885 .count(),
2886 3
2887 );
2888 }
2889
2890 #[tokio::test]
2891 async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
2892 let frontend = LocalFrontend::new(Default::default()).await;
2893 frontend
2894 .run_sql(
2895 r#"
2896 create table webhook_single_column (
2897 body jsonb
2898 ) with (
2899 connector = 'webhook'
2900 ) validate as secure_compare(
2901 headers->>'x-rw-signature',
2902 'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
2903 );
2904 "#,
2905 )
2906 .await
2907 .unwrap();
2908 }
2909
2910 #[tokio::test]
2911 async fn test_create_webhook_table_with_generated_columns() {
2912 let frontend = LocalFrontend::new(Default::default()).await;
2913 let err = frontend
2914 .run_sql(
2915 r#"
2916 create table webhook_generated_columns (
2917 id int,
2918 amount double precision,
2919 amount_with_fee double precision as amount + 1.0
2920 ) with (
2921 connector = 'webhook'
2922 );
2923 "#,
2924 )
2925 .await
2926 .unwrap_err();
2927
2928 assert!(
2929 err.to_string()
2930 .contains("generated columns are not supported for webhook tables"),
2931 "{err:?}"
2932 );
2933 }
2934
2935 #[tokio::test]
2936 async fn test_create_webhook_table_with_default_value() {
2937 let frontend = LocalFrontend::new(Default::default()).await;
2938 let err = frontend
2939 .run_sql(
2940 r#"
2941 create table webhook_default_value (
2942 id int default 42,
2943 amount double precision
2944 ) with (
2945 connector = 'webhook'
2946 );
2947 "#,
2948 )
2949 .await
2950 .unwrap_err();
2951
2952 assert!(
2953 err.to_string()
2954 .contains("default values are not supported for webhook tables"),
2955 "{err:?}"
2956 );
2957 }
2958
2959 #[tokio::test]
2960 async fn test_create_webhook_table_with_not_null_option() {
2961 let frontend = LocalFrontend::new(Default::default()).await;
2962 let err = frontend
2963 .run_sql(
2964 r#"
2965 create table webhook_not_null (
2966 id int not null,
2967 amount double precision
2968 ) with (
2969 connector = 'webhook'
2970 );
2971 "#,
2972 )
2973 .await
2974 .unwrap_err();
2975
2976 assert!(
2977 err.to_string()
2978 .contains("only NULL column option is supported for webhook tables"),
2979 "{err:?}"
2980 );
2981 }
2982
2983 #[test]
2984 fn test_bind_primary_key() {
2985 for (sql, expected) in [
2988 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2989 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2990 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2991 (
2992 "create table t (v1 int primary key, v2 int primary key)",
2993 Err("multiple primary keys are not allowed"),
2994 ),
2995 (
2996 "create table t (v1 int primary key primary key, v2 int)",
2997 Err("multiple primary keys are not allowed"),
2998 ),
2999 (
3000 "create table t (v1 int, v2 int, primary key (v1))",
3001 Ok(&[1]),
3002 ),
3003 (
3004 "create table t (v1 int, primary key (v2), v2 int)",
3005 Ok(&[2]),
3006 ),
3007 (
3008 "create table t (primary key (v2, v1), v1 int, v2 int)",
3009 Ok(&[2, 1]),
3010 ),
3011 (
3012 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3013 Err("multiple primary keys are not allowed"),
3014 ),
3015 (
3016 "create table t (v1 int primary key, primary key (v1), v2 int)",
3017 Err("multiple primary keys are not allowed"),
3018 ),
3019 (
3020 "create table t (v1 int, primary key (V3), v2 int)",
3021 Err("column \"v3\" named in key does not exist"),
3022 ),
3023 ] {
3024 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3025 let risingwave_sqlparser::ast::Statement::CreateTable {
3026 columns: column_defs,
3027 constraints,
3028 ..
3029 } = ast.remove(0)
3030 else {
3031 panic!("test case should be create table")
3032 };
3033 let actual: Result<_> = (|| {
3034 let mut columns = bind_sql_columns(&column_defs, false)?;
3035 let mut col_id_gen = ColumnIdGenerator::new_initial();
3036 for c in &mut columns {
3037 col_id_gen.generate(c)?;
3038 }
3039
3040 let pk_names =
3041 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3042 let (_, pk_column_ids, _) =
3043 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3044 Ok(pk_column_ids)
3045 })();
3046 match (expected, actual) {
3047 (Ok(expected), Ok(actual)) => assert_eq!(
3048 expected.iter().copied().map(ColumnId::new).collect_vec(),
3049 actual,
3050 "sql: {sql}"
3051 ),
3052 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3053 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3054 (Err(expected), Err(actual)) => assert!(
3055 actual.to_string().contains(expected),
3056 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3057 ),
3058 }
3059 }
3060 }
3061
3062 #[tokio::test]
3063 async fn test_duplicate_props_options() {
3064 let proto_file = create_proto_file(PROTO_FILE_DATA);
3065 let sql = format!(
3066 r#"CREATE TABLE t
3067 WITH (
3068 connector = 'kinesis',
3069 aws.region='user_test_topic',
3070 endpoint='172.10.1.1:9090,172.10.1.2:9090',
3071 aws.credentials.access_key_id = 'your_access_key_1',
3072 aws.credentials.secret_access_key = 'your_secret_key_1'
3073 )
3074 FORMAT PLAIN ENCODE PROTOBUF (
3075 message = '.test.TestRecord',
3076 aws.credentials.access_key_id = 'your_access_key_2',
3077 aws.credentials.secret_access_key = 'your_secret_key_2',
3078 schema.location = 'file://{}',
3079 )"#,
3080 proto_file.path().to_str().unwrap()
3081 );
3082 let frontend = LocalFrontend::new(Default::default()).await;
3083 frontend.run_sql(sql).await.unwrap();
3084
3085 let session = frontend.session_ref();
3086 let catalog_reader = session.env().catalog_reader().read_guard();
3087 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3088
3089 let (source, _) = catalog_reader
3091 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3092 .unwrap();
3093 assert_eq!(source.name, "t");
3094
3095 assert_eq!(
3097 source
3098 .info
3099 .format_encode_options
3100 .get("aws.credentials.access_key_id")
3101 .unwrap(),
3102 "your_access_key_2"
3103 );
3104 assert_eq!(
3105 source
3106 .info
3107 .format_encode_options
3108 .get("aws.credentials.secret_access_key")
3109 .unwrap(),
3110 "your_secret_key_2"
3111 );
3112
3113 assert_eq!(
3115 source
3116 .with_properties
3117 .get("aws.credentials.access_key_id")
3118 .unwrap(),
3119 "your_access_key_1"
3120 );
3121 assert_eq!(
3122 source
3123 .with_properties
3124 .get("aws.credentials.secret_access_key")
3125 .unwrap(),
3126 "your_secret_key_1"
3127 );
3128
3129 assert!(!source.with_properties.contains_key("schema.location"));
3131 }
3132}