1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::external::{
40 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
41 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::source::cdc::{
44 build_cdc_table_id, normalize_simple_postgres_quoted_table_name,
45};
46use risingwave_connector::{
47 AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
48};
49use risingwave_pb::catalog::connection::Info as ConnectionInfo;
50use risingwave_pb::catalog::connection_params::ConnectionType;
51use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
52use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
53use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
54use risingwave_pb::plan_common::{
55 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
56};
57use risingwave_pb::secret::PbSecretRef;
58use risingwave_pb::secret::secret_ref::PbRefAsType;
59use risingwave_pb::stream_plan::StreamFragmentGraph;
60use risingwave_sqlparser::ast::{
61 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
62 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
63 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
64 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
65};
66use risingwave_sqlparser::parser::IncludeOption;
67use thiserror_ext::AsReport;
68
69use super::RwPgResponse;
70use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
71use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
72use crate::catalog::root_catalog::SchemaPath;
73use crate::catalog::source_catalog::SourceCatalog;
74use crate::catalog::table_catalog::TableVersion;
75use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
76use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
77use crate::expr::{Expr, ExprImpl, ExprRewriter};
78use crate::handler::HandlerArgs;
79use crate::handler::create_source::{
80 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
81 bind_source_watermark, handle_addition_columns,
82};
83use crate::handler::util::{
84 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
85};
86use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
87use crate::optimizer::plan_node::{
88 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
89 ensure_sync_log_store_fragment_root,
90};
91use crate::optimizer::property::{Order, RequiredDist};
92use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
93use crate::session::SessionImpl;
94use crate::session::current::notice_to_user;
95use crate::stream_fragmenter::{GraphJobType, build_graph};
96use crate::utils::OverwriteOptions;
97use crate::{Binder, Explain, TableCatalog, WithOptions};
98
99mod col_id_gen;
100pub use col_id_gen::*;
101use risingwave_connector::sink::SinkParam;
102use risingwave_connector::sink::iceberg::{
103 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
104 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
105 COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
106 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS,
107 CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
108 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
109 IcebergWriteMode, ORDER_KEY, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
110 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
111 SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
112 validate_order_key_columns,
113};
114use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
115
116use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
117
118fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
119 for option_def in &c.options {
120 match option_def.option {
121 ColumnOption::GeneratedColumns(_) => {}
122 ColumnOption::DefaultValue(_) => {}
123 ColumnOption::DefaultValueInternal { .. } => {}
124 ColumnOption::Unique { is_primary: true } => {}
125 ColumnOption::Null => {}
126 ColumnOption::NotNull => {}
127 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
128 }
129 }
130 Ok(())
131}
132
133pub fn bind_sql_columns(
137 column_defs: &[ColumnDef],
138 is_for_drop_table_connector: bool,
139) -> Result<Vec<ColumnCatalog>> {
140 let mut columns = Vec::with_capacity(column_defs.len());
141
142 for column in column_defs {
143 ensure_column_options_supported(column)?;
144 let ColumnDef {
148 name,
149 data_type,
150 collation,
151 options,
152 ..
153 } = column;
154
155 let data_type = data_type
156 .clone()
157 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
158 if let Some(collation) = collation {
159 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
165 bail_not_implemented!(
166 "Collate collation other than `C` or `POSIX` is not implemented"
167 );
168 }
169
170 match data_type {
171 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
172 _ => {
173 return Err(ErrorCode::NotSupported(
174 format!("{} is not a collatable data type", data_type),
175 "The only built-in collatable data types are `varchar`, please check your type".into(),
176 ).into());
177 }
178 }
179 }
180
181 if !is_for_drop_table_connector {
182 check_column_name_not_reserved(&name.real_value())?;
186 }
187
188 let nullable: bool = !options
189 .iter()
190 .any(|def| matches!(def.option, ColumnOption::NotNull));
191
192 columns.push(ColumnCatalog {
193 column_desc: ColumnDesc {
194 data_type: bind_data_type(&data_type)?,
195 column_id: ColumnId::placeholder(),
196 name: name.real_value(),
197 generated_or_default_column: None,
198 description: None,
199 additional_column: AdditionalColumn { column_type: None },
200 version: ColumnDescVersion::LATEST,
201 system_column: None,
202 nullable,
203 },
204 is_hidden: false,
205 });
206 }
207
208 Ok(columns)
209}
210
211fn check_generated_column_constraints(
212 column_name: &String,
213 column_id: ColumnId,
214 expr: &ExprImpl,
215 column_catalogs: &[ColumnCatalog],
216 generated_column_names: &[String],
217 pk_column_ids: &[ColumnId],
218) -> Result<()> {
219 let input_refs = expr.collect_input_refs(column_catalogs.len());
220 for idx in input_refs.ones() {
221 let referred_generated_column = &column_catalogs[idx].column_desc.name;
222 if generated_column_names
223 .iter()
224 .any(|c| c == referred_generated_column)
225 {
226 return Err(ErrorCode::BindError(format!(
227 "Generated can not reference another generated column. \
228 But here generated column \"{}\" referenced another generated column \"{}\"",
229 column_name, referred_generated_column
230 ))
231 .into());
232 }
233 }
234
235 if pk_column_ids.contains(&column_id) && expr.is_impure() {
236 return Err(ErrorCode::BindError(format!(
237 "Generated columns with impure expressions should not be part of the primary key. \
238 Here column \"{}\" is defined as part of the primary key.",
239 column_name
240 ))
241 .into());
242 }
243
244 Ok(())
245}
246
247pub fn bind_sql_column_constraints(
250 session: &SessionImpl,
251 table_name: String,
252 column_catalogs: &mut [ColumnCatalog],
253 columns: &[ColumnDef],
254 pk_column_ids: &[ColumnId],
255) -> Result<()> {
256 let generated_column_names = {
257 let mut names = vec![];
258 for column in columns {
259 for option_def in &column.options {
260 if let ColumnOption::GeneratedColumns(_) = option_def.option {
261 names.push(column.name.real_value());
262 break;
263 }
264 }
265 }
266 names
267 };
268
269 let mut binder = Binder::new_for_ddl(session);
270 binder.bind_columns_to_context(table_name, column_catalogs)?;
271
272 for column in columns {
273 let Some(idx) = column_catalogs
274 .iter()
275 .position(|c| c.name() == column.name.real_value())
276 else {
277 continue;
280 };
281
282 for option_def in &column.options {
283 match &option_def.option {
284 ColumnOption::GeneratedColumns(expr) => {
285 binder.set_clause(Some(Clause::GeneratedColumn));
286
287 let expr_impl = binder.bind_expr(expr).with_context(|| {
288 format!(
289 "fail to bind expression in generated column \"{}\"",
290 column.name.real_value()
291 )
292 })?;
293
294 check_generated_column_constraints(
295 &column.name.real_value(),
296 column_catalogs[idx].column_id(),
297 &expr_impl,
298 column_catalogs,
299 &generated_column_names,
300 pk_column_ids,
301 )?;
302
303 column_catalogs[idx].column_desc.generated_or_default_column = Some(
304 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
305 expr: Some(expr_impl.to_expr_proto()),
306 }),
307 );
308 binder.set_clause(None);
309 }
310 ColumnOption::DefaultValue(expr) => {
311 let expr_impl = binder
312 .bind_expr(expr)?
313 .cast_assign(column_catalogs[idx].data_type())?;
314
315 let rewritten_expr_impl = session
323 .pinned_snapshot()
324 .inline_now_proc_time()
325 .rewrite_expr(expr_impl.clone());
326
327 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
328 let snapshot_value = snapshot_value?;
329
330 column_catalogs[idx].column_desc.generated_or_default_column =
331 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
332 snapshot_value: Some(snapshot_value.to_protobuf()),
333 expr: Some(expr_impl.to_expr_proto()),
334 }));
336 } else {
337 return Err(ErrorCode::BindError(format!(
338 "Default expression used in column `{}` cannot be evaluated. \
339 Use generated columns instead if you mean to reference other columns.",
340 column.name
341 ))
342 .into());
343 }
344 }
345 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
346 if persisted.is_empty() {
351 bail_bind_error!(
352 "DEFAULT INTERNAL is only used for internal purposes, \
353 please specify a concrete default value"
354 );
355 }
356
357 let desc = DefaultColumnDesc::decode(&**persisted)
358 .expect("failed to decode persisted `DefaultColumnDesc`");
359
360 column_catalogs[idx].column_desc.generated_or_default_column =
361 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
362 }
363 _ => {}
364 }
365 }
366 }
367 Ok(())
368}
369
370pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
372 let mut pk_column_names = vec![];
373
374 for constraint in table_constraints {
375 match constraint {
376 TableConstraint::Unique {
377 name: _,
378 columns,
379 is_primary: true,
380 } => {
381 if !pk_column_names.is_empty() {
382 return Err(multiple_pk_definition_err());
383 }
384 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
385 }
386 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
387 }
388 }
389 Ok(pk_column_names)
390}
391
392pub fn bind_sql_pk_names(
393 columns_defs: &[ColumnDef],
394 pk_names_from_table_constraints: Vec<String>,
395) -> Result<Vec<String>> {
396 let mut pk_column_names = pk_names_from_table_constraints;
397
398 for column in columns_defs {
399 for option_def in &column.options {
400 if let ColumnOption::Unique { is_primary: true } = option_def.option {
401 if !pk_column_names.is_empty() {
402 return Err(multiple_pk_definition_err());
403 }
404 pk_column_names.push(column.name.real_value());
405 };
406 }
407 }
408
409 Ok(pk_column_names)
410}
411
412fn multiple_pk_definition_err() -> RwError {
413 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
414}
415
416pub fn bind_pk_and_row_id_on_relation(
421 mut columns: Vec<ColumnCatalog>,
422 pk_names: Vec<String>,
423 must_need_pk: bool,
424) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
425 for c in &columns {
426 assert!(c.column_id() != ColumnId::placeholder());
427 }
428
429 let name_to_id = columns
431 .iter()
432 .map(|c| (c.name(), c.column_id()))
433 .collect::<HashMap<_, _>>();
434
435 let mut pk_column_ids: Vec<_> = pk_names
436 .iter()
437 .map(|name| {
438 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
439 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
440 })
441 })
442 .try_collect()?;
443
444 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
446
447 let row_id_index = need_row_id.then(|| {
448 let column = ColumnCatalog::row_id_column();
449 let index = columns.len();
450 pk_column_ids = vec![column.column_id()];
451 columns.push(column);
452 index
453 });
454
455 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
456 Err(ErrorCode::InvalidInputSyntax(format!(
457 "column \"{col}\" specified more than once"
458 )))?;
459 }
460
461 Ok((columns, pk_column_ids, row_id_index))
462}
463
464#[allow(clippy::too_many_arguments)]
467pub(crate) async fn gen_create_table_plan_with_source(
468 mut handler_args: HandlerArgs,
469 explain_options: ExplainOptions,
470 table_name: ObjectName,
471 column_defs: Vec<ColumnDef>,
472 wildcard_idx: Option<usize>,
473 constraints: Vec<TableConstraint>,
474 format_encode: FormatEncodeOptions,
475 source_watermarks: Vec<SourceWatermark>,
476 mut col_id_gen: ColumnIdGenerator,
477 include_column_options: IncludeOption,
478 props: CreateTableProps,
479 sql_column_strategy: SqlColumnStrategy,
480) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
481 if props.append_only
482 && format_encode.format != Format::Plain
483 && format_encode.format != Format::Native
484 {
485 return Err(ErrorCode::BindError(format!(
486 "Append only table does not support format {}.",
487 format_encode.format
488 ))
489 .into());
490 }
491
492 let session = &handler_args.session;
493 let (with_properties, refresh_mode) =
494 bind_connector_props(&handler_args, &format_encode, false)?;
495 if with_properties.is_shareable_cdc_connector() {
496 generated_columns_check_for_cdc_table(&column_defs)?;
497 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
498 } else if column_defs.iter().any(|col| {
499 col.options
500 .iter()
501 .any(|def| matches!(def.option, ColumnOption::NotNull))
502 }) {
503 notice_to_user(
505 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
506 );
507 }
508
509 let db_name: &str = &session.database();
510 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
511
512 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
514 session,
515 &format_encode,
516 Either::Left(&with_properties),
517 CreateSourceType::Table,
518 )
519 .await?;
520
521 let overwrite_options = OverwriteOptions::new(&mut handler_args);
522 let rate_limit = overwrite_options.source_rate_limit;
523 let source = bind_create_source_or_table_with_connector(
524 handler_args.clone(),
525 table_name,
526 format_encode,
527 with_properties,
528 &column_defs,
529 constraints,
530 wildcard_idx,
531 source_watermarks,
532 columns_from_resolve_source,
533 source_info,
534 include_column_options,
535 &mut col_id_gen,
536 CreateSourceType::Table,
537 rate_limit,
538 sql_column_strategy,
539 refresh_mode,
540 )
541 .await?;
542
543 let context = OptimizerContext::new(handler_args, explain_options);
544
545 let (plan, table) = gen_table_plan_with_source(
546 context.into(),
547 schema_name,
548 source.clone(),
549 col_id_gen.into_version(),
550 props,
551 )?;
552
553 Ok((plan, Some(source), table))
554}
555
556#[allow(clippy::too_many_arguments)]
559pub(crate) fn gen_create_table_plan(
560 context: OptimizerContext,
561 table_name: ObjectName,
562 column_defs: Vec<ColumnDef>,
563 constraints: Vec<TableConstraint>,
564 mut col_id_gen: ColumnIdGenerator,
565 source_watermarks: Vec<SourceWatermark>,
566 props: CreateTableProps,
567 is_for_replace_plan: bool,
568) -> Result<(PlanRef, TableCatalog)> {
569 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
570 for c in &mut columns {
571 col_id_gen.generate(c)?;
572 }
573
574 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
575 if !secret_refs.is_empty() || !connection_refs.is_empty() {
576 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
577 }
578
579 gen_create_table_plan_without_source(
580 context,
581 table_name,
582 columns,
583 column_defs,
584 constraints,
585 source_watermarks,
586 col_id_gen.into_version(),
587 props,
588 )
589}
590
591#[allow(clippy::too_many_arguments)]
592pub(crate) fn gen_create_table_plan_without_source(
593 context: OptimizerContext,
594 table_name: ObjectName,
595 columns: Vec<ColumnCatalog>,
596 column_defs: Vec<ColumnDef>,
597 constraints: Vec<TableConstraint>,
598 source_watermarks: Vec<SourceWatermark>,
599 version: TableVersion,
600 props: CreateTableProps,
601) -> Result<(PlanRef, TableCatalog)> {
602 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
604 let (mut columns, pk_column_ids, row_id_index) =
605 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
606
607 let watermark_descs = bind_source_watermark(
608 context.session_ctx(),
609 table_name.real_value(),
610 source_watermarks,
611 &columns,
612 )?;
613
614 bind_sql_column_constraints(
615 context.session_ctx(),
616 table_name.real_value(),
617 &mut columns,
618 &column_defs,
619 &pk_column_ids,
620 )?;
621 let session = context.session_ctx().clone();
622
623 let db_name = &session.database();
624 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
625
626 let info = CreateTableInfo {
627 columns,
628 pk_column_ids,
629 row_id_index,
630 watermark_descs,
631 source_catalog: None,
632 version,
633 };
634
635 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
636}
637
638fn gen_table_plan_with_source(
639 context: OptimizerContextRef,
640 schema_name: Option<String>,
641 source_catalog: SourceCatalog,
642 version: TableVersion,
643 props: CreateTableProps,
644) -> Result<(PlanRef, TableCatalog)> {
645 let table_name = source_catalog.name.clone();
646
647 let info = CreateTableInfo {
648 columns: source_catalog.columns.clone(),
649 pk_column_ids: source_catalog.pk_col_ids.clone(),
650 row_id_index: source_catalog.row_id_index,
651 watermark_descs: source_catalog.watermark_descs.clone(),
652 source_catalog: Some(source_catalog),
653 version,
654 };
655
656 gen_table_plan_inner(context, schema_name, table_name, info, props)
657}
658
659#[derive(Clone, Copy)]
661pub enum EitherOnConflict {
662 Ast(Option<OnConflict>),
663 Resolved(ConflictBehavior),
664}
665
666impl From<Option<OnConflict>> for EitherOnConflict {
667 fn from(v: Option<OnConflict>) -> Self {
668 Self::Ast(v)
669 }
670}
671
672impl From<ConflictBehavior> for EitherOnConflict {
673 fn from(v: ConflictBehavior) -> Self {
674 Self::Resolved(v)
675 }
676}
677
678impl EitherOnConflict {
679 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
681 let conflict_behavior = match self {
682 EitherOnConflict::Ast(on_conflict) => {
683 if append_only {
684 if row_id_as_pk {
685 ConflictBehavior::NoCheck
687 } else {
688 if let Some(on_conflict) = on_conflict
690 && on_conflict != OnConflict::Nothing
691 {
692 return Err(ErrorCode::InvalidInputSyntax(
693 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
694 the ON CONFLICT behavior must be DO NOTHING."
695 .to_owned(),
696 )
697 .into());
698 }
699 ConflictBehavior::IgnoreConflict
700 }
701 } else {
702 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
704 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
705 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
706 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
707 }
708 }
709 }
710 EitherOnConflict::Resolved(b) => b,
711 };
712
713 Ok(conflict_behavior)
714 }
715}
716
717pub struct CreateTableInfo {
722 pub columns: Vec<ColumnCatalog>,
723 pub pk_column_ids: Vec<ColumnId>,
724 pub row_id_index: Option<usize>,
725 pub watermark_descs: Vec<WatermarkDesc>,
726 pub source_catalog: Option<SourceCatalog>,
727 pub version: TableVersion,
728}
729
730pub struct CreateTableProps {
735 pub definition: String,
736 pub append_only: bool,
737 pub on_conflict: EitherOnConflict,
738 pub with_version_columns: Vec<String>,
739 pub webhook_info: Option<PbWebhookSourceInfo>,
740 pub engine: Engine,
741}
742
743#[allow(clippy::too_many_arguments)]
744fn gen_table_plan_inner(
745 context: OptimizerContextRef,
746 schema_name: Option<String>,
747 table_name: String,
748 info: CreateTableInfo,
749 props: CreateTableProps,
750) -> Result<(PlanRef, TableCatalog)> {
751 let CreateTableInfo {
752 ref columns,
753 row_id_index,
754 ref watermark_descs,
755 ref source_catalog,
756 ..
757 } = info;
758 let CreateTableProps { append_only, .. } = props;
759
760 let (database_id, schema_id) = context
761 .session_ctx()
762 .get_database_and_schema_id_for_create(schema_name)?;
763
764 let session = context.session_ctx().clone();
765 let retention_seconds = context.with_options().retention_seconds();
766
767 let source_node: LogicalPlanRef = LogicalSource::new(
768 source_catalog.clone().map(Rc::new),
769 columns.clone(),
770 row_id_index,
771 SourceNodeKind::CreateTable,
772 context.clone(),
773 None,
774 )?
775 .into();
776
777 let required_cols = FixedBitSet::with_capacity(columns.len());
778 let plan_root = PlanRoot::new_with_logical_plan(
779 source_node,
780 RequiredDist::Any,
781 Order::any(),
782 required_cols,
783 vec![],
784 );
785
786 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
787
788 if !append_only && has_non_ttl_watermark {
789 return Err(ErrorCode::NotSupported(
790 "Defining watermarks on table requires the table to be append only.".to_owned(),
791 "Use the key words `APPEND ONLY`".to_owned(),
792 )
793 .into());
794 }
795
796 if !append_only && retention_seconds.is_some() {
797 if session
798 .config()
799 .unsafe_enable_storage_retention_for_non_append_only_tables()
800 {
801 tracing::warn!(
802 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
803 table_name
804 );
805 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
806 This may lead to stream inconsistency and unrecoverable \
807 node failure if there is any row INSERT/UPDATE/DELETE operation \
808 corresponding to the TTLed primary keys";
809 session.notice_to_user(NOTICE);
810 } else {
811 return Err(ErrorCode::NotSupported(
812 "Defining retention seconds on table requires the table to be append only."
813 .to_owned(),
814 "Use the key words `APPEND ONLY`".to_owned(),
815 )
816 .into());
817 }
818 }
819
820 let materialize =
821 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
822
823 let mut table = materialize.table().clone();
824 table.owner = session.user_id();
825
826 Ok((
827 ensure_sync_log_store_fragment_root(materialize.into()),
828 table,
829 ))
830}
831
832#[allow(clippy::too_many_arguments)]
836pub(crate) fn gen_create_table_plan_for_cdc_table(
837 context: OptimizerContextRef,
838 source: Arc<SourceCatalog>,
839 external_table_name: String,
840 column_defs: Vec<ColumnDef>,
841 source_watermarks: Vec<SourceWatermark>,
842 mut columns: Vec<ColumnCatalog>,
843 pk_names: Vec<String>,
844 cdc_with_options: WithOptionsSecResolved,
845 mut col_id_gen: ColumnIdGenerator,
846 on_conflict: Option<OnConflict>,
847 with_version_columns: Vec<String>,
848 include_column_options: IncludeOption,
849 table_name: ObjectName,
850 resolved_table_name: String, database_id: DatabaseId,
852 schema_id: SchemaId,
853 table_id: TableId,
854 engine: Engine,
855) -> Result<(PlanRef, TableCatalog)> {
856 let session = context.session_ctx().clone();
857
858 handle_addition_columns(
860 None,
861 &cdc_with_options,
862 include_column_options,
863 &mut columns,
864 true,
865 )?;
866
867 for c in &mut columns {
868 col_id_gen.generate(c)?;
869 }
870
871 let (mut columns, pk_column_ids, _row_id_index) =
872 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
873
874 let watermark_descs = bind_source_watermark(
875 context.session_ctx(),
876 table_name.real_value(),
877 source_watermarks,
878 &columns,
879 )?;
880
881 bind_sql_column_constraints(
883 context.session_ctx(),
884 table_name.real_value(),
885 &mut columns,
886 &column_defs,
887 &pk_column_ids,
888 )?;
889
890 let definition = context.normalized_sql().to_owned();
891
892 let pk_column_indices = {
893 let mut id_to_idx = HashMap::new();
894 columns.iter().enumerate().for_each(|(idx, c)| {
895 id_to_idx.insert(c.column_id(), idx);
896 });
897 pk_column_ids
899 .iter()
900 .map(|c| id_to_idx.get(c).copied().unwrap())
901 .collect_vec()
902 };
903 let table_pk = pk_column_indices
904 .iter()
905 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
906 .collect();
907
908 let (options, secret_refs) = cdc_with_options.into_parts();
909
910 let non_generated_column_descs = columns
911 .iter()
912 .filter(|&c| !c.is_generated())
913 .map(|c| c.column_desc.clone())
914 .collect_vec();
915 let non_generated_column_num = non_generated_column_descs.len();
916 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
917 let cdc_table_desc = CdcTableDesc {
918 table_id,
919 source_id: source.id, external_table_name: external_table_name.clone(),
921 pk: table_pk,
922 columns: non_generated_column_descs,
923 stream_key: pk_column_indices,
924 connect_properties: options,
925 secret_refs,
926 };
927
928 tracing::debug!(?cdc_table_desc, "create cdc table");
929 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
930
931 let logical_scan = LogicalCdcScan::create(
932 external_table_name.clone(),
933 Rc::new(cdc_table_desc),
934 context.clone(),
935 options,
936 );
937
938 let scan_node: LogicalPlanRef = logical_scan.into();
939 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
940 let plan_root = PlanRoot::new_with_logical_plan(
941 scan_node,
942 RequiredDist::Any,
943 Order::any(),
944 required_cols,
945 vec![],
946 );
947
948 let cdc_table_id_external_table_name = if let ExternalCdcTableType::Postgres = cdc_table_type
949 && let Some(normalized_table_name) =
950 normalize_simple_postgres_quoted_table_name(&external_table_name)
951 {
952 normalized_table_name
953 } else {
954 external_table_name
955 };
956 let cdc_table_id = build_cdc_table_id(source.id, &cdc_table_id_external_table_name);
957 let materialize = plan_root.gen_table_plan(
958 context,
959 resolved_table_name,
960 database_id,
961 schema_id,
962 CreateTableInfo {
963 columns,
964 pk_column_ids,
965 row_id_index: None,
966 watermark_descs,
967 source_catalog: Some((*source).clone()),
968 version: col_id_gen.into_version(),
969 },
970 CreateTableProps {
971 definition,
972 append_only: false,
973 on_conflict: on_conflict.into(),
974 with_version_columns,
975 webhook_info: None,
976 engine,
977 },
978 )?;
979
980 let mut table = materialize.table().clone();
981 table.owner = session.user_id();
982 table.cdc_table_id = Some(cdc_table_id);
983 table.cdc_table_type = Some(cdc_table_type);
984 Ok((
985 ensure_sync_log_store_fragment_root(materialize.into()),
986 table,
987 ))
988}
989
990fn derive_with_options_for_cdc_table(
998 source_with_properties: &WithOptionsSecResolved,
999 external_table_name: String,
1000) -> Result<(WithOptionsSecResolved, String)> {
1001 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
1002 let source_database_name: &str = source_with_properties
1004 .get("database.name")
1005 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
1006 .as_str();
1007 let mut with_options = source_with_properties.clone();
1008 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
1009 match connector.as_str() {
1010 MYSQL_CDC_CONNECTOR => {
1011 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
1014 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
1015 })?;
1016 if !source_database_name
1018 .split(',')
1019 .map(|s| s.trim())
1020 .any(|name| name == db_name)
1021 {
1022 return Err(anyhow!(
1023 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1024 db_name,
1025 source_database_name
1026 ).into());
1027 }
1028 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1029 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1030 return Ok((with_options, external_table_name));
1032 }
1033 POSTGRES_CDC_CONNECTOR => {
1034 let (schema_name, table_name) =
1035 parse_postgres_cdc_external_table_name(&external_table_name)?;
1036
1037 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name);
1039 with_options.insert(TABLE_NAME_KEY.into(), table_name);
1040 return Ok((with_options, external_table_name));
1042 }
1043 SQL_SERVER_CDC_CONNECTOR => {
1044 let parts: Vec<&str> = external_table_name.split('.').collect();
1052 let (schema_name, table_name) = match parts.len() {
1053 3 => {
1054 let db_name = parts[0];
1057 let schema_name = parts[1];
1058 let table_name = parts[2];
1059
1060 if db_name != source_database_name {
1061 return Err(anyhow!(
1062 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1063 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1064 db_name,
1065 source_database_name
1066 ).into());
1067 }
1068 (schema_name, table_name)
1069 }
1070 2 => {
1071 let schema_name = parts[0];
1074 let table_name = parts[1];
1075 (schema_name, table_name)
1076 }
1077 1 => {
1078 return Err(anyhow!(
1081 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1082 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1083 external_table_name,
1084 external_table_name,
1085 source_database_name,
1086 external_table_name
1087 ).into());
1088 }
1089 _ => {
1090 return Err(anyhow!(
1092 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1093 external_table_name
1094 ).into());
1095 }
1096 };
1097
1098 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1100 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1101
1102 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1105 return Ok((with_options, normalized_external_table_name));
1106 }
1107 _ => {
1108 return Err(RwError::from(anyhow!(
1109 "connector {} is not supported for cdc table",
1110 connector
1111 )));
1112 }
1113 };
1114 }
1115 unreachable!("All valid CDC connectors should have returned by now")
1116}
1117
1118fn parse_postgres_cdc_external_table_name(external_table_name: &str) -> Result<(String, String)> {
1123 let mut parts = vec![];
1124 let mut current = String::new();
1125 let mut chars = external_table_name.chars().peekable();
1126 let mut in_quote = false;
1127 let mut just_closed_quote = false;
1128
1129 while let Some(ch) = chars.next() {
1130 if in_quote {
1131 if ch == '"' {
1132 if chars.peek() == Some(&'"') {
1133 current.push('"');
1134 chars.next();
1135 } else {
1136 in_quote = false;
1137 just_closed_quote = true;
1138 }
1139 } else {
1140 current.push(ch);
1141 }
1142 } else {
1143 match ch {
1144 '.' => {
1145 if current.is_empty() {
1146 return Err(anyhow!(
1147 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1148 external_table_name
1149 )
1150 .into());
1151 }
1152 parts.push(std::mem::take(&mut current));
1153 just_closed_quote = false;
1154 }
1155 '"' if current.is_empty() => {
1156 in_quote = true;
1157 }
1158 '"' => {
1159 return Err(anyhow!(
1160 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1161 external_table_name
1162 )
1163 .into());
1164 }
1165 _ if just_closed_quote => {
1166 return Err(anyhow!(
1167 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1168 external_table_name
1169 )
1170 .into());
1171 }
1172 _ => current.push(ch),
1173 }
1174 }
1175 }
1176
1177 if in_quote || current.is_empty() {
1178 return Err(anyhow!(
1179 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1180 external_table_name
1181 )
1182 .into());
1183 }
1184 parts.push(current);
1185
1186 if let [schema_name, table_name] = parts.as_slice() {
1187 Ok((schema_name.clone(), table_name.clone()))
1188 } else {
1189 Err(
1190 anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'")
1191 .into(),
1192 )
1193 }
1194}
1195
1196#[allow(clippy::too_many_arguments)]
1197pub(super) async fn handle_create_table_plan(
1198 handler_args: HandlerArgs,
1199 explain_options: ExplainOptions,
1200 format_encode: Option<FormatEncodeOptions>,
1201 cdc_table_info: Option<CdcTableInfo>,
1202 table_name: &ObjectName,
1203 column_defs: Vec<ColumnDef>,
1204 wildcard_idx: Option<usize>,
1205 constraints: Vec<TableConstraint>,
1206 source_watermarks: Vec<SourceWatermark>,
1207 append_only: bool,
1208 on_conflict: Option<OnConflict>,
1209 with_version_columns: Vec<String>,
1210 include_column_options: IncludeOption,
1211 webhook_info: Option<WebhookSourceInfo>,
1212 engine: Engine,
1213) -> Result<(
1214 PlanRef,
1215 Option<SourceCatalog>,
1216 TableCatalog,
1217 TableJobType,
1218 Option<SourceId>,
1219)> {
1220 let col_id_gen = ColumnIdGenerator::new_initial();
1221 let format_encode = check_create_table_with_source(
1222 &handler_args.with_options,
1223 format_encode,
1224 &include_column_options,
1225 &cdc_table_info,
1226 )?;
1227 let webhook_info = webhook_info
1228 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1229 .transpose()?;
1230
1231 let props = CreateTableProps {
1232 definition: handler_args.normalized_sql.clone(),
1233 append_only,
1234 on_conflict: on_conflict.into(),
1235 with_version_columns: with_version_columns.clone(),
1236 webhook_info,
1237 engine,
1238 };
1239
1240 let ((plan, source, table), job_type, shared_shource_id) = match (
1241 format_encode,
1242 cdc_table_info.as_ref(),
1243 ) {
1244 (Some(format_encode), None) => (
1245 gen_create_table_plan_with_source(
1246 handler_args,
1247 explain_options,
1248 table_name.clone(),
1249 column_defs,
1250 wildcard_idx,
1251 constraints,
1252 format_encode,
1253 source_watermarks,
1254 col_id_gen,
1255 include_column_options,
1256 props,
1257 SqlColumnStrategy::FollowChecked,
1258 )
1259 .await?,
1260 TableJobType::General,
1261 None,
1262 ),
1263 (None, None) => {
1264 let context = OptimizerContext::new(handler_args, explain_options);
1265 let (plan, table) = gen_create_table_plan(
1266 context,
1267 table_name.clone(),
1268 column_defs,
1269 constraints,
1270 col_id_gen,
1271 source_watermarks,
1272 props,
1273 false,
1274 )?;
1275
1276 ((plan, None, table), TableJobType::General, None)
1277 }
1278
1279 (None, Some(cdc_table)) => {
1280 sanity_check_for_table_on_cdc_source(
1281 append_only,
1282 &column_defs,
1283 &wildcard_idx,
1284 &constraints,
1285 &source_watermarks,
1286 )?;
1287
1288 generated_columns_check_for_cdc_table(&column_defs)?;
1289 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1290
1291 let session = &handler_args.session;
1292 let db_name = &session.database();
1293 let user_name = &session.user_name();
1294 let search_path = session.config().search_path();
1295 let (schema_name, resolved_table_name) =
1296 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1297 let (database_id, schema_id) =
1298 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1299
1300 let (source_schema, source_name) =
1302 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1303
1304 let source = {
1305 let catalog_reader = session.env().catalog_reader().read_guard();
1306 let schema_path =
1307 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1308
1309 let (source, _) = catalog_reader.get_source_by_name(
1310 db_name,
1311 schema_path,
1312 source_name.as_str(),
1313 )?;
1314 source.clone()
1315 };
1316 let (cdc_with_options, normalized_external_table_name) =
1317 derive_with_options_for_cdc_table(
1318 &source.with_properties,
1319 cdc_table.external_table_name.clone(),
1320 )?;
1321
1322 let (columns, pk_names) = match wildcard_idx {
1323 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1324 None => {
1325 for column_def in &column_defs {
1326 for option_def in &column_def.options {
1327 if let ColumnOption::DefaultValue(_)
1328 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1329 {
1330 return Err(ErrorCode::NotSupported(
1331 "Default value for columns defined on the table created from a CDC source".into(),
1332 "Remove the default value expression in the column definitions".into(),
1333 )
1334 .into());
1335 }
1336 }
1337 }
1338
1339 let (columns, pk_names) =
1340 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1341 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1343 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1344 .context("failed to extract external table config")?;
1345
1346 (columns, pk_names)
1347 }
1348 };
1349
1350 let context: OptimizerContextRef =
1351 OptimizerContext::new(handler_args, explain_options).into();
1352 let shared_source_id = source.id;
1353 let (plan, table) = gen_create_table_plan_for_cdc_table(
1354 context,
1355 source,
1356 normalized_external_table_name,
1357 column_defs,
1358 source_watermarks,
1359 columns,
1360 pk_names,
1361 cdc_with_options,
1362 col_id_gen,
1363 on_conflict,
1364 with_version_columns,
1365 include_column_options,
1366 table_name.clone(),
1367 resolved_table_name,
1368 database_id,
1369 schema_id,
1370 TableId::placeholder(),
1371 engine,
1372 )?;
1373
1374 (
1375 (plan, None, table),
1376 TableJobType::SharedCdcSource,
1377 Some(shared_source_id),
1378 )
1379 }
1380 (Some(_), Some(_)) => {
1381 return Err(ErrorCode::NotSupported(
1382 "Data format and encoding format doesn't apply to table created from a CDC source"
1383 .into(),
1384 "Remove the FORMAT and ENCODE specification".into(),
1385 )
1386 .into());
1387 }
1388 };
1389 Ok((plan, source, table, job_type, shared_shource_id))
1390}
1391
1392fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1394 let mut found_generated_column = false;
1395 for column in columns {
1396 let mut is_generated = false;
1397
1398 for option_def in &column.options {
1399 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1400 is_generated = true;
1401 break;
1402 }
1403 }
1404
1405 if is_generated {
1406 found_generated_column = true;
1407 } else if found_generated_column {
1408 return Err(ErrorCode::NotSupported(
1409 "Non-generated column found after a generated column.".into(),
1410 "Ensure that all generated columns appear at the end of the cdc table definition."
1411 .into(),
1412 )
1413 .into());
1414 }
1415 }
1416 Ok(())
1417}
1418
1419fn not_null_check_for_cdc_table(
1421 wildcard_idx: &Option<usize>,
1422 column_defs: &Vec<ColumnDef>,
1423) -> Result<()> {
1424 if !wildcard_idx.is_some()
1425 && column_defs.iter().any(|col| {
1426 col.options
1427 .iter()
1428 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1429 })
1430 {
1431 return Err(ErrorCode::NotSupported(
1432 "CDC table with NOT NULL constraint is not supported".to_owned(),
1433 "Please remove the NOT NULL constraint for columns".to_owned(),
1434 )
1435 .into());
1436 }
1437 Ok(())
1438}
1439
1440fn sanity_check_for_table_on_cdc_source(
1442 append_only: bool,
1443 column_defs: &Vec<ColumnDef>,
1444 wildcard_idx: &Option<usize>,
1445 constraints: &Vec<TableConstraint>,
1446 source_watermarks: &Vec<SourceWatermark>,
1447) -> Result<()> {
1448 if wildcard_idx.is_some() && !column_defs.is_empty() {
1450 return Err(ErrorCode::NotSupported(
1451 "wildcard(*) and column definitions cannot be used together".to_owned(),
1452 "Remove the wildcard or column definitions".to_owned(),
1453 )
1454 .into());
1455 }
1456
1457 if !wildcard_idx.is_some()
1459 && !constraints.iter().any(|c| {
1460 matches!(
1461 c,
1462 TableConstraint::Unique {
1463 is_primary: true,
1464 ..
1465 }
1466 )
1467 })
1468 && !column_defs.iter().any(|col| {
1469 col.options
1470 .iter()
1471 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1472 })
1473 {
1474 return Err(ErrorCode::NotSupported(
1475 "CDC table without primary key constraint is not supported".to_owned(),
1476 "Please define a primary key".to_owned(),
1477 )
1478 .into());
1479 }
1480
1481 if append_only {
1482 return Err(ErrorCode::NotSupported(
1483 "append only modifier on the table created from a CDC source".into(),
1484 "Remove the APPEND ONLY clause".into(),
1485 )
1486 .into());
1487 }
1488
1489 if !source_watermarks.is_empty()
1490 && source_watermarks
1491 .iter()
1492 .any(|watermark| !watermark.with_ttl)
1493 {
1494 return Err(ErrorCode::NotSupported(
1495 "non-TTL watermark defined on the table created from a CDC source".into(),
1496 "Use `WATERMARK ... WITH TTL` instead.".into(),
1497 )
1498 .into());
1499 }
1500
1501 Ok(())
1502}
1503
1504async fn bind_cdc_table_schema_externally(
1506 cdc_with_options: WithOptionsSecResolved,
1507) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1508 let (options, secret_refs) = cdc_with_options.into_parts();
1510 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1511 .context("failed to extract external table config")?;
1512
1513 let table = ExternalTableImpl::connect(config)
1514 .await
1515 .context("failed to auto derive table schema")?;
1516
1517 Ok((
1518 table
1519 .column_descs()
1520 .iter()
1521 .cloned()
1522 .map(|column_desc| ColumnCatalog {
1523 column_desc,
1524 is_hidden: false,
1525 })
1526 .collect(),
1527 table.pk_names().clone(),
1528 ))
1529}
1530
1531fn bind_cdc_table_schema(
1533 column_defs: &Vec<ColumnDef>,
1534 constraints: &Vec<TableConstraint>,
1535 is_for_replace_plan: bool,
1536) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1537 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1538
1539 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1540 Ok((columns, pk_names))
1541}
1542
1543#[allow(clippy::too_many_arguments)]
1544pub async fn handle_create_table(
1545 handler_args: HandlerArgs,
1546 table_name: ObjectName,
1547 column_defs: Vec<ColumnDef>,
1548 wildcard_idx: Option<usize>,
1549 constraints: Vec<TableConstraint>,
1550 if_not_exists: bool,
1551 format_encode: Option<FormatEncodeOptions>,
1552 source_watermarks: Vec<SourceWatermark>,
1553 append_only: bool,
1554 on_conflict: Option<OnConflict>,
1555 with_version_columns: Vec<String>,
1556 cdc_table_info: Option<CdcTableInfo>,
1557 include_column_options: IncludeOption,
1558 webhook_info: Option<WebhookSourceInfo>,
1559 ast_engine: risingwave_sqlparser::ast::Engine,
1560) -> Result<RwPgResponse> {
1561 let session = handler_args.session.clone();
1562
1563 if append_only {
1564 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1565 }
1566
1567 session.check_cluster_limits().await?;
1568
1569 let engine = match ast_engine {
1570 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1571 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1572 };
1573
1574 if let Either::Right(resp) = session.check_relation_name_duplicated(
1575 table_name.clone(),
1576 StatementType::CREATE_TABLE,
1577 if_not_exists,
1578 )? {
1579 return Ok(resp);
1580 }
1581
1582 let (graph, source, hummock_table, job_type, shared_source_id) = {
1583 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1584 handler_args.clone(),
1585 ExplainOptions::default(),
1586 format_encode,
1587 cdc_table_info,
1588 &table_name,
1589 column_defs.clone(),
1590 wildcard_idx,
1591 constraints.clone(),
1592 source_watermarks,
1593 append_only,
1594 on_conflict,
1595 with_version_columns,
1596 include_column_options,
1597 webhook_info,
1598 engine,
1599 )
1600 .await?;
1601 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1602
1603 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1604
1605 (graph, source, table, job_type, shared_source_id)
1606 };
1607
1608 tracing::trace!(
1609 "name={}, graph=\n{}",
1610 table_name,
1611 serde_json::to_string_pretty(&graph).unwrap()
1612 );
1613
1614 let dependencies = shared_source_id
1615 .map(|id| HashSet::from([id.as_object_id()]))
1616 .unwrap_or_default();
1617
1618 match engine {
1620 Engine::Hummock => {
1621 let catalog_writer = session.catalog_writer()?;
1622 let action = match job_type {
1623 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1624 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1625 };
1626 execute_with_long_running_notification(
1627 catalog_writer.create_table(
1628 source.map(|s| s.to_prost()),
1629 hummock_table.to_prost(),
1630 graph,
1631 job_type,
1632 if_not_exists,
1633 dependencies,
1634 ),
1635 &session,
1636 "CREATE TABLE",
1637 action,
1638 )
1639 .await?;
1640 }
1641 Engine::Iceberg => {
1642 let hummock_table_name = hummock_table.name.clone();
1643 session.create_staging_table(hummock_table.clone());
1644 let res = Box::pin(create_iceberg_engine_table(
1645 session.clone(),
1646 handler_args,
1647 source.map(|s| s.to_prost()),
1648 hummock_table,
1649 graph,
1650 table_name,
1651 job_type,
1652 if_not_exists,
1653 ))
1654 .await;
1655 session.drop_staging_table(&hummock_table_name);
1656 res?
1657 }
1658 }
1659
1660 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1661}
1662
1663#[allow(clippy::too_many_arguments)]
1672pub async fn create_iceberg_engine_table(
1673 session: Arc<SessionImpl>,
1674 handler_args: HandlerArgs,
1675 mut source: Option<PbSource>,
1676 table: TableCatalog,
1677 graph: StreamFragmentGraph,
1678 table_name: ObjectName,
1679 job_type: PbTableJobType,
1680 if_not_exists: bool,
1681) -> Result<()> {
1682 let rw_db_name = session
1683 .env()
1684 .catalog_reader()
1685 .read_guard()
1686 .get_database_by_id(table.database_id)?
1687 .name()
1688 .to_owned();
1689 let rw_schema_name = session
1690 .env()
1691 .catalog_reader()
1692 .read_guard()
1693 .get_schema_by_id(table.database_id, table.schema_id)?
1694 .name()
1695 .clone();
1696 let iceberg_catalog_name = rw_db_name.clone();
1697 let iceberg_database_name = rw_schema_name.clone();
1698 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1699
1700 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1701 let sink_decouple = session.config().sink_decouple();
1702 if matches!(sink_decouple, SinkDecouple::Disable) {
1703 bail!(
1704 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1705 );
1706 }
1707
1708 let mut connection_ref = BTreeMap::new();
1709 let with_common = if iceberg_engine_connection.is_empty() {
1710 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1711 } else {
1712 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1713 assert_eq!(parts.len(), 2);
1714 let connection_catalog =
1715 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1716 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1717 if params.connection_type == ConnectionType::Iceberg as i32 {
1718 connection_ref.insert(
1720 "connection".to_owned(),
1721 ConnectionRefValue {
1722 connection_name: ObjectName::from(vec![
1723 Ident::from(parts[0]),
1724 Ident::from(parts[1]),
1725 ]),
1726 },
1727 );
1728
1729 let mut with_common = BTreeMap::new();
1730 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1731 with_common.insert("database.name".to_owned(), iceberg_database_name);
1732 with_common.insert("table.name".to_owned(), iceberg_table_name);
1733
1734 let hosted_catalog = params
1735 .properties
1736 .get("hosted_catalog")
1737 .map(|s| s.eq_ignore_ascii_case("true"))
1738 .unwrap_or(false);
1739 if hosted_catalog {
1740 let meta_client = session.env().meta_client();
1741 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1742
1743 let meta_store_endpoint =
1744 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1745 ErrorCode::InternalError(
1746 "failed to parse the meta store endpoint".to_owned(),
1747 )
1748 })?;
1749 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1750 let meta_store_user = meta_store_endpoint.username().to_owned();
1751 let meta_store_password = match meta_store_endpoint.password() {
1752 Some(password) => percent_decode_str(password)
1753 .decode_utf8()
1754 .map_err(|_| {
1755 ErrorCode::InternalError(
1756 "failed to parse password from meta store endpoint".to_owned(),
1757 )
1758 })?
1759 .into_owned(),
1760 None => "".to_owned(),
1761 };
1762 let meta_store_host = meta_store_endpoint
1763 .host_str()
1764 .ok_or_else(|| {
1765 ErrorCode::InternalError(
1766 "failed to parse host from meta store endpoint".to_owned(),
1767 )
1768 })?
1769 .to_owned();
1770 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1771 ErrorCode::InternalError(
1772 "failed to parse port from meta store endpoint".to_owned(),
1773 )
1774 })?;
1775 let meta_store_database = meta_store_endpoint
1776 .path()
1777 .trim_start_matches('/')
1778 .to_owned();
1779
1780 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1781 bail!("failed to parse meta backend: {}", meta_store_backend);
1782 };
1783
1784 let catalog_uri = match meta_backend {
1785 MetaBackend::Postgres => {
1786 format!(
1787 "jdbc:postgresql://{}:{}/{}",
1788 meta_store_host, meta_store_port, meta_store_database
1789 )
1790 }
1791 MetaBackend::Mysql => {
1792 format!(
1793 "jdbc:mysql://{}:{}/{}",
1794 meta_store_host, meta_store_port, meta_store_database
1795 )
1796 }
1797 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1798 bail!(
1799 "Unsupported meta backend for iceberg engine table: {}",
1800 meta_store_backend
1801 );
1802 }
1803 };
1804
1805 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1806 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1807 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1808 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1809 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1810 }
1811
1812 with_common
1813 } else {
1814 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1815 "Only iceberg connection could be used in iceberg engine".to_owned(),
1816 )));
1817 }
1818 } else {
1819 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1820 "Private Link Service has been deprecated. Please create a new connection instead."
1821 .to_owned(),
1822 )));
1823 }
1824 };
1825
1826 let mut pks = table
1829 .pk_column_names()
1830 .iter()
1831 .map(|c| c.to_string())
1832 .collect::<Vec<String>>();
1833
1834 if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1836 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1837 }
1838
1839 let sink_from = CreateSink::From(table_name.clone());
1840
1841 let mut sink_name = table_name.clone();
1842 *sink_name.0.last_mut().unwrap() = Ident::from(
1843 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1844 );
1845 let create_sink_stmt = CreateSinkStatement {
1846 if_not_exists: false,
1847 sink_name,
1848 with_properties: WithProperties(vec![]),
1849 sink_from,
1850 columns: vec![],
1851 emit_mode: None,
1852 sink_schema: None,
1853 into_table_name: None,
1854 };
1855
1856 let mut sink_handler_args = handler_args.clone();
1857
1858 let mut sink_with = with_common.clone();
1859
1860 if !handler_args
1862 .with_options
1863 .get(ENABLE_COMPACTION)
1864 .is_some_and(|val| val.eq_ignore_ascii_case("true"))
1865 {
1866 sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1867 }
1868
1869 if table.append_only {
1870 sink_with.insert("type".to_owned(), "append-only".to_owned());
1871 } else {
1872 sink_with.insert("primary_key".to_owned(), pks.join(","));
1873 sink_with.insert("type".to_owned(), "upsert".to_owned());
1874 }
1875 let commit_checkpoint_interval = handler_args
1894 .with_options
1895 .get(COMMIT_CHECKPOINT_INTERVAL)
1896 .map(|v| v.to_owned())
1897 .unwrap_or_else(|| "60".to_owned());
1898 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1899 ErrorCode::InvalidInputSyntax(format!(
1900 "commit_checkpoint_interval must be greater than 0: {}",
1901 commit_checkpoint_interval
1902 ))
1903 })?;
1904
1905 if commit_checkpoint_interval == 0 {
1906 bail!("commit_checkpoint_interval must be greater than 0");
1907 }
1908
1909 source
1911 .as_mut()
1912 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1913
1914 let sink_decouple = session.config().sink_decouple();
1915 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1916 bail!(
1917 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1918 )
1919 }
1920
1921 sink_with.insert(
1922 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1923 commit_checkpoint_interval.to_string(),
1924 );
1925 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1926
1927 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1928
1929 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1930 match enable_compaction.to_lowercase().as_str() {
1931 "true" => {
1932 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1933 }
1934 "false" => {
1935 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1936 }
1937 _ => {
1938 return Err(ErrorCode::InvalidInputSyntax(format!(
1939 "enable_compaction must be true or false: {}",
1940 enable_compaction
1941 ))
1942 .into());
1943 }
1944 }
1945
1946 source
1948 .as_mut()
1949 .map(|x| x.with_properties.remove("enable_compaction"));
1950 } else {
1951 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1952 }
1953
1954 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1955 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1956 ErrorCode::InvalidInputSyntax(format!(
1957 "compaction_interval_sec must be greater than 0: {}",
1958 commit_checkpoint_interval
1959 ))
1960 })?;
1961 if compaction_interval_sec == 0 {
1962 bail!("compaction_interval_sec must be greater than 0");
1963 }
1964 sink_with.insert(
1965 "compaction_interval_sec".to_owned(),
1966 compaction_interval_sec.to_string(),
1967 );
1968 source
1970 .as_mut()
1971 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1972 }
1973
1974 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1975 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1976 {
1977 source
1979 .as_mut()
1980 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1981 match enable_snapshot_expiration.to_lowercase().as_str() {
1982 "true" => {
1983 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1984 true
1985 }
1986 "false" => {
1987 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1988 false
1989 }
1990 _ => {
1991 return Err(ErrorCode::InvalidInputSyntax(format!(
1992 "enable_snapshot_expiration must be true or false: {}",
1993 enable_snapshot_expiration
1994 ))
1995 .into());
1996 }
1997 }
1998 } else {
1999 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
2000 true
2001 };
2002
2003 if has_enabled_snapshot_expiration {
2004 if let Some(snapshot_expiration_retain_last) = handler_args
2006 .with_options
2007 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
2008 {
2009 sink_with.insert(
2010 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
2011 snapshot_expiration_retain_last.to_owned(),
2012 );
2013 source
2015 .as_mut()
2016 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
2017 }
2018
2019 if let Some(snapshot_expiration_max_age) = handler_args
2020 .with_options
2021 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
2022 {
2023 sink_with.insert(
2024 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
2025 snapshot_expiration_max_age.to_owned(),
2026 );
2027 source
2029 .as_mut()
2030 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
2031 }
2032
2033 if let Some(snapshot_expiration_clear_expired_files) = handler_args
2034 .with_options
2035 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
2036 {
2037 sink_with.insert(
2038 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
2039 snapshot_expiration_clear_expired_files.to_owned(),
2040 );
2041 source.as_mut().map(|x| {
2043 x.with_properties
2044 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
2045 });
2046 }
2047
2048 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
2049 .with_options
2050 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
2051 {
2052 sink_with.insert(
2053 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
2054 snapshot_expiration_clear_expired_meta_data.to_owned(),
2055 );
2056 source.as_mut().map(|x| {
2058 x.with_properties
2059 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
2060 });
2061 }
2062 }
2063
2064 if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
2065 let format_version = format_version.parse::<u8>().map_err(|_| {
2066 ErrorCode::InvalidInputSyntax(format!(
2067 "format_version must be 1, 2 or 3: {}",
2068 format_version
2069 ))
2070 })?;
2071 if format_version != 1 && format_version != 2 && format_version != 3 {
2072 bail!("format_version must be 1, 2 or 3");
2073 }
2074 sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
2075
2076 source
2078 .as_mut()
2079 .map(|x| x.with_properties.remove(FORMAT_VERSION));
2080 }
2081
2082 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
2083 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
2084 ErrorCode::InvalidInputSyntax(format!(
2085 "invalid write_mode: {}, must be one of: {}, {}",
2086 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
2087 ))
2088 })?;
2089
2090 match write_mode {
2091 IcebergWriteMode::MergeOnRead => {
2092 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2093 }
2094
2095 IcebergWriteMode::CopyOnWrite => {
2096 if table.append_only {
2097 return Err(ErrorCode::NotSupported(
2098 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2099 "Please use MERGE ON READ instead".to_owned(),
2100 )
2101 .into());
2102 }
2103
2104 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2105 }
2106 }
2107
2108 source
2110 .as_mut()
2111 .map(|x| x.with_properties.remove("write_mode"));
2112 } else {
2113 sink_with.insert(
2114 WRITE_MODE.to_owned(),
2115 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2116 );
2117 }
2118
2119 if let Some(max_snapshots_num_before_compaction) =
2120 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2121 {
2122 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2123 .parse::<u32>()
2124 .map_err(|_| {
2125 ErrorCode::InvalidInputSyntax(format!(
2126 "{} must be greater than 0: {}",
2127 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2128 ))
2129 })?;
2130
2131 if max_snapshots_num_before_compaction == 0 {
2132 bail!(format!(
2133 "{} must be greater than 0",
2134 COMPACTION_MAX_SNAPSHOTS_NUM
2135 ));
2136 }
2137
2138 sink_with.insert(
2139 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2140 max_snapshots_num_before_compaction.to_string(),
2141 );
2142
2143 source
2145 .as_mut()
2146 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2147 }
2148
2149 if let Some(small_files_threshold_mb) = handler_args
2150 .with_options
2151 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2152 {
2153 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2154 ErrorCode::InvalidInputSyntax(format!(
2155 "{} must be greater than 0: {}",
2156 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2157 ))
2158 })?;
2159 if small_files_threshold_mb == 0 {
2160 bail!(format!(
2161 "{} must be a greater than 0",
2162 COMPACTION_SMALL_FILES_THRESHOLD_MB
2163 ));
2164 }
2165 sink_with.insert(
2166 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2167 small_files_threshold_mb.to_string(),
2168 );
2169
2170 source.as_mut().map(|x| {
2172 x.with_properties
2173 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2174 });
2175 }
2176
2177 if let Some(delete_files_count_threshold) = handler_args
2178 .with_options
2179 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2180 {
2181 let delete_files_count_threshold =
2182 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2183 ErrorCode::InvalidInputSyntax(format!(
2184 "{} must be greater than 0: {}",
2185 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2186 ))
2187 })?;
2188 if delete_files_count_threshold == 0 {
2189 bail!(format!(
2190 "{} must be greater than 0",
2191 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2192 ));
2193 }
2194 sink_with.insert(
2195 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2196 delete_files_count_threshold.to_string(),
2197 );
2198
2199 source.as_mut().map(|x| {
2201 x.with_properties
2202 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2203 });
2204 }
2205
2206 if let Some(trigger_snapshot_count) = handler_args
2207 .with_options
2208 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2209 {
2210 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2211 ErrorCode::InvalidInputSyntax(format!(
2212 "{} must be greater than 0: {}",
2213 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2214 ))
2215 })?;
2216 if trigger_snapshot_count == 0 {
2217 bail!(format!(
2218 "{} must be greater than 0",
2219 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2220 ));
2221 }
2222 sink_with.insert(
2223 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2224 trigger_snapshot_count.to_string(),
2225 );
2226
2227 source
2229 .as_mut()
2230 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2231 }
2232
2233 if let Some(target_file_size_mb) = handler_args
2234 .with_options
2235 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2236 {
2237 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2238 ErrorCode::InvalidInputSyntax(format!(
2239 "{} must be greater than 0: {}",
2240 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2241 ))
2242 })?;
2243 if target_file_size_mb == 0 {
2244 bail!(format!(
2245 "{} must be greater than 0",
2246 COMPACTION_TARGET_FILE_SIZE_MB
2247 ));
2248 }
2249 sink_with.insert(
2250 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2251 target_file_size_mb.to_string(),
2252 );
2253 source
2255 .as_mut()
2256 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2257 }
2258
2259 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2260 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2261 ErrorCode::InvalidInputSyntax(format!(
2262 "invalid compaction_type: {}, must be one of {:?}",
2263 compaction_type,
2264 &[
2265 CompactionType::Full,
2266 CompactionType::SmallFiles,
2267 CompactionType::FilesWithDelete
2268 ]
2269 ))
2270 })?;
2271
2272 sink_with.insert(
2273 COMPACTION_TYPE.to_owned(),
2274 compaction_type.as_str().to_owned(),
2275 );
2276
2277 source
2279 .as_mut()
2280 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2281 }
2282
2283 if let Some(write_parquet_compression) = handler_args
2284 .with_options
2285 .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2286 {
2287 sink_with.insert(
2288 COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2289 write_parquet_compression.to_owned(),
2290 );
2291 source.as_mut().map(|x| {
2293 x.with_properties
2294 .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2295 });
2296 }
2297
2298 if let Some(write_parquet_max_row_group_rows) = handler_args
2299 .with_options
2300 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2301 {
2302 let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2303 .parse::<usize>()
2304 .map_err(|_| {
2305 ErrorCode::InvalidInputSyntax(format!(
2306 "{} must be a positive integer: {}",
2307 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2308 ))
2309 })?;
2310 if write_parquet_max_row_group_rows == 0 {
2311 bail!(format!(
2312 "{} must be greater than 0",
2313 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2314 ));
2315 }
2316 sink_with.insert(
2317 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2318 write_parquet_max_row_group_rows.to_string(),
2319 );
2320 source.as_mut().map(|x| {
2322 x.with_properties
2323 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2324 });
2325 }
2326
2327 if let Some(write_parquet_max_row_group_bytes) = handler_args
2328 .with_options
2329 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2330 {
2331 let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2332 .parse::<usize>()
2333 .map_err(|_| {
2334 ErrorCode::InvalidInputSyntax(format!(
2335 "{} must be a positive integer: {}",
2336 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2337 ))
2338 })?;
2339 if write_parquet_max_row_group_bytes == 0 {
2340 bail!(format!(
2341 "{} must be greater than 0",
2342 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2343 ));
2344 }
2345 sink_with.insert(
2346 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2347 write_parquet_max_row_group_bytes.to_string(),
2348 );
2349 source.as_mut().map(|x| {
2350 x.with_properties
2351 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2352 });
2353 }
2354
2355 let partition_by = handler_args
2356 .with_options
2357 .get("partition_by")
2358 .map(|v| v.to_owned());
2359
2360 if let Some(partition_by) = &partition_by {
2361 let mut partition_columns = vec![];
2362 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2363 table
2364 .columns()
2365 .iter()
2366 .find(|col| col.name().eq_ignore_ascii_case(&column))
2367 .ok_or_else(|| {
2368 ErrorCode::InvalidInputSyntax(format!(
2369 "Partition source column does not exist in schema: {}",
2370 column
2371 ))
2372 })?;
2373
2374 partition_columns.push(column);
2375 }
2376
2377 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2378 |_| {
2379 ErrorCode::InvalidInputSyntax(
2380 "The partition columns should be the prefix of the primary key".to_owned(),
2381 )
2382 },
2383 )?;
2384
2385 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2386
2387 source
2389 .as_mut()
2390 .map(|x| x.with_properties.remove("partition_by"));
2391 }
2392
2393 let order_key = handler_args
2394 .with_options
2395 .get(ORDER_KEY)
2396 .map(|v| v.to_owned());
2397 if let Some(order_key) = &order_key {
2398 validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2399 .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2400
2401 sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2402
2403 source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2404 }
2405
2406 sink_handler_args.with_options =
2407 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2408 let SinkPlanContext {
2409 sink_plan,
2410 sink_catalog,
2411 ..
2412 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2413 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2414
2415 let mut source_name = table_name.clone();
2416 *source_name.0.last_mut().unwrap() = Ident::from(
2417 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2418 );
2419 let create_source_stmt = CreateSourceStatement {
2420 temporary: false,
2421 if_not_exists: false,
2422 columns: vec![],
2423 source_name,
2424 wildcard_idx: Some(0),
2425 constraints: vec![],
2426 with_properties: WithProperties(vec![]),
2427 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2428 source_watermarks: vec![],
2429 include_column_options: vec![],
2430 };
2431
2432 let mut source_handler_args = handler_args.clone();
2433 let source_with = with_common;
2434 source_handler_args.with_options =
2435 WithOptions::new(source_with, Default::default(), connection_ref);
2436
2437 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2438 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2439 let (with_properties, refresh_mode) =
2440 bind_connector_props(&source_handler_args, &format_encode, true)?;
2441
2442 let (iceberg_catalog, table_identifier) = {
2445 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2446 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2447 iceberg_sink.create_table_if_not_exists().await?;
2448
2449 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2450 let table_identifier = iceberg_sink.config.full_table_name()?;
2451 (iceberg_catalog, table_identifier)
2452 };
2453
2454 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2455 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2456 &session,
2457 &format_encode,
2458 Either::Left(&with_properties),
2459 create_source_type,
2460 )
2461 .await?;
2462 let mut col_id_gen = ColumnIdGenerator::new_initial();
2463
2464 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2465 source_handler_args,
2466 create_source_stmt.source_name,
2467 format_encode,
2468 with_properties,
2469 &create_source_stmt.columns,
2470 create_source_stmt.constraints,
2471 create_source_stmt.wildcard_idx,
2472 create_source_stmt.source_watermarks,
2473 columns_from_resolve_source,
2474 source_info,
2475 create_source_stmt.include_column_options,
2476 &mut col_id_gen,
2477 create_source_type,
2478 overwrite_options.source_rate_limit,
2479 SqlColumnStrategy::FollowChecked,
2480 refresh_mode,
2481 )
2482 .await?;
2483
2484 let _ = Jvm::get_or_init()?;
2487
2488 let catalog_writer = session.catalog_writer()?;
2489 let action = match job_type {
2490 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2491 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2492 };
2493 let res = execute_with_long_running_notification(
2494 catalog_writer.create_iceberg_table(
2495 PbTableJobInfo {
2496 source,
2497 table: Some(table.to_prost()),
2498 fragment_graph: Some(graph),
2499 job_type: job_type as _,
2500 },
2501 PbSinkJobInfo {
2502 sink: Some(sink_catalog.to_proto()),
2503 fragment_graph: Some(sink_graph),
2504 },
2505 iceberg_source_catalog.to_prost(),
2506 if_not_exists,
2507 ),
2508 &session,
2509 "CREATE TABLE",
2510 action,
2511 )
2512 .await;
2513
2514 if res.is_err() {
2515 let _ = iceberg_catalog
2516 .drop_table(&table_identifier)
2517 .await
2518 .inspect_err(|err| {
2519 tracing::error!(
2520 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2521 table_identifier,
2522 err.as_report()
2523 );
2524 });
2525 res?
2526 }
2527
2528 Ok(())
2529}
2530
2531pub fn check_create_table_with_source(
2532 with_options: &WithOptions,
2533 format_encode: Option<FormatEncodeOptions>,
2534 include_column_options: &IncludeOption,
2535 cdc_table_info: &Option<CdcTableInfo>,
2536) -> Result<Option<FormatEncodeOptions>> {
2537 if cdc_table_info.is_some() {
2539 return Ok(format_encode);
2540 }
2541 let defined_source = with_options.is_source_connector();
2542
2543 if !include_column_options.is_empty() && !defined_source {
2544 return Err(ErrorCode::InvalidInputSyntax(
2545 "INCLUDE should be used with a connector".to_owned(),
2546 )
2547 .into());
2548 }
2549 if defined_source {
2550 format_encode.as_ref().ok_or_else(|| {
2551 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2552 })?;
2553 }
2554 Ok(format_encode)
2555}
2556
2557fn ensure_partition_columns_are_prefix_of_primary_key(
2558 partition_columns: &[String],
2559 primary_key_columns: &[String],
2560) -> std::result::Result<(), String> {
2561 if partition_columns.len() > primary_key_columns.len() {
2562 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2563 }
2564
2565 for (i, partition_col) in partition_columns.iter().enumerate() {
2566 if primary_key_columns.get(i) != Some(partition_col) {
2567 return Err(format!(
2568 "Partition column '{}' is not a prefix of the primary key.",
2569 partition_col
2570 ));
2571 }
2572 }
2573
2574 Ok(())
2575}
2576
2577#[allow(clippy::too_many_arguments)]
2578pub async fn generate_stream_graph_for_replace_table(
2579 _session: &Arc<SessionImpl>,
2580 table_name: ObjectName,
2581 original_catalog: &Arc<TableCatalog>,
2582 handler_args: HandlerArgs,
2583 statement: Statement,
2584 col_id_gen: ColumnIdGenerator,
2585 sql_column_strategy: SqlColumnStrategy,
2586) -> Result<(
2587 StreamFragmentGraph,
2588 TableCatalog,
2589 Option<SourceCatalog>,
2590 TableJobType,
2591)> {
2592 let Statement::CreateTable {
2593 columns,
2594 constraints,
2595 source_watermarks,
2596 append_only,
2597 on_conflict,
2598 with_version_columns,
2599 wildcard_idx,
2600 cdc_table_info,
2601 format_encode,
2602 include_column_options,
2603 engine,
2604 with_options,
2605 ..
2606 } = statement
2607 else {
2608 panic!("unexpected statement type: {:?}", statement);
2609 };
2610
2611 let format_encode = format_encode
2612 .clone()
2613 .map(|format_encode| format_encode.into_v2_with_warning());
2614
2615 let engine = match engine {
2616 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2617 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2618 };
2619
2620 let is_drop_connector =
2621 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2622 if is_drop_connector {
2623 debug_assert!(
2624 source_watermarks.is_empty()
2625 && include_column_options.is_empty()
2626 && with_options
2627 .iter()
2628 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2629 );
2630 }
2631
2632 let props = CreateTableProps {
2633 definition: handler_args.normalized_sql.clone(),
2634 append_only,
2635 on_conflict: on_conflict.into(),
2636 with_version_columns: with_version_columns
2637 .iter()
2638 .map(|col| col.real_value())
2639 .collect(),
2640 webhook_info: original_catalog.webhook_info.clone(),
2641 engine,
2642 };
2643
2644 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2645 (Some(format_encode), None) => (
2646 gen_create_table_plan_with_source(
2647 handler_args,
2648 ExplainOptions::default(),
2649 table_name,
2650 columns,
2651 wildcard_idx,
2652 constraints,
2653 format_encode,
2654 source_watermarks,
2655 col_id_gen,
2656 include_column_options,
2657 props,
2658 sql_column_strategy,
2659 )
2660 .await?,
2661 TableJobType::General,
2662 ),
2663 (None, None) => {
2664 let context = OptimizerContext::from_handler_args(handler_args);
2665 let (plan, table) = gen_create_table_plan(
2666 context,
2667 table_name,
2668 columns,
2669 constraints,
2670 col_id_gen,
2671 source_watermarks,
2672 props,
2673 true,
2674 )?;
2675 ((plan, None, table), TableJobType::General)
2676 }
2677 (None, Some(cdc_table)) => {
2678 sanity_check_for_table_on_cdc_source(
2679 append_only,
2680 &columns,
2681 &wildcard_idx,
2682 &constraints,
2683 &source_watermarks,
2684 )?;
2685
2686 let session = &handler_args.session;
2687 let (source, resolved_table_name) =
2688 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2689
2690 let (cdc_with_options, normalized_external_table_name) =
2691 derive_with_options_for_cdc_table(
2692 &source.with_properties,
2693 cdc_table.external_table_name.clone(),
2694 )?;
2695
2696 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2697
2698 let context: OptimizerContextRef =
2699 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2700 let (plan, table) = gen_create_table_plan_for_cdc_table(
2701 context,
2702 source,
2703 normalized_external_table_name,
2704 columns,
2705 source_watermarks,
2706 column_catalogs,
2707 pk_names,
2708 cdc_with_options,
2709 col_id_gen,
2710 on_conflict,
2711 with_version_columns
2712 .iter()
2713 .map(|col| col.real_value())
2714 .collect(),
2715 include_column_options,
2716 table_name,
2717 resolved_table_name,
2718 original_catalog.database_id,
2719 original_catalog.schema_id,
2720 original_catalog.id(),
2721 engine,
2722 )?;
2723
2724 ((plan, None, table), TableJobType::SharedCdcSource)
2725 }
2726 (Some(_), Some(_)) => {
2727 return Err(ErrorCode::NotSupported(
2728 "Data format and encoding format doesn't apply to table created from a CDC source"
2729 .into(),
2730 "Remove the FORMAT and ENCODE specification".into(),
2731 )
2732 .into());
2733 }
2734 };
2735
2736 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2737 Err(ErrorCode::InvalidInputSyntax(
2738 "alter primary key of table is not supported".to_owned(),
2739 ))?
2740 }
2741
2742 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2743
2744 table.id = original_catalog.id();
2746 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2747 table.associated_source_id = Some(source_id);
2748
2749 let source = source.as_mut().unwrap();
2750 source.id = source_id;
2751 source.associated_table_id = Some(table.id());
2752 }
2753
2754 Ok((graph, table, source, job_type))
2755}
2756
2757fn get_source_and_resolved_table_name(
2758 session: &Arc<SessionImpl>,
2759 cdc_table: CdcTableInfo,
2760 table_name: ObjectName,
2761) -> Result<(Arc<SourceCatalog>, String)> {
2762 let db_name = &session.database();
2763 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2764
2765 let (source_schema, source_name) =
2766 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2767
2768 let source = {
2769 let catalog_reader = session.env().catalog_reader().read_guard();
2770 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2771 let (source, _) = catalog_reader.get_source_by_name(
2772 db_name,
2773 SchemaPath::Name(schema_name.as_str()),
2774 source_name.as_str(),
2775 )?;
2776 source.clone()
2777 };
2778
2779 Ok((source, resolved_table_name))
2780}
2781
2782fn bind_webhook_info(
2784 session: &Arc<SessionImpl>,
2785 column_defs: &[ColumnDef],
2786 webhook_info: WebhookSourceInfo,
2787) -> Result<PbWebhookSourceInfo> {
2788 let WebhookSourceInfo {
2789 secret_ref,
2790 signature_expr,
2791 wait_for_persistence,
2792 is_batched,
2793 } = webhook_info;
2794
2795 for column in column_defs {
2796 for option_def in &column.options {
2797 match option_def.option {
2798 ColumnOption::Null => {}
2799 ColumnOption::GeneratedColumns(_) => {
2800 return Err(ErrorCode::InvalidInputSyntax(
2801 "generated columns are not supported for webhook tables".to_owned(),
2802 )
2803 .into());
2804 }
2805 ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2806 return Err(ErrorCode::InvalidInputSyntax(
2807 "default values are not supported for webhook tables".to_owned(),
2808 )
2809 .into());
2810 }
2811 ColumnOption::NotNull
2812 | ColumnOption::Unique { .. }
2813 | ColumnOption::ForeignKey { .. }
2814 | ColumnOption::Check(_)
2815 | ColumnOption::DialectSpecific(_) => {
2816 return Err(ErrorCode::InvalidInputSyntax(
2817 "only NULL column option is supported for webhook tables".to_owned(),
2818 )
2819 .into());
2820 }
2821 }
2822 }
2823 }
2824
2825 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2827 let db_name = &session.database();
2828 let (schema_name, secret_name) =
2829 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2830 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2831 (
2832 Some(PbSecretRef {
2833 secret_id: secret_catalog.id,
2834 ref_as: match secret_ref.ref_as {
2835 SecretRefAsType::Text => PbRefAsType::Text,
2836 SecretRefAsType::File => PbRefAsType::File,
2837 }
2838 .into(),
2839 }),
2840 Some(secret_name),
2841 )
2842 } else {
2843 (None, None)
2844 };
2845
2846 let signature_expr = if let Some(signature_expr) = signature_expr {
2847 let payload_name = if column_defs.len() == 1
2848 && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2849 {
2850 column_defs[0].name.real_value()
2851 } else {
2852 WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2853 };
2854 let secure_compare_context = SecureCompareContext {
2855 payload_name,
2856 secret_name,
2857 };
2858 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2859 let expr = binder.bind_expr(&signature_expr)?;
2860
2861 if expr.as_function_call().is_none()
2863 || expr.as_function_call().unwrap().func_type()
2864 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2865 {
2866 return Err(ErrorCode::InvalidInputSyntax(
2867 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2868 )
2869 .into());
2870 }
2871
2872 Some(expr.to_expr_proto())
2873 } else {
2874 session.notice_to_user(
2875 "VALIDATE clause is strongly recommended for safety or production usages",
2876 );
2877 None
2878 };
2879
2880 let pb_webhook_info = PbWebhookSourceInfo {
2881 secret_ref: pb_secret_ref,
2882 signature_expr,
2883 wait_for_persistence,
2884 is_batched,
2885 };
2886
2887 Ok(pb_webhook_info)
2888}
2889
2890#[cfg(test)]
2891mod tests {
2892 use risingwave_common::catalog::{
2893 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2894 };
2895 use risingwave_common::types::{DataType, StructType};
2896
2897 use super::*;
2898 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2899
2900 #[tokio::test]
2901 async fn test_create_table_handler() {
2902 let sql =
2903 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2904 let frontend = LocalFrontend::new(Default::default()).await;
2905 frontend.run_sql(sql).await.unwrap();
2906
2907 let session = frontend.session_ref();
2908 let catalog_reader = session.env().catalog_reader().read_guard();
2909 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2910
2911 let (table, _) = catalog_reader
2913 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2914 .unwrap();
2915 assert_eq!(table.name(), "t");
2916
2917 let columns = table
2918 .columns
2919 .iter()
2920 .map(|col| (col.name(), col.data_type().clone()))
2921 .collect::<HashMap<&str, DataType>>();
2922
2923 let expected_columns = maplit::hashmap! {
2924 ROW_ID_COLUMN_NAME => DataType::Serial,
2925 "v1" => DataType::Int16,
2926 "v2" => StructType::new(
2927 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2928 )
2929 .with_ids([3, 4, 5].map(ColumnId::new))
2930 .into(),
2931 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2932 };
2933
2934 assert_eq!(columns, expected_columns, "{columns:#?}");
2935 }
2936
2937 #[tokio::test]
2938 async fn test_create_webhook_table_with_arbitrary_columns() {
2939 let frontend = LocalFrontend::new(Default::default()).await;
2940 frontend
2941 .run_sql("create schema ingest_schema;")
2942 .await
2943 .unwrap();
2944 frontend
2945 .run_sql(
2946 r#"
2947 create table ingest_schema.orders (
2948 id int,
2949 customer_name varchar,
2950 amount double precision,
2951 primary key (id)
2952 ) with (
2953 connector = 'webhook'
2954 ) validate as secure_compare(
2955 headers->>'x-rw-signature',
2956 'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2957 );
2958 "#,
2959 )
2960 .await
2961 .unwrap();
2962
2963 let session = frontend.session_ref();
2964 let catalog_reader = session.env().catalog_reader().read_guard();
2965 let (table, _) = catalog_reader
2966 .get_created_table_by_name(
2967 DEFAULT_DATABASE_NAME,
2968 SchemaPath::Name("ingest_schema"),
2969 "orders",
2970 )
2971 .unwrap();
2972
2973 assert!(table.webhook_info.is_some());
2974 assert_eq!(
2975 table
2976 .columns
2977 .iter()
2978 .filter(|column| column.can_dml())
2979 .count(),
2980 3
2981 );
2982 }
2983
2984 #[tokio::test]
2985 async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
2986 let frontend = LocalFrontend::new(Default::default()).await;
2987 frontend
2988 .run_sql(
2989 r#"
2990 create table webhook_single_column (
2991 body jsonb
2992 ) with (
2993 connector = 'webhook'
2994 ) validate as secure_compare(
2995 headers->>'x-rw-signature',
2996 'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
2997 );
2998 "#,
2999 )
3000 .await
3001 .unwrap();
3002 }
3003
3004 #[tokio::test]
3005 async fn test_create_webhook_table_with_generated_columns() {
3006 let frontend = LocalFrontend::new(Default::default()).await;
3007 let err = frontend
3008 .run_sql(
3009 r#"
3010 create table webhook_generated_columns (
3011 id int,
3012 amount double precision,
3013 amount_with_fee double precision as amount + 1.0
3014 ) with (
3015 connector = 'webhook'
3016 );
3017 "#,
3018 )
3019 .await
3020 .unwrap_err();
3021
3022 assert!(
3023 err.to_string()
3024 .contains("generated columns are not supported for webhook tables"),
3025 "{err:?}"
3026 );
3027 }
3028
3029 #[tokio::test]
3030 async fn test_create_webhook_table_with_default_value() {
3031 let frontend = LocalFrontend::new(Default::default()).await;
3032 let err = frontend
3033 .run_sql(
3034 r#"
3035 create table webhook_default_value (
3036 id int default 42,
3037 amount double precision
3038 ) with (
3039 connector = 'webhook'
3040 );
3041 "#,
3042 )
3043 .await
3044 .unwrap_err();
3045
3046 assert!(
3047 err.to_string()
3048 .contains("default values are not supported for webhook tables"),
3049 "{err:?}"
3050 );
3051 }
3052
3053 #[tokio::test]
3054 async fn test_create_webhook_table_with_not_null_option() {
3055 let frontend = LocalFrontend::new(Default::default()).await;
3056 let err = frontend
3057 .run_sql(
3058 r#"
3059 create table webhook_not_null (
3060 id int not null,
3061 amount double precision
3062 ) with (
3063 connector = 'webhook'
3064 );
3065 "#,
3066 )
3067 .await
3068 .unwrap_err();
3069
3070 assert!(
3071 err.to_string()
3072 .contains("only NULL column option is supported for webhook tables"),
3073 "{err:?}"
3074 );
3075 }
3076
3077 #[test]
3078 fn test_parse_postgres_cdc_external_table_name() {
3079 for (input, expected) in [
3080 ("public.Note", ("public", "Note")),
3081 ("public.\"Note\"", ("public", "Note")),
3082 (
3083 "\"Mixed.Schema\".\"Note.Table\"",
3084 ("Mixed.Schema", "Note.Table"),
3085 ),
3086 ("public.\"Note\"\"Archive\"", ("public", "Note\"Archive")),
3087 ] {
3088 assert_eq!(
3089 parse_postgres_cdc_external_table_name(input).unwrap(),
3090 (expected.0.to_owned(), expected.1.to_owned()),
3091 "input: {input}"
3092 );
3093 }
3094
3095 for input in [
3096 "Note",
3097 "public.",
3098 ".Note",
3099 "public.\"Note",
3100 "public.\"Note\"Archive",
3101 "public.Note.Archive",
3102 ] {
3103 assert!(
3104 parse_postgres_cdc_external_table_name(input).is_err(),
3105 "input should be rejected: {input}"
3106 );
3107 }
3108 }
3109
3110 #[test]
3111 fn test_bind_primary_key() {
3112 for (sql, expected) in [
3115 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
3116 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
3117 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
3118 (
3119 "create table t (v1 int primary key, v2 int primary key)",
3120 Err("multiple primary keys are not allowed"),
3121 ),
3122 (
3123 "create table t (v1 int primary key primary key, v2 int)",
3124 Err("multiple primary keys are not allowed"),
3125 ),
3126 (
3127 "create table t (v1 int, v2 int, primary key (v1))",
3128 Ok(&[1]),
3129 ),
3130 (
3131 "create table t (v1 int, primary key (v2), v2 int)",
3132 Ok(&[2]),
3133 ),
3134 (
3135 "create table t (primary key (v2, v1), v1 int, v2 int)",
3136 Ok(&[2, 1]),
3137 ),
3138 (
3139 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3140 Err("multiple primary keys are not allowed"),
3141 ),
3142 (
3143 "create table t (v1 int primary key, primary key (v1), v2 int)",
3144 Err("multiple primary keys are not allowed"),
3145 ),
3146 (
3147 "create table t (v1 int, primary key (V3), v2 int)",
3148 Err("column \"v3\" named in key does not exist"),
3149 ),
3150 ] {
3151 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3152 let risingwave_sqlparser::ast::Statement::CreateTable {
3153 columns: column_defs,
3154 constraints,
3155 ..
3156 } = ast.remove(0)
3157 else {
3158 panic!("test case should be create table")
3159 };
3160 let actual: Result<_> = (|| {
3161 let mut columns = bind_sql_columns(&column_defs, false)?;
3162 let mut col_id_gen = ColumnIdGenerator::new_initial();
3163 for c in &mut columns {
3164 col_id_gen.generate(c)?;
3165 }
3166
3167 let pk_names =
3168 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3169 let (_, pk_column_ids, _) =
3170 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3171 Ok(pk_column_ids)
3172 })();
3173 match (expected, actual) {
3174 (Ok(expected), Ok(actual)) => assert_eq!(
3175 expected.iter().copied().map(ColumnId::new).collect_vec(),
3176 actual,
3177 "sql: {sql}"
3178 ),
3179 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3180 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3181 (Err(expected), Err(actual)) => assert!(
3182 actual.to_string().contains(expected),
3183 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3184 ),
3185 }
3186 }
3187 }
3188
3189 #[tokio::test]
3190 async fn test_duplicate_props_options() {
3191 let proto_file = create_proto_file(PROTO_FILE_DATA);
3192 let sql = format!(
3193 r#"CREATE TABLE t
3194 WITH (
3195 connector = 'kinesis',
3196 aws.region='user_test_topic',
3197 endpoint='172.10.1.1:9090,172.10.1.2:9090',
3198 aws.credentials.access_key_id = 'your_access_key_1',
3199 aws.credentials.secret_access_key = 'your_secret_key_1'
3200 )
3201 FORMAT PLAIN ENCODE PROTOBUF (
3202 message = '.test.TestRecord',
3203 aws.credentials.access_key_id = 'your_access_key_2',
3204 aws.credentials.secret_access_key = 'your_secret_key_2',
3205 schema.location = 'file://{}',
3206 )"#,
3207 proto_file.path().to_str().unwrap()
3208 );
3209 let frontend = LocalFrontend::new(Default::default()).await;
3210 frontend.run_sql(sql).await.unwrap();
3211
3212 let session = frontend.session_ref();
3213 let catalog_reader = session.env().catalog_reader().read_guard();
3214 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3215
3216 let (source, _) = catalog_reader
3218 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3219 .unwrap();
3220 assert_eq!(source.name, "t");
3221
3222 assert_eq!(
3224 source
3225 .info
3226 .format_encode_options
3227 .get("aws.credentials.access_key_id")
3228 .unwrap(),
3229 "your_access_key_2"
3230 );
3231 assert_eq!(
3232 source
3233 .info
3234 .format_encode_options
3235 .get("aws.credentials.secret_access_key")
3236 .unwrap(),
3237 "your_secret_key_2"
3238 );
3239
3240 assert_eq!(
3242 source
3243 .with_properties
3244 .get("aws.credentials.access_key_id")
3245 .unwrap(),
3246 "your_access_key_1"
3247 );
3248 assert_eq!(
3249 source
3250 .with_properties
3251 .get("aws.credentials.secret_access_key")
3252 .unwrap(),
3253 "your_secret_key_1"
3254 );
3255
3256 assert!(!source.with_properties.contains_key("schema.location"));
3258 }
3259}