1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::external::{
40 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
41 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::source::cdc::{
44 build_cdc_table_id, normalize_simple_postgres_quoted_table_name,
45};
46use risingwave_connector::{
47 AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
48};
49use risingwave_pb::catalog::connection::Info as ConnectionInfo;
50use risingwave_pb::catalog::connection_params::ConnectionType;
51use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
52use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
53use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
54use risingwave_pb::plan_common::{
55 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
56};
57use risingwave_pb::secret::PbSecretRef;
58use risingwave_pb::secret::secret_ref::PbRefAsType;
59use risingwave_pb::stream_plan::StreamFragmentGraph;
60use risingwave_sqlparser::ast::{
61 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
62 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
63 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
64 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
65};
66use risingwave_sqlparser::parser::IncludeOption;
67use thiserror_ext::AsReport;
68
69use super::RwPgResponse;
70use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
71use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
72use crate::catalog::root_catalog::SchemaPath;
73use crate::catalog::source_catalog::SourceCatalog;
74use crate::catalog::table_catalog::TableVersion;
75use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
76use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
77use crate::expr::{Expr, ExprImpl, ExprRewriter};
78use crate::handler::HandlerArgs;
79use crate::handler::create_source::{
80 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
81 bind_source_watermark, handle_addition_columns,
82};
83use crate::handler::util::{
84 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
85};
86use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
87use crate::optimizer::plan_node::{
88 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
89 ensure_sync_log_store_fragment_root,
90};
91use crate::optimizer::property::{Order, RequiredDist};
92use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
93use crate::session::SessionImpl;
94use crate::session::current::notice_to_user;
95use crate::stream_fragmenter::{GraphJobType, build_graph};
96use crate::utils::OverwriteOptions;
97use crate::{Binder, Explain, TableCatalog, WithOptions};
98
99mod col_id_gen;
100pub use col_id_gen::*;
101use risingwave_connector::sink::SinkParam;
102use risingwave_connector::sink::iceberg::{
103 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
104 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
105 COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
106 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS,
107 CompactionType, ENABLE_COMPACTION, ENABLE_PK_INDEX, ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
108 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
109 IcebergWriteMode, ORDER_KEY, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
110 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
111 SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
112 validate_order_key_columns,
113};
114use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
115
116use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
117
118fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
119 for option_def in &c.options {
120 match option_def.option {
121 ColumnOption::GeneratedColumns(_) => {}
122 ColumnOption::DefaultValue(_) => {}
123 ColumnOption::DefaultValueInternal { .. } => {}
124 ColumnOption::Unique { is_primary: true } => {}
125 ColumnOption::Null => {}
126 ColumnOption::NotNull => {}
127 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
128 }
129 }
130 Ok(())
131}
132
133pub fn bind_sql_columns(
137 column_defs: &[ColumnDef],
138 is_for_drop_table_connector: bool,
139) -> Result<Vec<ColumnCatalog>> {
140 let mut columns = Vec::with_capacity(column_defs.len());
141
142 for column in column_defs {
143 ensure_column_options_supported(column)?;
144 let ColumnDef {
148 name,
149 data_type,
150 collation,
151 options,
152 ..
153 } = column;
154
155 let data_type = data_type
156 .clone()
157 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
158 if let Some(collation) = collation {
159 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
165 bail_not_implemented!(
166 "Collate collation other than `C` or `POSIX` is not implemented"
167 );
168 }
169
170 match data_type {
171 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
172 _ => {
173 return Err(ErrorCode::NotSupported(
174 format!("{} is not a collatable data type", data_type),
175 "The only built-in collatable data types are `varchar`, please check your type".into(),
176 ).into());
177 }
178 }
179 }
180
181 if !is_for_drop_table_connector {
182 check_column_name_not_reserved(&name.real_value())?;
186 }
187
188 let nullable: bool = !options
189 .iter()
190 .any(|def| matches!(def.option, ColumnOption::NotNull));
191
192 columns.push(ColumnCatalog {
193 column_desc: ColumnDesc {
194 data_type: bind_data_type(&data_type)?,
195 column_id: ColumnId::placeholder(),
196 name: name.real_value(),
197 generated_or_default_column: None,
198 description: None,
199 additional_column: AdditionalColumn { column_type: None },
200 version: ColumnDescVersion::LATEST,
201 system_column: None,
202 nullable,
203 },
204 is_hidden: false,
205 });
206 }
207
208 Ok(columns)
209}
210
211fn check_generated_column_constraints(
212 column_name: &String,
213 column_id: ColumnId,
214 expr: &ExprImpl,
215 column_catalogs: &[ColumnCatalog],
216 generated_column_names: &[String],
217 pk_column_ids: &[ColumnId],
218) -> Result<()> {
219 let input_refs = expr.collect_input_refs(column_catalogs.len());
220 for idx in input_refs.ones() {
221 let referred_generated_column = &column_catalogs[idx].column_desc.name;
222 if generated_column_names
223 .iter()
224 .any(|c| c == referred_generated_column)
225 {
226 return Err(ErrorCode::BindError(format!(
227 "Generated can not reference another generated column. \
228 But here generated column \"{}\" referenced another generated column \"{}\"",
229 column_name, referred_generated_column
230 ))
231 .into());
232 }
233 }
234
235 if pk_column_ids.contains(&column_id) && expr.is_impure() {
236 return Err(ErrorCode::BindError(format!(
237 "Generated columns with impure expressions should not be part of the primary key. \
238 Here column \"{}\" is defined as part of the primary key.",
239 column_name
240 ))
241 .into());
242 }
243
244 Ok(())
245}
246
247pub fn bind_sql_column_constraints(
250 session: &SessionImpl,
251 table_name: String,
252 column_catalogs: &mut [ColumnCatalog],
253 columns: &[ColumnDef],
254 pk_column_ids: &[ColumnId],
255) -> Result<()> {
256 let generated_column_names = {
257 let mut names = vec![];
258 for column in columns {
259 for option_def in &column.options {
260 if let ColumnOption::GeneratedColumns(_) = option_def.option {
261 names.push(column.name.real_value());
262 break;
263 }
264 }
265 }
266 names
267 };
268
269 let mut binder = Binder::new_for_ddl(session);
270 binder.bind_columns_to_context(table_name, column_catalogs)?;
271
272 for column in columns {
273 let Some(idx) = column_catalogs
274 .iter()
275 .position(|c| c.name() == column.name.real_value())
276 else {
277 continue;
280 };
281
282 for option_def in &column.options {
283 match &option_def.option {
284 ColumnOption::GeneratedColumns(expr) => {
285 binder.set_clause(Some(Clause::GeneratedColumn));
286
287 let expr_impl = binder.bind_expr(expr).with_context(|| {
288 format!(
289 "fail to bind expression in generated column \"{}\"",
290 column.name.real_value()
291 )
292 })?;
293
294 check_generated_column_constraints(
295 &column.name.real_value(),
296 column_catalogs[idx].column_id(),
297 &expr_impl,
298 column_catalogs,
299 &generated_column_names,
300 pk_column_ids,
301 )?;
302
303 column_catalogs[idx].column_desc.generated_or_default_column = Some(
304 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
305 expr: Some(expr_impl.to_expr_proto()),
306 }),
307 );
308 binder.set_clause(None);
309 }
310 ColumnOption::DefaultValue(expr) => {
311 let expr_impl = binder
312 .bind_expr(expr)?
313 .cast_assign(column_catalogs[idx].data_type())?;
314
315 let rewritten_expr_impl = session
323 .pinned_snapshot()
324 .inline_now_proc_time()
325 .rewrite_expr(expr_impl.clone());
326
327 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
328 let snapshot_value = snapshot_value?;
329
330 column_catalogs[idx].column_desc.generated_or_default_column =
331 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
332 snapshot_value: Some(snapshot_value.to_protobuf()),
333 expr: Some(expr_impl.to_expr_proto()),
334 }));
336 } else {
337 return Err(ErrorCode::BindError(format!(
338 "Default expression used in column `{}` cannot be evaluated. \
339 Use generated columns instead if you mean to reference other columns.",
340 column.name
341 ))
342 .into());
343 }
344 }
345 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
346 if persisted.is_empty() {
351 bail_bind_error!(
352 "DEFAULT INTERNAL is only used for internal purposes, \
353 please specify a concrete default value"
354 );
355 }
356
357 let desc = DefaultColumnDesc::decode(&**persisted)
358 .expect("failed to decode persisted `DefaultColumnDesc`");
359
360 column_catalogs[idx].column_desc.generated_or_default_column =
361 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
362 }
363 _ => {}
364 }
365 }
366 }
367 Ok(())
368}
369
370pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
372 let mut pk_column_names = vec![];
373
374 for constraint in table_constraints {
375 match constraint {
376 TableConstraint::Unique {
377 name: _,
378 columns,
379 is_primary: true,
380 } => {
381 if !pk_column_names.is_empty() {
382 return Err(multiple_pk_definition_err());
383 }
384 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
385 }
386 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
387 }
388 }
389 Ok(pk_column_names)
390}
391
392pub fn bind_sql_pk_names(
393 columns_defs: &[ColumnDef],
394 pk_names_from_table_constraints: Vec<String>,
395) -> Result<Vec<String>> {
396 let mut pk_column_names = pk_names_from_table_constraints;
397
398 for column in columns_defs {
399 for option_def in &column.options {
400 if let ColumnOption::Unique { is_primary: true } = option_def.option {
401 if !pk_column_names.is_empty() {
402 return Err(multiple_pk_definition_err());
403 }
404 pk_column_names.push(column.name.real_value());
405 };
406 }
407 }
408
409 Ok(pk_column_names)
410}
411
412fn multiple_pk_definition_err() -> RwError {
413 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
414}
415
416pub fn bind_pk_and_row_id_on_relation(
421 mut columns: Vec<ColumnCatalog>,
422 pk_names: Vec<String>,
423 must_need_pk: bool,
424) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
425 for c in &columns {
426 assert!(c.column_id() != ColumnId::placeholder());
427 }
428
429 let name_to_id = columns
431 .iter()
432 .map(|c| (c.name(), c.column_id()))
433 .collect::<HashMap<_, _>>();
434
435 let mut pk_column_ids: Vec<_> = pk_names
436 .iter()
437 .map(|name| {
438 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
439 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
440 })
441 })
442 .try_collect()?;
443
444 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
446
447 let row_id_index = need_row_id.then(|| {
448 let column = ColumnCatalog::row_id_column();
449 let index = columns.len();
450 pk_column_ids = vec![column.column_id()];
451 columns.push(column);
452 index
453 });
454
455 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
456 Err(ErrorCode::InvalidInputSyntax(format!(
457 "column \"{col}\" specified more than once"
458 )))?;
459 }
460
461 Ok((columns, pk_column_ids, row_id_index))
462}
463
464#[allow(clippy::too_many_arguments)]
467pub(crate) async fn gen_create_table_plan_with_source(
468 mut handler_args: HandlerArgs,
469 explain_options: ExplainOptions,
470 table_name: ObjectName,
471 column_defs: Vec<ColumnDef>,
472 wildcard_idx: Option<usize>,
473 constraints: Vec<TableConstraint>,
474 format_encode: FormatEncodeOptions,
475 source_watermarks: Vec<SourceWatermark>,
476 mut col_id_gen: ColumnIdGenerator,
477 include_column_options: IncludeOption,
478 props: CreateTableProps,
479 sql_column_strategy: SqlColumnStrategy,
480) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
481 if props.append_only
482 && format_encode.format != Format::Plain
483 && format_encode.format != Format::Native
484 {
485 return Err(ErrorCode::BindError(format!(
486 "Append only table does not support format {}.",
487 format_encode.format
488 ))
489 .into());
490 }
491
492 let session = &handler_args.session;
493 let (with_properties, refresh_mode) =
494 bind_connector_props(&handler_args, &format_encode, false)?;
495 if with_properties.is_shareable_cdc_connector() {
496 generated_columns_check_for_cdc_table(&column_defs)?;
497 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
498 } else if column_defs.iter().any(|col| {
499 col.options
500 .iter()
501 .any(|def| matches!(def.option, ColumnOption::NotNull))
502 }) {
503 notice_to_user(
505 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
506 );
507 }
508
509 let db_name: &str = &session.database();
510 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
511
512 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
514 session,
515 &format_encode,
516 Either::Left(&with_properties),
517 CreateSourceType::Table,
518 )
519 .await?;
520
521 let overwrite_options = OverwriteOptions::new(&mut handler_args);
522 let rate_limit = overwrite_options.source_rate_limit;
523 let source = bind_create_source_or_table_with_connector(
524 handler_args.clone(),
525 table_name,
526 format_encode,
527 with_properties,
528 &column_defs,
529 constraints,
530 wildcard_idx,
531 source_watermarks,
532 columns_from_resolve_source,
533 source_info,
534 include_column_options,
535 &mut col_id_gen,
536 CreateSourceType::Table,
537 rate_limit,
538 sql_column_strategy,
539 refresh_mode,
540 )
541 .await?;
542
543 let context = OptimizerContext::new(handler_args, explain_options);
544
545 let (plan, table) = gen_table_plan_with_source(
546 context.into(),
547 schema_name,
548 source.clone(),
549 col_id_gen.into_version(),
550 props,
551 )?;
552
553 Ok((plan, Some(source), table))
554}
555
556#[allow(clippy::too_many_arguments)]
559pub(crate) fn gen_create_table_plan(
560 context: OptimizerContext,
561 table_name: ObjectName,
562 column_defs: Vec<ColumnDef>,
563 constraints: Vec<TableConstraint>,
564 mut col_id_gen: ColumnIdGenerator,
565 source_watermarks: Vec<SourceWatermark>,
566 props: CreateTableProps,
567 is_for_replace_plan: bool,
568) -> Result<(PlanRef, TableCatalog)> {
569 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
570 for c in &mut columns {
571 col_id_gen.generate(c)?;
572 }
573
574 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
575 if !secret_refs.is_empty() || !connection_refs.is_empty() {
576 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
577 }
578
579 gen_create_table_plan_without_source(
580 context,
581 table_name,
582 columns,
583 column_defs,
584 constraints,
585 source_watermarks,
586 col_id_gen.into_version(),
587 props,
588 )
589}
590
591#[allow(clippy::too_many_arguments)]
592pub(crate) fn gen_create_table_plan_without_source(
593 context: OptimizerContext,
594 table_name: ObjectName,
595 columns: Vec<ColumnCatalog>,
596 column_defs: Vec<ColumnDef>,
597 constraints: Vec<TableConstraint>,
598 source_watermarks: Vec<SourceWatermark>,
599 version: TableVersion,
600 props: CreateTableProps,
601) -> Result<(PlanRef, TableCatalog)> {
602 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
604 let (mut columns, pk_column_ids, row_id_index) =
605 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
606
607 let watermark_descs = bind_source_watermark(
608 context.session_ctx(),
609 table_name.real_value(),
610 source_watermarks,
611 &columns,
612 )?;
613
614 bind_sql_column_constraints(
615 context.session_ctx(),
616 table_name.real_value(),
617 &mut columns,
618 &column_defs,
619 &pk_column_ids,
620 )?;
621 let session = context.session_ctx().clone();
622
623 let db_name = &session.database();
624 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
625
626 let info = CreateTableInfo {
627 columns,
628 pk_column_ids,
629 row_id_index,
630 watermark_descs,
631 source_catalog: None,
632 version,
633 };
634
635 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
636}
637
638fn gen_table_plan_with_source(
639 context: OptimizerContextRef,
640 schema_name: Option<String>,
641 source_catalog: SourceCatalog,
642 version: TableVersion,
643 props: CreateTableProps,
644) -> Result<(PlanRef, TableCatalog)> {
645 let table_name = source_catalog.name.clone();
646
647 let info = CreateTableInfo {
648 columns: source_catalog.columns.clone(),
649 pk_column_ids: source_catalog.pk_col_ids.clone(),
650 row_id_index: source_catalog.row_id_index,
651 watermark_descs: source_catalog.watermark_descs.clone(),
652 source_catalog: Some(source_catalog),
653 version,
654 };
655
656 gen_table_plan_inner(context, schema_name, table_name, info, props)
657}
658
659#[derive(Clone, Copy)]
661pub enum EitherOnConflict {
662 Ast(Option<OnConflict>),
663 Resolved(ConflictBehavior),
664}
665
666impl From<Option<OnConflict>> for EitherOnConflict {
667 fn from(v: Option<OnConflict>) -> Self {
668 Self::Ast(v)
669 }
670}
671
672impl From<ConflictBehavior> for EitherOnConflict {
673 fn from(v: ConflictBehavior) -> Self {
674 Self::Resolved(v)
675 }
676}
677
678impl EitherOnConflict {
679 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
681 let conflict_behavior = match self {
682 EitherOnConflict::Ast(on_conflict) => {
683 if append_only {
684 if row_id_as_pk {
685 ConflictBehavior::NoCheck
687 } else {
688 if let Some(on_conflict) = on_conflict
690 && on_conflict != OnConflict::Nothing
691 {
692 return Err(ErrorCode::InvalidInputSyntax(
693 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
694 the ON CONFLICT behavior must be DO NOTHING."
695 .to_owned(),
696 )
697 .into());
698 }
699 ConflictBehavior::IgnoreConflict
700 }
701 } else {
702 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
704 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
705 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
706 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
707 }
708 }
709 }
710 EitherOnConflict::Resolved(b) => b,
711 };
712
713 Ok(conflict_behavior)
714 }
715}
716
717pub struct CreateTableInfo {
722 pub columns: Vec<ColumnCatalog>,
723 pub pk_column_ids: Vec<ColumnId>,
724 pub row_id_index: Option<usize>,
725 pub watermark_descs: Vec<WatermarkDesc>,
726 pub source_catalog: Option<SourceCatalog>,
727 pub version: TableVersion,
728}
729
730pub struct CreateTableProps {
735 pub definition: String,
736 pub append_only: bool,
737 pub on_conflict: EitherOnConflict,
738 pub with_version_columns: Vec<String>,
739 pub webhook_info: Option<PbWebhookSourceInfo>,
740 pub engine: Engine,
741}
742
743#[allow(clippy::too_many_arguments)]
744fn gen_table_plan_inner(
745 context: OptimizerContextRef,
746 schema_name: Option<String>,
747 table_name: String,
748 info: CreateTableInfo,
749 props: CreateTableProps,
750) -> Result<(PlanRef, TableCatalog)> {
751 let CreateTableInfo {
752 ref columns,
753 row_id_index,
754 ref watermark_descs,
755 ref source_catalog,
756 ..
757 } = info;
758 let CreateTableProps { append_only, .. } = props;
759
760 let (database_id, schema_id) = context
761 .session_ctx()
762 .get_database_and_schema_id_for_create(schema_name)?;
763
764 let session = context.session_ctx().clone();
765 let retention_seconds = context.with_options().retention_seconds();
766
767 let source_node: LogicalPlanRef = LogicalSource::new(
768 source_catalog.clone().map(Rc::new),
769 columns.clone(),
770 row_id_index,
771 SourceNodeKind::CreateTable,
772 context.clone(),
773 None,
774 )?
775 .into();
776
777 let required_cols = FixedBitSet::with_capacity(columns.len());
778 let plan_root = PlanRoot::new_with_logical_plan(
779 source_node,
780 RequiredDist::Any,
781 Order::any(),
782 required_cols,
783 vec![],
784 );
785
786 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
787
788 if !append_only && has_non_ttl_watermark {
789 return Err(ErrorCode::NotSupported(
790 "Defining watermarks on table requires the table to be append only.".to_owned(),
791 "Use the key words `APPEND ONLY`".to_owned(),
792 )
793 .into());
794 }
795
796 if !append_only && retention_seconds.is_some() {
797 if session
798 .config()
799 .unsafe_enable_storage_retention_for_non_append_only_tables()
800 {
801 tracing::warn!(
802 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
803 table_name
804 );
805 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
806 This may lead to stream inconsistency and unrecoverable \
807 node failure if there is any row INSERT/UPDATE/DELETE operation \
808 corresponding to the TTLed primary keys";
809 session.notice_to_user(NOTICE);
810 } else {
811 return Err(ErrorCode::NotSupported(
812 "Defining retention seconds on table requires the table to be append only."
813 .to_owned(),
814 "Use the key words `APPEND ONLY`".to_owned(),
815 )
816 .into());
817 }
818 }
819
820 let materialize =
821 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
822
823 let mut table = materialize.table().clone();
824 table.owner = session.user_id();
825
826 Ok((
827 ensure_sync_log_store_fragment_root(materialize.into()),
828 table,
829 ))
830}
831
832#[allow(clippy::too_many_arguments)]
836pub(crate) fn gen_create_table_plan_for_cdc_table(
837 context: OptimizerContextRef,
838 source: Arc<SourceCatalog>,
839 external_table_name: String,
840 column_defs: Vec<ColumnDef>,
841 source_watermarks: Vec<SourceWatermark>,
842 mut columns: Vec<ColumnCatalog>,
843 pk_names: Vec<String>,
844 cdc_with_options: WithOptionsSecResolved,
845 mut col_id_gen: ColumnIdGenerator,
846 on_conflict: Option<OnConflict>,
847 with_version_columns: Vec<String>,
848 include_column_options: IncludeOption,
849 table_name: ObjectName,
850 resolved_table_name: String, database_id: DatabaseId,
852 schema_id: SchemaId,
853 table_id: TableId,
854 engine: Engine,
855) -> Result<(PlanRef, TableCatalog)> {
856 let session = context.session_ctx().clone();
857
858 handle_addition_columns(
860 None,
861 &cdc_with_options,
862 include_column_options,
863 &mut columns,
864 true,
865 )?;
866
867 for c in &mut columns {
868 col_id_gen.generate(c)?;
869 }
870
871 let (mut columns, pk_column_ids, _row_id_index) =
872 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
873
874 let watermark_descs = bind_source_watermark(
875 context.session_ctx(),
876 table_name.real_value(),
877 source_watermarks,
878 &columns,
879 )?;
880
881 bind_sql_column_constraints(
883 context.session_ctx(),
884 table_name.real_value(),
885 &mut columns,
886 &column_defs,
887 &pk_column_ids,
888 )?;
889
890 let definition = context.normalized_sql().to_owned();
891
892 let pk_column_indices = {
893 let mut id_to_idx = HashMap::new();
894 columns.iter().enumerate().for_each(|(idx, c)| {
895 id_to_idx.insert(c.column_id(), idx);
896 });
897 pk_column_ids
899 .iter()
900 .map(|c| id_to_idx.get(c).copied().unwrap())
901 .collect_vec()
902 };
903 let table_pk = pk_column_indices
904 .iter()
905 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
906 .collect();
907
908 let (options, secret_refs) = cdc_with_options.into_parts();
909
910 let non_generated_column_descs = columns
911 .iter()
912 .filter(|&c| !c.is_generated())
913 .map(|c| c.column_desc.clone())
914 .collect_vec();
915 let non_generated_column_num = non_generated_column_descs.len();
916 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
917 let cdc_table_desc = CdcTableDesc {
918 table_id,
919 source_id: source.id, external_table_name: external_table_name.clone(),
921 pk: table_pk,
922 columns: non_generated_column_descs,
923 stream_key: pk_column_indices,
924 connect_properties: options,
925 secret_refs,
926 };
927
928 tracing::debug!(?cdc_table_desc, "create cdc table");
929 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
930
931 let logical_scan = LogicalCdcScan::create(
932 external_table_name.clone(),
933 Rc::new(cdc_table_desc),
934 context.clone(),
935 options,
936 );
937
938 let scan_node: LogicalPlanRef = logical_scan.into();
939 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
940 let plan_root = PlanRoot::new_with_logical_plan(
941 scan_node,
942 RequiredDist::Any,
943 Order::any(),
944 required_cols,
945 vec![],
946 );
947
948 let cdc_table_id_external_table_name = if let ExternalCdcTableType::Postgres = cdc_table_type
949 && let Some(normalized_table_name) =
950 normalize_simple_postgres_quoted_table_name(&external_table_name)
951 {
952 normalized_table_name
953 } else {
954 external_table_name
955 };
956 let cdc_table_id = build_cdc_table_id(source.id, &cdc_table_id_external_table_name);
957 let materialize = plan_root.gen_table_plan(
958 context,
959 resolved_table_name,
960 database_id,
961 schema_id,
962 CreateTableInfo {
963 columns,
964 pk_column_ids,
965 row_id_index: None,
966 watermark_descs,
967 source_catalog: Some((*source).clone()),
968 version: col_id_gen.into_version(),
969 },
970 CreateTableProps {
971 definition,
972 append_only: false,
973 on_conflict: on_conflict.into(),
974 with_version_columns,
975 webhook_info: None,
976 engine,
977 },
978 )?;
979
980 let mut table = materialize.table().clone();
981 table.owner = session.user_id();
982 table.cdc_table_id = Some(cdc_table_id);
983 table.cdc_table_type = Some(cdc_table_type);
984 Ok((
985 ensure_sync_log_store_fragment_root(materialize.into()),
986 table,
987 ))
988}
989
990fn derive_with_options_for_cdc_table(
998 source_with_properties: &WithOptionsSecResolved,
999 external_table_name: String,
1000) -> Result<(WithOptionsSecResolved, String)> {
1001 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
1002 let source_database_name: &str = source_with_properties
1004 .get("database.name")
1005 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
1006 .as_str();
1007 let mut with_options = source_with_properties.clone();
1008 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
1009 match connector.as_str() {
1010 MYSQL_CDC_CONNECTOR => {
1011 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
1014 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
1015 })?;
1016 if !source_database_name
1018 .split(',')
1019 .map(|s| s.trim())
1020 .any(|name| name == db_name)
1021 {
1022 return Err(anyhow!(
1023 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1024 db_name,
1025 source_database_name
1026 ).into());
1027 }
1028 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1029 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1030 return Ok((with_options, external_table_name));
1032 }
1033 POSTGRES_CDC_CONNECTOR => {
1034 let (schema_name, table_name) =
1035 parse_postgres_cdc_external_table_name(&external_table_name)?;
1036
1037 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name);
1039 with_options.insert(TABLE_NAME_KEY.into(), table_name);
1040 return Ok((with_options, external_table_name));
1042 }
1043 SQL_SERVER_CDC_CONNECTOR => {
1044 let parts: Vec<&str> = external_table_name.split('.').collect();
1052 let (schema_name, table_name) = match parts.len() {
1053 3 => {
1054 let db_name = parts[0];
1057 let schema_name = parts[1];
1058 let table_name = parts[2];
1059
1060 if db_name != source_database_name {
1061 return Err(anyhow!(
1062 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1063 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1064 db_name,
1065 source_database_name
1066 ).into());
1067 }
1068 (schema_name, table_name)
1069 }
1070 2 => {
1071 let schema_name = parts[0];
1074 let table_name = parts[1];
1075 (schema_name, table_name)
1076 }
1077 1 => {
1078 return Err(anyhow!(
1081 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1082 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1083 external_table_name,
1084 external_table_name,
1085 source_database_name,
1086 external_table_name
1087 ).into());
1088 }
1089 _ => {
1090 return Err(anyhow!(
1092 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1093 external_table_name
1094 ).into());
1095 }
1096 };
1097
1098 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1100 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1101
1102 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1105 return Ok((with_options, normalized_external_table_name));
1106 }
1107 _ => {
1108 return Err(RwError::from(anyhow!(
1109 "connector {} is not supported for cdc table",
1110 connector
1111 )));
1112 }
1113 };
1114 }
1115 unreachable!("All valid CDC connectors should have returned by now")
1116}
1117
1118fn parse_postgres_cdc_external_table_name(external_table_name: &str) -> Result<(String, String)> {
1123 let mut parts = vec![];
1124 let mut current = String::new();
1125 let mut chars = external_table_name.chars().peekable();
1126 let mut in_quote = false;
1127 let mut just_closed_quote = false;
1128
1129 while let Some(ch) = chars.next() {
1130 if in_quote {
1131 if ch == '"' {
1132 if chars.peek() == Some(&'"') {
1133 current.push('"');
1134 chars.next();
1135 } else {
1136 in_quote = false;
1137 just_closed_quote = true;
1138 }
1139 } else {
1140 current.push(ch);
1141 }
1142 } else {
1143 match ch {
1144 '.' => {
1145 if current.is_empty() {
1146 return Err(anyhow!(
1147 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1148 external_table_name
1149 )
1150 .into());
1151 }
1152 parts.push(std::mem::take(&mut current));
1153 just_closed_quote = false;
1154 }
1155 '"' if current.is_empty() => {
1156 in_quote = true;
1157 }
1158 '"' => {
1159 return Err(anyhow!(
1160 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1161 external_table_name
1162 )
1163 .into());
1164 }
1165 _ if just_closed_quote => {
1166 return Err(anyhow!(
1167 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1168 external_table_name
1169 )
1170 .into());
1171 }
1172 _ => current.push(ch),
1173 }
1174 }
1175 }
1176
1177 if in_quote || current.is_empty() {
1178 return Err(anyhow!(
1179 "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1180 external_table_name
1181 )
1182 .into());
1183 }
1184 parts.push(current);
1185
1186 if let [schema_name, table_name] = parts.as_slice() {
1187 Ok((schema_name.clone(), table_name.clone()))
1188 } else {
1189 Err(
1190 anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'")
1191 .into(),
1192 )
1193 }
1194}
1195
1196#[allow(clippy::too_many_arguments)]
1197pub(super) async fn handle_create_table_plan(
1198 handler_args: HandlerArgs,
1199 explain_options: ExplainOptions,
1200 format_encode: Option<FormatEncodeOptions>,
1201 cdc_table_info: Option<CdcTableInfo>,
1202 table_name: &ObjectName,
1203 column_defs: Vec<ColumnDef>,
1204 wildcard_idx: Option<usize>,
1205 constraints: Vec<TableConstraint>,
1206 source_watermarks: Vec<SourceWatermark>,
1207 append_only: bool,
1208 on_conflict: Option<OnConflict>,
1209 with_version_columns: Vec<String>,
1210 include_column_options: IncludeOption,
1211 webhook_info: Option<WebhookSourceInfo>,
1212 engine: Engine,
1213) -> Result<(
1214 PlanRef,
1215 Option<SourceCatalog>,
1216 TableCatalog,
1217 TableJobType,
1218 Option<SourceId>,
1219)> {
1220 let col_id_gen = ColumnIdGenerator::new_initial();
1221 let format_encode = check_create_table_with_source(
1222 &handler_args.with_options,
1223 format_encode,
1224 &include_column_options,
1225 &cdc_table_info,
1226 )?;
1227 let webhook_info = webhook_info
1228 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1229 .transpose()?;
1230
1231 let props = CreateTableProps {
1232 definition: handler_args.normalized_sql.clone(),
1233 append_only,
1234 on_conflict: on_conflict.into(),
1235 with_version_columns: with_version_columns.clone(),
1236 webhook_info,
1237 engine,
1238 };
1239
1240 let ((plan, source, table), job_type, shared_shource_id) = match (
1241 format_encode,
1242 cdc_table_info.as_ref(),
1243 ) {
1244 (Some(format_encode), None) => (
1245 gen_create_table_plan_with_source(
1246 handler_args,
1247 explain_options,
1248 table_name.clone(),
1249 column_defs,
1250 wildcard_idx,
1251 constraints,
1252 format_encode,
1253 source_watermarks,
1254 col_id_gen,
1255 include_column_options,
1256 props,
1257 SqlColumnStrategy::FollowChecked,
1258 )
1259 .await?,
1260 TableJobType::General,
1261 None,
1262 ),
1263 (None, None) => {
1264 let context = OptimizerContext::new(handler_args, explain_options);
1265 let (plan, table) = gen_create_table_plan(
1266 context,
1267 table_name.clone(),
1268 column_defs,
1269 constraints,
1270 col_id_gen,
1271 source_watermarks,
1272 props,
1273 false,
1274 )?;
1275
1276 ((plan, None, table), TableJobType::General, None)
1277 }
1278
1279 (None, Some(cdc_table)) => {
1280 sanity_check_for_table_on_cdc_source(
1281 append_only,
1282 &column_defs,
1283 &wildcard_idx,
1284 &constraints,
1285 &source_watermarks,
1286 )?;
1287
1288 generated_columns_check_for_cdc_table(&column_defs)?;
1289 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1290
1291 let session = &handler_args.session;
1292 let db_name = &session.database();
1293 let user_name = &session.user_name();
1294 let search_path = session.config().search_path();
1295 let (schema_name, resolved_table_name) =
1296 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1297 let (database_id, schema_id) =
1298 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1299
1300 let (source_schema, source_name) =
1302 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1303
1304 let source = {
1305 let catalog_reader = session.env().catalog_reader().read_guard();
1306 let schema_path =
1307 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1308
1309 let (source, _) = catalog_reader.get_source_by_name(
1310 db_name,
1311 schema_path,
1312 source_name.as_str(),
1313 )?;
1314 source.clone()
1315 };
1316 let (cdc_with_options, normalized_external_table_name) =
1317 derive_with_options_for_cdc_table(
1318 &source.with_properties,
1319 cdc_table.external_table_name.clone(),
1320 )?;
1321
1322 let (columns, pk_names) = match wildcard_idx {
1323 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1324 None => {
1325 for column_def in &column_defs {
1326 for option_def in &column_def.options {
1327 if let ColumnOption::DefaultValue(_)
1328 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1329 {
1330 return Err(ErrorCode::NotSupported(
1331 "Default value for columns defined on the table created from a CDC source".into(),
1332 "Remove the default value expression in the column definitions".into(),
1333 )
1334 .into());
1335 }
1336 }
1337 }
1338
1339 let (columns, pk_names) =
1340 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1341 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1343 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1344 .context("failed to extract external table config")?;
1345
1346 (columns, pk_names)
1347 }
1348 };
1349
1350 let context: OptimizerContextRef =
1351 OptimizerContext::new(handler_args, explain_options).into();
1352 let shared_source_id = source.id;
1353 let (plan, table) = gen_create_table_plan_for_cdc_table(
1354 context,
1355 source,
1356 normalized_external_table_name,
1357 column_defs,
1358 source_watermarks,
1359 columns,
1360 pk_names,
1361 cdc_with_options,
1362 col_id_gen,
1363 on_conflict,
1364 with_version_columns,
1365 include_column_options,
1366 table_name.clone(),
1367 resolved_table_name,
1368 database_id,
1369 schema_id,
1370 TableId::placeholder(),
1371 engine,
1372 )?;
1373
1374 (
1375 (plan, None, table),
1376 TableJobType::SharedCdcSource,
1377 Some(shared_source_id),
1378 )
1379 }
1380 (Some(_), Some(_)) => {
1381 return Err(ErrorCode::NotSupported(
1382 "Data format and encoding format doesn't apply to table created from a CDC source"
1383 .into(),
1384 "Remove the FORMAT and ENCODE specification".into(),
1385 )
1386 .into());
1387 }
1388 };
1389 Ok((plan, source, table, job_type, shared_shource_id))
1390}
1391
1392fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1394 let mut found_generated_column = false;
1395 for column in columns {
1396 let mut is_generated = false;
1397
1398 for option_def in &column.options {
1399 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1400 is_generated = true;
1401 break;
1402 }
1403 }
1404
1405 if is_generated {
1406 found_generated_column = true;
1407 } else if found_generated_column {
1408 return Err(ErrorCode::NotSupported(
1409 "Non-generated column found after a generated column.".into(),
1410 "Ensure that all generated columns appear at the end of the cdc table definition."
1411 .into(),
1412 )
1413 .into());
1414 }
1415 }
1416 Ok(())
1417}
1418
1419fn not_null_check_for_cdc_table(
1421 wildcard_idx: &Option<usize>,
1422 column_defs: &Vec<ColumnDef>,
1423) -> Result<()> {
1424 if !wildcard_idx.is_some()
1425 && column_defs.iter().any(|col| {
1426 col.options
1427 .iter()
1428 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1429 })
1430 {
1431 return Err(ErrorCode::NotSupported(
1432 "CDC table with NOT NULL constraint is not supported".to_owned(),
1433 "Please remove the NOT NULL constraint for columns".to_owned(),
1434 )
1435 .into());
1436 }
1437 Ok(())
1438}
1439
1440fn sanity_check_for_table_on_cdc_source(
1442 append_only: bool,
1443 column_defs: &Vec<ColumnDef>,
1444 wildcard_idx: &Option<usize>,
1445 constraints: &Vec<TableConstraint>,
1446 source_watermarks: &Vec<SourceWatermark>,
1447) -> Result<()> {
1448 if wildcard_idx.is_some() && !column_defs.is_empty() {
1450 return Err(ErrorCode::NotSupported(
1451 "wildcard(*) and column definitions cannot be used together".to_owned(),
1452 "Remove the wildcard or column definitions".to_owned(),
1453 )
1454 .into());
1455 }
1456
1457 if !wildcard_idx.is_some()
1459 && !constraints.iter().any(|c| {
1460 matches!(
1461 c,
1462 TableConstraint::Unique {
1463 is_primary: true,
1464 ..
1465 }
1466 )
1467 })
1468 && !column_defs.iter().any(|col| {
1469 col.options
1470 .iter()
1471 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1472 })
1473 {
1474 return Err(ErrorCode::NotSupported(
1475 "CDC table without primary key constraint is not supported".to_owned(),
1476 "Please define a primary key".to_owned(),
1477 )
1478 .into());
1479 }
1480
1481 if append_only {
1482 return Err(ErrorCode::NotSupported(
1483 "append only modifier on the table created from a CDC source".into(),
1484 "Remove the APPEND ONLY clause".into(),
1485 )
1486 .into());
1487 }
1488
1489 if !source_watermarks.is_empty()
1490 && source_watermarks
1491 .iter()
1492 .any(|watermark| !watermark.with_ttl)
1493 {
1494 return Err(ErrorCode::NotSupported(
1495 "non-TTL watermark defined on the table created from a CDC source".into(),
1496 "Use `WATERMARK ... WITH TTL` instead.".into(),
1497 )
1498 .into());
1499 }
1500
1501 Ok(())
1502}
1503
1504async fn bind_cdc_table_schema_externally(
1506 cdc_with_options: WithOptionsSecResolved,
1507) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1508 let (options, secret_refs) = cdc_with_options.into_parts();
1510 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1511 .context("failed to extract external table config")?;
1512
1513 let table = ExternalTableImpl::connect(config)
1514 .await
1515 .context("failed to auto derive table schema")?;
1516
1517 Ok((
1518 table
1519 .column_descs()
1520 .iter()
1521 .cloned()
1522 .map(|column_desc| ColumnCatalog {
1523 column_desc,
1524 is_hidden: false,
1525 })
1526 .collect(),
1527 table.pk_names().clone(),
1528 ))
1529}
1530
1531fn bind_cdc_table_schema(
1533 column_defs: &Vec<ColumnDef>,
1534 constraints: &Vec<TableConstraint>,
1535 is_for_replace_plan: bool,
1536) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1537 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1538
1539 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1540 Ok((columns, pk_names))
1541}
1542
1543#[allow(clippy::too_many_arguments)]
1544pub async fn handle_create_table(
1545 handler_args: HandlerArgs,
1546 table_name: ObjectName,
1547 column_defs: Vec<ColumnDef>,
1548 wildcard_idx: Option<usize>,
1549 constraints: Vec<TableConstraint>,
1550 if_not_exists: bool,
1551 format_encode: Option<FormatEncodeOptions>,
1552 source_watermarks: Vec<SourceWatermark>,
1553 append_only: bool,
1554 on_conflict: Option<OnConflict>,
1555 with_version_columns: Vec<String>,
1556 cdc_table_info: Option<CdcTableInfo>,
1557 include_column_options: IncludeOption,
1558 webhook_info: Option<WebhookSourceInfo>,
1559 ast_engine: risingwave_sqlparser::ast::Engine,
1560) -> Result<RwPgResponse> {
1561 let session = handler_args.session.clone();
1562
1563 if append_only {
1564 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1565 }
1566
1567 session.check_cluster_limits().await?;
1568
1569 let engine = match ast_engine {
1570 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1571 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1572 };
1573
1574 if let Either::Right(resp) = session.check_relation_name_duplicated(
1575 table_name.clone(),
1576 StatementType::CREATE_TABLE,
1577 if_not_exists,
1578 )? {
1579 return Ok(resp);
1580 }
1581
1582 let (graph, source, hummock_table, job_type, shared_source_id) = {
1583 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1584 handler_args.clone(),
1585 ExplainOptions::default(),
1586 format_encode,
1587 cdc_table_info,
1588 &table_name,
1589 column_defs.clone(),
1590 wildcard_idx,
1591 constraints.clone(),
1592 source_watermarks,
1593 append_only,
1594 on_conflict,
1595 with_version_columns,
1596 include_column_options,
1597 webhook_info,
1598 engine,
1599 )
1600 .await?;
1601 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1602
1603 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1604
1605 (graph, source, table, job_type, shared_source_id)
1606 };
1607
1608 tracing::trace!(
1609 "name={}, graph=\n{}",
1610 table_name,
1611 serde_json::to_string_pretty(&graph).unwrap()
1612 );
1613
1614 let dependencies = shared_source_id
1615 .map(|id| HashSet::from([id.as_object_id()]))
1616 .unwrap_or_default();
1617
1618 match engine {
1620 Engine::Hummock => {
1621 let catalog_writer = session.catalog_writer()?;
1622 let action = match job_type {
1623 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1624 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1625 };
1626 execute_with_long_running_notification(
1627 catalog_writer.create_table(
1628 source.map(|s| s.to_prost()),
1629 hummock_table.to_prost(),
1630 graph,
1631 job_type,
1632 if_not_exists,
1633 dependencies,
1634 ),
1635 &session,
1636 "CREATE TABLE",
1637 action,
1638 )
1639 .await?;
1640 }
1641 Engine::Iceberg => {
1642 let hummock_table_name = hummock_table.name.clone();
1643 session.create_staging_table(hummock_table.clone());
1644 let res = Box::pin(create_iceberg_engine_table(
1645 session.clone(),
1646 handler_args,
1647 source.map(|s| s.to_prost()),
1648 hummock_table,
1649 graph,
1650 table_name,
1651 job_type,
1652 if_not_exists,
1653 ))
1654 .await;
1655 session.drop_staging_table(&hummock_table_name);
1656 res?
1657 }
1658 }
1659
1660 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1661}
1662
1663#[allow(clippy::too_many_arguments)]
1672pub async fn create_iceberg_engine_table(
1673 session: Arc<SessionImpl>,
1674 handler_args: HandlerArgs,
1675 mut source: Option<PbSource>,
1676 table: TableCatalog,
1677 graph: StreamFragmentGraph,
1678 table_name: ObjectName,
1679 job_type: PbTableJobType,
1680 if_not_exists: bool,
1681) -> Result<()> {
1682 let rw_db_name = session
1683 .env()
1684 .catalog_reader()
1685 .read_guard()
1686 .get_database_by_id(table.database_id)?
1687 .name()
1688 .to_owned();
1689 let rw_schema_name = session
1690 .env()
1691 .catalog_reader()
1692 .read_guard()
1693 .get_schema_by_id(table.database_id, table.schema_id)?
1694 .name()
1695 .clone();
1696 let iceberg_catalog_name = rw_db_name.clone();
1697 let iceberg_database_name = rw_schema_name.clone();
1698 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1699
1700 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1701 let sink_decouple = session.config().sink_decouple();
1702 if matches!(sink_decouple, SinkDecouple::Disable) {
1703 bail!(
1704 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1705 );
1706 }
1707
1708 let mut connection_ref = BTreeMap::new();
1709 let with_common = if iceberg_engine_connection.is_empty() {
1710 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1711 } else {
1712 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1713 assert_eq!(parts.len(), 2);
1714 let connection_catalog =
1715 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1716 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1717 if params.connection_type == ConnectionType::Iceberg as i32 {
1718 connection_ref.insert(
1720 "connection".to_owned(),
1721 ConnectionRefValue {
1722 connection_name: ObjectName::from(vec![
1723 Ident::from(parts[0]),
1724 Ident::from(parts[1]),
1725 ]),
1726 },
1727 );
1728
1729 let mut with_common = BTreeMap::new();
1730 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1731 with_common.insert("database.name".to_owned(), iceberg_database_name);
1732 with_common.insert("table.name".to_owned(), iceberg_table_name);
1733
1734 let hosted_catalog = params
1735 .properties
1736 .get("hosted_catalog")
1737 .map(|s| s.eq_ignore_ascii_case("true"))
1738 .unwrap_or(false);
1739 if hosted_catalog {
1740 let meta_client = session.env().meta_client();
1741 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1742
1743 let meta_store_endpoint =
1744 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1745 ErrorCode::InternalError(
1746 "failed to parse the meta store endpoint".to_owned(),
1747 )
1748 })?;
1749 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1750 let meta_store_user = meta_store_endpoint.username().to_owned();
1751 let meta_store_password = match meta_store_endpoint.password() {
1752 Some(password) => percent_decode_str(password)
1753 .decode_utf8()
1754 .map_err(|_| {
1755 ErrorCode::InternalError(
1756 "failed to parse password from meta store endpoint".to_owned(),
1757 )
1758 })?
1759 .into_owned(),
1760 None => "".to_owned(),
1761 };
1762 let meta_store_host = meta_store_endpoint
1763 .host_str()
1764 .ok_or_else(|| {
1765 ErrorCode::InternalError(
1766 "failed to parse host from meta store endpoint".to_owned(),
1767 )
1768 })?
1769 .to_owned();
1770 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1771 ErrorCode::InternalError(
1772 "failed to parse port from meta store endpoint".to_owned(),
1773 )
1774 })?;
1775 let meta_store_database = meta_store_endpoint
1776 .path()
1777 .trim_start_matches('/')
1778 .to_owned();
1779
1780 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1781 bail!("failed to parse meta backend: {}", meta_store_backend);
1782 };
1783
1784 let catalog_uri = match meta_backend {
1785 MetaBackend::Postgres => {
1786 format!(
1787 "jdbc:postgresql://{}:{}/{}",
1788 meta_store_host, meta_store_port, meta_store_database
1789 )
1790 }
1791 MetaBackend::Mysql => {
1792 format!(
1793 "jdbc:mysql://{}:{}/{}",
1794 meta_store_host, meta_store_port, meta_store_database
1795 )
1796 }
1797 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1798 bail!(
1799 "Unsupported meta backend for iceberg engine table: {}",
1800 meta_store_backend
1801 );
1802 }
1803 };
1804
1805 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1806 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1807 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1808 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1809 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1810 }
1811
1812 with_common
1813 } else {
1814 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1815 "Only iceberg connection could be used in iceberg engine".to_owned(),
1816 )));
1817 }
1818 } else {
1819 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1820 "Private Link Service has been deprecated. Please create a new connection instead."
1821 .to_owned(),
1822 )));
1823 }
1824 };
1825
1826 let mut pks = table
1829 .pk_column_names()
1830 .iter()
1831 .map(|c| c.to_string())
1832 .collect::<Vec<String>>();
1833
1834 if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1836 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1837 }
1838
1839 let sink_from = CreateSink::From(table_name.clone());
1840
1841 let mut sink_name = table_name.clone();
1842 *sink_name.0.last_mut().unwrap() = Ident::from(
1843 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1844 );
1845 let create_sink_stmt = CreateSinkStatement {
1846 if_not_exists: false,
1847 sink_name,
1848 with_properties: WithProperties(vec![]),
1849 sink_from,
1850 columns: vec![],
1851 emit_mode: None,
1852 sink_schema: None,
1853 into_table_name: None,
1854 };
1855
1856 let mut sink_handler_args = handler_args.clone();
1857
1858 let mut sink_with = with_common.clone();
1859
1860 let enable_pk_index = handler_args
1861 .with_options
1862 .get(ENABLE_PK_INDEX)
1863 .is_some_and(|val| val.eq_ignore_ascii_case("true"));
1864
1865 if !enable_pk_index {
1867 sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1868 }
1869
1870 if table.append_only {
1871 sink_with.insert("type".to_owned(), "append-only".to_owned());
1872 } else {
1873 sink_with.insert("type".to_owned(), "upsert".to_owned());
1874 if !enable_pk_index {
1875 sink_with.insert("primary_key".to_owned(), pks.join(","));
1876 }
1877 }
1878 let commit_checkpoint_interval = handler_args
1897 .with_options
1898 .get(COMMIT_CHECKPOINT_INTERVAL)
1899 .map(|v| v.to_owned())
1900 .unwrap_or_else(|| "60".to_owned());
1901 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1902 ErrorCode::InvalidInputSyntax(format!(
1903 "commit_checkpoint_interval must be greater than 0: {}",
1904 commit_checkpoint_interval
1905 ))
1906 })?;
1907
1908 if commit_checkpoint_interval == 0 {
1909 bail!("commit_checkpoint_interval must be greater than 0");
1910 }
1911
1912 source
1914 .as_mut()
1915 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1916
1917 let sink_decouple = session.config().sink_decouple();
1918 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1919 bail!(
1920 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1921 )
1922 }
1923
1924 sink_with.insert(
1925 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1926 commit_checkpoint_interval.to_string(),
1927 );
1928 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1929
1930 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1931
1932 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1933 match enable_compaction.to_lowercase().as_str() {
1934 "true" => {
1935 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1936 }
1937 "false" => {
1938 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1939 }
1940 _ => {
1941 return Err(ErrorCode::InvalidInputSyntax(format!(
1942 "enable_compaction must be true or false: {}",
1943 enable_compaction
1944 ))
1945 .into());
1946 }
1947 }
1948
1949 source
1951 .as_mut()
1952 .map(|x| x.with_properties.remove("enable_compaction"));
1953 } else {
1954 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1955 }
1956
1957 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1958 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1959 ErrorCode::InvalidInputSyntax(format!(
1960 "compaction_interval_sec must be greater than 0: {}",
1961 commit_checkpoint_interval
1962 ))
1963 })?;
1964 if compaction_interval_sec == 0 {
1965 bail!("compaction_interval_sec must be greater than 0");
1966 }
1967 sink_with.insert(
1968 "compaction_interval_sec".to_owned(),
1969 compaction_interval_sec.to_string(),
1970 );
1971 source
1973 .as_mut()
1974 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1975 }
1976
1977 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1978 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1979 {
1980 source
1982 .as_mut()
1983 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1984 match enable_snapshot_expiration.to_lowercase().as_str() {
1985 "true" => {
1986 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1987 true
1988 }
1989 "false" => {
1990 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1991 false
1992 }
1993 _ => {
1994 return Err(ErrorCode::InvalidInputSyntax(format!(
1995 "enable_snapshot_expiration must be true or false: {}",
1996 enable_snapshot_expiration
1997 ))
1998 .into());
1999 }
2000 }
2001 } else {
2002 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
2003 true
2004 };
2005
2006 if has_enabled_snapshot_expiration {
2007 if let Some(snapshot_expiration_retain_last) = handler_args
2009 .with_options
2010 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
2011 {
2012 sink_with.insert(
2013 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
2014 snapshot_expiration_retain_last.to_owned(),
2015 );
2016 source
2018 .as_mut()
2019 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
2020 }
2021
2022 if let Some(snapshot_expiration_max_age) = handler_args
2023 .with_options
2024 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
2025 {
2026 sink_with.insert(
2027 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
2028 snapshot_expiration_max_age.to_owned(),
2029 );
2030 source
2032 .as_mut()
2033 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
2034 }
2035
2036 if let Some(snapshot_expiration_clear_expired_files) = handler_args
2037 .with_options
2038 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
2039 {
2040 sink_with.insert(
2041 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
2042 snapshot_expiration_clear_expired_files.to_owned(),
2043 );
2044 source.as_mut().map(|x| {
2046 x.with_properties
2047 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
2048 });
2049 }
2050
2051 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
2052 .with_options
2053 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
2054 {
2055 sink_with.insert(
2056 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
2057 snapshot_expiration_clear_expired_meta_data.to_owned(),
2058 );
2059 source.as_mut().map(|x| {
2061 x.with_properties
2062 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
2063 });
2064 }
2065 }
2066
2067 if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
2068 let format_version = format_version.parse::<u8>().map_err(|_| {
2069 ErrorCode::InvalidInputSyntax(format!(
2070 "format_version must be 1, 2 or 3: {}",
2071 format_version
2072 ))
2073 })?;
2074 if format_version != 1 && format_version != 2 && format_version != 3 {
2075 bail!("format_version must be 1, 2 or 3");
2076 }
2077 sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
2078
2079 source
2081 .as_mut()
2082 .map(|x| x.with_properties.remove(FORMAT_VERSION));
2083 }
2084
2085 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
2086 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
2087 ErrorCode::InvalidInputSyntax(format!(
2088 "invalid write_mode: {}, must be one of: {}, {}",
2089 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
2090 ))
2091 })?;
2092
2093 match write_mode {
2094 IcebergWriteMode::MergeOnRead => {
2095 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2096 }
2097
2098 IcebergWriteMode::CopyOnWrite => {
2099 if table.append_only {
2100 return Err(ErrorCode::NotSupported(
2101 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2102 "Please use MERGE ON READ instead".to_owned(),
2103 )
2104 .into());
2105 }
2106
2107 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2108 }
2109 }
2110
2111 source
2113 .as_mut()
2114 .map(|x| x.with_properties.remove("write_mode"));
2115 } else {
2116 sink_with.insert(
2117 WRITE_MODE.to_owned(),
2118 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2119 );
2120 }
2121
2122 if let Some(enable_pk_index) = handler_args.with_options.get(ENABLE_PK_INDEX) {
2123 match enable_pk_index.to_lowercase().as_str() {
2124 "true" => {
2125 sink_with.insert(ENABLE_PK_INDEX.to_owned(), "true".to_owned());
2126 }
2127 "false" => {
2128 sink_with.insert(ENABLE_PK_INDEX.to_owned(), "false".to_owned());
2129 }
2130 _ => {
2131 return Err(ErrorCode::InvalidInputSyntax(format!(
2132 "enable_pk_index must be true or false: {}",
2133 enable_pk_index
2134 ))
2135 .into());
2136 }
2137 }
2138
2139 source
2141 .as_mut()
2142 .map(|x| x.with_properties.remove(ENABLE_PK_INDEX));
2143 }
2144
2145 if let Some(max_snapshots_num_before_compaction) =
2146 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2147 {
2148 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2149 .parse::<u32>()
2150 .map_err(|_| {
2151 ErrorCode::InvalidInputSyntax(format!(
2152 "{} must be greater than 0: {}",
2153 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2154 ))
2155 })?;
2156
2157 if max_snapshots_num_before_compaction == 0 {
2158 bail!(format!(
2159 "{} must be greater than 0",
2160 COMPACTION_MAX_SNAPSHOTS_NUM
2161 ));
2162 }
2163
2164 sink_with.insert(
2165 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2166 max_snapshots_num_before_compaction.to_string(),
2167 );
2168
2169 source
2171 .as_mut()
2172 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2173 }
2174
2175 if let Some(small_files_threshold_mb) = handler_args
2176 .with_options
2177 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2178 {
2179 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2180 ErrorCode::InvalidInputSyntax(format!(
2181 "{} must be greater than 0: {}",
2182 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2183 ))
2184 })?;
2185 if small_files_threshold_mb == 0 {
2186 bail!(format!(
2187 "{} must be a greater than 0",
2188 COMPACTION_SMALL_FILES_THRESHOLD_MB
2189 ));
2190 }
2191 sink_with.insert(
2192 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2193 small_files_threshold_mb.to_string(),
2194 );
2195
2196 source.as_mut().map(|x| {
2198 x.with_properties
2199 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2200 });
2201 }
2202
2203 if let Some(delete_files_count_threshold) = handler_args
2204 .with_options
2205 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2206 {
2207 let delete_files_count_threshold =
2208 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2209 ErrorCode::InvalidInputSyntax(format!(
2210 "{} must be greater than 0: {}",
2211 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2212 ))
2213 })?;
2214 if delete_files_count_threshold == 0 {
2215 bail!(format!(
2216 "{} must be greater than 0",
2217 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2218 ));
2219 }
2220 sink_with.insert(
2221 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2222 delete_files_count_threshold.to_string(),
2223 );
2224
2225 source.as_mut().map(|x| {
2227 x.with_properties
2228 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2229 });
2230 }
2231
2232 if let Some(trigger_snapshot_count) = handler_args
2233 .with_options
2234 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2235 {
2236 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2237 ErrorCode::InvalidInputSyntax(format!(
2238 "{} must be greater than 0: {}",
2239 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2240 ))
2241 })?;
2242 if trigger_snapshot_count == 0 {
2243 bail!(format!(
2244 "{} must be greater than 0",
2245 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2246 ));
2247 }
2248 sink_with.insert(
2249 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2250 trigger_snapshot_count.to_string(),
2251 );
2252
2253 source
2255 .as_mut()
2256 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2257 }
2258
2259 if let Some(target_file_size_mb) = handler_args
2260 .with_options
2261 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2262 {
2263 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2264 ErrorCode::InvalidInputSyntax(format!(
2265 "{} must be greater than 0: {}",
2266 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2267 ))
2268 })?;
2269 if target_file_size_mb == 0 {
2270 bail!(format!(
2271 "{} must be greater than 0",
2272 COMPACTION_TARGET_FILE_SIZE_MB
2273 ));
2274 }
2275 sink_with.insert(
2276 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2277 target_file_size_mb.to_string(),
2278 );
2279 source
2281 .as_mut()
2282 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2283 }
2284
2285 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2286 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2287 ErrorCode::InvalidInputSyntax(format!(
2288 "invalid compaction_type: {}, must be one of {:?}",
2289 compaction_type,
2290 [
2291 CompactionType::Full,
2292 CompactionType::SmallFiles,
2293 CompactionType::FilesWithDelete
2294 ]
2295 ))
2296 })?;
2297
2298 sink_with.insert(
2299 COMPACTION_TYPE.to_owned(),
2300 compaction_type.as_str().to_owned(),
2301 );
2302
2303 source
2305 .as_mut()
2306 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2307 }
2308
2309 if let Some(write_parquet_compression) = handler_args
2310 .with_options
2311 .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2312 {
2313 sink_with.insert(
2314 COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2315 write_parquet_compression.to_owned(),
2316 );
2317 source.as_mut().map(|x| {
2319 x.with_properties
2320 .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2321 });
2322 }
2323
2324 if let Some(write_parquet_max_row_group_rows) = handler_args
2325 .with_options
2326 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2327 {
2328 let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2329 .parse::<usize>()
2330 .map_err(|_| {
2331 ErrorCode::InvalidInputSyntax(format!(
2332 "{} must be a positive integer: {}",
2333 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2334 ))
2335 })?;
2336 if write_parquet_max_row_group_rows == 0 {
2337 bail!(format!(
2338 "{} must be greater than 0",
2339 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2340 ));
2341 }
2342 sink_with.insert(
2343 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2344 write_parquet_max_row_group_rows.to_string(),
2345 );
2346 source.as_mut().map(|x| {
2348 x.with_properties
2349 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2350 });
2351 }
2352
2353 if let Some(write_parquet_max_row_group_bytes) = handler_args
2354 .with_options
2355 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2356 {
2357 let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2358 .parse::<usize>()
2359 .map_err(|_| {
2360 ErrorCode::InvalidInputSyntax(format!(
2361 "{} must be a positive integer: {}",
2362 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2363 ))
2364 })?;
2365 if write_parquet_max_row_group_bytes == 0 {
2366 bail!(format!(
2367 "{} must be greater than 0",
2368 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2369 ));
2370 }
2371 sink_with.insert(
2372 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2373 write_parquet_max_row_group_bytes.to_string(),
2374 );
2375 source.as_mut().map(|x| {
2376 x.with_properties
2377 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2378 });
2379 }
2380
2381 let partition_by = handler_args
2382 .with_options
2383 .get("partition_by")
2384 .map(|v| v.to_owned());
2385
2386 if let Some(partition_by) = &partition_by {
2387 let mut partition_columns = vec![];
2388 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2389 table
2390 .columns()
2391 .iter()
2392 .find(|col| col.name().eq_ignore_ascii_case(&column))
2393 .ok_or_else(|| {
2394 ErrorCode::InvalidInputSyntax(format!(
2395 "Partition source column does not exist in schema: {}",
2396 column
2397 ))
2398 })?;
2399
2400 partition_columns.push(column);
2401 }
2402
2403 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2404 |_| {
2405 ErrorCode::InvalidInputSyntax(
2406 "The partition columns should be the prefix of the primary key".to_owned(),
2407 )
2408 },
2409 )?;
2410
2411 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2412
2413 source
2415 .as_mut()
2416 .map(|x| x.with_properties.remove("partition_by"));
2417 }
2418
2419 let order_key = handler_args
2420 .with_options
2421 .get(ORDER_KEY)
2422 .map(|v| v.to_owned());
2423 if let Some(order_key) = &order_key {
2424 validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2425 .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2426
2427 sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2428
2429 source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2430 }
2431
2432 sink_handler_args.with_options =
2433 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2434 let SinkPlanContext {
2435 sink_plan,
2436 sink_catalog,
2437 ..
2438 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2439 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2440
2441 let mut source_name = table_name.clone();
2442 *source_name.0.last_mut().unwrap() = Ident::from(
2443 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2444 );
2445 let create_source_stmt = CreateSourceStatement {
2446 temporary: false,
2447 if_not_exists: false,
2448 columns: vec![],
2449 source_name,
2450 wildcard_idx: Some(0),
2451 constraints: vec![],
2452 with_properties: WithProperties(vec![]),
2453 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2454 source_watermarks: vec![],
2455 include_column_options: vec![],
2456 };
2457
2458 let mut source_handler_args = handler_args.clone();
2459 let source_with = with_common;
2460 source_handler_args.with_options =
2461 WithOptions::new(source_with, Default::default(), connection_ref);
2462
2463 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2464 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2465 let (with_properties, refresh_mode) =
2466 bind_connector_props(&source_handler_args, &format_encode, true)?;
2467
2468 let (iceberg_catalog, table_identifier) = {
2471 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2472 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2473 iceberg_sink.create_table_if_not_exists().await?;
2474
2475 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2476 let table_identifier = iceberg_sink.config.full_table_name()?;
2477 (iceberg_catalog, table_identifier)
2478 };
2479
2480 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2481 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2482 &session,
2483 &format_encode,
2484 Either::Left(&with_properties),
2485 create_source_type,
2486 )
2487 .await?;
2488 let mut col_id_gen = ColumnIdGenerator::new_initial();
2489
2490 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2491 source_handler_args,
2492 create_source_stmt.source_name,
2493 format_encode,
2494 with_properties,
2495 &create_source_stmt.columns,
2496 create_source_stmt.constraints,
2497 create_source_stmt.wildcard_idx,
2498 create_source_stmt.source_watermarks,
2499 columns_from_resolve_source,
2500 source_info,
2501 create_source_stmt.include_column_options,
2502 &mut col_id_gen,
2503 create_source_type,
2504 overwrite_options.source_rate_limit,
2505 SqlColumnStrategy::FollowChecked,
2506 refresh_mode,
2507 )
2508 .await?;
2509
2510 let _ = Jvm::get_or_init()?;
2513
2514 let catalog_writer = session.catalog_writer()?;
2515 let action = match job_type {
2516 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2517 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2518 };
2519 let res = execute_with_long_running_notification(
2520 catalog_writer.create_iceberg_table(
2521 PbTableJobInfo {
2522 source,
2523 table: Some(table.to_prost()),
2524 fragment_graph: Some(graph),
2525 job_type: job_type as _,
2526 },
2527 PbSinkJobInfo {
2528 sink: Some(sink_catalog.to_proto()),
2529 fragment_graph: Some(sink_graph),
2530 },
2531 iceberg_source_catalog.to_prost(),
2532 if_not_exists,
2533 ),
2534 &session,
2535 "CREATE TABLE",
2536 action,
2537 )
2538 .await;
2539
2540 if res.is_err() {
2541 let _ = iceberg_catalog
2542 .drop_table(&table_identifier)
2543 .await
2544 .inspect_err(|err| {
2545 tracing::error!(
2546 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2547 table_identifier,
2548 err.as_report()
2549 );
2550 });
2551 res?
2552 }
2553
2554 Ok(())
2555}
2556
2557pub fn check_create_table_with_source(
2558 with_options: &WithOptions,
2559 format_encode: Option<FormatEncodeOptions>,
2560 include_column_options: &IncludeOption,
2561 cdc_table_info: &Option<CdcTableInfo>,
2562) -> Result<Option<FormatEncodeOptions>> {
2563 if cdc_table_info.is_some() {
2565 return Ok(format_encode);
2566 }
2567 let defined_source = with_options.is_source_connector();
2568
2569 if !include_column_options.is_empty() && !defined_source {
2570 return Err(ErrorCode::InvalidInputSyntax(
2571 "INCLUDE should be used with a connector".to_owned(),
2572 )
2573 .into());
2574 }
2575 if defined_source {
2576 format_encode.as_ref().ok_or_else(|| {
2577 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2578 })?;
2579 }
2580 Ok(format_encode)
2581}
2582
2583fn ensure_partition_columns_are_prefix_of_primary_key(
2584 partition_columns: &[String],
2585 primary_key_columns: &[String],
2586) -> std::result::Result<(), String> {
2587 if partition_columns.len() > primary_key_columns.len() {
2588 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2589 }
2590
2591 for (i, partition_col) in partition_columns.iter().enumerate() {
2592 if primary_key_columns.get(i) != Some(partition_col) {
2593 return Err(format!(
2594 "Partition column '{}' is not a prefix of the primary key.",
2595 partition_col
2596 ));
2597 }
2598 }
2599
2600 Ok(())
2601}
2602
2603#[allow(clippy::too_many_arguments)]
2604pub async fn generate_stream_graph_for_replace_table(
2605 _session: &Arc<SessionImpl>,
2606 table_name: ObjectName,
2607 original_catalog: &Arc<TableCatalog>,
2608 handler_args: HandlerArgs,
2609 statement: Statement,
2610 col_id_gen: ColumnIdGenerator,
2611 sql_column_strategy: SqlColumnStrategy,
2612) -> Result<(
2613 StreamFragmentGraph,
2614 TableCatalog,
2615 Option<SourceCatalog>,
2616 TableJobType,
2617)> {
2618 let Statement::CreateTable {
2619 columns,
2620 constraints,
2621 source_watermarks,
2622 append_only,
2623 on_conflict,
2624 with_version_columns,
2625 wildcard_idx,
2626 cdc_table_info,
2627 format_encode,
2628 include_column_options,
2629 engine,
2630 with_options,
2631 ..
2632 } = statement
2633 else {
2634 panic!("unexpected statement type: {:?}", statement);
2635 };
2636
2637 let format_encode = format_encode
2638 .clone()
2639 .map(|format_encode| format_encode.into_v2_with_warning());
2640
2641 let engine = match engine {
2642 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2643 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2644 };
2645
2646 let is_drop_connector =
2647 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2648 if is_drop_connector {
2649 debug_assert!(
2650 source_watermarks.is_empty()
2651 && include_column_options.is_empty()
2652 && with_options
2653 .iter()
2654 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2655 );
2656 }
2657
2658 let props = CreateTableProps {
2659 definition: handler_args.normalized_sql.clone(),
2660 append_only,
2661 on_conflict: on_conflict.into(),
2662 with_version_columns: with_version_columns
2663 .iter()
2664 .map(|col| col.real_value())
2665 .collect(),
2666 webhook_info: original_catalog.webhook_info.clone(),
2667 engine,
2668 };
2669
2670 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2671 (Some(format_encode), None) => (
2672 gen_create_table_plan_with_source(
2673 handler_args,
2674 ExplainOptions::default(),
2675 table_name,
2676 columns,
2677 wildcard_idx,
2678 constraints,
2679 format_encode,
2680 source_watermarks,
2681 col_id_gen,
2682 include_column_options,
2683 props,
2684 sql_column_strategy,
2685 )
2686 .await?,
2687 TableJobType::General,
2688 ),
2689 (None, None) => {
2690 let context = OptimizerContext::from_handler_args(handler_args);
2691 let (plan, table) = gen_create_table_plan(
2692 context,
2693 table_name,
2694 columns,
2695 constraints,
2696 col_id_gen,
2697 source_watermarks,
2698 props,
2699 true,
2700 )?;
2701 ((plan, None, table), TableJobType::General)
2702 }
2703 (None, Some(cdc_table)) => {
2704 sanity_check_for_table_on_cdc_source(
2705 append_only,
2706 &columns,
2707 &wildcard_idx,
2708 &constraints,
2709 &source_watermarks,
2710 )?;
2711
2712 let session = &handler_args.session;
2713 let (source, resolved_table_name) =
2714 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2715
2716 let (cdc_with_options, normalized_external_table_name) =
2717 derive_with_options_for_cdc_table(
2718 &source.with_properties,
2719 cdc_table.external_table_name.clone(),
2720 )?;
2721
2722 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2723
2724 let context: OptimizerContextRef =
2725 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2726 let (plan, table) = gen_create_table_plan_for_cdc_table(
2727 context,
2728 source,
2729 normalized_external_table_name,
2730 columns,
2731 source_watermarks,
2732 column_catalogs,
2733 pk_names,
2734 cdc_with_options,
2735 col_id_gen,
2736 on_conflict,
2737 with_version_columns
2738 .iter()
2739 .map(|col| col.real_value())
2740 .collect(),
2741 include_column_options,
2742 table_name,
2743 resolved_table_name,
2744 original_catalog.database_id,
2745 original_catalog.schema_id,
2746 original_catalog.id(),
2747 engine,
2748 )?;
2749
2750 ((plan, None, table), TableJobType::SharedCdcSource)
2751 }
2752 (Some(_), Some(_)) => {
2753 return Err(ErrorCode::NotSupported(
2754 "Data format and encoding format doesn't apply to table created from a CDC source"
2755 .into(),
2756 "Remove the FORMAT and ENCODE specification".into(),
2757 )
2758 .into());
2759 }
2760 };
2761
2762 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2763 Err(ErrorCode::InvalidInputSyntax(
2764 "alter primary key of table is not supported".to_owned(),
2765 ))?
2766 }
2767
2768 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2769
2770 table.id = original_catalog.id();
2772 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2773 table.associated_source_id = Some(source_id);
2774
2775 let source = source.as_mut().unwrap();
2776 source.id = source_id;
2777 source.associated_table_id = Some(table.id());
2778 }
2779
2780 Ok((graph, table, source, job_type))
2781}
2782
2783fn get_source_and_resolved_table_name(
2784 session: &Arc<SessionImpl>,
2785 cdc_table: CdcTableInfo,
2786 table_name: ObjectName,
2787) -> Result<(Arc<SourceCatalog>, String)> {
2788 let db_name = &session.database();
2789 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2790
2791 let (source_schema, source_name) =
2792 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2793
2794 let source = {
2795 let catalog_reader = session.env().catalog_reader().read_guard();
2796 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2797 let (source, _) = catalog_reader.get_source_by_name(
2798 db_name,
2799 SchemaPath::Name(schema_name.as_str()),
2800 source_name.as_str(),
2801 )?;
2802 source.clone()
2803 };
2804
2805 Ok((source, resolved_table_name))
2806}
2807
2808fn bind_webhook_info(
2810 session: &Arc<SessionImpl>,
2811 column_defs: &[ColumnDef],
2812 webhook_info: WebhookSourceInfo,
2813) -> Result<PbWebhookSourceInfo> {
2814 let WebhookSourceInfo {
2815 secret_ref,
2816 signature_expr,
2817 wait_for_persistence,
2818 is_batched,
2819 } = webhook_info;
2820
2821 for column in column_defs {
2822 for option_def in &column.options {
2823 match option_def.option {
2824 ColumnOption::Null => {}
2825 ColumnOption::GeneratedColumns(_) => {
2826 return Err(ErrorCode::InvalidInputSyntax(
2827 "generated columns are not supported for webhook tables".to_owned(),
2828 )
2829 .into());
2830 }
2831 ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2832 return Err(ErrorCode::InvalidInputSyntax(
2833 "default values are not supported for webhook tables".to_owned(),
2834 )
2835 .into());
2836 }
2837 ColumnOption::NotNull
2838 | ColumnOption::Unique { .. }
2839 | ColumnOption::ForeignKey { .. }
2840 | ColumnOption::Check(_)
2841 | ColumnOption::DialectSpecific(_) => {
2842 return Err(ErrorCode::InvalidInputSyntax(
2843 "only NULL column option is supported for webhook tables".to_owned(),
2844 )
2845 .into());
2846 }
2847 }
2848 }
2849 }
2850
2851 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2853 let db_name = &session.database();
2854 let (schema_name, secret_name) =
2855 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2856 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2857 (
2858 Some(PbSecretRef {
2859 secret_id: secret_catalog.id,
2860 ref_as: match secret_ref.ref_as {
2861 SecretRefAsType::Text => PbRefAsType::Text,
2862 SecretRefAsType::File => PbRefAsType::File,
2863 }
2864 .into(),
2865 }),
2866 Some(secret_name),
2867 )
2868 } else {
2869 (None, None)
2870 };
2871
2872 let signature_expr = if let Some(signature_expr) = signature_expr {
2873 let payload_name = if column_defs.len() == 1
2874 && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2875 {
2876 column_defs[0].name.real_value()
2877 } else {
2878 WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2879 };
2880 let secure_compare_context = SecureCompareContext {
2881 payload_name,
2882 secret_name,
2883 };
2884 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2885 let expr = binder.bind_expr(&signature_expr)?;
2886
2887 if expr.as_function_call().is_none()
2889 || expr.as_function_call().unwrap().func_type()
2890 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2891 {
2892 return Err(ErrorCode::InvalidInputSyntax(
2893 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2894 )
2895 .into());
2896 }
2897
2898 Some(expr.to_expr_proto())
2899 } else {
2900 session.notice_to_user(
2901 "VALIDATE clause is strongly recommended for safety or production usages",
2902 );
2903 None
2904 };
2905
2906 let pb_webhook_info = PbWebhookSourceInfo {
2907 secret_ref: pb_secret_ref,
2908 signature_expr,
2909 wait_for_persistence,
2910 is_batched,
2911 };
2912
2913 Ok(pb_webhook_info)
2914}
2915
2916#[cfg(test)]
2917mod tests {
2918 use risingwave_common::catalog::{
2919 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2920 };
2921 use risingwave_common::types::{DataType, StructType};
2922
2923 use super::*;
2924 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2925
2926 #[tokio::test]
2927 async fn test_create_table_handler() {
2928 let sql =
2929 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2930 let frontend = LocalFrontend::new(Default::default()).await;
2931 frontend.run_sql(sql).await.unwrap();
2932
2933 let session = frontend.session_ref();
2934 let catalog_reader = session.env().catalog_reader().read_guard();
2935 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2936
2937 let (table, _) = catalog_reader
2939 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2940 .unwrap();
2941 assert_eq!(table.name(), "t");
2942
2943 let columns = table
2944 .columns
2945 .iter()
2946 .map(|col| (col.name(), col.data_type().clone()))
2947 .collect::<HashMap<&str, DataType>>();
2948
2949 let expected_columns = maplit::hashmap! {
2950 ROW_ID_COLUMN_NAME => DataType::Serial,
2951 "v1" => DataType::Int16,
2952 "v2" => StructType::new(
2953 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2954 )
2955 .with_ids([3, 4, 5].map(ColumnId::new))
2956 .into(),
2957 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2958 };
2959
2960 assert_eq!(columns, expected_columns, "{columns:#?}");
2961 }
2962
2963 #[tokio::test]
2964 async fn test_create_webhook_table_with_arbitrary_columns() {
2965 let frontend = LocalFrontend::new(Default::default()).await;
2966 frontend
2967 .run_sql("create schema ingest_schema;")
2968 .await
2969 .unwrap();
2970 frontend
2971 .run_sql(
2972 r#"
2973 create table ingest_schema.orders (
2974 id int,
2975 customer_name varchar,
2976 amount double precision,
2977 primary key (id)
2978 ) with (
2979 connector = 'webhook'
2980 ) validate as secure_compare(
2981 headers->>'x-rw-signature',
2982 'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2983 );
2984 "#,
2985 )
2986 .await
2987 .unwrap();
2988
2989 let session = frontend.session_ref();
2990 let catalog_reader = session.env().catalog_reader().read_guard();
2991 let (table, _) = catalog_reader
2992 .get_created_table_by_name(
2993 DEFAULT_DATABASE_NAME,
2994 SchemaPath::Name("ingest_schema"),
2995 "orders",
2996 )
2997 .unwrap();
2998
2999 assert!(table.webhook_info.is_some());
3000 assert_eq!(
3001 table
3002 .columns
3003 .iter()
3004 .filter(|column| column.can_dml())
3005 .count(),
3006 3
3007 );
3008 }
3009
3010 #[tokio::test]
3011 async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
3012 let frontend = LocalFrontend::new(Default::default()).await;
3013 frontend
3014 .run_sql(
3015 r#"
3016 create table webhook_single_column (
3017 body jsonb
3018 ) with (
3019 connector = 'webhook'
3020 ) validate as secure_compare(
3021 headers->>'x-rw-signature',
3022 'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
3023 );
3024 "#,
3025 )
3026 .await
3027 .unwrap();
3028 }
3029
3030 #[tokio::test]
3031 async fn test_create_webhook_table_with_generated_columns() {
3032 let frontend = LocalFrontend::new(Default::default()).await;
3033 let err = frontend
3034 .run_sql(
3035 r#"
3036 create table webhook_generated_columns (
3037 id int,
3038 amount double precision,
3039 amount_with_fee double precision as amount + 1.0
3040 ) with (
3041 connector = 'webhook'
3042 );
3043 "#,
3044 )
3045 .await
3046 .unwrap_err();
3047
3048 assert!(
3049 err.to_string()
3050 .contains("generated columns are not supported for webhook tables"),
3051 "{err:?}"
3052 );
3053 }
3054
3055 #[tokio::test]
3056 async fn test_create_webhook_table_with_default_value() {
3057 let frontend = LocalFrontend::new(Default::default()).await;
3058 let err = frontend
3059 .run_sql(
3060 r#"
3061 create table webhook_default_value (
3062 id int default 42,
3063 amount double precision
3064 ) with (
3065 connector = 'webhook'
3066 );
3067 "#,
3068 )
3069 .await
3070 .unwrap_err();
3071
3072 assert!(
3073 err.to_string()
3074 .contains("default values are not supported for webhook tables"),
3075 "{err:?}"
3076 );
3077 }
3078
3079 #[tokio::test]
3080 async fn test_create_webhook_table_with_not_null_option() {
3081 let frontend = LocalFrontend::new(Default::default()).await;
3082 let err = frontend
3083 .run_sql(
3084 r#"
3085 create table webhook_not_null (
3086 id int not null,
3087 amount double precision
3088 ) with (
3089 connector = 'webhook'
3090 );
3091 "#,
3092 )
3093 .await
3094 .unwrap_err();
3095
3096 assert!(
3097 err.to_string()
3098 .contains("only NULL column option is supported for webhook tables"),
3099 "{err:?}"
3100 );
3101 }
3102
3103 #[test]
3104 fn test_parse_postgres_cdc_external_table_name() {
3105 for (input, expected) in [
3106 ("public.Note", ("public", "Note")),
3107 ("public.\"Note\"", ("public", "Note")),
3108 (
3109 "\"Mixed.Schema\".\"Note.Table\"",
3110 ("Mixed.Schema", "Note.Table"),
3111 ),
3112 ("public.\"Note\"\"Archive\"", ("public", "Note\"Archive")),
3113 ] {
3114 assert_eq!(
3115 parse_postgres_cdc_external_table_name(input).unwrap(),
3116 (expected.0.to_owned(), expected.1.to_owned()),
3117 "input: {input}"
3118 );
3119 }
3120
3121 for input in [
3122 "Note",
3123 "public.",
3124 ".Note",
3125 "public.\"Note",
3126 "public.\"Note\"Archive",
3127 "public.Note.Archive",
3128 ] {
3129 assert!(
3130 parse_postgres_cdc_external_table_name(input).is_err(),
3131 "input should be rejected: {input}"
3132 );
3133 }
3134 }
3135
3136 #[test]
3137 fn test_bind_primary_key() {
3138 for (sql, expected) in [
3141 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
3142 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
3143 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
3144 (
3145 "create table t (v1 int primary key, v2 int primary key)",
3146 Err("multiple primary keys are not allowed"),
3147 ),
3148 (
3149 "create table t (v1 int primary key primary key, v2 int)",
3150 Err("multiple primary keys are not allowed"),
3151 ),
3152 (
3153 "create table t (v1 int, v2 int, primary key (v1))",
3154 Ok(&[1]),
3155 ),
3156 (
3157 "create table t (v1 int, primary key (v2), v2 int)",
3158 Ok(&[2]),
3159 ),
3160 (
3161 "create table t (primary key (v2, v1), v1 int, v2 int)",
3162 Ok(&[2, 1]),
3163 ),
3164 (
3165 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3166 Err("multiple primary keys are not allowed"),
3167 ),
3168 (
3169 "create table t (v1 int primary key, primary key (v1), v2 int)",
3170 Err("multiple primary keys are not allowed"),
3171 ),
3172 (
3173 "create table t (v1 int, primary key (V3), v2 int)",
3174 Err("column \"v3\" named in key does not exist"),
3175 ),
3176 ] {
3177 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3178 let risingwave_sqlparser::ast::Statement::CreateTable {
3179 columns: column_defs,
3180 constraints,
3181 ..
3182 } = ast.remove(0)
3183 else {
3184 panic!("test case should be create table")
3185 };
3186 let actual: Result<_> = (|| {
3187 let mut columns = bind_sql_columns(&column_defs, false)?;
3188 let mut col_id_gen = ColumnIdGenerator::new_initial();
3189 for c in &mut columns {
3190 col_id_gen.generate(c)?;
3191 }
3192
3193 let pk_names =
3194 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3195 let (_, pk_column_ids, _) =
3196 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3197 Ok(pk_column_ids)
3198 })();
3199 match (expected, actual) {
3200 (Ok(expected), Ok(actual)) => assert_eq!(
3201 expected.iter().copied().map(ColumnId::new).collect_vec(),
3202 actual,
3203 "sql: {sql}"
3204 ),
3205 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3206 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3207 (Err(expected), Err(actual)) => assert!(
3208 actual.to_string().contains(expected),
3209 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3210 ),
3211 }
3212 }
3213 }
3214
3215 #[tokio::test]
3216 async fn test_duplicate_props_options() {
3217 let proto_file = create_proto_file(PROTO_FILE_DATA);
3218 let sql = format!(
3219 r#"CREATE TABLE t
3220 WITH (
3221 connector = 'kinesis',
3222 aws.region='user_test_topic',
3223 endpoint='172.10.1.1:9090,172.10.1.2:9090',
3224 aws.credentials.access_key_id = 'your_access_key_1',
3225 aws.credentials.secret_access_key = 'your_secret_key_1'
3226 )
3227 FORMAT PLAIN ENCODE PROTOBUF (
3228 message = '.test.TestRecord',
3229 aws.credentials.access_key_id = 'your_access_key_2',
3230 aws.credentials.secret_access_key = 'your_secret_key_2',
3231 schema.location = 'file://{}',
3232 )"#,
3233 proto_file.path().to_str().unwrap()
3234 );
3235 let frontend = LocalFrontend::new(Default::default()).await;
3236 frontend.run_sql(sql).await.unwrap();
3237
3238 let session = frontend.session_ref();
3239 let catalog_reader = session.env().catalog_reader().read_guard();
3240 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3241
3242 let (source, _) = catalog_reader
3244 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3245 .unwrap();
3246 assert_eq!(source.name, "t");
3247
3248 assert_eq!(
3250 source
3251 .info
3252 .format_encode_options
3253 .get("aws.credentials.access_key_id")
3254 .unwrap(),
3255 "your_access_key_2"
3256 );
3257 assert_eq!(
3258 source
3259 .info
3260 .format_encode_options
3261 .get("aws.credentials.secret_access_key")
3262 .unwrap(),
3263 "your_secret_key_2"
3264 );
3265
3266 assert_eq!(
3268 source
3269 .with_properties
3270 .get("aws.credentials.access_key_id")
3271 .unwrap(),
3272 "your_access_key_1"
3273 );
3274 assert_eq!(
3275 source
3276 .with_properties
3277 .get("aws.credentials.secret_access_key")
3278 .unwrap(),
3279 "your_secret_key_1"
3280 );
3281
3282 assert!(!source.with_properties.contains_key("schema.location"));
3284 }
3285}