1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{
45 AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
46};
47use risingwave_pb::catalog::connection::Info as ConnectionInfo;
48use risingwave_pb::catalog::connection_params::ConnectionType;
49use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
50use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
51use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
52use risingwave_pb::plan_common::{
53 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
54};
55use risingwave_pb::secret::PbSecretRef;
56use risingwave_pb::secret::secret_ref::PbRefAsType;
57use risingwave_pb::stream_plan::StreamFragmentGraph;
58use risingwave_sqlparser::ast::{
59 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
60 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
61 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
62 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
63};
64use risingwave_sqlparser::parser::IncludeOption;
65use thiserror_ext::AsReport;
66
67use super::RwPgResponse;
68use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
69use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
70use crate::catalog::root_catalog::SchemaPath;
71use crate::catalog::source_catalog::SourceCatalog;
72use crate::catalog::table_catalog::TableVersion;
73use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
74use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
75use crate::expr::{Expr, ExprImpl, ExprRewriter};
76use crate::handler::HandlerArgs;
77use crate::handler::create_source::{
78 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
79 bind_source_watermark, handle_addition_columns,
80};
81use crate::handler::util::{
82 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
83};
84use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
85use crate::optimizer::plan_node::{
86 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
87};
88use crate::optimizer::property::{Order, RequiredDist};
89use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
90use crate::session::SessionImpl;
91use crate::session::current::notice_to_user;
92use crate::stream_fragmenter::{GraphJobType, build_graph};
93use crate::utils::OverwriteOptions;
94use crate::{Binder, Explain, TableCatalog, WithOptions};
95
96mod col_id_gen;
97pub use col_id_gen::*;
98use risingwave_connector::sink::SinkParam;
99use risingwave_connector::sink::iceberg::{
100 COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, COMPACTION_DELETE_FILES_COUNT_THRESHOLD,
101 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, COMPACTION_SMALL_FILES_THRESHOLD_MB,
102 COMPACTION_TARGET_FILE_SIZE_MB, COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE,
103 COMPACTION_WRITE_PARQUET_COMPRESSION, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES,
104 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, CompactionType, ENABLE_COMPACTION,
105 ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
106 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, ICEBERG_WRITE_MODE_COPY_ON_WRITE,
107 ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink, IcebergWriteMode, ORDER_KEY,
108 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
109 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
110 parse_partition_by_exprs, validate_order_key_columns,
111};
112use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
113
114use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
115
116fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
117 for option_def in &c.options {
118 match option_def.option {
119 ColumnOption::GeneratedColumns(_) => {}
120 ColumnOption::DefaultValue(_) => {}
121 ColumnOption::DefaultValueInternal { .. } => {}
122 ColumnOption::Unique { is_primary: true } => {}
123 ColumnOption::Null => {}
124 ColumnOption::NotNull => {}
125 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
126 }
127 }
128 Ok(())
129}
130
131pub fn bind_sql_columns(
135 column_defs: &[ColumnDef],
136 is_for_drop_table_connector: bool,
137) -> Result<Vec<ColumnCatalog>> {
138 let mut columns = Vec::with_capacity(column_defs.len());
139
140 for column in column_defs {
141 ensure_column_options_supported(column)?;
142 let ColumnDef {
146 name,
147 data_type,
148 collation,
149 options,
150 ..
151 } = column;
152
153 let data_type = data_type
154 .clone()
155 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
156 if let Some(collation) = collation {
157 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
163 bail_not_implemented!(
164 "Collate collation other than `C` or `POSIX` is not implemented"
165 );
166 }
167
168 match data_type {
169 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
170 _ => {
171 return Err(ErrorCode::NotSupported(
172 format!("{} is not a collatable data type", data_type),
173 "The only built-in collatable data types are `varchar`, please check your type".into(),
174 ).into());
175 }
176 }
177 }
178
179 if !is_for_drop_table_connector {
180 check_column_name_not_reserved(&name.real_value())?;
184 }
185
186 let nullable: bool = !options
187 .iter()
188 .any(|def| matches!(def.option, ColumnOption::NotNull));
189
190 columns.push(ColumnCatalog {
191 column_desc: ColumnDesc {
192 data_type: bind_data_type(&data_type)?,
193 column_id: ColumnId::placeholder(),
194 name: name.real_value(),
195 generated_or_default_column: None,
196 description: None,
197 additional_column: AdditionalColumn { column_type: None },
198 version: ColumnDescVersion::LATEST,
199 system_column: None,
200 nullable,
201 },
202 is_hidden: false,
203 });
204 }
205
206 Ok(columns)
207}
208
209fn check_generated_column_constraints(
210 column_name: &String,
211 column_id: ColumnId,
212 expr: &ExprImpl,
213 column_catalogs: &[ColumnCatalog],
214 generated_column_names: &[String],
215 pk_column_ids: &[ColumnId],
216) -> Result<()> {
217 let input_refs = expr.collect_input_refs(column_catalogs.len());
218 for idx in input_refs.ones() {
219 let referred_generated_column = &column_catalogs[idx].column_desc.name;
220 if generated_column_names
221 .iter()
222 .any(|c| c == referred_generated_column)
223 {
224 return Err(ErrorCode::BindError(format!(
225 "Generated can not reference another generated column. \
226 But here generated column \"{}\" referenced another generated column \"{}\"",
227 column_name, referred_generated_column
228 ))
229 .into());
230 }
231 }
232
233 if pk_column_ids.contains(&column_id) && expr.is_impure() {
234 return Err(ErrorCode::BindError(format!(
235 "Generated columns with impure expressions should not be part of the primary key. \
236 Here column \"{}\" is defined as part of the primary key.",
237 column_name
238 ))
239 .into());
240 }
241
242 Ok(())
243}
244
245pub fn bind_sql_column_constraints(
248 session: &SessionImpl,
249 table_name: String,
250 column_catalogs: &mut [ColumnCatalog],
251 columns: &[ColumnDef],
252 pk_column_ids: &[ColumnId],
253) -> Result<()> {
254 let generated_column_names = {
255 let mut names = vec![];
256 for column in columns {
257 for option_def in &column.options {
258 if let ColumnOption::GeneratedColumns(_) = option_def.option {
259 names.push(column.name.real_value());
260 break;
261 }
262 }
263 }
264 names
265 };
266
267 let mut binder = Binder::new_for_ddl(session);
268 binder.bind_columns_to_context(table_name, column_catalogs)?;
269
270 for column in columns {
271 let Some(idx) = column_catalogs
272 .iter()
273 .position(|c| c.name() == column.name.real_value())
274 else {
275 continue;
278 };
279
280 for option_def in &column.options {
281 match &option_def.option {
282 ColumnOption::GeneratedColumns(expr) => {
283 binder.set_clause(Some(Clause::GeneratedColumn));
284
285 let expr_impl = binder.bind_expr(expr).with_context(|| {
286 format!(
287 "fail to bind expression in generated column \"{}\"",
288 column.name.real_value()
289 )
290 })?;
291
292 check_generated_column_constraints(
293 &column.name.real_value(),
294 column_catalogs[idx].column_id(),
295 &expr_impl,
296 column_catalogs,
297 &generated_column_names,
298 pk_column_ids,
299 )?;
300
301 column_catalogs[idx].column_desc.generated_or_default_column = Some(
302 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
303 expr: Some(expr_impl.to_expr_proto()),
304 }),
305 );
306 binder.set_clause(None);
307 }
308 ColumnOption::DefaultValue(expr) => {
309 let expr_impl = binder
310 .bind_expr(expr)?
311 .cast_assign(column_catalogs[idx].data_type())?;
312
313 let rewritten_expr_impl = session
321 .pinned_snapshot()
322 .inline_now_proc_time()
323 .rewrite_expr(expr_impl.clone());
324
325 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
326 let snapshot_value = snapshot_value?;
327
328 column_catalogs[idx].column_desc.generated_or_default_column =
329 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
330 snapshot_value: Some(snapshot_value.to_protobuf()),
331 expr: Some(expr_impl.to_expr_proto()),
332 }));
334 } else {
335 return Err(ErrorCode::BindError(format!(
336 "Default expression used in column `{}` cannot be evaluated. \
337 Use generated columns instead if you mean to reference other columns.",
338 column.name
339 ))
340 .into());
341 }
342 }
343 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
344 if persisted.is_empty() {
349 bail_bind_error!(
350 "DEFAULT INTERNAL is only used for internal purposes, \
351 please specify a concrete default value"
352 );
353 }
354
355 let desc = DefaultColumnDesc::decode(&**persisted)
356 .expect("failed to decode persisted `DefaultColumnDesc`");
357
358 column_catalogs[idx].column_desc.generated_or_default_column =
359 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
360 }
361 _ => {}
362 }
363 }
364 }
365 Ok(())
366}
367
368pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
370 let mut pk_column_names = vec![];
371
372 for constraint in table_constraints {
373 match constraint {
374 TableConstraint::Unique {
375 name: _,
376 columns,
377 is_primary: true,
378 } => {
379 if !pk_column_names.is_empty() {
380 return Err(multiple_pk_definition_err());
381 }
382 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
383 }
384 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
385 }
386 }
387 Ok(pk_column_names)
388}
389
390pub fn bind_sql_pk_names(
391 columns_defs: &[ColumnDef],
392 pk_names_from_table_constraints: Vec<String>,
393) -> Result<Vec<String>> {
394 let mut pk_column_names = pk_names_from_table_constraints;
395
396 for column in columns_defs {
397 for option_def in &column.options {
398 if let ColumnOption::Unique { is_primary: true } = option_def.option {
399 if !pk_column_names.is_empty() {
400 return Err(multiple_pk_definition_err());
401 }
402 pk_column_names.push(column.name.real_value());
403 };
404 }
405 }
406
407 Ok(pk_column_names)
408}
409
410fn multiple_pk_definition_err() -> RwError {
411 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
412}
413
414pub fn bind_pk_and_row_id_on_relation(
419 mut columns: Vec<ColumnCatalog>,
420 pk_names: Vec<String>,
421 must_need_pk: bool,
422) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
423 for c in &columns {
424 assert!(c.column_id() != ColumnId::placeholder());
425 }
426
427 let name_to_id = columns
429 .iter()
430 .map(|c| (c.name(), c.column_id()))
431 .collect::<HashMap<_, _>>();
432
433 let mut pk_column_ids: Vec<_> = pk_names
434 .iter()
435 .map(|name| {
436 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
437 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
438 })
439 })
440 .try_collect()?;
441
442 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
444
445 let row_id_index = need_row_id.then(|| {
446 let column = ColumnCatalog::row_id_column();
447 let index = columns.len();
448 pk_column_ids = vec![column.column_id()];
449 columns.push(column);
450 index
451 });
452
453 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
454 Err(ErrorCode::InvalidInputSyntax(format!(
455 "column \"{col}\" specified more than once"
456 )))?;
457 }
458
459 Ok((columns, pk_column_ids, row_id_index))
460}
461
462#[allow(clippy::too_many_arguments)]
465pub(crate) async fn gen_create_table_plan_with_source(
466 mut handler_args: HandlerArgs,
467 explain_options: ExplainOptions,
468 table_name: ObjectName,
469 column_defs: Vec<ColumnDef>,
470 wildcard_idx: Option<usize>,
471 constraints: Vec<TableConstraint>,
472 format_encode: FormatEncodeOptions,
473 source_watermarks: Vec<SourceWatermark>,
474 mut col_id_gen: ColumnIdGenerator,
475 include_column_options: IncludeOption,
476 props: CreateTableProps,
477 sql_column_strategy: SqlColumnStrategy,
478) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
479 if props.append_only
480 && format_encode.format != Format::Plain
481 && format_encode.format != Format::Native
482 {
483 return Err(ErrorCode::BindError(format!(
484 "Append only table does not support format {}.",
485 format_encode.format
486 ))
487 .into());
488 }
489
490 let session = &handler_args.session;
491 let (with_properties, refresh_mode) =
492 bind_connector_props(&handler_args, &format_encode, false)?;
493 if with_properties.is_shareable_cdc_connector() {
494 generated_columns_check_for_cdc_table(&column_defs)?;
495 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
496 } else if column_defs.iter().any(|col| {
497 col.options
498 .iter()
499 .any(|def| matches!(def.option, ColumnOption::NotNull))
500 }) {
501 notice_to_user(
503 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
504 );
505 }
506
507 let db_name: &str = &session.database();
508 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
509
510 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
512 session,
513 &format_encode,
514 Either::Left(&with_properties),
515 CreateSourceType::Table,
516 )
517 .await?;
518
519 let overwrite_options = OverwriteOptions::new(&mut handler_args);
520 let rate_limit = overwrite_options.source_rate_limit;
521 let source = bind_create_source_or_table_with_connector(
522 handler_args.clone(),
523 table_name,
524 format_encode,
525 with_properties,
526 &column_defs,
527 constraints,
528 wildcard_idx,
529 source_watermarks,
530 columns_from_resolve_source,
531 source_info,
532 include_column_options,
533 &mut col_id_gen,
534 CreateSourceType::Table,
535 rate_limit,
536 sql_column_strategy,
537 refresh_mode,
538 )
539 .await?;
540
541 let context = OptimizerContext::new(handler_args, explain_options);
542
543 let (plan, table) = gen_table_plan_with_source(
544 context.into(),
545 schema_name,
546 source.clone(),
547 col_id_gen.into_version(),
548 props,
549 )?;
550
551 Ok((plan, Some(source), table))
552}
553
554#[allow(clippy::too_many_arguments)]
557pub(crate) fn gen_create_table_plan(
558 context: OptimizerContext,
559 table_name: ObjectName,
560 column_defs: Vec<ColumnDef>,
561 constraints: Vec<TableConstraint>,
562 mut col_id_gen: ColumnIdGenerator,
563 source_watermarks: Vec<SourceWatermark>,
564 props: CreateTableProps,
565 is_for_replace_plan: bool,
566) -> Result<(PlanRef, TableCatalog)> {
567 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
568 for c in &mut columns {
569 col_id_gen.generate(c)?;
570 }
571
572 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
573 if !secret_refs.is_empty() || !connection_refs.is_empty() {
574 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
575 }
576
577 gen_create_table_plan_without_source(
578 context,
579 table_name,
580 columns,
581 column_defs,
582 constraints,
583 source_watermarks,
584 col_id_gen.into_version(),
585 props,
586 )
587}
588
589#[allow(clippy::too_many_arguments)]
590pub(crate) fn gen_create_table_plan_without_source(
591 context: OptimizerContext,
592 table_name: ObjectName,
593 columns: Vec<ColumnCatalog>,
594 column_defs: Vec<ColumnDef>,
595 constraints: Vec<TableConstraint>,
596 source_watermarks: Vec<SourceWatermark>,
597 version: TableVersion,
598 props: CreateTableProps,
599) -> Result<(PlanRef, TableCatalog)> {
600 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
602 let (mut columns, pk_column_ids, row_id_index) =
603 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
604
605 let watermark_descs = bind_source_watermark(
606 context.session_ctx(),
607 table_name.real_value(),
608 source_watermarks,
609 &columns,
610 )?;
611
612 bind_sql_column_constraints(
613 context.session_ctx(),
614 table_name.real_value(),
615 &mut columns,
616 &column_defs,
617 &pk_column_ids,
618 )?;
619 let session = context.session_ctx().clone();
620
621 let db_name = &session.database();
622 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
623
624 let info = CreateTableInfo {
625 columns,
626 pk_column_ids,
627 row_id_index,
628 watermark_descs,
629 source_catalog: None,
630 version,
631 };
632
633 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
634}
635
636fn gen_table_plan_with_source(
637 context: OptimizerContextRef,
638 schema_name: Option<String>,
639 source_catalog: SourceCatalog,
640 version: TableVersion,
641 props: CreateTableProps,
642) -> Result<(PlanRef, TableCatalog)> {
643 let table_name = source_catalog.name.clone();
644
645 let info = CreateTableInfo {
646 columns: source_catalog.columns.clone(),
647 pk_column_ids: source_catalog.pk_col_ids.clone(),
648 row_id_index: source_catalog.row_id_index,
649 watermark_descs: source_catalog.watermark_descs.clone(),
650 source_catalog: Some(source_catalog),
651 version,
652 };
653
654 gen_table_plan_inner(context, schema_name, table_name, info, props)
655}
656
657#[derive(Clone, Copy)]
659pub enum EitherOnConflict {
660 Ast(Option<OnConflict>),
661 Resolved(ConflictBehavior),
662}
663
664impl From<Option<OnConflict>> for EitherOnConflict {
665 fn from(v: Option<OnConflict>) -> Self {
666 Self::Ast(v)
667 }
668}
669
670impl From<ConflictBehavior> for EitherOnConflict {
671 fn from(v: ConflictBehavior) -> Self {
672 Self::Resolved(v)
673 }
674}
675
676impl EitherOnConflict {
677 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
679 let conflict_behavior = match self {
680 EitherOnConflict::Ast(on_conflict) => {
681 if append_only {
682 if row_id_as_pk {
683 ConflictBehavior::NoCheck
685 } else {
686 if let Some(on_conflict) = on_conflict
688 && on_conflict != OnConflict::Nothing
689 {
690 return Err(ErrorCode::InvalidInputSyntax(
691 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
692 the ON CONFLICT behavior must be DO NOTHING."
693 .to_owned(),
694 )
695 .into());
696 }
697 ConflictBehavior::IgnoreConflict
698 }
699 } else {
700 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
702 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
703 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
704 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
705 }
706 }
707 }
708 EitherOnConflict::Resolved(b) => b,
709 };
710
711 Ok(conflict_behavior)
712 }
713}
714
715pub struct CreateTableInfo {
720 pub columns: Vec<ColumnCatalog>,
721 pub pk_column_ids: Vec<ColumnId>,
722 pub row_id_index: Option<usize>,
723 pub watermark_descs: Vec<WatermarkDesc>,
724 pub source_catalog: Option<SourceCatalog>,
725 pub version: TableVersion,
726}
727
728pub struct CreateTableProps {
733 pub definition: String,
734 pub append_only: bool,
735 pub on_conflict: EitherOnConflict,
736 pub with_version_columns: Vec<String>,
737 pub webhook_info: Option<PbWebhookSourceInfo>,
738 pub engine: Engine,
739}
740
741#[allow(clippy::too_many_arguments)]
742fn gen_table_plan_inner(
743 context: OptimizerContextRef,
744 schema_name: Option<String>,
745 table_name: String,
746 info: CreateTableInfo,
747 props: CreateTableProps,
748) -> Result<(PlanRef, TableCatalog)> {
749 let CreateTableInfo {
750 ref columns,
751 row_id_index,
752 ref watermark_descs,
753 ref source_catalog,
754 ..
755 } = info;
756 let CreateTableProps { append_only, .. } = props;
757
758 let (database_id, schema_id) = context
759 .session_ctx()
760 .get_database_and_schema_id_for_create(schema_name)?;
761
762 let session = context.session_ctx().clone();
763 let retention_seconds = context.with_options().retention_seconds();
764
765 let source_node: LogicalPlanRef = LogicalSource::new(
766 source_catalog.clone().map(Rc::new),
767 columns.clone(),
768 row_id_index,
769 SourceNodeKind::CreateTable,
770 context.clone(),
771 None,
772 )?
773 .into();
774
775 let required_cols = FixedBitSet::with_capacity(columns.len());
776 let plan_root = PlanRoot::new_with_logical_plan(
777 source_node,
778 RequiredDist::Any,
779 Order::any(),
780 required_cols,
781 vec![],
782 );
783
784 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
785
786 if !append_only && has_non_ttl_watermark {
787 return Err(ErrorCode::NotSupported(
788 "Defining watermarks on table requires the table to be append only.".to_owned(),
789 "Use the key words `APPEND ONLY`".to_owned(),
790 )
791 .into());
792 }
793
794 if !append_only && retention_seconds.is_some() {
795 if session
796 .config()
797 .unsafe_enable_storage_retention_for_non_append_only_tables()
798 {
799 tracing::warn!(
800 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
801 table_name
802 );
803 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
804 This may lead to stream inconsistency and unrecoverable \
805 node failure if there is any row INSERT/UPDATE/DELETE operation \
806 corresponding to the TTLed primary keys";
807 session.notice_to_user(NOTICE);
808 } else {
809 return Err(ErrorCode::NotSupported(
810 "Defining retention seconds on table requires the table to be append only."
811 .to_owned(),
812 "Use the key words `APPEND ONLY`".to_owned(),
813 )
814 .into());
815 }
816 }
817
818 let materialize =
819 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
820
821 let mut table = materialize.table().clone();
822 table.owner = session.user_id();
823
824 Ok((materialize.into(), table))
825}
826
827#[allow(clippy::too_many_arguments)]
831pub(crate) fn gen_create_table_plan_for_cdc_table(
832 context: OptimizerContextRef,
833 source: Arc<SourceCatalog>,
834 external_table_name: String,
835 column_defs: Vec<ColumnDef>,
836 source_watermarks: Vec<SourceWatermark>,
837 mut columns: Vec<ColumnCatalog>,
838 pk_names: Vec<String>,
839 cdc_with_options: WithOptionsSecResolved,
840 mut col_id_gen: ColumnIdGenerator,
841 on_conflict: Option<OnConflict>,
842 with_version_columns: Vec<String>,
843 include_column_options: IncludeOption,
844 table_name: ObjectName,
845 resolved_table_name: String, database_id: DatabaseId,
847 schema_id: SchemaId,
848 table_id: TableId,
849 engine: Engine,
850) -> Result<(PlanRef, TableCatalog)> {
851 let session = context.session_ctx().clone();
852
853 handle_addition_columns(
855 None,
856 &cdc_with_options,
857 include_column_options,
858 &mut columns,
859 true,
860 )?;
861
862 for c in &mut columns {
863 col_id_gen.generate(c)?;
864 }
865
866 let (mut columns, pk_column_ids, _row_id_index) =
867 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
868
869 let watermark_descs = bind_source_watermark(
870 context.session_ctx(),
871 table_name.real_value(),
872 source_watermarks,
873 &columns,
874 )?;
875
876 bind_sql_column_constraints(
878 context.session_ctx(),
879 table_name.real_value(),
880 &mut columns,
881 &column_defs,
882 &pk_column_ids,
883 )?;
884
885 let definition = context.normalized_sql().to_owned();
886
887 let pk_column_indices = {
888 let mut id_to_idx = HashMap::new();
889 columns.iter().enumerate().for_each(|(idx, c)| {
890 id_to_idx.insert(c.column_id(), idx);
891 });
892 pk_column_ids
894 .iter()
895 .map(|c| id_to_idx.get(c).copied().unwrap())
896 .collect_vec()
897 };
898 let table_pk = pk_column_indices
899 .iter()
900 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
901 .collect();
902
903 let (options, secret_refs) = cdc_with_options.into_parts();
904
905 let non_generated_column_descs = columns
906 .iter()
907 .filter(|&c| !c.is_generated())
908 .map(|c| c.column_desc.clone())
909 .collect_vec();
910 let non_generated_column_num = non_generated_column_descs.len();
911 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
912 let cdc_table_desc = CdcTableDesc {
913 table_id,
914 source_id: source.id, external_table_name: external_table_name.clone(),
916 pk: table_pk,
917 columns: non_generated_column_descs,
918 stream_key: pk_column_indices,
919 connect_properties: options,
920 secret_refs,
921 };
922
923 tracing::debug!(?cdc_table_desc, "create cdc table");
924 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
925
926 let logical_scan = LogicalCdcScan::create(
927 external_table_name.clone(),
928 Rc::new(cdc_table_desc),
929 context.clone(),
930 options,
931 );
932
933 let scan_node: LogicalPlanRef = logical_scan.into();
934 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
935 let plan_root = PlanRoot::new_with_logical_plan(
936 scan_node,
937 RequiredDist::Any,
938 Order::any(),
939 required_cols,
940 vec![],
941 );
942
943 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
944 let materialize = plan_root.gen_table_plan(
945 context,
946 resolved_table_name,
947 database_id,
948 schema_id,
949 CreateTableInfo {
950 columns,
951 pk_column_ids,
952 row_id_index: None,
953 watermark_descs,
954 source_catalog: Some((*source).clone()),
955 version: col_id_gen.into_version(),
956 },
957 CreateTableProps {
958 definition,
959 append_only: false,
960 on_conflict: on_conflict.into(),
961 with_version_columns,
962 webhook_info: None,
963 engine,
964 },
965 )?;
966
967 let mut table = materialize.table().clone();
968 table.owner = session.user_id();
969 table.cdc_table_id = Some(cdc_table_id);
970 table.cdc_table_type = Some(cdc_table_type);
971 Ok((materialize.into(), table))
972}
973
974fn derive_with_options_for_cdc_table(
982 source_with_properties: &WithOptionsSecResolved,
983 external_table_name: String,
984) -> Result<(WithOptionsSecResolved, String)> {
985 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
986 let source_database_name: &str = source_with_properties
988 .get("database.name")
989 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
990 .as_str();
991 let mut with_options = source_with_properties.clone();
992 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
993 match connector.as_str() {
994 MYSQL_CDC_CONNECTOR => {
995 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
998 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
999 })?;
1000 if !source_database_name
1002 .split(',')
1003 .map(|s| s.trim())
1004 .any(|name| name == db_name)
1005 {
1006 return Err(anyhow!(
1007 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1008 db_name,
1009 source_database_name
1010 ).into());
1011 }
1012 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1013 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1014 return Ok((with_options, external_table_name));
1016 }
1017 POSTGRES_CDC_CONNECTOR => {
1018 let (schema_name, table_name) = external_table_name
1019 .split_once('.')
1020 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1021
1022 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1024 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1025 return Ok((with_options, external_table_name));
1027 }
1028 SQL_SERVER_CDC_CONNECTOR => {
1029 let parts: Vec<&str> = external_table_name.split('.').collect();
1037 let (schema_name, table_name) = match parts.len() {
1038 3 => {
1039 let db_name = parts[0];
1042 let schema_name = parts[1];
1043 let table_name = parts[2];
1044
1045 if db_name != source_database_name {
1046 return Err(anyhow!(
1047 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1048 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1049 db_name,
1050 source_database_name
1051 ).into());
1052 }
1053 (schema_name, table_name)
1054 }
1055 2 => {
1056 let schema_name = parts[0];
1059 let table_name = parts[1];
1060 (schema_name, table_name)
1061 }
1062 1 => {
1063 return Err(anyhow!(
1066 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1067 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1068 external_table_name,
1069 external_table_name,
1070 source_database_name,
1071 external_table_name
1072 ).into());
1073 }
1074 _ => {
1075 return Err(anyhow!(
1077 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1078 external_table_name
1079 ).into());
1080 }
1081 };
1082
1083 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1085 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1086
1087 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1090 return Ok((with_options, normalized_external_table_name));
1091 }
1092 _ => {
1093 return Err(RwError::from(anyhow!(
1094 "connector {} is not supported for cdc table",
1095 connector
1096 )));
1097 }
1098 };
1099 }
1100 unreachable!("All valid CDC connectors should have returned by now")
1101}
1102
1103#[allow(clippy::too_many_arguments)]
1104pub(super) async fn handle_create_table_plan(
1105 handler_args: HandlerArgs,
1106 explain_options: ExplainOptions,
1107 format_encode: Option<FormatEncodeOptions>,
1108 cdc_table_info: Option<CdcTableInfo>,
1109 table_name: &ObjectName,
1110 column_defs: Vec<ColumnDef>,
1111 wildcard_idx: Option<usize>,
1112 constraints: Vec<TableConstraint>,
1113 source_watermarks: Vec<SourceWatermark>,
1114 append_only: bool,
1115 on_conflict: Option<OnConflict>,
1116 with_version_columns: Vec<String>,
1117 include_column_options: IncludeOption,
1118 webhook_info: Option<WebhookSourceInfo>,
1119 engine: Engine,
1120) -> Result<(
1121 PlanRef,
1122 Option<SourceCatalog>,
1123 TableCatalog,
1124 TableJobType,
1125 Option<SourceId>,
1126)> {
1127 let col_id_gen = ColumnIdGenerator::new_initial();
1128 let format_encode = check_create_table_with_source(
1129 &handler_args.with_options,
1130 format_encode,
1131 &include_column_options,
1132 &cdc_table_info,
1133 )?;
1134 let webhook_info = webhook_info
1135 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1136 .transpose()?;
1137
1138 let props = CreateTableProps {
1139 definition: handler_args.normalized_sql.clone(),
1140 append_only,
1141 on_conflict: on_conflict.into(),
1142 with_version_columns: with_version_columns.clone(),
1143 webhook_info,
1144 engine,
1145 };
1146
1147 let ((plan, source, table), job_type, shared_shource_id) = match (
1148 format_encode,
1149 cdc_table_info.as_ref(),
1150 ) {
1151 (Some(format_encode), None) => (
1152 gen_create_table_plan_with_source(
1153 handler_args,
1154 explain_options,
1155 table_name.clone(),
1156 column_defs,
1157 wildcard_idx,
1158 constraints,
1159 format_encode,
1160 source_watermarks,
1161 col_id_gen,
1162 include_column_options,
1163 props,
1164 SqlColumnStrategy::FollowChecked,
1165 )
1166 .await?,
1167 TableJobType::General,
1168 None,
1169 ),
1170 (None, None) => {
1171 let context = OptimizerContext::new(handler_args, explain_options);
1172 let (plan, table) = gen_create_table_plan(
1173 context,
1174 table_name.clone(),
1175 column_defs,
1176 constraints,
1177 col_id_gen,
1178 source_watermarks,
1179 props,
1180 false,
1181 )?;
1182
1183 ((plan, None, table), TableJobType::General, None)
1184 }
1185
1186 (None, Some(cdc_table)) => {
1187 sanity_check_for_table_on_cdc_source(
1188 append_only,
1189 &column_defs,
1190 &wildcard_idx,
1191 &constraints,
1192 &source_watermarks,
1193 )?;
1194
1195 generated_columns_check_for_cdc_table(&column_defs)?;
1196 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1197
1198 let session = &handler_args.session;
1199 let db_name = &session.database();
1200 let user_name = &session.user_name();
1201 let search_path = session.config().search_path();
1202 let (schema_name, resolved_table_name) =
1203 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1204 let (database_id, schema_id) =
1205 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1206
1207 let (source_schema, source_name) =
1209 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1210
1211 let source = {
1212 let catalog_reader = session.env().catalog_reader().read_guard();
1213 let schema_path =
1214 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1215
1216 let (source, _) = catalog_reader.get_source_by_name(
1217 db_name,
1218 schema_path,
1219 source_name.as_str(),
1220 )?;
1221 source.clone()
1222 };
1223 let (cdc_with_options, normalized_external_table_name) =
1224 derive_with_options_for_cdc_table(
1225 &source.with_properties,
1226 cdc_table.external_table_name.clone(),
1227 )?;
1228
1229 let (columns, pk_names) = match wildcard_idx {
1230 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1231 None => {
1232 for column_def in &column_defs {
1233 for option_def in &column_def.options {
1234 if let ColumnOption::DefaultValue(_)
1235 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1236 {
1237 return Err(ErrorCode::NotSupported(
1238 "Default value for columns defined on the table created from a CDC source".into(),
1239 "Remove the default value expression in the column definitions".into(),
1240 )
1241 .into());
1242 }
1243 }
1244 }
1245
1246 let (columns, pk_names) =
1247 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1248 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1250 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1251 .context("failed to extract external table config")?;
1252
1253 (columns, pk_names)
1254 }
1255 };
1256
1257 let context: OptimizerContextRef =
1258 OptimizerContext::new(handler_args, explain_options).into();
1259 let shared_source_id = source.id;
1260 let (plan, table) = gen_create_table_plan_for_cdc_table(
1261 context,
1262 source,
1263 normalized_external_table_name,
1264 column_defs,
1265 source_watermarks,
1266 columns,
1267 pk_names,
1268 cdc_with_options,
1269 col_id_gen,
1270 on_conflict,
1271 with_version_columns,
1272 include_column_options,
1273 table_name.clone(),
1274 resolved_table_name,
1275 database_id,
1276 schema_id,
1277 TableId::placeholder(),
1278 engine,
1279 )?;
1280
1281 (
1282 (plan, None, table),
1283 TableJobType::SharedCdcSource,
1284 Some(shared_source_id),
1285 )
1286 }
1287 (Some(_), Some(_)) => {
1288 return Err(ErrorCode::NotSupported(
1289 "Data format and encoding format doesn't apply to table created from a CDC source"
1290 .into(),
1291 "Remove the FORMAT and ENCODE specification".into(),
1292 )
1293 .into());
1294 }
1295 };
1296 Ok((plan, source, table, job_type, shared_shource_id))
1297}
1298
1299fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1301 let mut found_generated_column = false;
1302 for column in columns {
1303 let mut is_generated = false;
1304
1305 for option_def in &column.options {
1306 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1307 is_generated = true;
1308 break;
1309 }
1310 }
1311
1312 if is_generated {
1313 found_generated_column = true;
1314 } else if found_generated_column {
1315 return Err(ErrorCode::NotSupported(
1316 "Non-generated column found after a generated column.".into(),
1317 "Ensure that all generated columns appear at the end of the cdc table definition."
1318 .into(),
1319 )
1320 .into());
1321 }
1322 }
1323 Ok(())
1324}
1325
1326fn not_null_check_for_cdc_table(
1328 wildcard_idx: &Option<usize>,
1329 column_defs: &Vec<ColumnDef>,
1330) -> Result<()> {
1331 if !wildcard_idx.is_some()
1332 && column_defs.iter().any(|col| {
1333 col.options
1334 .iter()
1335 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1336 })
1337 {
1338 return Err(ErrorCode::NotSupported(
1339 "CDC table with NOT NULL constraint is not supported".to_owned(),
1340 "Please remove the NOT NULL constraint for columns".to_owned(),
1341 )
1342 .into());
1343 }
1344 Ok(())
1345}
1346
1347fn sanity_check_for_table_on_cdc_source(
1349 append_only: bool,
1350 column_defs: &Vec<ColumnDef>,
1351 wildcard_idx: &Option<usize>,
1352 constraints: &Vec<TableConstraint>,
1353 source_watermarks: &Vec<SourceWatermark>,
1354) -> Result<()> {
1355 if wildcard_idx.is_some() && !column_defs.is_empty() {
1357 return Err(ErrorCode::NotSupported(
1358 "wildcard(*) and column definitions cannot be used together".to_owned(),
1359 "Remove the wildcard or column definitions".to_owned(),
1360 )
1361 .into());
1362 }
1363
1364 if !wildcard_idx.is_some()
1366 && !constraints.iter().any(|c| {
1367 matches!(
1368 c,
1369 TableConstraint::Unique {
1370 is_primary: true,
1371 ..
1372 }
1373 )
1374 })
1375 && !column_defs.iter().any(|col| {
1376 col.options
1377 .iter()
1378 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1379 })
1380 {
1381 return Err(ErrorCode::NotSupported(
1382 "CDC table without primary key constraint is not supported".to_owned(),
1383 "Please define a primary key".to_owned(),
1384 )
1385 .into());
1386 }
1387
1388 if append_only {
1389 return Err(ErrorCode::NotSupported(
1390 "append only modifier on the table created from a CDC source".into(),
1391 "Remove the APPEND ONLY clause".into(),
1392 )
1393 .into());
1394 }
1395
1396 if !source_watermarks.is_empty()
1397 && source_watermarks
1398 .iter()
1399 .any(|watermark| !watermark.with_ttl)
1400 {
1401 return Err(ErrorCode::NotSupported(
1402 "non-TTL watermark defined on the table created from a CDC source".into(),
1403 "Use `WATERMARK ... WITH TTL` instead.".into(),
1404 )
1405 .into());
1406 }
1407
1408 Ok(())
1409}
1410
1411async fn bind_cdc_table_schema_externally(
1413 cdc_with_options: WithOptionsSecResolved,
1414) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1415 let (options, secret_refs) = cdc_with_options.into_parts();
1417 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1418 .context("failed to extract external table config")?;
1419
1420 let table = ExternalTableImpl::connect(config)
1421 .await
1422 .context("failed to auto derive table schema")?;
1423
1424 Ok((
1425 table
1426 .column_descs()
1427 .iter()
1428 .cloned()
1429 .map(|column_desc| ColumnCatalog {
1430 column_desc,
1431 is_hidden: false,
1432 })
1433 .collect(),
1434 table.pk_names().clone(),
1435 ))
1436}
1437
1438fn bind_cdc_table_schema(
1440 column_defs: &Vec<ColumnDef>,
1441 constraints: &Vec<TableConstraint>,
1442 is_for_replace_plan: bool,
1443) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1444 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1445
1446 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1447 Ok((columns, pk_names))
1448}
1449
1450#[allow(clippy::too_many_arguments)]
1451pub async fn handle_create_table(
1452 handler_args: HandlerArgs,
1453 table_name: ObjectName,
1454 column_defs: Vec<ColumnDef>,
1455 wildcard_idx: Option<usize>,
1456 constraints: Vec<TableConstraint>,
1457 if_not_exists: bool,
1458 format_encode: Option<FormatEncodeOptions>,
1459 source_watermarks: Vec<SourceWatermark>,
1460 append_only: bool,
1461 on_conflict: Option<OnConflict>,
1462 with_version_columns: Vec<String>,
1463 cdc_table_info: Option<CdcTableInfo>,
1464 include_column_options: IncludeOption,
1465 webhook_info: Option<WebhookSourceInfo>,
1466 ast_engine: risingwave_sqlparser::ast::Engine,
1467) -> Result<RwPgResponse> {
1468 let session = handler_args.session.clone();
1469
1470 if append_only {
1471 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1472 }
1473
1474 session.check_cluster_limits().await?;
1475
1476 let engine = match ast_engine {
1477 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1478 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1479 };
1480
1481 if let Either::Right(resp) = session.check_relation_name_duplicated(
1482 table_name.clone(),
1483 StatementType::CREATE_TABLE,
1484 if_not_exists,
1485 )? {
1486 return Ok(resp);
1487 }
1488
1489 let (graph, source, hummock_table, job_type, shared_source_id) = {
1490 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1491 handler_args.clone(),
1492 ExplainOptions::default(),
1493 format_encode,
1494 cdc_table_info,
1495 &table_name,
1496 column_defs.clone(),
1497 wildcard_idx,
1498 constraints.clone(),
1499 source_watermarks,
1500 append_only,
1501 on_conflict,
1502 with_version_columns,
1503 include_column_options,
1504 webhook_info,
1505 engine,
1506 )
1507 .await?;
1508 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1509
1510 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1511
1512 (graph, source, table, job_type, shared_source_id)
1513 };
1514
1515 tracing::trace!(
1516 "name={}, graph=\n{}",
1517 table_name,
1518 serde_json::to_string_pretty(&graph).unwrap()
1519 );
1520
1521 let dependencies = shared_source_id
1522 .map(|id| HashSet::from([id.as_object_id()]))
1523 .unwrap_or_default();
1524
1525 match engine {
1527 Engine::Hummock => {
1528 let catalog_writer = session.catalog_writer()?;
1529 let action = match job_type {
1530 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1531 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1532 };
1533 execute_with_long_running_notification(
1534 catalog_writer.create_table(
1535 source.map(|s| s.to_prost()),
1536 hummock_table.to_prost(),
1537 graph,
1538 job_type,
1539 if_not_exists,
1540 dependencies,
1541 ),
1542 &session,
1543 "CREATE TABLE",
1544 action,
1545 )
1546 .await?;
1547 }
1548 Engine::Iceberg => {
1549 let hummock_table_name = hummock_table.name.clone();
1550 session.create_staging_table(hummock_table.clone());
1551 let res = Box::pin(create_iceberg_engine_table(
1552 session.clone(),
1553 handler_args,
1554 source.map(|s| s.to_prost()),
1555 hummock_table,
1556 graph,
1557 table_name,
1558 job_type,
1559 if_not_exists,
1560 ))
1561 .await;
1562 session.drop_staging_table(&hummock_table_name);
1563 res?
1564 }
1565 }
1566
1567 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1568}
1569
1570#[allow(clippy::too_many_arguments)]
1579pub async fn create_iceberg_engine_table(
1580 session: Arc<SessionImpl>,
1581 handler_args: HandlerArgs,
1582 mut source: Option<PbSource>,
1583 table: TableCatalog,
1584 graph: StreamFragmentGraph,
1585 table_name: ObjectName,
1586 job_type: PbTableJobType,
1587 if_not_exists: bool,
1588) -> Result<()> {
1589 let rw_db_name = session
1590 .env()
1591 .catalog_reader()
1592 .read_guard()
1593 .get_database_by_id(table.database_id)?
1594 .name()
1595 .to_owned();
1596 let rw_schema_name = session
1597 .env()
1598 .catalog_reader()
1599 .read_guard()
1600 .get_schema_by_id(table.database_id, table.schema_id)?
1601 .name()
1602 .clone();
1603 let iceberg_catalog_name = rw_db_name.clone();
1604 let iceberg_database_name = rw_schema_name.clone();
1605 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1606
1607 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1608 let sink_decouple = session.config().sink_decouple();
1609 if matches!(sink_decouple, SinkDecouple::Disable) {
1610 bail!(
1611 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1612 );
1613 }
1614
1615 let mut connection_ref = BTreeMap::new();
1616 let with_common = if iceberg_engine_connection.is_empty() {
1617 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1618 } else {
1619 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1620 assert_eq!(parts.len(), 2);
1621 let connection_catalog =
1622 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1623 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1624 if params.connection_type == ConnectionType::Iceberg as i32 {
1625 connection_ref.insert(
1627 "connection".to_owned(),
1628 ConnectionRefValue {
1629 connection_name: ObjectName::from(vec![
1630 Ident::from(parts[0]),
1631 Ident::from(parts[1]),
1632 ]),
1633 },
1634 );
1635
1636 let mut with_common = BTreeMap::new();
1637 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1638 with_common.insert("database.name".to_owned(), iceberg_database_name);
1639 with_common.insert("table.name".to_owned(), iceberg_table_name);
1640
1641 let hosted_catalog = params
1642 .properties
1643 .get("hosted_catalog")
1644 .map(|s| s.eq_ignore_ascii_case("true"))
1645 .unwrap_or(false);
1646 if hosted_catalog {
1647 let meta_client = session.env().meta_client();
1648 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1649
1650 let meta_store_endpoint =
1651 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1652 ErrorCode::InternalError(
1653 "failed to parse the meta store endpoint".to_owned(),
1654 )
1655 })?;
1656 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1657 let meta_store_user = meta_store_endpoint.username().to_owned();
1658 let meta_store_password = match meta_store_endpoint.password() {
1659 Some(password) => percent_decode_str(password)
1660 .decode_utf8()
1661 .map_err(|_| {
1662 ErrorCode::InternalError(
1663 "failed to parse password from meta store endpoint".to_owned(),
1664 )
1665 })?
1666 .into_owned(),
1667 None => "".to_owned(),
1668 };
1669 let meta_store_host = meta_store_endpoint
1670 .host_str()
1671 .ok_or_else(|| {
1672 ErrorCode::InternalError(
1673 "failed to parse host from meta store endpoint".to_owned(),
1674 )
1675 })?
1676 .to_owned();
1677 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1678 ErrorCode::InternalError(
1679 "failed to parse port from meta store endpoint".to_owned(),
1680 )
1681 })?;
1682 let meta_store_database = meta_store_endpoint
1683 .path()
1684 .trim_start_matches('/')
1685 .to_owned();
1686
1687 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1688 bail!("failed to parse meta backend: {}", meta_store_backend);
1689 };
1690
1691 let catalog_uri = match meta_backend {
1692 MetaBackend::Postgres => {
1693 format!(
1694 "jdbc:postgresql://{}:{}/{}",
1695 meta_store_host, meta_store_port, meta_store_database
1696 )
1697 }
1698 MetaBackend::Mysql => {
1699 format!(
1700 "jdbc:mysql://{}:{}/{}",
1701 meta_store_host, meta_store_port, meta_store_database
1702 )
1703 }
1704 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1705 bail!(
1706 "Unsupported meta backend for iceberg engine table: {}",
1707 meta_store_backend
1708 );
1709 }
1710 };
1711
1712 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1713 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1714 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1715 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1716 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1717 }
1718
1719 with_common
1720 } else {
1721 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1722 "Only iceberg connection could be used in iceberg engine".to_owned(),
1723 )));
1724 }
1725 } else {
1726 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1727 "Private Link Service has been deprecated. Please create a new connection instead."
1728 .to_owned(),
1729 )));
1730 }
1731 };
1732
1733 let mut pks = table
1736 .pk_column_names()
1737 .iter()
1738 .map(|c| c.to_string())
1739 .collect::<Vec<String>>();
1740
1741 if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1743 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1744 }
1745
1746 let sink_from = CreateSink::From(table_name.clone());
1747
1748 let mut sink_name = table_name.clone();
1749 *sink_name.0.last_mut().unwrap() = Ident::from(
1750 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1751 );
1752 let create_sink_stmt = CreateSinkStatement {
1753 if_not_exists: false,
1754 sink_name,
1755 with_properties: WithProperties(vec![]),
1756 sink_from,
1757 columns: vec![],
1758 emit_mode: None,
1759 sink_schema: None,
1760 into_table_name: None,
1761 };
1762
1763 let mut sink_handler_args = handler_args.clone();
1764
1765 let mut sink_with = with_common.clone();
1766
1767 if !handler_args
1769 .with_options
1770 .get(ENABLE_COMPACTION)
1771 .is_some_and(|val| val.eq_ignore_ascii_case("true"))
1772 {
1773 sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1774 }
1775
1776 if table.append_only {
1777 sink_with.insert("type".to_owned(), "append-only".to_owned());
1778 } else {
1779 sink_with.insert("primary_key".to_owned(), pks.join(","));
1780 sink_with.insert("type".to_owned(), "upsert".to_owned());
1781 }
1782 let commit_checkpoint_interval = handler_args
1801 .with_options
1802 .get(COMMIT_CHECKPOINT_INTERVAL)
1803 .map(|v| v.to_owned())
1804 .unwrap_or_else(|| "60".to_owned());
1805 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1806 ErrorCode::InvalidInputSyntax(format!(
1807 "commit_checkpoint_interval must be greater than 0: {}",
1808 commit_checkpoint_interval
1809 ))
1810 })?;
1811
1812 if commit_checkpoint_interval == 0 {
1813 bail!("commit_checkpoint_interval must be greater than 0");
1814 }
1815
1816 source
1818 .as_mut()
1819 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1820
1821 let sink_decouple = session.config().sink_decouple();
1822 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1823 bail!(
1824 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1825 )
1826 }
1827
1828 sink_with.insert(
1829 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1830 commit_checkpoint_interval.to_string(),
1831 );
1832
1833 let commit_checkpoint_size_threshold_mb = handler_args
1834 .with_options
1835 .get(COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
1836 .map(|v| v.to_owned())
1837 .unwrap_or_else(|| ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB.to_string());
1838 let commit_checkpoint_size_threshold_mb = commit_checkpoint_size_threshold_mb
1839 .parse::<u64>()
1840 .map_err(|_| {
1841 ErrorCode::InvalidInputSyntax(format!(
1842 "{} must be greater than 0: {}",
1843 COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, commit_checkpoint_size_threshold_mb
1844 ))
1845 })?;
1846 if commit_checkpoint_size_threshold_mb == 0 {
1847 bail!("{COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB} must be greater than 0");
1848 }
1849
1850 source.as_mut().map(|x| {
1852 x.with_properties
1853 .remove(COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
1854 });
1855 sink_with.insert(
1856 COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB.to_owned(),
1857 commit_checkpoint_size_threshold_mb.to_string(),
1858 );
1859 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1860
1861 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1862
1863 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1864 match enable_compaction.to_lowercase().as_str() {
1865 "true" => {
1866 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1867 }
1868 "false" => {
1869 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1870 }
1871 _ => {
1872 return Err(ErrorCode::InvalidInputSyntax(format!(
1873 "enable_compaction must be true or false: {}",
1874 enable_compaction
1875 ))
1876 .into());
1877 }
1878 }
1879
1880 source
1882 .as_mut()
1883 .map(|x| x.with_properties.remove("enable_compaction"));
1884 } else {
1885 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1886 }
1887
1888 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1889 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1890 ErrorCode::InvalidInputSyntax(format!(
1891 "compaction_interval_sec must be greater than 0: {}",
1892 commit_checkpoint_interval
1893 ))
1894 })?;
1895 if compaction_interval_sec == 0 {
1896 bail!("compaction_interval_sec must be greater than 0");
1897 }
1898 sink_with.insert(
1899 "compaction_interval_sec".to_owned(),
1900 compaction_interval_sec.to_string(),
1901 );
1902 source
1904 .as_mut()
1905 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1906 }
1907
1908 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1909 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1910 {
1911 source
1913 .as_mut()
1914 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1915 match enable_snapshot_expiration.to_lowercase().as_str() {
1916 "true" => {
1917 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1918 true
1919 }
1920 "false" => {
1921 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1922 false
1923 }
1924 _ => {
1925 return Err(ErrorCode::InvalidInputSyntax(format!(
1926 "enable_snapshot_expiration must be true or false: {}",
1927 enable_snapshot_expiration
1928 ))
1929 .into());
1930 }
1931 }
1932 } else {
1933 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1934 true
1935 };
1936
1937 if has_enabled_snapshot_expiration {
1938 if let Some(snapshot_expiration_retain_last) = handler_args
1940 .with_options
1941 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1942 {
1943 sink_with.insert(
1944 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1945 snapshot_expiration_retain_last.to_owned(),
1946 );
1947 source
1949 .as_mut()
1950 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1951 }
1952
1953 if let Some(snapshot_expiration_max_age) = handler_args
1954 .with_options
1955 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1956 {
1957 sink_with.insert(
1958 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1959 snapshot_expiration_max_age.to_owned(),
1960 );
1961 source
1963 .as_mut()
1964 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1965 }
1966
1967 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1968 .with_options
1969 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1970 {
1971 sink_with.insert(
1972 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1973 snapshot_expiration_clear_expired_files.to_owned(),
1974 );
1975 source.as_mut().map(|x| {
1977 x.with_properties
1978 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1979 });
1980 }
1981
1982 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1983 .with_options
1984 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1985 {
1986 sink_with.insert(
1987 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1988 snapshot_expiration_clear_expired_meta_data.to_owned(),
1989 );
1990 source.as_mut().map(|x| {
1992 x.with_properties
1993 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1994 });
1995 }
1996 }
1997
1998 if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1999 let format_version = format_version.parse::<u8>().map_err(|_| {
2000 ErrorCode::InvalidInputSyntax(format!(
2001 "format_version must be 1, 2 or 3: {}",
2002 format_version
2003 ))
2004 })?;
2005 if format_version != 1 && format_version != 2 && format_version != 3 {
2006 bail!("format_version must be 1, 2 or 3");
2007 }
2008 sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
2009
2010 source
2012 .as_mut()
2013 .map(|x| x.with_properties.remove(FORMAT_VERSION));
2014 }
2015
2016 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
2017 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
2018 ErrorCode::InvalidInputSyntax(format!(
2019 "invalid write_mode: {}, must be one of: {}, {}",
2020 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
2021 ))
2022 })?;
2023
2024 match write_mode {
2025 IcebergWriteMode::MergeOnRead => {
2026 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2027 }
2028
2029 IcebergWriteMode::CopyOnWrite => {
2030 if table.append_only {
2031 return Err(ErrorCode::NotSupported(
2032 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2033 "Please use MERGE ON READ instead".to_owned(),
2034 )
2035 .into());
2036 }
2037
2038 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2039 }
2040 }
2041
2042 source
2044 .as_mut()
2045 .map(|x| x.with_properties.remove("write_mode"));
2046 } else {
2047 sink_with.insert(
2048 WRITE_MODE.to_owned(),
2049 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2050 );
2051 }
2052
2053 if let Some(max_snapshots_num_before_compaction) =
2054 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2055 {
2056 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2057 .parse::<u32>()
2058 .map_err(|_| {
2059 ErrorCode::InvalidInputSyntax(format!(
2060 "{} must be greater than 0: {}",
2061 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2062 ))
2063 })?;
2064
2065 if max_snapshots_num_before_compaction == 0 {
2066 bail!(format!(
2067 "{} must be greater than 0",
2068 COMPACTION_MAX_SNAPSHOTS_NUM
2069 ));
2070 }
2071
2072 sink_with.insert(
2073 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2074 max_snapshots_num_before_compaction.to_string(),
2075 );
2076
2077 source
2079 .as_mut()
2080 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2081 }
2082
2083 if let Some(small_files_threshold_mb) = handler_args
2084 .with_options
2085 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2086 {
2087 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2088 ErrorCode::InvalidInputSyntax(format!(
2089 "{} must be greater than 0: {}",
2090 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2091 ))
2092 })?;
2093 if small_files_threshold_mb == 0 {
2094 bail!(format!(
2095 "{} must be a greater than 0",
2096 COMPACTION_SMALL_FILES_THRESHOLD_MB
2097 ));
2098 }
2099 sink_with.insert(
2100 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2101 small_files_threshold_mb.to_string(),
2102 );
2103
2104 source.as_mut().map(|x| {
2106 x.with_properties
2107 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2108 });
2109 }
2110
2111 if let Some(delete_files_count_threshold) = handler_args
2112 .with_options
2113 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2114 {
2115 let delete_files_count_threshold =
2116 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2117 ErrorCode::InvalidInputSyntax(format!(
2118 "{} must be greater than 0: {}",
2119 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2120 ))
2121 })?;
2122 if delete_files_count_threshold == 0 {
2123 bail!(format!(
2124 "{} must be greater than 0",
2125 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2126 ));
2127 }
2128 sink_with.insert(
2129 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2130 delete_files_count_threshold.to_string(),
2131 );
2132
2133 source.as_mut().map(|x| {
2135 x.with_properties
2136 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2137 });
2138 }
2139
2140 if let Some(trigger_snapshot_count) = handler_args
2141 .with_options
2142 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2143 {
2144 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2145 ErrorCode::InvalidInputSyntax(format!(
2146 "{} must be greater than 0: {}",
2147 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2148 ))
2149 })?;
2150 if trigger_snapshot_count == 0 {
2151 bail!(format!(
2152 "{} must be greater than 0",
2153 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2154 ));
2155 }
2156 sink_with.insert(
2157 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2158 trigger_snapshot_count.to_string(),
2159 );
2160
2161 source
2163 .as_mut()
2164 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2165 }
2166
2167 if let Some(target_file_size_mb) = handler_args
2168 .with_options
2169 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2170 {
2171 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2172 ErrorCode::InvalidInputSyntax(format!(
2173 "{} must be greater than 0: {}",
2174 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2175 ))
2176 })?;
2177 if target_file_size_mb == 0 {
2178 bail!(format!(
2179 "{} must be greater than 0",
2180 COMPACTION_TARGET_FILE_SIZE_MB
2181 ));
2182 }
2183 sink_with.insert(
2184 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2185 target_file_size_mb.to_string(),
2186 );
2187 source
2189 .as_mut()
2190 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2191 }
2192
2193 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2194 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2195 ErrorCode::InvalidInputSyntax(format!(
2196 "invalid compaction_type: {}, must be one of {:?}",
2197 compaction_type,
2198 &[
2199 CompactionType::Full,
2200 CompactionType::SmallFiles,
2201 CompactionType::FilesWithDelete
2202 ]
2203 ))
2204 })?;
2205
2206 sink_with.insert(
2207 COMPACTION_TYPE.to_owned(),
2208 compaction_type.as_str().to_owned(),
2209 );
2210
2211 source
2213 .as_mut()
2214 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2215 }
2216
2217 if let Some(write_parquet_compression) = handler_args
2218 .with_options
2219 .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2220 {
2221 sink_with.insert(
2222 COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2223 write_parquet_compression.to_owned(),
2224 );
2225 source.as_mut().map(|x| {
2227 x.with_properties
2228 .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2229 });
2230 }
2231
2232 if let Some(write_parquet_max_row_group_rows) = handler_args
2233 .with_options
2234 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2235 {
2236 let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2237 .parse::<usize>()
2238 .map_err(|_| {
2239 ErrorCode::InvalidInputSyntax(format!(
2240 "{} must be a positive integer: {}",
2241 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2242 ))
2243 })?;
2244 if write_parquet_max_row_group_rows == 0 {
2245 bail!(format!(
2246 "{} must be greater than 0",
2247 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2248 ));
2249 }
2250 sink_with.insert(
2251 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2252 write_parquet_max_row_group_rows.to_string(),
2253 );
2254 source.as_mut().map(|x| {
2256 x.with_properties
2257 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2258 });
2259 }
2260
2261 if let Some(write_parquet_max_row_group_bytes) = handler_args
2262 .with_options
2263 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2264 {
2265 let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2266 .parse::<usize>()
2267 .map_err(|_| {
2268 ErrorCode::InvalidInputSyntax(format!(
2269 "{} must be a positive integer: {}",
2270 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2271 ))
2272 })?;
2273 if write_parquet_max_row_group_bytes == 0 {
2274 bail!(format!(
2275 "{} must be greater than 0",
2276 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2277 ));
2278 }
2279 sink_with.insert(
2280 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2281 write_parquet_max_row_group_bytes.to_string(),
2282 );
2283 source.as_mut().map(|x| {
2284 x.with_properties
2285 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2286 });
2287 }
2288
2289 let partition_by = handler_args
2290 .with_options
2291 .get("partition_by")
2292 .map(|v| v.to_owned());
2293
2294 if let Some(partition_by) = &partition_by {
2295 let mut partition_columns = vec![];
2296 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2297 table
2298 .columns()
2299 .iter()
2300 .find(|col| col.name().eq_ignore_ascii_case(&column))
2301 .ok_or_else(|| {
2302 ErrorCode::InvalidInputSyntax(format!(
2303 "Partition source column does not exist in schema: {}",
2304 column
2305 ))
2306 })?;
2307
2308 partition_columns.push(column);
2309 }
2310
2311 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2312 |_| {
2313 ErrorCode::InvalidInputSyntax(
2314 "The partition columns should be the prefix of the primary key".to_owned(),
2315 )
2316 },
2317 )?;
2318
2319 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2320
2321 source
2323 .as_mut()
2324 .map(|x| x.with_properties.remove("partition_by"));
2325 }
2326
2327 let order_key = handler_args
2328 .with_options
2329 .get(ORDER_KEY)
2330 .map(|v| v.to_owned());
2331 if let Some(order_key) = &order_key {
2332 validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2333 .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2334
2335 sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2336
2337 source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2338 }
2339
2340 sink_handler_args.with_options =
2341 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2342 let SinkPlanContext {
2343 sink_plan,
2344 sink_catalog,
2345 ..
2346 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2347 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2348
2349 let mut source_name = table_name.clone();
2350 *source_name.0.last_mut().unwrap() = Ident::from(
2351 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2352 );
2353 let create_source_stmt = CreateSourceStatement {
2354 temporary: false,
2355 if_not_exists: false,
2356 columns: vec![],
2357 source_name,
2358 wildcard_idx: Some(0),
2359 constraints: vec![],
2360 with_properties: WithProperties(vec![]),
2361 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2362 source_watermarks: vec![],
2363 include_column_options: vec![],
2364 };
2365
2366 let mut source_handler_args = handler_args.clone();
2367 let source_with = with_common;
2368 source_handler_args.with_options =
2369 WithOptions::new(source_with, Default::default(), connection_ref);
2370
2371 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2372 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2373 let (with_properties, refresh_mode) =
2374 bind_connector_props(&source_handler_args, &format_encode, true)?;
2375
2376 let (iceberg_catalog, table_identifier) = {
2379 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2380 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2381 iceberg_sink.create_table_if_not_exists().await?;
2382
2383 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2384 let table_identifier = iceberg_sink.config.full_table_name()?;
2385 (iceberg_catalog, table_identifier)
2386 };
2387
2388 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2389 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2390 &session,
2391 &format_encode,
2392 Either::Left(&with_properties),
2393 create_source_type,
2394 )
2395 .await?;
2396 let mut col_id_gen = ColumnIdGenerator::new_initial();
2397
2398 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2399 source_handler_args,
2400 create_source_stmt.source_name,
2401 format_encode,
2402 with_properties,
2403 &create_source_stmt.columns,
2404 create_source_stmt.constraints,
2405 create_source_stmt.wildcard_idx,
2406 create_source_stmt.source_watermarks,
2407 columns_from_resolve_source,
2408 source_info,
2409 create_source_stmt.include_column_options,
2410 &mut col_id_gen,
2411 create_source_type,
2412 overwrite_options.source_rate_limit,
2413 SqlColumnStrategy::FollowChecked,
2414 refresh_mode,
2415 )
2416 .await?;
2417
2418 let _ = Jvm::get_or_init()?;
2421
2422 let catalog_writer = session.catalog_writer()?;
2423 let action = match job_type {
2424 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2425 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2426 };
2427 let res = execute_with_long_running_notification(
2428 catalog_writer.create_iceberg_table(
2429 PbTableJobInfo {
2430 source,
2431 table: Some(table.to_prost()),
2432 fragment_graph: Some(graph),
2433 job_type: job_type as _,
2434 },
2435 PbSinkJobInfo {
2436 sink: Some(sink_catalog.to_proto()),
2437 fragment_graph: Some(sink_graph),
2438 },
2439 iceberg_source_catalog.to_prost(),
2440 if_not_exists,
2441 ),
2442 &session,
2443 "CREATE TABLE",
2444 action,
2445 )
2446 .await;
2447
2448 if res.is_err() {
2449 let _ = iceberg_catalog
2450 .drop_table(&table_identifier)
2451 .await
2452 .inspect_err(|err| {
2453 tracing::error!(
2454 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2455 table_identifier,
2456 err.as_report()
2457 );
2458 });
2459 res?
2460 }
2461
2462 Ok(())
2463}
2464
2465pub fn check_create_table_with_source(
2466 with_options: &WithOptions,
2467 format_encode: Option<FormatEncodeOptions>,
2468 include_column_options: &IncludeOption,
2469 cdc_table_info: &Option<CdcTableInfo>,
2470) -> Result<Option<FormatEncodeOptions>> {
2471 if cdc_table_info.is_some() {
2473 return Ok(format_encode);
2474 }
2475 let defined_source = with_options.is_source_connector();
2476
2477 if !include_column_options.is_empty() && !defined_source {
2478 return Err(ErrorCode::InvalidInputSyntax(
2479 "INCLUDE should be used with a connector".to_owned(),
2480 )
2481 .into());
2482 }
2483 if defined_source {
2484 format_encode.as_ref().ok_or_else(|| {
2485 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2486 })?;
2487 }
2488 Ok(format_encode)
2489}
2490
2491fn ensure_partition_columns_are_prefix_of_primary_key(
2492 partition_columns: &[String],
2493 primary_key_columns: &[String],
2494) -> std::result::Result<(), String> {
2495 if partition_columns.len() > primary_key_columns.len() {
2496 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2497 }
2498
2499 for (i, partition_col) in partition_columns.iter().enumerate() {
2500 if primary_key_columns.get(i) != Some(partition_col) {
2501 return Err(format!(
2502 "Partition column '{}' is not a prefix of the primary key.",
2503 partition_col
2504 ));
2505 }
2506 }
2507
2508 Ok(())
2509}
2510
2511#[allow(clippy::too_many_arguments)]
2512pub async fn generate_stream_graph_for_replace_table(
2513 _session: &Arc<SessionImpl>,
2514 table_name: ObjectName,
2515 original_catalog: &Arc<TableCatalog>,
2516 handler_args: HandlerArgs,
2517 statement: Statement,
2518 col_id_gen: ColumnIdGenerator,
2519 sql_column_strategy: SqlColumnStrategy,
2520) -> Result<(
2521 StreamFragmentGraph,
2522 TableCatalog,
2523 Option<SourceCatalog>,
2524 TableJobType,
2525)> {
2526 let Statement::CreateTable {
2527 columns,
2528 constraints,
2529 source_watermarks,
2530 append_only,
2531 on_conflict,
2532 with_version_columns,
2533 wildcard_idx,
2534 cdc_table_info,
2535 format_encode,
2536 include_column_options,
2537 engine,
2538 with_options,
2539 ..
2540 } = statement
2541 else {
2542 panic!("unexpected statement type: {:?}", statement);
2543 };
2544
2545 let format_encode = format_encode
2546 .clone()
2547 .map(|format_encode| format_encode.into_v2_with_warning());
2548
2549 let engine = match engine {
2550 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2551 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2552 };
2553
2554 let is_drop_connector =
2555 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2556 if is_drop_connector {
2557 debug_assert!(
2558 source_watermarks.is_empty()
2559 && include_column_options.is_empty()
2560 && with_options
2561 .iter()
2562 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2563 );
2564 }
2565
2566 let props = CreateTableProps {
2567 definition: handler_args.normalized_sql.clone(),
2568 append_only,
2569 on_conflict: on_conflict.into(),
2570 with_version_columns: with_version_columns
2571 .iter()
2572 .map(|col| col.real_value())
2573 .collect(),
2574 webhook_info: original_catalog.webhook_info.clone(),
2575 engine,
2576 };
2577
2578 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2579 (Some(format_encode), None) => (
2580 gen_create_table_plan_with_source(
2581 handler_args,
2582 ExplainOptions::default(),
2583 table_name,
2584 columns,
2585 wildcard_idx,
2586 constraints,
2587 format_encode,
2588 source_watermarks,
2589 col_id_gen,
2590 include_column_options,
2591 props,
2592 sql_column_strategy,
2593 )
2594 .await?,
2595 TableJobType::General,
2596 ),
2597 (None, None) => {
2598 let context = OptimizerContext::from_handler_args(handler_args);
2599 let (plan, table) = gen_create_table_plan(
2600 context,
2601 table_name,
2602 columns,
2603 constraints,
2604 col_id_gen,
2605 source_watermarks,
2606 props,
2607 true,
2608 )?;
2609 ((plan, None, table), TableJobType::General)
2610 }
2611 (None, Some(cdc_table)) => {
2612 sanity_check_for_table_on_cdc_source(
2613 append_only,
2614 &columns,
2615 &wildcard_idx,
2616 &constraints,
2617 &source_watermarks,
2618 )?;
2619
2620 let session = &handler_args.session;
2621 let (source, resolved_table_name) =
2622 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2623
2624 let (cdc_with_options, normalized_external_table_name) =
2625 derive_with_options_for_cdc_table(
2626 &source.with_properties,
2627 cdc_table.external_table_name.clone(),
2628 )?;
2629
2630 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2631
2632 let context: OptimizerContextRef =
2633 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2634 let (plan, table) = gen_create_table_plan_for_cdc_table(
2635 context,
2636 source,
2637 normalized_external_table_name,
2638 columns,
2639 source_watermarks,
2640 column_catalogs,
2641 pk_names,
2642 cdc_with_options,
2643 col_id_gen,
2644 on_conflict,
2645 with_version_columns
2646 .iter()
2647 .map(|col| col.real_value())
2648 .collect(),
2649 include_column_options,
2650 table_name,
2651 resolved_table_name,
2652 original_catalog.database_id,
2653 original_catalog.schema_id,
2654 original_catalog.id(),
2655 engine,
2656 )?;
2657
2658 ((plan, None, table), TableJobType::SharedCdcSource)
2659 }
2660 (Some(_), Some(_)) => {
2661 return Err(ErrorCode::NotSupported(
2662 "Data format and encoding format doesn't apply to table created from a CDC source"
2663 .into(),
2664 "Remove the FORMAT and ENCODE specification".into(),
2665 )
2666 .into());
2667 }
2668 };
2669
2670 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2671 Err(ErrorCode::InvalidInputSyntax(
2672 "alter primary key of table is not supported".to_owned(),
2673 ))?
2674 }
2675
2676 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2677
2678 table.id = original_catalog.id();
2680 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2681 table.associated_source_id = Some(source_id);
2682
2683 let source = source.as_mut().unwrap();
2684 source.id = source_id;
2685 source.associated_table_id = Some(table.id());
2686 }
2687
2688 Ok((graph, table, source, job_type))
2689}
2690
2691fn get_source_and_resolved_table_name(
2692 session: &Arc<SessionImpl>,
2693 cdc_table: CdcTableInfo,
2694 table_name: ObjectName,
2695) -> Result<(Arc<SourceCatalog>, String)> {
2696 let db_name = &session.database();
2697 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2698
2699 let (source_schema, source_name) =
2700 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2701
2702 let source = {
2703 let catalog_reader = session.env().catalog_reader().read_guard();
2704 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2705 let (source, _) = catalog_reader.get_source_by_name(
2706 db_name,
2707 SchemaPath::Name(schema_name.as_str()),
2708 source_name.as_str(),
2709 )?;
2710 source.clone()
2711 };
2712
2713 Ok((source, resolved_table_name))
2714}
2715
2716fn bind_webhook_info(
2718 session: &Arc<SessionImpl>,
2719 column_defs: &[ColumnDef],
2720 webhook_info: WebhookSourceInfo,
2721) -> Result<PbWebhookSourceInfo> {
2722 let WebhookSourceInfo {
2723 secret_ref,
2724 signature_expr,
2725 wait_for_persistence,
2726 is_batched,
2727 } = webhook_info;
2728
2729 for column in column_defs {
2730 for option_def in &column.options {
2731 match option_def.option {
2732 ColumnOption::Null => {}
2733 ColumnOption::GeneratedColumns(_) => {
2734 return Err(ErrorCode::InvalidInputSyntax(
2735 "generated columns are not supported for webhook tables".to_owned(),
2736 )
2737 .into());
2738 }
2739 ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2740 return Err(ErrorCode::InvalidInputSyntax(
2741 "default values are not supported for webhook tables".to_owned(),
2742 )
2743 .into());
2744 }
2745 ColumnOption::NotNull
2746 | ColumnOption::Unique { .. }
2747 | ColumnOption::ForeignKey { .. }
2748 | ColumnOption::Check(_)
2749 | ColumnOption::DialectSpecific(_) => {
2750 return Err(ErrorCode::InvalidInputSyntax(
2751 "only NULL column option is supported for webhook tables".to_owned(),
2752 )
2753 .into());
2754 }
2755 }
2756 }
2757 }
2758
2759 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2761 let db_name = &session.database();
2762 let (schema_name, secret_name) =
2763 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2764 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2765 (
2766 Some(PbSecretRef {
2767 secret_id: secret_catalog.id,
2768 ref_as: match secret_ref.ref_as {
2769 SecretRefAsType::Text => PbRefAsType::Text,
2770 SecretRefAsType::File => PbRefAsType::File,
2771 }
2772 .into(),
2773 }),
2774 Some(secret_name),
2775 )
2776 } else {
2777 (None, None)
2778 };
2779
2780 let signature_expr = if let Some(signature_expr) = signature_expr {
2781 let payload_name = if column_defs.len() == 1
2782 && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2783 {
2784 column_defs[0].name.real_value()
2785 } else {
2786 WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2787 };
2788 let secure_compare_context = SecureCompareContext {
2789 payload_name,
2790 secret_name,
2791 };
2792 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2793 let expr = binder.bind_expr(&signature_expr)?;
2794
2795 if expr.as_function_call().is_none()
2797 || expr.as_function_call().unwrap().func_type()
2798 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2799 {
2800 return Err(ErrorCode::InvalidInputSyntax(
2801 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2802 )
2803 .into());
2804 }
2805
2806 Some(expr.to_expr_proto())
2807 } else {
2808 session.notice_to_user(
2809 "VALIDATE clause is strongly recommended for safety or production usages",
2810 );
2811 None
2812 };
2813
2814 let pb_webhook_info = PbWebhookSourceInfo {
2815 secret_ref: pb_secret_ref,
2816 signature_expr,
2817 wait_for_persistence,
2818 is_batched,
2819 };
2820
2821 Ok(pb_webhook_info)
2822}
2823
2824#[cfg(test)]
2825mod tests {
2826 use risingwave_common::catalog::{
2827 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2828 };
2829 use risingwave_common::types::{DataType, StructType};
2830
2831 use super::*;
2832 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2833
2834 #[tokio::test]
2835 async fn test_create_table_handler() {
2836 let sql =
2837 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2838 let frontend = LocalFrontend::new(Default::default()).await;
2839 frontend.run_sql(sql).await.unwrap();
2840
2841 let session = frontend.session_ref();
2842 let catalog_reader = session.env().catalog_reader().read_guard();
2843 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2844
2845 let (table, _) = catalog_reader
2847 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2848 .unwrap();
2849 assert_eq!(table.name(), "t");
2850
2851 let columns = table
2852 .columns
2853 .iter()
2854 .map(|col| (col.name(), col.data_type().clone()))
2855 .collect::<HashMap<&str, DataType>>();
2856
2857 let expected_columns = maplit::hashmap! {
2858 ROW_ID_COLUMN_NAME => DataType::Serial,
2859 "v1" => DataType::Int16,
2860 "v2" => StructType::new(
2861 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2862 )
2863 .with_ids([3, 4, 5].map(ColumnId::new))
2864 .into(),
2865 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2866 };
2867
2868 assert_eq!(columns, expected_columns, "{columns:#?}");
2869 }
2870
2871 #[tokio::test]
2872 async fn test_create_webhook_table_with_arbitrary_columns() {
2873 let frontend = LocalFrontend::new(Default::default()).await;
2874 frontend
2875 .run_sql("create schema ingest_schema;")
2876 .await
2877 .unwrap();
2878 frontend
2879 .run_sql(
2880 r#"
2881 create table ingest_schema.orders (
2882 id int,
2883 customer_name varchar,
2884 amount double precision,
2885 primary key (id)
2886 ) with (
2887 connector = 'webhook'
2888 ) validate as secure_compare(
2889 headers->>'x-rw-signature',
2890 'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2891 );
2892 "#,
2893 )
2894 .await
2895 .unwrap();
2896
2897 let session = frontend.session_ref();
2898 let catalog_reader = session.env().catalog_reader().read_guard();
2899 let (table, _) = catalog_reader
2900 .get_created_table_by_name(
2901 DEFAULT_DATABASE_NAME,
2902 SchemaPath::Name("ingest_schema"),
2903 "orders",
2904 )
2905 .unwrap();
2906
2907 assert!(table.webhook_info.is_some());
2908 assert_eq!(
2909 table
2910 .columns
2911 .iter()
2912 .filter(|column| column.can_dml())
2913 .count(),
2914 3
2915 );
2916 }
2917
2918 #[tokio::test]
2919 async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
2920 let frontend = LocalFrontend::new(Default::default()).await;
2921 frontend
2922 .run_sql(
2923 r#"
2924 create table webhook_single_column (
2925 body jsonb
2926 ) with (
2927 connector = 'webhook'
2928 ) validate as secure_compare(
2929 headers->>'x-rw-signature',
2930 'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
2931 );
2932 "#,
2933 )
2934 .await
2935 .unwrap();
2936 }
2937
2938 #[tokio::test]
2939 async fn test_create_webhook_table_with_generated_columns() {
2940 let frontend = LocalFrontend::new(Default::default()).await;
2941 let err = frontend
2942 .run_sql(
2943 r#"
2944 create table webhook_generated_columns (
2945 id int,
2946 amount double precision,
2947 amount_with_fee double precision as amount + 1.0
2948 ) with (
2949 connector = 'webhook'
2950 );
2951 "#,
2952 )
2953 .await
2954 .unwrap_err();
2955
2956 assert!(
2957 err.to_string()
2958 .contains("generated columns are not supported for webhook tables"),
2959 "{err:?}"
2960 );
2961 }
2962
2963 #[tokio::test]
2964 async fn test_create_webhook_table_with_default_value() {
2965 let frontend = LocalFrontend::new(Default::default()).await;
2966 let err = frontend
2967 .run_sql(
2968 r#"
2969 create table webhook_default_value (
2970 id int default 42,
2971 amount double precision
2972 ) with (
2973 connector = 'webhook'
2974 );
2975 "#,
2976 )
2977 .await
2978 .unwrap_err();
2979
2980 assert!(
2981 err.to_string()
2982 .contains("default values are not supported for webhook tables"),
2983 "{err:?}"
2984 );
2985 }
2986
2987 #[tokio::test]
2988 async fn test_create_webhook_table_with_not_null_option() {
2989 let frontend = LocalFrontend::new(Default::default()).await;
2990 let err = frontend
2991 .run_sql(
2992 r#"
2993 create table webhook_not_null (
2994 id int not null,
2995 amount double precision
2996 ) with (
2997 connector = 'webhook'
2998 );
2999 "#,
3000 )
3001 .await
3002 .unwrap_err();
3003
3004 assert!(
3005 err.to_string()
3006 .contains("only NULL column option is supported for webhook tables"),
3007 "{err:?}"
3008 );
3009 }
3010
3011 #[test]
3012 fn test_bind_primary_key() {
3013 for (sql, expected) in [
3016 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
3017 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
3018 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
3019 (
3020 "create table t (v1 int primary key, v2 int primary key)",
3021 Err("multiple primary keys are not allowed"),
3022 ),
3023 (
3024 "create table t (v1 int primary key primary key, v2 int)",
3025 Err("multiple primary keys are not allowed"),
3026 ),
3027 (
3028 "create table t (v1 int, v2 int, primary key (v1))",
3029 Ok(&[1]),
3030 ),
3031 (
3032 "create table t (v1 int, primary key (v2), v2 int)",
3033 Ok(&[2]),
3034 ),
3035 (
3036 "create table t (primary key (v2, v1), v1 int, v2 int)",
3037 Ok(&[2, 1]),
3038 ),
3039 (
3040 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3041 Err("multiple primary keys are not allowed"),
3042 ),
3043 (
3044 "create table t (v1 int primary key, primary key (v1), v2 int)",
3045 Err("multiple primary keys are not allowed"),
3046 ),
3047 (
3048 "create table t (v1 int, primary key (V3), v2 int)",
3049 Err("column \"v3\" named in key does not exist"),
3050 ),
3051 ] {
3052 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3053 let risingwave_sqlparser::ast::Statement::CreateTable {
3054 columns: column_defs,
3055 constraints,
3056 ..
3057 } = ast.remove(0)
3058 else {
3059 panic!("test case should be create table")
3060 };
3061 let actual: Result<_> = (|| {
3062 let mut columns = bind_sql_columns(&column_defs, false)?;
3063 let mut col_id_gen = ColumnIdGenerator::new_initial();
3064 for c in &mut columns {
3065 col_id_gen.generate(c)?;
3066 }
3067
3068 let pk_names =
3069 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3070 let (_, pk_column_ids, _) =
3071 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3072 Ok(pk_column_ids)
3073 })();
3074 match (expected, actual) {
3075 (Ok(expected), Ok(actual)) => assert_eq!(
3076 expected.iter().copied().map(ColumnId::new).collect_vec(),
3077 actual,
3078 "sql: {sql}"
3079 ),
3080 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3081 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3082 (Err(expected), Err(actual)) => assert!(
3083 actual.to_string().contains(expected),
3084 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3085 ),
3086 }
3087 }
3088 }
3089
3090 #[tokio::test]
3091 async fn test_duplicate_props_options() {
3092 let proto_file = create_proto_file(PROTO_FILE_DATA);
3093 let sql = format!(
3094 r#"CREATE TABLE t
3095 WITH (
3096 connector = 'kinesis',
3097 aws.region='user_test_topic',
3098 endpoint='172.10.1.1:9090,172.10.1.2:9090',
3099 aws.credentials.access_key_id = 'your_access_key_1',
3100 aws.credentials.secret_access_key = 'your_secret_key_1'
3101 )
3102 FORMAT PLAIN ENCODE PROTOBUF (
3103 message = '.test.TestRecord',
3104 aws.credentials.access_key_id = 'your_access_key_2',
3105 aws.credentials.secret_access_key = 'your_secret_key_2',
3106 schema.location = 'file://{}',
3107 )"#,
3108 proto_file.path().to_str().unwrap()
3109 );
3110 let frontend = LocalFrontend::new(Default::default()).await;
3111 frontend.run_sql(sql).await.unwrap();
3112
3113 let session = frontend.session_ref();
3114 let catalog_reader = session.env().catalog_reader().read_guard();
3115 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3116
3117 let (source, _) = catalog_reader
3119 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3120 .unwrap();
3121 assert_eq!(source.name, "t");
3122
3123 assert_eq!(
3125 source
3126 .info
3127 .format_encode_options
3128 .get("aws.credentials.access_key_id")
3129 .unwrap(),
3130 "your_access_key_2"
3131 );
3132 assert_eq!(
3133 source
3134 .info
3135 .format_encode_options
3136 .get("aws.credentials.secret_access_key")
3137 .unwrap(),
3138 "your_secret_key_2"
3139 );
3140
3141 assert_eq!(
3143 source
3144 .with_properties
3145 .get("aws.credentials.access_key_id")
3146 .unwrap(),
3147 "your_access_key_1"
3148 );
3149 assert_eq!(
3150 source
3151 .with_properties
3152 .get("aws.credentials.secret_access_key")
3153 .unwrap(),
3154 "your_secret_key_1"
3155 );
3156
3157 assert!(!source.with_properties.contains_key("schema.location"));
3159 }
3160}