1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
41 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
49use risingwave_pb::plan_common::{
50 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
51};
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::secret::secret_ref::PbRefAsType;
54use risingwave_pb::stream_plan::StreamFragmentGraph;
55use risingwave_sqlparser::ast::{
56 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
57 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
58 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
59 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
60};
61use risingwave_sqlparser::parser::{IncludeOption, Parser};
62use thiserror_ext::AsReport;
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76 bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
80use crate::optimizer::plan_node::{
81 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
82};
83use crate::optimizer::property::{Order, RequiredDist};
84use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
85use crate::session::SessionImpl;
86use crate::session::current::notice_to_user;
87use crate::stream_fragmenter::{GraphJobType, build_graph};
88use crate::utils::OverwriteOptions;
89use crate::{Binder, Explain, TableCatalog, WithOptions};
90
91mod col_id_gen;
92pub use col_id_gen::*;
93use risingwave_connector::sink::SinkParam;
94use risingwave_connector::sink::iceberg::{
95 COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
96 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
97 MAX_SNAPSHOTS_NUM, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
98 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
99 SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
100};
101use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
102use risingwave_pb::meta::PbThrottleTarget;
103
104use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
105
106fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
107 for option_def in &c.options {
108 match option_def.option {
109 ColumnOption::GeneratedColumns(_) => {}
110 ColumnOption::DefaultValue(_) => {}
111 ColumnOption::DefaultValueInternal { .. } => {}
112 ColumnOption::Unique { is_primary: true } => {}
113 ColumnOption::Null => {}
114 ColumnOption::NotNull => {}
115 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
116 }
117 }
118 Ok(())
119}
120
121pub fn bind_sql_columns(
125 column_defs: &[ColumnDef],
126 is_for_drop_table_connector: bool,
127) -> Result<Vec<ColumnCatalog>> {
128 let mut columns = Vec::with_capacity(column_defs.len());
129
130 for column in column_defs {
131 ensure_column_options_supported(column)?;
132 let ColumnDef {
136 name,
137 data_type,
138 collation,
139 options,
140 ..
141 } = column;
142
143 let data_type = data_type
144 .clone()
145 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
146 if let Some(collation) = collation {
147 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
153 bail_not_implemented!(
154 "Collate collation other than `C` or `POSIX` is not implemented"
155 );
156 }
157
158 match data_type {
159 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
160 _ => {
161 return Err(ErrorCode::NotSupported(
162 format!("{} is not a collatable data type", data_type),
163 "The only built-in collatable data types are `varchar`, please check your type".into(),
164 ).into());
165 }
166 }
167 }
168
169 if !is_for_drop_table_connector {
170 check_column_name_not_reserved(&name.real_value())?;
174 }
175
176 let nullable: bool = !options
177 .iter()
178 .any(|def| matches!(def.option, ColumnOption::NotNull));
179
180 columns.push(ColumnCatalog {
181 column_desc: ColumnDesc {
182 data_type: bind_data_type(&data_type)?,
183 column_id: ColumnId::placeholder(),
184 name: name.real_value(),
185 generated_or_default_column: None,
186 description: None,
187 additional_column: AdditionalColumn { column_type: None },
188 version: ColumnDescVersion::LATEST,
189 system_column: None,
190 nullable,
191 },
192 is_hidden: false,
193 });
194 }
195
196 Ok(columns)
197}
198
199fn check_generated_column_constraints(
200 column_name: &String,
201 column_id: ColumnId,
202 expr: &ExprImpl,
203 column_catalogs: &[ColumnCatalog],
204 generated_column_names: &[String],
205 pk_column_ids: &[ColumnId],
206) -> Result<()> {
207 let input_refs = expr.collect_input_refs(column_catalogs.len());
208 for idx in input_refs.ones() {
209 let referred_generated_column = &column_catalogs[idx].column_desc.name;
210 if generated_column_names
211 .iter()
212 .any(|c| c == referred_generated_column)
213 {
214 return Err(ErrorCode::BindError(format!(
215 "Generated can not reference another generated column. \
216 But here generated column \"{}\" referenced another generated column \"{}\"",
217 column_name, referred_generated_column
218 ))
219 .into());
220 }
221 }
222
223 if pk_column_ids.contains(&column_id) && expr.is_impure() {
224 return Err(ErrorCode::BindError(format!(
225 "Generated columns with impure expressions should not be part of the primary key. \
226 Here column \"{}\" is defined as part of the primary key.",
227 column_name
228 ))
229 .into());
230 }
231
232 Ok(())
233}
234
235pub fn bind_sql_column_constraints(
238 session: &SessionImpl,
239 table_name: String,
240 column_catalogs: &mut [ColumnCatalog],
241 columns: &[ColumnDef],
242 pk_column_ids: &[ColumnId],
243) -> Result<()> {
244 let generated_column_names = {
245 let mut names = vec![];
246 for column in columns {
247 for option_def in &column.options {
248 if let ColumnOption::GeneratedColumns(_) = option_def.option {
249 names.push(column.name.real_value());
250 break;
251 }
252 }
253 }
254 names
255 };
256
257 let mut binder = Binder::new_for_ddl(session);
258 binder.bind_columns_to_context(table_name, column_catalogs)?;
259
260 for column in columns {
261 let Some(idx) = column_catalogs
262 .iter()
263 .position(|c| c.name() == column.name.real_value())
264 else {
265 continue;
268 };
269
270 for option_def in &column.options {
271 match &option_def.option {
272 ColumnOption::GeneratedColumns(expr) => {
273 binder.set_clause(Some(Clause::GeneratedColumn));
274
275 let expr_impl = binder.bind_expr(expr).with_context(|| {
276 format!(
277 "fail to bind expression in generated column \"{}\"",
278 column.name.real_value()
279 )
280 })?;
281
282 check_generated_column_constraints(
283 &column.name.real_value(),
284 column_catalogs[idx].column_id(),
285 &expr_impl,
286 column_catalogs,
287 &generated_column_names,
288 pk_column_ids,
289 )?;
290
291 column_catalogs[idx].column_desc.generated_or_default_column = Some(
292 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
293 expr: Some(expr_impl.to_expr_proto()),
294 }),
295 );
296 binder.set_clause(None);
297 }
298 ColumnOption::DefaultValue(expr) => {
299 let expr_impl = binder
300 .bind_expr(expr)?
301 .cast_assign(column_catalogs[idx].data_type())?;
302
303 let rewritten_expr_impl = session
311 .pinned_snapshot()
312 .inline_now_proc_time()
313 .rewrite_expr(expr_impl.clone());
314
315 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
316 let snapshot_value = snapshot_value?;
317
318 column_catalogs[idx].column_desc.generated_or_default_column =
319 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
320 snapshot_value: Some(snapshot_value.to_protobuf()),
321 expr: Some(expr_impl.to_expr_proto()),
322 }));
324 } else {
325 return Err(ErrorCode::BindError(format!(
326 "Default expression used in column `{}` cannot be evaluated. \
327 Use generated columns instead if you mean to reference other columns.",
328 column.name
329 ))
330 .into());
331 }
332 }
333 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
334 if persisted.is_empty() {
339 bail_bind_error!(
340 "DEFAULT INTERNAL is only used for internal purposes, \
341 please specify a concrete default value"
342 );
343 }
344
345 let desc = DefaultColumnDesc::decode(&**persisted)
346 .expect("failed to decode persisted `DefaultColumnDesc`");
347
348 column_catalogs[idx].column_desc.generated_or_default_column =
349 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
350 }
351 _ => {}
352 }
353 }
354 }
355 Ok(())
356}
357
358pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
360 let mut pk_column_names = vec![];
361
362 for constraint in table_constraints {
363 match constraint {
364 TableConstraint::Unique {
365 name: _,
366 columns,
367 is_primary: true,
368 } => {
369 if !pk_column_names.is_empty() {
370 return Err(multiple_pk_definition_err());
371 }
372 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
373 }
374 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
375 }
376 }
377 Ok(pk_column_names)
378}
379
380pub fn bind_sql_pk_names(
381 columns_defs: &[ColumnDef],
382 pk_names_from_table_constraints: Vec<String>,
383) -> Result<Vec<String>> {
384 let mut pk_column_names = pk_names_from_table_constraints;
385
386 for column in columns_defs {
387 for option_def in &column.options {
388 if let ColumnOption::Unique { is_primary: true } = option_def.option {
389 if !pk_column_names.is_empty() {
390 return Err(multiple_pk_definition_err());
391 }
392 pk_column_names.push(column.name.real_value());
393 };
394 }
395 }
396
397 Ok(pk_column_names)
398}
399
400fn multiple_pk_definition_err() -> RwError {
401 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
402}
403
404pub fn bind_pk_and_row_id_on_relation(
409 mut columns: Vec<ColumnCatalog>,
410 pk_names: Vec<String>,
411 must_need_pk: bool,
412) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
413 for c in &columns {
414 assert!(c.column_id() != ColumnId::placeholder());
415 }
416
417 let name_to_id = columns
419 .iter()
420 .map(|c| (c.name(), c.column_id()))
421 .collect::<HashMap<_, _>>();
422
423 let mut pk_column_ids: Vec<_> = pk_names
424 .iter()
425 .map(|name| {
426 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
427 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
428 })
429 })
430 .try_collect()?;
431
432 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
434
435 let row_id_index = need_row_id.then(|| {
436 let column = ColumnCatalog::row_id_column();
437 let index = columns.len();
438 pk_column_ids = vec![column.column_id()];
439 columns.push(column);
440 index
441 });
442
443 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
444 Err(ErrorCode::InvalidInputSyntax(format!(
445 "column \"{col}\" specified more than once"
446 )))?;
447 }
448
449 Ok((columns, pk_column_ids, row_id_index))
450}
451
452#[allow(clippy::too_many_arguments)]
455pub(crate) async fn gen_create_table_plan_with_source(
456 mut handler_args: HandlerArgs,
457 explain_options: ExplainOptions,
458 table_name: ObjectName,
459 column_defs: Vec<ColumnDef>,
460 wildcard_idx: Option<usize>,
461 constraints: Vec<TableConstraint>,
462 format_encode: FormatEncodeOptions,
463 source_watermarks: Vec<SourceWatermark>,
464 mut col_id_gen: ColumnIdGenerator,
465 include_column_options: IncludeOption,
466 props: CreateTableProps,
467 sql_column_strategy: SqlColumnStrategy,
468) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
469 if props.append_only
470 && format_encode.format != Format::Plain
471 && format_encode.format != Format::Native
472 {
473 return Err(ErrorCode::BindError(format!(
474 "Append only table does not support format {}.",
475 format_encode.format
476 ))
477 .into());
478 }
479
480 let session = &handler_args.session;
481 let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
482 if with_properties.is_shareable_cdc_connector() {
483 generated_columns_check_for_cdc_table(&column_defs)?;
484 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
485 } else if column_defs.iter().any(|col| {
486 col.options
487 .iter()
488 .any(|def| matches!(def.option, ColumnOption::NotNull))
489 }) {
490 notice_to_user(
492 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
493 );
494 }
495
496 let db_name: &str = &session.database();
497 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
498
499 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
501 session,
502 &format_encode,
503 Either::Left(&with_properties),
504 CreateSourceType::Table,
505 )
506 .await?;
507
508 let overwrite_options = OverwriteOptions::new(&mut handler_args);
509 let rate_limit = overwrite_options.source_rate_limit;
510 let source = bind_create_source_or_table_with_connector(
511 handler_args.clone(),
512 table_name,
513 format_encode,
514 with_properties,
515 &column_defs,
516 constraints,
517 wildcard_idx,
518 source_watermarks,
519 columns_from_resolve_source,
520 source_info,
521 include_column_options,
522 &mut col_id_gen,
523 CreateSourceType::Table,
524 rate_limit,
525 sql_column_strategy,
526 )
527 .await?;
528
529 let context = OptimizerContext::new(handler_args, explain_options);
530
531 let (plan, table) = gen_table_plan_with_source(
532 context.into(),
533 schema_name,
534 source.clone(),
535 col_id_gen.into_version(),
536 props,
537 )?;
538
539 Ok((plan, Some(source), table))
540}
541
542#[allow(clippy::too_many_arguments)]
545pub(crate) fn gen_create_table_plan(
546 context: OptimizerContext,
547 table_name: ObjectName,
548 column_defs: Vec<ColumnDef>,
549 constraints: Vec<TableConstraint>,
550 mut col_id_gen: ColumnIdGenerator,
551 source_watermarks: Vec<SourceWatermark>,
552 props: CreateTableProps,
553 is_for_replace_plan: bool,
554) -> Result<(PlanRef, TableCatalog)> {
555 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
556 for c in &mut columns {
557 col_id_gen.generate(c)?;
558 }
559
560 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
561 if !secret_refs.is_empty() || !connection_refs.is_empty() {
562 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
563 }
564
565 gen_create_table_plan_without_source(
566 context,
567 table_name,
568 columns,
569 column_defs,
570 constraints,
571 source_watermarks,
572 col_id_gen.into_version(),
573 props,
574 )
575}
576
577#[allow(clippy::too_many_arguments)]
578pub(crate) fn gen_create_table_plan_without_source(
579 context: OptimizerContext,
580 table_name: ObjectName,
581 columns: Vec<ColumnCatalog>,
582 column_defs: Vec<ColumnDef>,
583 constraints: Vec<TableConstraint>,
584 source_watermarks: Vec<SourceWatermark>,
585 version: TableVersion,
586 props: CreateTableProps,
587) -> Result<(PlanRef, TableCatalog)> {
588 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
590 let (mut columns, pk_column_ids, row_id_index) =
591 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
592
593 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
594 context.session_ctx(),
595 table_name.real_value(),
596 source_watermarks,
597 &columns,
598 )?;
599
600 bind_sql_column_constraints(
601 context.session_ctx(),
602 table_name.real_value(),
603 &mut columns,
604 &column_defs,
605 &pk_column_ids,
606 )?;
607 let session = context.session_ctx().clone();
608
609 let db_name = &session.database();
610 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
611
612 let info = CreateTableInfo {
613 columns,
614 pk_column_ids,
615 row_id_index,
616 watermark_descs,
617 source_catalog: None,
618 version,
619 };
620
621 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
622}
623
624fn gen_table_plan_with_source(
625 context: OptimizerContextRef,
626 schema_name: Option<String>,
627 source_catalog: SourceCatalog,
628 version: TableVersion,
629 props: CreateTableProps,
630) -> Result<(PlanRef, TableCatalog)> {
631 let table_name = source_catalog.name.clone();
632
633 let info = CreateTableInfo {
634 columns: source_catalog.columns.clone(),
635 pk_column_ids: source_catalog.pk_col_ids.clone(),
636 row_id_index: source_catalog.row_id_index,
637 watermark_descs: source_catalog.watermark_descs.clone(),
638 source_catalog: Some(source_catalog),
639 version,
640 };
641
642 gen_table_plan_inner(context, schema_name, table_name, info, props)
643}
644
645#[derive(Clone, Copy)]
647pub enum EitherOnConflict {
648 Ast(Option<OnConflict>),
649 Resolved(ConflictBehavior),
650}
651
652impl From<Option<OnConflict>> for EitherOnConflict {
653 fn from(v: Option<OnConflict>) -> Self {
654 Self::Ast(v)
655 }
656}
657
658impl From<ConflictBehavior> for EitherOnConflict {
659 fn from(v: ConflictBehavior) -> Self {
660 Self::Resolved(v)
661 }
662}
663
664impl EitherOnConflict {
665 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
667 let conflict_behavior = match self {
668 EitherOnConflict::Ast(on_conflict) => {
669 if append_only {
670 if row_id_as_pk {
671 ConflictBehavior::NoCheck
673 } else {
674 if let Some(on_conflict) = on_conflict
676 && on_conflict != OnConflict::Nothing
677 {
678 return Err(ErrorCode::InvalidInputSyntax(
679 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
680 the ON CONFLICT behavior must be DO NOTHING."
681 .to_owned(),
682 )
683 .into());
684 }
685 ConflictBehavior::IgnoreConflict
686 }
687 } else {
688 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
690 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
691 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
692 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
693 }
694 }
695 }
696 EitherOnConflict::Resolved(b) => b,
697 };
698
699 Ok(conflict_behavior)
700 }
701}
702
703pub struct CreateTableInfo {
708 pub columns: Vec<ColumnCatalog>,
709 pub pk_column_ids: Vec<ColumnId>,
710 pub row_id_index: Option<usize>,
711 pub watermark_descs: Vec<WatermarkDesc>,
712 pub source_catalog: Option<SourceCatalog>,
713 pub version: TableVersion,
714}
715
716pub struct CreateTableProps {
721 pub definition: String,
722 pub append_only: bool,
723 pub on_conflict: EitherOnConflict,
724 pub with_version_columns: Vec<String>,
725 pub webhook_info: Option<PbWebhookSourceInfo>,
726 pub engine: Engine,
727}
728
729#[allow(clippy::too_many_arguments)]
730fn gen_table_plan_inner(
731 context: OptimizerContextRef,
732 schema_name: Option<String>,
733 table_name: String,
734 info: CreateTableInfo,
735 props: CreateTableProps,
736) -> Result<(PlanRef, TableCatalog)> {
737 let CreateTableInfo {
738 ref columns,
739 row_id_index,
740 ref watermark_descs,
741 ref source_catalog,
742 ..
743 } = info;
744 let CreateTableProps { append_only, .. } = props;
745
746 let (database_id, schema_id) = context
747 .session_ctx()
748 .get_database_and_schema_id_for_create(schema_name)?;
749
750 let session = context.session_ctx().clone();
751 let retention_seconds = context.with_options().retention_seconds();
752
753 let source_node: LogicalPlanRef = LogicalSource::new(
754 source_catalog.clone().map(Rc::new),
755 columns.clone(),
756 row_id_index,
757 SourceNodeKind::CreateTable,
758 context.clone(),
759 None,
760 )?
761 .into();
762
763 let required_cols = FixedBitSet::with_capacity(columns.len());
764 let plan_root = PlanRoot::new_with_logical_plan(
765 source_node,
766 RequiredDist::Any,
767 Order::any(),
768 required_cols,
769 vec![],
770 );
771
772 if !append_only && !watermark_descs.is_empty() {
773 return Err(ErrorCode::NotSupported(
774 "Defining watermarks on table requires the table to be append only.".to_owned(),
775 "Use the key words `APPEND ONLY`".to_owned(),
776 )
777 .into());
778 }
779
780 if !append_only && retention_seconds.is_some() {
781 if session
782 .config()
783 .unsafe_enable_storage_retention_for_non_append_only_tables()
784 {
785 tracing::warn!(
786 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
787 table_name
788 );
789 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
790 This may lead to stream inconsistency and unrecoverable \
791 node failure if there is any row INSERT/UPDATE/DELETE operation \
792 corresponding to the TTLed primary keys";
793 session.notice_to_user(NOTICE);
794 } else {
795 return Err(ErrorCode::NotSupported(
796 "Defining retention seconds on table requires the table to be append only."
797 .to_owned(),
798 "Use the key words `APPEND ONLY`".to_owned(),
799 )
800 .into());
801 }
802 }
803
804 let materialize =
805 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
806
807 let mut table = materialize.table().clone();
808 table.owner = session.user_id();
809
810 Ok((materialize.into(), table))
811}
812
813#[allow(clippy::too_many_arguments)]
817pub(crate) fn gen_create_table_plan_for_cdc_table(
818 context: OptimizerContextRef,
819 source: Arc<SourceCatalog>,
820 external_table_name: String,
821 column_defs: Vec<ColumnDef>,
822 mut columns: Vec<ColumnCatalog>,
823 pk_names: Vec<String>,
824 cdc_with_options: WithOptionsSecResolved,
825 mut col_id_gen: ColumnIdGenerator,
826 on_conflict: Option<OnConflict>,
827 with_version_columns: Vec<String>,
828 include_column_options: IncludeOption,
829 table_name: ObjectName,
830 resolved_table_name: String, database_id: DatabaseId,
832 schema_id: SchemaId,
833 table_id: TableId,
834 engine: Engine,
835) -> Result<(PlanRef, TableCatalog)> {
836 let session = context.session_ctx().clone();
837
838 handle_addition_columns(
840 None,
841 &cdc_with_options,
842 include_column_options,
843 &mut columns,
844 true,
845 )?;
846
847 for c in &mut columns {
848 col_id_gen.generate(c)?;
849 }
850
851 let (mut columns, pk_column_ids, _row_id_index) =
852 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
853
854 bind_sql_column_constraints(
856 context.session_ctx(),
857 table_name.real_value(),
858 &mut columns,
859 &column_defs,
860 &pk_column_ids,
861 )?;
862
863 let definition = context.normalized_sql().to_owned();
864
865 let pk_column_indices = {
866 let mut id_to_idx = HashMap::new();
867 columns.iter().enumerate().for_each(|(idx, c)| {
868 id_to_idx.insert(c.column_id(), idx);
869 });
870 pk_column_ids
872 .iter()
873 .map(|c| id_to_idx.get(c).copied().unwrap())
874 .collect_vec()
875 };
876 let table_pk = pk_column_indices
877 .iter()
878 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
879 .collect();
880
881 let (options, secret_refs) = cdc_with_options.into_parts();
882
883 let non_generated_column_descs = columns
884 .iter()
885 .filter(|&c| !c.is_generated())
886 .map(|c| c.column_desc.clone())
887 .collect_vec();
888 let non_generated_column_num = non_generated_column_descs.len();
889 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
890 let cdc_table_desc = CdcTableDesc {
891 table_id,
892 source_id: source.id.into(), external_table_name: external_table_name.clone(),
894 pk: table_pk,
895 columns: non_generated_column_descs,
896 stream_key: pk_column_indices,
897 connect_properties: options,
898 secret_refs,
899 };
900
901 tracing::debug!(?cdc_table_desc, "create cdc table");
902 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
903
904 let logical_scan = LogicalCdcScan::create(
905 external_table_name.clone(),
906 Rc::new(cdc_table_desc),
907 context.clone(),
908 options,
909 );
910
911 let scan_node: LogicalPlanRef = logical_scan.into();
912 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
913 let plan_root = PlanRoot::new_with_logical_plan(
914 scan_node,
915 RequiredDist::Any,
916 Order::any(),
917 required_cols,
918 vec![],
919 );
920
921 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
922 let materialize = plan_root.gen_table_plan(
923 context,
924 resolved_table_name,
925 database_id,
926 schema_id,
927 CreateTableInfo {
928 columns,
929 pk_column_ids,
930 row_id_index: None,
931 watermark_descs: vec![],
932 source_catalog: Some((*source).clone()),
933 version: col_id_gen.into_version(),
934 },
935 CreateTableProps {
936 definition,
937 append_only: false,
938 on_conflict: on_conflict.into(),
939 with_version_columns,
940 webhook_info: None,
941 engine,
942 },
943 )?;
944
945 let mut table = materialize.table().clone();
946 table.owner = session.user_id();
947 table.cdc_table_id = Some(cdc_table_id);
948 table.cdc_table_type = Some(cdc_table_type);
949 Ok((materialize.into(), table))
950}
951
952fn derive_with_options_for_cdc_table(
953 source_with_properties: &WithOptionsSecResolved,
954 external_table_name: String,
955) -> Result<WithOptionsSecResolved> {
956 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
957 let source_database_name: &str = source_with_properties
959 .get("database.name")
960 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
961 .as_str();
962 let mut with_options = source_with_properties.clone();
963 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
964 match connector.as_str() {
965 MYSQL_CDC_CONNECTOR => {
966 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
969 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
970 })?;
971 if !source_database_name
973 .split(',')
974 .map(|s| s.trim())
975 .any(|name| name == db_name)
976 {
977 return Err(anyhow!(
978 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
979 db_name,
980 source_database_name
981 ).into());
982 }
983 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
984 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
985 }
986 POSTGRES_CDC_CONNECTOR => {
987 let (schema_name, table_name) = external_table_name
988 .split_once('.')
989 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
990
991 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
993 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
994 }
995 SQL_SERVER_CDC_CONNECTOR => {
996 let parts: Vec<&str> = external_table_name.split('.').collect();
1002 let (_, schema_name, table_name) = match parts.len() {
1003 3 => {
1004 let db_name = parts[0];
1006 let schema_name = parts[1];
1007 let table_name = parts[2];
1008
1009 if db_name != source_database_name {
1011 return Err(anyhow!(
1012 "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
1013 db_name,
1014 source_database_name
1015 ).into());
1016 }
1017 (db_name, schema_name, table_name)
1018 }
1019 2 => {
1020 let schema_name = parts[0];
1022 let table_name = parts[1];
1023 (source_database_name, schema_name, table_name)
1024 }
1025 1 => {
1026 let table_name = parts[0];
1028 (source_database_name, "dbo", table_name)
1029 }
1030 _ => {
1031 return Err(anyhow!(
1032 "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1033 ).into());
1034 }
1035 };
1036
1037 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1039 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1040 }
1041 _ => {
1042 return Err(RwError::from(anyhow!(
1043 "connector {} is not supported for cdc table",
1044 connector
1045 )));
1046 }
1047 };
1048 }
1049 Ok(with_options)
1050}
1051
1052#[allow(clippy::too_many_arguments)]
1053pub(super) async fn handle_create_table_plan(
1054 handler_args: HandlerArgs,
1055 explain_options: ExplainOptions,
1056 format_encode: Option<FormatEncodeOptions>,
1057 cdc_table_info: Option<CdcTableInfo>,
1058 table_name: &ObjectName,
1059 column_defs: Vec<ColumnDef>,
1060 wildcard_idx: Option<usize>,
1061 constraints: Vec<TableConstraint>,
1062 source_watermarks: Vec<SourceWatermark>,
1063 append_only: bool,
1064 on_conflict: Option<OnConflict>,
1065 with_version_columns: Vec<String>,
1066 include_column_options: IncludeOption,
1067 webhook_info: Option<WebhookSourceInfo>,
1068 engine: Engine,
1069) -> Result<(
1070 PlanRef,
1071 Option<SourceCatalog>,
1072 TableCatalog,
1073 TableJobType,
1074 Option<SourceId>,
1075)> {
1076 let col_id_gen = ColumnIdGenerator::new_initial();
1077 let format_encode = check_create_table_with_source(
1078 &handler_args.with_options,
1079 format_encode,
1080 &include_column_options,
1081 &cdc_table_info,
1082 )?;
1083 let webhook_info = webhook_info
1084 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1085 .transpose()?;
1086
1087 let props = CreateTableProps {
1088 definition: handler_args.normalized_sql.clone(),
1089 append_only,
1090 on_conflict: on_conflict.into(),
1091 with_version_columns: with_version_columns.clone(),
1092 webhook_info,
1093 engine,
1094 };
1095
1096 let ((plan, source, table), job_type, shared_shource_id) = match (
1097 format_encode,
1098 cdc_table_info.as_ref(),
1099 ) {
1100 (Some(format_encode), None) => (
1101 gen_create_table_plan_with_source(
1102 handler_args,
1103 explain_options,
1104 table_name.clone(),
1105 column_defs,
1106 wildcard_idx,
1107 constraints,
1108 format_encode,
1109 source_watermarks,
1110 col_id_gen,
1111 include_column_options,
1112 props,
1113 SqlColumnStrategy::FollowChecked,
1114 )
1115 .await?,
1116 TableJobType::General,
1117 None,
1118 ),
1119 (None, None) => {
1120 let context = OptimizerContext::new(handler_args, explain_options);
1121 let (plan, table) = gen_create_table_plan(
1122 context,
1123 table_name.clone(),
1124 column_defs,
1125 constraints,
1126 col_id_gen,
1127 source_watermarks,
1128 props,
1129 false,
1130 )?;
1131
1132 ((plan, None, table), TableJobType::General, None)
1133 }
1134
1135 (None, Some(cdc_table)) => {
1136 sanity_check_for_table_on_cdc_source(
1137 append_only,
1138 &column_defs,
1139 &wildcard_idx,
1140 &constraints,
1141 &source_watermarks,
1142 )?;
1143
1144 generated_columns_check_for_cdc_table(&column_defs)?;
1145 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1146
1147 let session = &handler_args.session;
1148 let db_name = &session.database();
1149 let user_name = &session.user_name();
1150 let search_path = session.config().search_path();
1151 let (schema_name, resolved_table_name) =
1152 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1153 let (database_id, schema_id) =
1154 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1155
1156 let (format_encode, source_name) =
1158 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1159
1160 let source = {
1161 let catalog_reader = session.env().catalog_reader().read_guard();
1162 let schema_path =
1163 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1164
1165 let (source, _) = catalog_reader.get_source_by_name(
1166 db_name,
1167 schema_path,
1168 source_name.as_str(),
1169 )?;
1170 source.clone()
1171 };
1172 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1173 &source.with_properties,
1174 cdc_table.external_table_name.clone(),
1175 )?;
1176
1177 let (columns, pk_names) = match wildcard_idx {
1178 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1179 None => {
1180 for column_def in &column_defs {
1181 for option_def in &column_def.options {
1182 if let ColumnOption::DefaultValue(_)
1183 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1184 {
1185 return Err(ErrorCode::NotSupported(
1186 "Default value for columns defined on the table created from a CDC source".into(),
1187 "Remove the default value expression in the column definitions".into(),
1188 )
1189 .into());
1190 }
1191 }
1192 }
1193
1194 let (columns, pk_names) =
1195 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1196 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1198 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1199 .context("failed to extract external table config")?;
1200
1201 (columns, pk_names)
1202 }
1203 };
1204
1205 let context: OptimizerContextRef =
1206 OptimizerContext::new(handler_args, explain_options).into();
1207 let shared_source_id = source.id;
1208 let (plan, table) = gen_create_table_plan_for_cdc_table(
1209 context,
1210 source,
1211 cdc_table.external_table_name.clone(),
1212 column_defs,
1213 columns,
1214 pk_names,
1215 cdc_with_options,
1216 col_id_gen,
1217 on_conflict,
1218 with_version_columns,
1219 include_column_options,
1220 table_name.clone(),
1221 resolved_table_name,
1222 database_id,
1223 schema_id,
1224 TableId::placeholder(),
1225 engine,
1226 )?;
1227
1228 (
1229 (plan, None, table),
1230 TableJobType::SharedCdcSource,
1231 Some(shared_source_id),
1232 )
1233 }
1234 (Some(_), Some(_)) => {
1235 return Err(ErrorCode::NotSupported(
1236 "Data format and encoding format doesn't apply to table created from a CDC source"
1237 .into(),
1238 "Remove the FORMAT and ENCODE specification".into(),
1239 )
1240 .into());
1241 }
1242 };
1243 Ok((plan, source, table, job_type, shared_shource_id))
1244}
1245
1246fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1248 let mut found_generated_column = false;
1249 for column in columns {
1250 let mut is_generated = false;
1251
1252 for option_def in &column.options {
1253 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1254 is_generated = true;
1255 break;
1256 }
1257 }
1258
1259 if is_generated {
1260 found_generated_column = true;
1261 } else if found_generated_column {
1262 return Err(ErrorCode::NotSupported(
1263 "Non-generated column found after a generated column.".into(),
1264 "Ensure that all generated columns appear at the end of the cdc table definition."
1265 .into(),
1266 )
1267 .into());
1268 }
1269 }
1270 Ok(())
1271}
1272
1273fn not_null_check_for_cdc_table(
1275 wildcard_idx: &Option<usize>,
1276 column_defs: &Vec<ColumnDef>,
1277) -> Result<()> {
1278 if !wildcard_idx.is_some()
1279 && column_defs.iter().any(|col| {
1280 col.options
1281 .iter()
1282 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1283 })
1284 {
1285 return Err(ErrorCode::NotSupported(
1286 "CDC table with NOT NULL constraint is not supported".to_owned(),
1287 "Please remove the NOT NULL constraint for columns".to_owned(),
1288 )
1289 .into());
1290 }
1291 Ok(())
1292}
1293
1294fn sanity_check_for_table_on_cdc_source(
1296 append_only: bool,
1297 column_defs: &Vec<ColumnDef>,
1298 wildcard_idx: &Option<usize>,
1299 constraints: &Vec<TableConstraint>,
1300 source_watermarks: &Vec<SourceWatermark>,
1301) -> Result<()> {
1302 if wildcard_idx.is_some() && !column_defs.is_empty() {
1304 return Err(ErrorCode::NotSupported(
1305 "wildcard(*) and column definitions cannot be used together".to_owned(),
1306 "Remove the wildcard or column definitions".to_owned(),
1307 )
1308 .into());
1309 }
1310
1311 if !wildcard_idx.is_some()
1313 && !constraints.iter().any(|c| {
1314 matches!(
1315 c,
1316 TableConstraint::Unique {
1317 is_primary: true,
1318 ..
1319 }
1320 )
1321 })
1322 && !column_defs.iter().any(|col| {
1323 col.options
1324 .iter()
1325 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1326 })
1327 {
1328 return Err(ErrorCode::NotSupported(
1329 "CDC table without primary key constraint is not supported".to_owned(),
1330 "Please define a primary key".to_owned(),
1331 )
1332 .into());
1333 }
1334
1335 if append_only {
1336 return Err(ErrorCode::NotSupported(
1337 "append only modifier on the table created from a CDC source".into(),
1338 "Remove the APPEND ONLY clause".into(),
1339 )
1340 .into());
1341 }
1342
1343 if !source_watermarks.is_empty() {
1344 return Err(ErrorCode::NotSupported(
1345 "watermark defined on the table created from a CDC source".into(),
1346 "Remove the Watermark definitions".into(),
1347 )
1348 .into());
1349 }
1350
1351 Ok(())
1352}
1353
1354async fn bind_cdc_table_schema_externally(
1356 cdc_with_options: WithOptionsSecResolved,
1357) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1358 let (options, secret_refs) = cdc_with_options.into_parts();
1360 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1361 .context("failed to extract external table config")?;
1362
1363 let table = ExternalTableImpl::connect(config)
1364 .await
1365 .context("failed to auto derive table schema")?;
1366
1367 Ok((
1368 table
1369 .column_descs()
1370 .iter()
1371 .cloned()
1372 .map(|column_desc| ColumnCatalog {
1373 column_desc,
1374 is_hidden: false,
1375 })
1376 .collect(),
1377 table.pk_names().clone(),
1378 ))
1379}
1380
1381fn bind_cdc_table_schema(
1383 column_defs: &Vec<ColumnDef>,
1384 constraints: &Vec<TableConstraint>,
1385 is_for_replace_plan: bool,
1386) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1387 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1388
1389 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1390 Ok((columns, pk_names))
1391}
1392
1393#[allow(clippy::too_many_arguments)]
1394pub async fn handle_create_table(
1395 mut handler_args: HandlerArgs,
1396 table_name: ObjectName,
1397 column_defs: Vec<ColumnDef>,
1398 wildcard_idx: Option<usize>,
1399 constraints: Vec<TableConstraint>,
1400 if_not_exists: bool,
1401 format_encode: Option<FormatEncodeOptions>,
1402 source_watermarks: Vec<SourceWatermark>,
1403 append_only: bool,
1404 on_conflict: Option<OnConflict>,
1405 with_version_columns: Vec<String>,
1406 cdc_table_info: Option<CdcTableInfo>,
1407 include_column_options: IncludeOption,
1408 webhook_info: Option<WebhookSourceInfo>,
1409 ast_engine: risingwave_sqlparser::ast::Engine,
1410) -> Result<RwPgResponse> {
1411 let session = handler_args.session.clone();
1412
1413 if append_only {
1414 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1415 }
1416
1417 session.check_cluster_limits().await?;
1418
1419 let engine = match ast_engine {
1420 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1421 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1422 };
1423 if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1424 if let Some(_rate_limit) = handler_args.with_options.insert(
1427 OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1428 "0".to_owned(),
1429 ) {
1430 return Err(ErrorCode::NotSupported(
1432 "source_rate_limit for iceberg table engine during table creation".to_owned(),
1433 "Please remove source_rate_limit from WITH options.".to_owned(),
1434 )
1435 .into());
1436 }
1437 }
1438
1439 if let Either::Right(resp) = session.check_relation_name_duplicated(
1440 table_name.clone(),
1441 StatementType::CREATE_TABLE,
1442 if_not_exists,
1443 )? {
1444 return Ok(resp);
1445 }
1446
1447 let (graph, source, hummock_table, job_type, shared_source_id) = {
1448 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1449 handler_args.clone(),
1450 ExplainOptions::default(),
1451 format_encode,
1452 cdc_table_info,
1453 &table_name,
1454 column_defs.clone(),
1455 wildcard_idx,
1456 constraints.clone(),
1457 source_watermarks,
1458 append_only,
1459 on_conflict,
1460 with_version_columns,
1461 include_column_options,
1462 webhook_info,
1463 engine,
1464 )
1465 .await?;
1466 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1467
1468 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1469
1470 (graph, source, table, job_type, shared_source_id)
1471 };
1472
1473 tracing::trace!(
1474 "name={}, graph=\n{}",
1475 table_name,
1476 serde_json::to_string_pretty(&graph).unwrap()
1477 );
1478
1479 let dependencies = shared_source_id
1480 .map(|id| HashSet::from([id as ObjectId]))
1481 .unwrap_or_default();
1482
1483 match engine {
1485 Engine::Hummock => {
1486 let catalog_writer = session.catalog_writer()?;
1487 catalog_writer
1488 .create_table(
1489 source.map(|s| s.to_prost()),
1490 hummock_table.to_prost(),
1491 graph,
1492 job_type,
1493 if_not_exists,
1494 dependencies,
1495 )
1496 .await?;
1497 }
1498 Engine::Iceberg => {
1499 let hummock_table_name = hummock_table.name.clone();
1500 session.create_staging_table(hummock_table.clone());
1501 let res = create_iceberg_engine_table(
1502 session.clone(),
1503 handler_args,
1504 source.map(|s| s.to_prost()),
1505 hummock_table,
1506 graph,
1507 table_name,
1508 job_type,
1509 if_not_exists,
1510 )
1511 .await;
1512 session.drop_staging_table(&hummock_table_name);
1513 res?
1514 }
1515 }
1516
1517 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1518}
1519
1520#[allow(clippy::too_many_arguments)]
1529pub async fn create_iceberg_engine_table(
1530 session: Arc<SessionImpl>,
1531 handler_args: HandlerArgs,
1532 mut source: Option<PbSource>,
1533 table: TableCatalog,
1534 graph: StreamFragmentGraph,
1535 table_name: ObjectName,
1536 job_type: PbTableJobType,
1537 if_not_exists: bool,
1538) -> Result<()> {
1539 let meta_client = session.env().meta_client();
1540 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1541
1542 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1543 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1544 })?;
1545 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1546 let meta_store_user = meta_store_endpoint.username().to_owned();
1547 let meta_store_password = match meta_store_endpoint.password() {
1548 Some(password) => percent_decode_str(password)
1549 .decode_utf8()
1550 .map_err(|_| {
1551 ErrorCode::InternalError(
1552 "failed to parse password from meta store endpoint".to_owned(),
1553 )
1554 })?
1555 .into_owned(),
1556 None => "".to_owned(),
1557 };
1558 let meta_store_host = meta_store_endpoint
1559 .host_str()
1560 .ok_or_else(|| {
1561 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1562 })?
1563 .to_owned();
1564 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1565 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1566 })?;
1567 let meta_store_database = meta_store_endpoint
1568 .path()
1569 .trim_start_matches('/')
1570 .to_owned();
1571
1572 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1573 bail!("failed to parse meta backend: {}", meta_store_backend);
1574 };
1575
1576 let catalog_uri = match meta_backend {
1577 MetaBackend::Postgres => {
1578 format!(
1579 "jdbc:postgresql://{}:{}/{}",
1580 meta_store_host.clone(),
1581 meta_store_port.clone(),
1582 meta_store_database.clone()
1583 )
1584 }
1585 MetaBackend::Mysql => {
1586 format!(
1587 "jdbc:mysql://{}:{}/{}",
1588 meta_store_host.clone(),
1589 meta_store_port.clone(),
1590 meta_store_database.clone()
1591 )
1592 }
1593 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1594 bail!(
1595 "Unsupported meta backend for iceberg engine table: {}",
1596 meta_store_backend
1597 );
1598 }
1599 };
1600
1601 let rw_db_name = session
1602 .env()
1603 .catalog_reader()
1604 .read_guard()
1605 .get_database_by_id(table.database_id)?
1606 .name()
1607 .to_owned();
1608 let rw_schema_name = session
1609 .env()
1610 .catalog_reader()
1611 .read_guard()
1612 .get_schema_by_id(table.database_id, table.schema_id)?
1613 .name()
1614 .clone();
1615 let iceberg_catalog_name = rw_db_name.clone();
1616 let iceberg_database_name = rw_schema_name.clone();
1617 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1618
1619 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1620 let sink_decouple = session.config().sink_decouple();
1621 if matches!(sink_decouple, SinkDecouple::Disable) {
1622 bail!(
1623 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1624 );
1625 }
1626
1627 let mut connection_ref = BTreeMap::new();
1628 let with_common = if iceberg_engine_connection.is_empty() {
1629 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1630 } else {
1631 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1632 assert_eq!(parts.len(), 2);
1633 let connection_catalog =
1634 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1635 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1636 if params.connection_type == ConnectionType::Iceberg as i32 {
1637 connection_ref.insert(
1639 "connection".to_owned(),
1640 ConnectionRefValue {
1641 connection_name: ObjectName::from(vec![
1642 Ident::from(parts[0]),
1643 Ident::from(parts[1]),
1644 ]),
1645 },
1646 );
1647
1648 let mut with_common = BTreeMap::new();
1649 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1650 with_common.insert("database.name".to_owned(), iceberg_database_name);
1651 with_common.insert("table.name".to_owned(), iceberg_table_name);
1652
1653 if let Some(s) = params.properties.get("hosted_catalog")
1654 && s.eq_ignore_ascii_case("true")
1655 {
1656 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1657 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1658 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1659 with_common.insert(
1660 "catalog.jdbc.password".to_owned(),
1661 meta_store_password.clone(),
1662 );
1663 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1664 }
1665
1666 with_common
1667 } else {
1668 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1669 "Only iceberg connection could be used in iceberg engine".to_owned(),
1670 )));
1671 }
1672 } else {
1673 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1674 "Private Link Service has been deprecated. Please create a new connection instead."
1675 .to_owned(),
1676 )));
1677 }
1678 };
1679
1680 let mut pks = table
1683 .pk_column_names()
1684 .iter()
1685 .map(|c| c.to_string())
1686 .collect::<Vec<String>>();
1687
1688 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1690 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1691 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1692 "select {} as {}, * from {}",
1693 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1694 ))
1695 .context("unable to parse query")?
1696 .try_into()
1697 .unwrap();
1698
1699 let Statement::Query(query) = &stmt else {
1700 panic!("unexpected statement: {:?}", stmt);
1701 };
1702 CreateSink::AsQuery(query.clone())
1703 } else {
1704 CreateSink::From(table_name.clone())
1705 };
1706
1707 let mut sink_name = table_name.clone();
1708 *sink_name.0.last_mut().unwrap() = Ident::from(
1709 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1710 );
1711 let create_sink_stmt = CreateSinkStatement {
1712 if_not_exists: false,
1713 sink_name,
1714 with_properties: WithProperties(vec![]),
1715 sink_from,
1716 columns: vec![],
1717 emit_mode: None,
1718 sink_schema: None,
1719 into_table_name: None,
1720 };
1721
1722 let mut sink_handler_args = handler_args.clone();
1723
1724 let mut sink_with = with_common.clone();
1725
1726 if table.append_only {
1727 sink_with.insert("type".to_owned(), "append-only".to_owned());
1728 } else {
1729 sink_with.insert("primary_key".to_owned(), pks.join(","));
1730 sink_with.insert("type".to_owned(), "upsert".to_owned());
1731 }
1732 let commit_checkpoint_interval = handler_args
1751 .with_options
1752 .get(COMMIT_CHECKPOINT_INTERVAL)
1753 .map(|v| v.to_owned())
1754 .unwrap_or_else(|| "60".to_owned());
1755 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1756 ErrorCode::InvalidInputSyntax(format!(
1757 "commit_checkpoint_interval must be a positive integer: {}",
1758 commit_checkpoint_interval
1759 ))
1760 })?;
1761
1762 if commit_checkpoint_interval == 0 {
1763 bail!("commit_checkpoint_interval must be a positive integer: 0");
1764 }
1765
1766 source
1768 .as_mut()
1769 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1770
1771 let sink_decouple = session.config().sink_decouple();
1772 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1773 bail!(
1774 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1775 )
1776 }
1777
1778 sink_with.insert(
1779 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1780 commit_checkpoint_interval.to_string(),
1781 );
1782 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1783
1784 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1785
1786 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1787 match enable_compaction.to_lowercase().as_str() {
1788 "true" => {
1789 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1790 }
1791 "false" => {
1792 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1793 }
1794 _ => {
1795 return Err(ErrorCode::InvalidInputSyntax(format!(
1796 "enable_compaction must be true or false: {}",
1797 enable_compaction
1798 ))
1799 .into());
1800 }
1801 }
1802
1803 source
1805 .as_mut()
1806 .map(|x| x.with_properties.remove("enable_compaction"));
1807 } else {
1808 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1809 }
1810
1811 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1812 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1813 ErrorCode::InvalidInputSyntax(format!(
1814 "compaction_interval_sec must be a positive integer: {}",
1815 commit_checkpoint_interval
1816 ))
1817 })?;
1818 if compaction_interval_sec == 0 {
1819 bail!("compaction_interval_sec must be a positive integer: 0");
1820 }
1821 sink_with.insert(
1822 "compaction_interval_sec".to_owned(),
1823 compaction_interval_sec.to_string(),
1824 );
1825 source
1827 .as_mut()
1828 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1829 }
1830
1831 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1832 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1833 {
1834 source
1836 .as_mut()
1837 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1838 match enable_snapshot_expiration.to_lowercase().as_str() {
1839 "true" => {
1840 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1841 true
1842 }
1843 "false" => {
1844 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1845 false
1846 }
1847 _ => {
1848 return Err(ErrorCode::InvalidInputSyntax(format!(
1849 "enable_snapshot_expiration must be true or false: {}",
1850 enable_snapshot_expiration
1851 ))
1852 .into());
1853 }
1854 }
1855 } else {
1856 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1857 true
1858 };
1859
1860 if has_enabled_snapshot_expiration {
1861 if let Some(snapshot_expiration_retain_last) = handler_args
1863 .with_options
1864 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1865 {
1866 sink_with.insert(
1867 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1868 snapshot_expiration_retain_last.to_owned(),
1869 );
1870 source
1872 .as_mut()
1873 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1874 }
1875
1876 if let Some(snapshot_expiration_max_age) = handler_args
1877 .with_options
1878 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1879 {
1880 sink_with.insert(
1881 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1882 snapshot_expiration_max_age.to_owned(),
1883 );
1884 source
1886 .as_mut()
1887 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1888 }
1889
1890 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1891 .with_options
1892 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1893 {
1894 sink_with.insert(
1895 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1896 snapshot_expiration_clear_expired_files.to_owned(),
1897 );
1898 source.as_mut().map(|x| {
1900 x.with_properties
1901 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1902 });
1903 }
1904
1905 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1906 .with_options
1907 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1908 {
1909 sink_with.insert(
1910 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1911 snapshot_expiration_clear_expired_meta_data.to_owned(),
1912 );
1913 source.as_mut().map(|x| {
1915 x.with_properties
1916 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1917 });
1918 }
1919 }
1920
1921 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1922 match write_mode.to_lowercase().as_str() {
1923 ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1924 sink_with.insert(
1925 WRITE_MODE.to_owned(),
1926 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1927 );
1928 }
1929
1930 ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1931 if table.append_only {
1932 return Err(ErrorCode::NotSupported(
1933 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1934 "Please use MERGE ON READ instead".to_owned(),
1935 )
1936 .into());
1937 }
1938
1939 sink_with.insert(
1940 WRITE_MODE.to_owned(),
1941 ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1942 );
1943 }
1944
1945 _ => {
1946 return Err(ErrorCode::InvalidInputSyntax(format!(
1947 "write_mode must be one of: {}, {}",
1948 ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1949 ))
1950 .into());
1951 }
1952 }
1953
1954 source
1956 .as_mut()
1957 .map(|x| x.with_properties.remove("write_mode"));
1958 } else {
1959 sink_with.insert(
1960 WRITE_MODE.to_owned(),
1961 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1962 );
1963 }
1964
1965 if let Some(max_snapshots_num_before_compaction) =
1966 handler_args.with_options.get(MAX_SNAPSHOTS_NUM)
1967 {
1968 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
1969 .parse::<u32>()
1970 .map_err(|_| {
1971 ErrorCode::InvalidInputSyntax(format!(
1972 "max_snapshots_num_before_compaction must be a positive integer: {}",
1973 max_snapshots_num_before_compaction
1974 ))
1975 })?;
1976 if max_snapshots_num_before_compaction == 0 {
1977 bail!("max_snapshots_num_before_compaction must be a positive integer: 0");
1978 }
1979 sink_with.insert(
1980 MAX_SNAPSHOTS_NUM.to_owned(),
1981 max_snapshots_num_before_compaction.to_string(),
1982 );
1983 }
1984
1985 let partition_by = handler_args
1986 .with_options
1987 .get("partition_by")
1988 .map(|v| v.to_owned());
1989
1990 if let Some(partition_by) = &partition_by {
1991 let mut partition_columns = vec![];
1992 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1993 table
1994 .columns()
1995 .iter()
1996 .find(|col| col.name().eq_ignore_ascii_case(&column))
1997 .ok_or_else(|| {
1998 ErrorCode::InvalidInputSyntax(format!(
1999 "Partition source column does not exist in schema: {}",
2000 column
2001 ))
2002 })?;
2003
2004 partition_columns.push(column);
2005 }
2006
2007 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2008 |_| {
2009 ErrorCode::InvalidInputSyntax(
2010 "The partition columns should be the prefix of the primary key".to_owned(),
2011 )
2012 },
2013 )?;
2014
2015 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2016
2017 source
2019 .as_mut()
2020 .map(|x| x.with_properties.remove("partition_by"));
2021 }
2022
2023 sink_handler_args.with_options =
2024 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2025 let SinkPlanContext {
2026 sink_plan,
2027 sink_catalog,
2028 ..
2029 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2030 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2031
2032 let mut source_name = table_name.clone();
2033 *source_name.0.last_mut().unwrap() = Ident::from(
2034 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2035 );
2036 let create_source_stmt = CreateSourceStatement {
2037 temporary: false,
2038 if_not_exists: false,
2039 columns: vec![],
2040 source_name,
2041 wildcard_idx: Some(0),
2042 constraints: vec![],
2043 with_properties: WithProperties(vec![]),
2044 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2045 source_watermarks: vec![],
2046 include_column_options: vec![],
2047 };
2048
2049 let mut source_handler_args = handler_args.clone();
2050 let source_with = with_common;
2051 source_handler_args.with_options =
2052 WithOptions::new(source_with, Default::default(), connection_ref);
2053
2054 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2055 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2056 let with_properties = bind_connector_props(&source_handler_args, &format_encode, true)?;
2057
2058 let (iceberg_catalog, table_identifier) = {
2061 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2062 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2063 iceberg_sink.create_table_if_not_exists().await?;
2064
2065 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2066 let table_identifier = iceberg_sink.config.full_table_name()?;
2067 (iceberg_catalog, table_identifier)
2068 };
2069
2070 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2071 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2072 &session,
2073 &format_encode,
2074 Either::Left(&with_properties),
2075 create_source_type,
2076 )
2077 .await?;
2078 let mut col_id_gen = ColumnIdGenerator::new_initial();
2079
2080 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2081 source_handler_args,
2082 create_source_stmt.source_name,
2083 format_encode,
2084 with_properties,
2085 &create_source_stmt.columns,
2086 create_source_stmt.constraints,
2087 create_source_stmt.wildcard_idx,
2088 create_source_stmt.source_watermarks,
2089 columns_from_resolve_source,
2090 source_info,
2091 create_source_stmt.include_column_options,
2092 &mut col_id_gen,
2093 create_source_type,
2094 overwrite_options.source_rate_limit,
2095 SqlColumnStrategy::FollowChecked,
2096 )
2097 .await?;
2098
2099 let has_connector = source.is_some();
2100
2101 let _ = Jvm::get_or_init()?;
2104
2105 let catalog_writer = session.catalog_writer()?;
2106 let res = catalog_writer
2107 .create_iceberg_table(
2108 PbTableJobInfo {
2109 source,
2110 table: Some(table.to_prost()),
2111 fragment_graph: Some(graph),
2112 job_type: job_type as _,
2113 },
2114 PbSinkJobInfo {
2115 sink: Some(sink_catalog.to_proto()),
2116 fragment_graph: Some(sink_graph),
2117 },
2118 iceberg_source_catalog.to_prost(),
2119 if_not_exists,
2120 )
2121 .await;
2122 if res.is_err() {
2123 let _ = iceberg_catalog
2124 .drop_table(&table_identifier)
2125 .await
2126 .inspect_err(|err| {
2127 tracing::error!(
2128 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2129 table_identifier,
2130 err.as_report()
2131 );
2132 });
2133 res?
2134 }
2135
2136 if has_connector {
2138 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
2139 handler_args,
2140 PbThrottleTarget::TableWithSource,
2141 table_name,
2142 -1,
2143 )
2144 .await?;
2145 }
2146
2147 Ok(())
2148}
2149
2150pub fn check_create_table_with_source(
2151 with_options: &WithOptions,
2152 format_encode: Option<FormatEncodeOptions>,
2153 include_column_options: &IncludeOption,
2154 cdc_table_info: &Option<CdcTableInfo>,
2155) -> Result<Option<FormatEncodeOptions>> {
2156 if cdc_table_info.is_some() {
2158 return Ok(format_encode);
2159 }
2160 let defined_source = with_options.is_source_connector();
2161
2162 if !include_column_options.is_empty() && !defined_source {
2163 return Err(ErrorCode::InvalidInputSyntax(
2164 "INCLUDE should be used with a connector".to_owned(),
2165 )
2166 .into());
2167 }
2168 if defined_source {
2169 format_encode.as_ref().ok_or_else(|| {
2170 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2171 })?;
2172 }
2173 Ok(format_encode)
2174}
2175
2176fn ensure_partition_columns_are_prefix_of_primary_key(
2177 partition_columns: &[String],
2178 primary_key_columns: &[String],
2179) -> std::result::Result<(), String> {
2180 if partition_columns.len() > primary_key_columns.len() {
2181 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2182 }
2183
2184 for (i, partition_col) in partition_columns.iter().enumerate() {
2185 if primary_key_columns.get(i) != Some(partition_col) {
2186 return Err(format!(
2187 "Partition column '{}' is not a prefix of the primary key.",
2188 partition_col
2189 ));
2190 }
2191 }
2192
2193 Ok(())
2194}
2195
2196#[allow(clippy::too_many_arguments)]
2197pub async fn generate_stream_graph_for_replace_table(
2198 _session: &Arc<SessionImpl>,
2199 table_name: ObjectName,
2200 original_catalog: &Arc<TableCatalog>,
2201 handler_args: HandlerArgs,
2202 statement: Statement,
2203 col_id_gen: ColumnIdGenerator,
2204 sql_column_strategy: SqlColumnStrategy,
2205) -> Result<(
2206 StreamFragmentGraph,
2207 TableCatalog,
2208 Option<SourceCatalog>,
2209 TableJobType,
2210)> {
2211 let Statement::CreateTable {
2212 columns,
2213 constraints,
2214 source_watermarks,
2215 append_only,
2216 on_conflict,
2217 with_version_columns,
2218 wildcard_idx,
2219 cdc_table_info,
2220 format_encode,
2221 include_column_options,
2222 engine,
2223 with_options,
2224 ..
2225 } = statement
2226 else {
2227 panic!("unexpected statement type: {:?}", statement);
2228 };
2229
2230 let format_encode = format_encode
2231 .clone()
2232 .map(|format_encode| format_encode.into_v2_with_warning());
2233
2234 let engine = match engine {
2235 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2236 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2237 };
2238
2239 let is_drop_connector =
2240 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2241 if is_drop_connector {
2242 debug_assert!(
2243 source_watermarks.is_empty()
2244 && include_column_options.is_empty()
2245 && with_options
2246 .iter()
2247 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2248 );
2249 }
2250
2251 let props = CreateTableProps {
2252 definition: handler_args.normalized_sql.clone(),
2253 append_only,
2254 on_conflict: on_conflict.into(),
2255 with_version_columns: with_version_columns
2256 .iter()
2257 .map(|col| col.real_value())
2258 .collect(),
2259 webhook_info: original_catalog.webhook_info.clone(),
2260 engine,
2261 };
2262
2263 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2264 (Some(format_encode), None) => (
2265 gen_create_table_plan_with_source(
2266 handler_args,
2267 ExplainOptions::default(),
2268 table_name,
2269 columns,
2270 wildcard_idx,
2271 constraints,
2272 format_encode,
2273 source_watermarks,
2274 col_id_gen,
2275 include_column_options,
2276 props,
2277 sql_column_strategy,
2278 )
2279 .await?,
2280 TableJobType::General,
2281 ),
2282 (None, None) => {
2283 let context = OptimizerContext::from_handler_args(handler_args);
2284 let (plan, table) = gen_create_table_plan(
2285 context,
2286 table_name,
2287 columns,
2288 constraints,
2289 col_id_gen,
2290 source_watermarks,
2291 props,
2292 true,
2293 )?;
2294 ((plan, None, table), TableJobType::General)
2295 }
2296 (None, Some(cdc_table)) => {
2297 let session = &handler_args.session;
2298 let (source, resolved_table_name, database_id, schema_id) =
2299 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2300
2301 let cdc_with_options = derive_with_options_for_cdc_table(
2302 &source.with_properties,
2303 cdc_table.external_table_name.clone(),
2304 )?;
2305
2306 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2307
2308 let context: OptimizerContextRef =
2309 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2310 let (plan, table) = gen_create_table_plan_for_cdc_table(
2311 context,
2312 source,
2313 cdc_table.external_table_name.clone(),
2314 columns,
2315 column_catalogs,
2316 pk_names,
2317 cdc_with_options,
2318 col_id_gen,
2319 on_conflict,
2320 with_version_columns
2321 .iter()
2322 .map(|col| col.real_value())
2323 .collect(),
2324 include_column_options,
2325 table_name,
2326 resolved_table_name,
2327 database_id,
2328 schema_id,
2329 original_catalog.id(),
2330 engine,
2331 )?;
2332
2333 ((plan, None, table), TableJobType::SharedCdcSource)
2334 }
2335 (Some(_), Some(_)) => {
2336 return Err(ErrorCode::NotSupported(
2337 "Data format and encoding format doesn't apply to table created from a CDC source"
2338 .into(),
2339 "Remove the FORMAT and ENCODE specification".into(),
2340 )
2341 .into());
2342 }
2343 };
2344
2345 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2346 Err(ErrorCode::InvalidInputSyntax(
2347 "alter primary key of table is not supported".to_owned(),
2348 ))?
2349 }
2350
2351 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2352
2353 table.id = original_catalog.id();
2355 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2356 table.associated_source_id = Some(source_id);
2357
2358 let source = source.as_mut().unwrap();
2359 source.id = source_id.as_raw_id();
2360 source.associated_table_id = Some(table.id());
2361 }
2362
2363 Ok((graph, table, source, job_type))
2364}
2365
2366fn get_source_and_resolved_table_name(
2367 session: &Arc<SessionImpl>,
2368 cdc_table: CdcTableInfo,
2369 table_name: ObjectName,
2370) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2371 let db_name = &session.database();
2372 let (schema_name, resolved_table_name) =
2373 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2374 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
2375
2376 let (format_encode, source_name) =
2377 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2378
2379 let source = {
2380 let catalog_reader = session.env().catalog_reader().read_guard();
2381 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2382 let (source, _) = catalog_reader.get_source_by_name(
2383 db_name,
2384 SchemaPath::Name(schema_name.as_str()),
2385 source_name.as_str(),
2386 )?;
2387 source.clone()
2388 };
2389
2390 Ok((source, resolved_table_name, database_id, schema_id))
2391}
2392
2393fn bind_webhook_info(
2395 session: &Arc<SessionImpl>,
2396 columns_defs: &[ColumnDef],
2397 webhook_info: WebhookSourceInfo,
2398) -> Result<PbWebhookSourceInfo> {
2399 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2401 {
2402 return Err(ErrorCode::InvalidInputSyntax(
2403 "Table with webhook source should have exactly one JSONB column".to_owned(),
2404 )
2405 .into());
2406 }
2407
2408 let WebhookSourceInfo {
2409 secret_ref,
2410 signature_expr,
2411 wait_for_persistence,
2412 is_batched,
2413 } = webhook_info;
2414
2415 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2417 let db_name = &session.database();
2418 let (schema_name, secret_name) =
2419 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2420 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2421 (
2422 Some(PbSecretRef {
2423 secret_id: secret_catalog.id.secret_id(),
2424 ref_as: match secret_ref.ref_as {
2425 SecretRefAsType::Text => PbRefAsType::Text,
2426 SecretRefAsType::File => PbRefAsType::File,
2427 }
2428 .into(),
2429 }),
2430 Some(secret_name),
2431 )
2432 } else {
2433 (None, None)
2434 };
2435
2436 let secure_compare_context = SecureCompareContext {
2437 column_name: columns_defs[0].name.real_value(),
2438 secret_name,
2439 };
2440 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2441 let expr = binder.bind_expr(&signature_expr)?;
2442
2443 if expr.as_function_call().is_none()
2445 || expr.as_function_call().unwrap().func_type()
2446 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2447 {
2448 return Err(ErrorCode::InvalidInputSyntax(
2449 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2450 )
2451 .into());
2452 }
2453
2454 let pb_webhook_info = PbWebhookSourceInfo {
2455 secret_ref: pb_secret_ref,
2456 signature_expr: Some(expr.to_expr_proto()),
2457 wait_for_persistence,
2458 is_batched,
2459 };
2460
2461 Ok(pb_webhook_info)
2462}
2463
2464#[cfg(test)]
2465mod tests {
2466 use risingwave_common::catalog::{
2467 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2468 };
2469 use risingwave_common::types::{DataType, StructType};
2470
2471 use super::*;
2472 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2473
2474 #[tokio::test]
2475 async fn test_create_table_handler() {
2476 let sql =
2477 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2478 let frontend = LocalFrontend::new(Default::default()).await;
2479 frontend.run_sql(sql).await.unwrap();
2480
2481 let session = frontend.session_ref();
2482 let catalog_reader = session.env().catalog_reader().read_guard();
2483 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2484
2485 let (table, _) = catalog_reader
2487 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2488 .unwrap();
2489 assert_eq!(table.name(), "t");
2490
2491 let columns = table
2492 .columns
2493 .iter()
2494 .map(|col| (col.name(), col.data_type().clone()))
2495 .collect::<HashMap<&str, DataType>>();
2496
2497 let expected_columns = maplit::hashmap! {
2498 ROW_ID_COLUMN_NAME => DataType::Serial,
2499 "v1" => DataType::Int16,
2500 "v2" => StructType::new(
2501 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2502 )
2503 .with_ids([3, 4, 5].map(ColumnId::new))
2504 .into(),
2505 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2506 };
2507
2508 assert_eq!(columns, expected_columns, "{columns:#?}");
2509 }
2510
2511 #[test]
2512 fn test_bind_primary_key() {
2513 for (sql, expected) in [
2516 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2517 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2518 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2519 (
2520 "create table t (v1 int primary key, v2 int primary key)",
2521 Err("multiple primary keys are not allowed"),
2522 ),
2523 (
2524 "create table t (v1 int primary key primary key, v2 int)",
2525 Err("multiple primary keys are not allowed"),
2526 ),
2527 (
2528 "create table t (v1 int, v2 int, primary key (v1))",
2529 Ok(&[1]),
2530 ),
2531 (
2532 "create table t (v1 int, primary key (v2), v2 int)",
2533 Ok(&[2]),
2534 ),
2535 (
2536 "create table t (primary key (v2, v1), v1 int, v2 int)",
2537 Ok(&[2, 1]),
2538 ),
2539 (
2540 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2541 Err("multiple primary keys are not allowed"),
2542 ),
2543 (
2544 "create table t (v1 int primary key, primary key (v1), v2 int)",
2545 Err("multiple primary keys are not allowed"),
2546 ),
2547 (
2548 "create table t (v1 int, primary key (V3), v2 int)",
2549 Err("column \"v3\" named in key does not exist"),
2550 ),
2551 ] {
2552 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2553 let risingwave_sqlparser::ast::Statement::CreateTable {
2554 columns: column_defs,
2555 constraints,
2556 ..
2557 } = ast.remove(0)
2558 else {
2559 panic!("test case should be create table")
2560 };
2561 let actual: Result<_> = (|| {
2562 let mut columns = bind_sql_columns(&column_defs, false)?;
2563 let mut col_id_gen = ColumnIdGenerator::new_initial();
2564 for c in &mut columns {
2565 col_id_gen.generate(c)?;
2566 }
2567
2568 let pk_names =
2569 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2570 let (_, pk_column_ids, _) =
2571 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2572 Ok(pk_column_ids)
2573 })();
2574 match (expected, actual) {
2575 (Ok(expected), Ok(actual)) => assert_eq!(
2576 expected.iter().copied().map(ColumnId::new).collect_vec(),
2577 actual,
2578 "sql: {sql}"
2579 ),
2580 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2581 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2582 (Err(expected), Err(actual)) => assert!(
2583 actual.to_string().contains(expected),
2584 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2585 ),
2586 }
2587 }
2588 }
2589
2590 #[tokio::test]
2591 async fn test_duplicate_props_options() {
2592 let proto_file = create_proto_file(PROTO_FILE_DATA);
2593 let sql = format!(
2594 r#"CREATE TABLE t
2595 WITH (
2596 connector = 'kinesis',
2597 aws.region='user_test_topic',
2598 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2599 aws.credentials.access_key_id = 'your_access_key_1',
2600 aws.credentials.secret_access_key = 'your_secret_key_1'
2601 )
2602 FORMAT PLAIN ENCODE PROTOBUF (
2603 message = '.test.TestRecord',
2604 aws.credentials.access_key_id = 'your_access_key_2',
2605 aws.credentials.secret_access_key = 'your_secret_key_2',
2606 schema.location = 'file://{}',
2607 )"#,
2608 proto_file.path().to_str().unwrap()
2609 );
2610 let frontend = LocalFrontend::new(Default::default()).await;
2611 frontend.run_sql(sql).await.unwrap();
2612
2613 let session = frontend.session_ref();
2614 let catalog_reader = session.env().catalog_reader().read_guard();
2615 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2616
2617 let (source, _) = catalog_reader
2619 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2620 .unwrap();
2621 assert_eq!(source.name, "t");
2622
2623 assert_eq!(
2625 source
2626 .info
2627 .format_encode_options
2628 .get("aws.credentials.access_key_id")
2629 .unwrap(),
2630 "your_access_key_2"
2631 );
2632 assert_eq!(
2633 source
2634 .info
2635 .format_encode_options
2636 .get("aws.credentials.secret_access_key")
2637 .unwrap(),
2638 "your_secret_key_2"
2639 );
2640
2641 assert_eq!(
2643 source
2644 .with_properties
2645 .get("aws.credentials.access_key_id")
2646 .unwrap(),
2647 "your_access_key_1"
2648 );
2649 assert_eq!(
2650 source
2651 .with_properties
2652 .get("aws.credentials.secret_access_key")
2653 .unwrap(),
2654 "your_secret_key_1"
2655 );
2656
2657 assert!(!source.with_properties.contains_key("schema.location"));
2659 }
2660}