1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
41 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76 bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
80use crate::optimizer::plan_node::{
81 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
82};
83use crate::optimizer::property::{Order, RequiredDist};
84use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
85use crate::session::SessionImpl;
86use crate::session::current::notice_to_user;
87use crate::stream_fragmenter::{GraphJobType, build_graph};
88use crate::utils::OverwriteOptions;
89use crate::{Binder, Explain, TableCatalog, WithOptions};
90
91mod col_id_gen;
92pub use col_id_gen::*;
93use risingwave_connector::sink::iceberg::{
94 COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
95 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, MAX_SNAPSHOTS_NUM,
96 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
97 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
98 parse_partition_by_exprs,
99};
100
101use crate::handler::drop_table::handle_drop_table;
102
103fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
104 for option_def in &c.options {
105 match option_def.option {
106 ColumnOption::GeneratedColumns(_) => {}
107 ColumnOption::DefaultValue(_) => {}
108 ColumnOption::DefaultValueInternal { .. } => {}
109 ColumnOption::Unique { is_primary: true } => {}
110 ColumnOption::Null => {}
111 ColumnOption::NotNull => {}
112 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
113 }
114 }
115 Ok(())
116}
117
118pub fn bind_sql_columns(
122 column_defs: &[ColumnDef],
123 is_for_drop_table_connector: bool,
124) -> Result<Vec<ColumnCatalog>> {
125 let mut columns = Vec::with_capacity(column_defs.len());
126
127 for column in column_defs {
128 ensure_column_options_supported(column)?;
129 let ColumnDef {
133 name,
134 data_type,
135 collation,
136 options,
137 ..
138 } = column;
139
140 let data_type = data_type
141 .clone()
142 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
143 if let Some(collation) = collation {
144 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
150 bail_not_implemented!(
151 "Collate collation other than `C` or `POSIX` is not implemented"
152 );
153 }
154
155 match data_type {
156 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
157 _ => {
158 return Err(ErrorCode::NotSupported(
159 format!("{} is not a collatable data type", data_type),
160 "The only built-in collatable data types are `varchar`, please check your type".into(),
161 ).into());
162 }
163 }
164 }
165
166 if !is_for_drop_table_connector {
167 check_column_name_not_reserved(&name.real_value())?;
171 }
172
173 let nullable: bool = !options
174 .iter()
175 .any(|def| matches!(def.option, ColumnOption::NotNull));
176
177 columns.push(ColumnCatalog {
178 column_desc: ColumnDesc {
179 data_type: bind_data_type(&data_type)?,
180 column_id: ColumnId::placeholder(),
181 name: name.real_value(),
182 generated_or_default_column: None,
183 description: None,
184 additional_column: AdditionalColumn { column_type: None },
185 version: ColumnDescVersion::LATEST,
186 system_column: None,
187 nullable,
188 },
189 is_hidden: false,
190 });
191 }
192
193 Ok(columns)
194}
195
196fn check_generated_column_constraints(
197 column_name: &String,
198 column_id: ColumnId,
199 expr: &ExprImpl,
200 column_catalogs: &[ColumnCatalog],
201 generated_column_names: &[String],
202 pk_column_ids: &[ColumnId],
203) -> Result<()> {
204 let input_refs = expr.collect_input_refs(column_catalogs.len());
205 for idx in input_refs.ones() {
206 let referred_generated_column = &column_catalogs[idx].column_desc.name;
207 if generated_column_names
208 .iter()
209 .any(|c| c == referred_generated_column)
210 {
211 return Err(ErrorCode::BindError(format!(
212 "Generated can not reference another generated column. \
213 But here generated column \"{}\" referenced another generated column \"{}\"",
214 column_name, referred_generated_column
215 ))
216 .into());
217 }
218 }
219
220 if pk_column_ids.contains(&column_id) && expr.is_impure() {
221 return Err(ErrorCode::BindError(format!(
222 "Generated columns with impure expressions should not be part of the primary key. \
223 Here column \"{}\" is defined as part of the primary key.",
224 column_name
225 ))
226 .into());
227 }
228
229 Ok(())
230}
231
232pub fn bind_sql_column_constraints(
235 session: &SessionImpl,
236 table_name: String,
237 column_catalogs: &mut [ColumnCatalog],
238 columns: &[ColumnDef],
239 pk_column_ids: &[ColumnId],
240) -> Result<()> {
241 let generated_column_names = {
242 let mut names = vec![];
243 for column in columns {
244 for option_def in &column.options {
245 if let ColumnOption::GeneratedColumns(_) = option_def.option {
246 names.push(column.name.real_value());
247 break;
248 }
249 }
250 }
251 names
252 };
253
254 let mut binder = Binder::new_for_ddl(session);
255 binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
256
257 for column in columns {
258 let Some(idx) = column_catalogs
259 .iter()
260 .position(|c| c.name() == column.name.real_value())
261 else {
262 continue;
265 };
266
267 for option_def in &column.options {
268 match &option_def.option {
269 ColumnOption::GeneratedColumns(expr) => {
270 binder.set_clause(Some(Clause::GeneratedColumn));
271
272 let expr_impl = binder.bind_expr(expr).with_context(|| {
273 format!(
274 "fail to bind expression in generated column \"{}\"",
275 column.name.real_value()
276 )
277 })?;
278
279 check_generated_column_constraints(
280 &column.name.real_value(),
281 column_catalogs[idx].column_id(),
282 &expr_impl,
283 column_catalogs,
284 &generated_column_names,
285 pk_column_ids,
286 )?;
287
288 column_catalogs[idx].column_desc.generated_or_default_column = Some(
289 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
290 expr: Some(expr_impl.to_expr_proto()),
291 }),
292 );
293 binder.set_clause(None);
294 }
295 ColumnOption::DefaultValue(expr) => {
296 let expr_impl = binder
297 .bind_expr(expr)?
298 .cast_assign(column_catalogs[idx].data_type())?;
299
300 let rewritten_expr_impl = session
308 .pinned_snapshot()
309 .inline_now_proc_time()
310 .rewrite_expr(expr_impl.clone());
311
312 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
313 let snapshot_value = snapshot_value?;
314
315 column_catalogs[idx].column_desc.generated_or_default_column =
316 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
317 snapshot_value: Some(snapshot_value.to_protobuf()),
318 expr: Some(expr_impl.to_expr_proto()),
319 }));
321 } else {
322 return Err(ErrorCode::BindError(format!(
323 "Default expression used in column `{}` cannot be evaluated. \
324 Use generated columns instead if you mean to reference other columns.",
325 column.name
326 ))
327 .into());
328 }
329 }
330 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
331 if persisted.is_empty() {
336 bail_bind_error!(
337 "DEFAULT INTERNAL is only used for internal purposes, \
338 please specify a concrete default value"
339 );
340 }
341
342 let desc = DefaultColumnDesc::decode(&**persisted)
343 .expect("failed to decode persisted `DefaultColumnDesc`");
344
345 column_catalogs[idx].column_desc.generated_or_default_column =
346 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
347 }
348 _ => {}
349 }
350 }
351 }
352 Ok(())
353}
354
355pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
357 let mut pk_column_names = vec![];
358
359 for constraint in table_constraints {
360 match constraint {
361 TableConstraint::Unique {
362 name: _,
363 columns,
364 is_primary: true,
365 } => {
366 if !pk_column_names.is_empty() {
367 return Err(multiple_pk_definition_err());
368 }
369 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
370 }
371 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
372 }
373 }
374 Ok(pk_column_names)
375}
376
377pub fn bind_sql_pk_names(
378 columns_defs: &[ColumnDef],
379 pk_names_from_table_constraints: Vec<String>,
380) -> Result<Vec<String>> {
381 let mut pk_column_names = pk_names_from_table_constraints;
382
383 for column in columns_defs {
384 for option_def in &column.options {
385 if let ColumnOption::Unique { is_primary: true } = option_def.option {
386 if !pk_column_names.is_empty() {
387 return Err(multiple_pk_definition_err());
388 }
389 pk_column_names.push(column.name.real_value());
390 };
391 }
392 }
393
394 Ok(pk_column_names)
395}
396
397fn multiple_pk_definition_err() -> RwError {
398 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
399}
400
401pub fn bind_pk_and_row_id_on_relation(
406 mut columns: Vec<ColumnCatalog>,
407 pk_names: Vec<String>,
408 must_need_pk: bool,
409) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
410 for c in &columns {
411 assert!(c.column_id() != ColumnId::placeholder());
412 }
413
414 let name_to_id = columns
416 .iter()
417 .map(|c| (c.name(), c.column_id()))
418 .collect::<HashMap<_, _>>();
419
420 let mut pk_column_ids: Vec<_> = pk_names
421 .iter()
422 .map(|name| {
423 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
424 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
425 })
426 })
427 .try_collect()?;
428
429 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
431
432 let row_id_index = need_row_id.then(|| {
433 let column = ColumnCatalog::row_id_column();
434 let index = columns.len();
435 pk_column_ids = vec![column.column_id()];
436 columns.push(column);
437 index
438 });
439
440 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
441 Err(ErrorCode::InvalidInputSyntax(format!(
442 "column \"{col}\" specified more than once"
443 )))?;
444 }
445
446 Ok((columns, pk_column_ids, row_id_index))
447}
448
449#[allow(clippy::too_many_arguments)]
452pub(crate) async fn gen_create_table_plan_with_source(
453 mut handler_args: HandlerArgs,
454 explain_options: ExplainOptions,
455 table_name: ObjectName,
456 column_defs: Vec<ColumnDef>,
457 wildcard_idx: Option<usize>,
458 constraints: Vec<TableConstraint>,
459 format_encode: FormatEncodeOptions,
460 source_watermarks: Vec<SourceWatermark>,
461 mut col_id_gen: ColumnIdGenerator,
462 include_column_options: IncludeOption,
463 props: CreateTableProps,
464 sql_column_strategy: SqlColumnStrategy,
465) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
466 if props.append_only
467 && format_encode.format != Format::Plain
468 && format_encode.format != Format::Native
469 {
470 return Err(ErrorCode::BindError(format!(
471 "Append only table does not support format {}.",
472 format_encode.format
473 ))
474 .into());
475 }
476
477 let session = &handler_args.session;
478 let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
479 if with_properties.is_shareable_cdc_connector() {
480 generated_columns_check_for_cdc_table(&column_defs)?;
481 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
482 } else if column_defs.iter().any(|col| {
483 col.options
484 .iter()
485 .any(|def| matches!(def.option, ColumnOption::NotNull))
486 }) {
487 notice_to_user(
489 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
490 );
491 }
492
493 let db_name: &str = &session.database();
494 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
495
496 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
498 session,
499 &format_encode,
500 Either::Left(&with_properties),
501 CreateSourceType::Table,
502 )
503 .await?;
504
505 let overwrite_options = OverwriteOptions::new(&mut handler_args);
506 let rate_limit = overwrite_options.source_rate_limit;
507 let source = bind_create_source_or_table_with_connector(
508 handler_args.clone(),
509 table_name,
510 format_encode,
511 with_properties,
512 &column_defs,
513 constraints,
514 wildcard_idx,
515 source_watermarks,
516 columns_from_resolve_source,
517 source_info,
518 include_column_options,
519 &mut col_id_gen,
520 CreateSourceType::Table,
521 rate_limit,
522 sql_column_strategy,
523 )
524 .await?;
525
526 let context = OptimizerContext::new(handler_args, explain_options);
527
528 let (plan, table) = gen_table_plan_with_source(
529 context.into(),
530 schema_name,
531 source.clone(),
532 col_id_gen.into_version(),
533 props,
534 )?;
535
536 Ok((plan, Some(source), table))
537}
538
539#[allow(clippy::too_many_arguments)]
542pub(crate) fn gen_create_table_plan(
543 context: OptimizerContext,
544 table_name: ObjectName,
545 column_defs: Vec<ColumnDef>,
546 constraints: Vec<TableConstraint>,
547 mut col_id_gen: ColumnIdGenerator,
548 source_watermarks: Vec<SourceWatermark>,
549 props: CreateTableProps,
550 is_for_replace_plan: bool,
551) -> Result<(PlanRef, TableCatalog)> {
552 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
553 for c in &mut columns {
554 col_id_gen.generate(c)?;
555 }
556
557 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
558 if !secret_refs.is_empty() || !connection_refs.is_empty() {
559 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
560 }
561
562 gen_create_table_plan_without_source(
563 context,
564 table_name,
565 columns,
566 column_defs,
567 constraints,
568 source_watermarks,
569 col_id_gen.into_version(),
570 props,
571 )
572}
573
574#[allow(clippy::too_many_arguments)]
575pub(crate) fn gen_create_table_plan_without_source(
576 context: OptimizerContext,
577 table_name: ObjectName,
578 columns: Vec<ColumnCatalog>,
579 column_defs: Vec<ColumnDef>,
580 constraints: Vec<TableConstraint>,
581 source_watermarks: Vec<SourceWatermark>,
582 version: TableVersion,
583 props: CreateTableProps,
584) -> Result<(PlanRef, TableCatalog)> {
585 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
587 let (mut columns, pk_column_ids, row_id_index) =
588 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
589
590 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
591 context.session_ctx(),
592 table_name.real_value(),
593 source_watermarks,
594 &columns,
595 )?;
596
597 bind_sql_column_constraints(
598 context.session_ctx(),
599 table_name.real_value(),
600 &mut columns,
601 &column_defs,
602 &pk_column_ids,
603 )?;
604 let session = context.session_ctx().clone();
605
606 let db_name = &session.database();
607 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
608
609 let info = CreateTableInfo {
610 columns,
611 pk_column_ids,
612 row_id_index,
613 watermark_descs,
614 source_catalog: None,
615 version,
616 };
617
618 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
619}
620
621fn gen_table_plan_with_source(
622 context: OptimizerContextRef,
623 schema_name: Option<String>,
624 source_catalog: SourceCatalog,
625 version: TableVersion,
626 props: CreateTableProps,
627) -> Result<(PlanRef, TableCatalog)> {
628 let table_name = source_catalog.name.clone();
629
630 let info = CreateTableInfo {
631 columns: source_catalog.columns.clone(),
632 pk_column_ids: source_catalog.pk_col_ids.clone(),
633 row_id_index: source_catalog.row_id_index,
634 watermark_descs: source_catalog.watermark_descs.clone(),
635 source_catalog: Some(source_catalog),
636 version,
637 };
638
639 gen_table_plan_inner(context, schema_name, table_name, info, props)
640}
641
642#[derive(Clone, Copy)]
644pub enum EitherOnConflict {
645 Ast(Option<OnConflict>),
646 Resolved(ConflictBehavior),
647}
648
649impl From<Option<OnConflict>> for EitherOnConflict {
650 fn from(v: Option<OnConflict>) -> Self {
651 Self::Ast(v)
652 }
653}
654
655impl From<ConflictBehavior> for EitherOnConflict {
656 fn from(v: ConflictBehavior) -> Self {
657 Self::Resolved(v)
658 }
659}
660
661impl EitherOnConflict {
662 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
664 let conflict_behavior = match self {
665 EitherOnConflict::Ast(on_conflict) => {
666 if append_only {
667 if row_id_as_pk {
668 ConflictBehavior::NoCheck
670 } else {
671 if let Some(on_conflict) = on_conflict
673 && on_conflict != OnConflict::Nothing
674 {
675 return Err(ErrorCode::InvalidInputSyntax(
676 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
677 the ON CONFLICT behavior must be DO NOTHING."
678 .to_owned(),
679 )
680 .into());
681 }
682 ConflictBehavior::IgnoreConflict
683 }
684 } else {
685 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
687 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
688 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
689 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
690 }
691 }
692 }
693 EitherOnConflict::Resolved(b) => b,
694 };
695
696 Ok(conflict_behavior)
697 }
698}
699
700pub struct CreateTableInfo {
705 pub columns: Vec<ColumnCatalog>,
706 pub pk_column_ids: Vec<ColumnId>,
707 pub row_id_index: Option<usize>,
708 pub watermark_descs: Vec<WatermarkDesc>,
709 pub source_catalog: Option<SourceCatalog>,
710 pub version: TableVersion,
711}
712
713pub struct CreateTableProps {
718 pub definition: String,
719 pub append_only: bool,
720 pub on_conflict: EitherOnConflict,
721 pub with_version_columns: Vec<String>,
722 pub webhook_info: Option<PbWebhookSourceInfo>,
723 pub engine: Engine,
724}
725
726#[allow(clippy::too_many_arguments)]
727fn gen_table_plan_inner(
728 context: OptimizerContextRef,
729 schema_name: Option<String>,
730 table_name: String,
731 info: CreateTableInfo,
732 props: CreateTableProps,
733) -> Result<(PlanRef, TableCatalog)> {
734 let CreateTableInfo {
735 ref columns,
736 row_id_index,
737 ref watermark_descs,
738 ref source_catalog,
739 ..
740 } = info;
741 let CreateTableProps { append_only, .. } = props;
742
743 let (database_id, schema_id) = context
744 .session_ctx()
745 .get_database_and_schema_id_for_create(schema_name)?;
746
747 let session = context.session_ctx().clone();
748 let retention_seconds = context.with_options().retention_seconds();
749
750 let source_node: LogicalPlanRef = LogicalSource::new(
751 source_catalog.clone().map(Rc::new),
752 columns.clone(),
753 row_id_index,
754 SourceNodeKind::CreateTable,
755 context.clone(),
756 None,
757 )?
758 .into();
759
760 let required_cols = FixedBitSet::with_capacity(columns.len());
761 let plan_root = PlanRoot::new_with_logical_plan(
762 source_node,
763 RequiredDist::Any,
764 Order::any(),
765 required_cols,
766 vec![],
767 );
768
769 if !append_only && !watermark_descs.is_empty() {
770 return Err(ErrorCode::NotSupported(
771 "Defining watermarks on table requires the table to be append only.".to_owned(),
772 "Use the key words `APPEND ONLY`".to_owned(),
773 )
774 .into());
775 }
776
777 if !append_only && retention_seconds.is_some() {
778 return Err(ErrorCode::NotSupported(
779 "Defining retention seconds on table requires the table to be append only.".to_owned(),
780 "Use the key words `APPEND ONLY`".to_owned(),
781 )
782 .into());
783 }
784
785 let materialize =
786 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
787
788 let mut table = materialize.table().clone();
789 table.owner = session.user_id();
790
791 Ok((materialize.into(), table))
792}
793
794#[allow(clippy::too_many_arguments)]
798pub(crate) fn gen_create_table_plan_for_cdc_table(
799 context: OptimizerContextRef,
800 source: Arc<SourceCatalog>,
801 external_table_name: String,
802 column_defs: Vec<ColumnDef>,
803 mut columns: Vec<ColumnCatalog>,
804 pk_names: Vec<String>,
805 cdc_with_options: WithOptionsSecResolved,
806 mut col_id_gen: ColumnIdGenerator,
807 on_conflict: Option<OnConflict>,
808 with_version_columns: Vec<String>,
809 include_column_options: IncludeOption,
810 table_name: ObjectName,
811 resolved_table_name: String, database_id: DatabaseId,
813 schema_id: SchemaId,
814 table_id: TableId,
815 engine: Engine,
816) -> Result<(PlanRef, TableCatalog)> {
817 let session = context.session_ctx().clone();
818
819 handle_addition_columns(
821 None,
822 &cdc_with_options,
823 include_column_options,
824 &mut columns,
825 true,
826 )?;
827
828 for c in &mut columns {
829 col_id_gen.generate(c)?;
830 }
831
832 let (mut columns, pk_column_ids, _row_id_index) =
833 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
834
835 bind_sql_column_constraints(
837 context.session_ctx(),
838 table_name.real_value(),
839 &mut columns,
840 &column_defs,
841 &pk_column_ids,
842 )?;
843
844 let definition = context.normalized_sql().to_owned();
845
846 let pk_column_indices = {
847 let mut id_to_idx = HashMap::new();
848 columns.iter().enumerate().for_each(|(idx, c)| {
849 id_to_idx.insert(c.column_id(), idx);
850 });
851 pk_column_ids
853 .iter()
854 .map(|c| id_to_idx.get(c).copied().unwrap())
855 .collect_vec()
856 };
857 let table_pk = pk_column_indices
858 .iter()
859 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
860 .collect();
861
862 let (options, secret_refs) = cdc_with_options.into_parts();
863
864 let non_generated_column_descs = columns
865 .iter()
866 .filter(|&c| (!c.is_generated()))
867 .map(|c| c.column_desc.clone())
868 .collect_vec();
869 let non_generated_column_num = non_generated_column_descs.len();
870 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
871 let cdc_table_desc = CdcTableDesc {
872 table_id,
873 source_id: source.id.into(), external_table_name: external_table_name.clone(),
875 pk: table_pk,
876 columns: non_generated_column_descs,
877 stream_key: pk_column_indices,
878 connect_properties: options,
879 secret_refs,
880 };
881
882 tracing::debug!(?cdc_table_desc, "create cdc table");
883 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
884
885 let logical_scan = LogicalCdcScan::create(
886 external_table_name.clone(),
887 Rc::new(cdc_table_desc),
888 context.clone(),
889 options,
890 );
891
892 let scan_node: LogicalPlanRef = logical_scan.into();
893 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
894 let plan_root = PlanRoot::new_with_logical_plan(
895 scan_node,
896 RequiredDist::Any,
897 Order::any(),
898 required_cols,
899 vec![],
900 );
901
902 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
903 let materialize = plan_root.gen_table_plan(
904 context,
905 resolved_table_name,
906 database_id,
907 schema_id,
908 CreateTableInfo {
909 columns,
910 pk_column_ids,
911 row_id_index: None,
912 watermark_descs: vec![],
913 source_catalog: Some((*source).clone()),
914 version: col_id_gen.into_version(),
915 },
916 CreateTableProps {
917 definition,
918 append_only: false,
919 on_conflict: on_conflict.into(),
920 with_version_columns,
921 webhook_info: None,
922 engine,
923 },
924 )?;
925
926 let mut table = materialize.table().clone();
927 table.owner = session.user_id();
928 table.cdc_table_id = Some(cdc_table_id);
929 table.cdc_table_type = Some(cdc_table_type);
930 Ok((materialize.into(), table))
931}
932
933fn derive_with_options_for_cdc_table(
934 source_with_properties: &WithOptionsSecResolved,
935 external_table_name: String,
936) -> Result<WithOptionsSecResolved> {
937 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
938 let source_database_name: &str = source_with_properties
940 .get("database.name")
941 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
942 .as_str();
943 let mut with_options = source_with_properties.clone();
944 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
945 match connector.as_str() {
946 MYSQL_CDC_CONNECTOR => {
947 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
950 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
951 })?;
952 if !source_database_name
954 .split(',')
955 .map(|s| s.trim())
956 .any(|name| name == db_name)
957 {
958 return Err(anyhow!(
959 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
960 db_name,
961 source_database_name
962 ).into());
963 }
964 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
965 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
966 }
967 POSTGRES_CDC_CONNECTOR => {
968 let (schema_name, table_name) = external_table_name
969 .split_once('.')
970 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
971
972 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
974 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
975 }
976 SQL_SERVER_CDC_CONNECTOR => {
977 let parts: Vec<&str> = external_table_name.split('.').collect();
983 let (_, schema_name, table_name) = match parts.len() {
984 3 => {
985 let db_name = parts[0];
987 let schema_name = parts[1];
988 let table_name = parts[2];
989
990 if db_name != source_database_name {
992 return Err(anyhow!(
993 "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
994 db_name,
995 source_database_name
996 ).into());
997 }
998 (db_name, schema_name, table_name)
999 }
1000 2 => {
1001 let schema_name = parts[0];
1003 let table_name = parts[1];
1004 (source_database_name, schema_name, table_name)
1005 }
1006 1 => {
1007 let table_name = parts[0];
1009 (source_database_name, "dbo", table_name)
1010 }
1011 _ => {
1012 return Err(anyhow!(
1013 "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1014 ).into());
1015 }
1016 };
1017
1018 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1020 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1021 }
1022 _ => {
1023 return Err(RwError::from(anyhow!(
1024 "connector {} is not supported for cdc table",
1025 connector
1026 )));
1027 }
1028 };
1029 }
1030 Ok(with_options)
1031}
1032
1033#[allow(clippy::too_many_arguments)]
1034pub(super) async fn handle_create_table_plan(
1035 handler_args: HandlerArgs,
1036 explain_options: ExplainOptions,
1037 format_encode: Option<FormatEncodeOptions>,
1038 cdc_table_info: Option<CdcTableInfo>,
1039 table_name: &ObjectName,
1040 column_defs: Vec<ColumnDef>,
1041 wildcard_idx: Option<usize>,
1042 constraints: Vec<TableConstraint>,
1043 source_watermarks: Vec<SourceWatermark>,
1044 append_only: bool,
1045 on_conflict: Option<OnConflict>,
1046 with_version_columns: Vec<String>,
1047 include_column_options: IncludeOption,
1048 webhook_info: Option<WebhookSourceInfo>,
1049 engine: Engine,
1050) -> Result<(
1051 PlanRef,
1052 Option<SourceCatalog>,
1053 TableCatalog,
1054 TableJobType,
1055 Option<SourceId>,
1056)> {
1057 let col_id_gen = ColumnIdGenerator::new_initial();
1058 let format_encode = check_create_table_with_source(
1059 &handler_args.with_options,
1060 format_encode,
1061 &include_column_options,
1062 &cdc_table_info,
1063 )?;
1064 let webhook_info = webhook_info
1065 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1066 .transpose()?;
1067
1068 let props = CreateTableProps {
1069 definition: handler_args.normalized_sql.clone(),
1070 append_only,
1071 on_conflict: on_conflict.into(),
1072 with_version_columns: with_version_columns.clone(),
1073 webhook_info,
1074 engine,
1075 };
1076
1077 let ((plan, source, table), job_type, shared_shource_id) = match (
1078 format_encode,
1079 cdc_table_info.as_ref(),
1080 ) {
1081 (Some(format_encode), None) => (
1082 gen_create_table_plan_with_source(
1083 handler_args,
1084 explain_options,
1085 table_name.clone(),
1086 column_defs,
1087 wildcard_idx,
1088 constraints,
1089 format_encode,
1090 source_watermarks,
1091 col_id_gen,
1092 include_column_options,
1093 props,
1094 SqlColumnStrategy::FollowChecked,
1095 )
1096 .await?,
1097 TableJobType::General,
1098 None,
1099 ),
1100 (None, None) => {
1101 let context = OptimizerContext::new(handler_args, explain_options);
1102 let (plan, table) = gen_create_table_plan(
1103 context,
1104 table_name.clone(),
1105 column_defs,
1106 constraints,
1107 col_id_gen,
1108 source_watermarks,
1109 props,
1110 false,
1111 )?;
1112
1113 ((plan, None, table), TableJobType::General, None)
1114 }
1115
1116 (None, Some(cdc_table)) => {
1117 sanity_check_for_table_on_cdc_source(
1118 append_only,
1119 &column_defs,
1120 &wildcard_idx,
1121 &constraints,
1122 &source_watermarks,
1123 )?;
1124
1125 generated_columns_check_for_cdc_table(&column_defs)?;
1126 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1127
1128 let session = &handler_args.session;
1129 let db_name = &session.database();
1130 let user_name = &session.user_name();
1131 let search_path = session.config().search_path();
1132 let (schema_name, resolved_table_name) =
1133 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1134 let (database_id, schema_id) =
1135 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1136
1137 let (format_encode, source_name) =
1139 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1140
1141 let source = {
1142 let catalog_reader = session.env().catalog_reader().read_guard();
1143 let schema_path =
1144 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1145
1146 let (source, _) = catalog_reader.get_source_by_name(
1147 db_name,
1148 schema_path,
1149 source_name.as_str(),
1150 )?;
1151 source.clone()
1152 };
1153 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1154 &source.with_properties,
1155 cdc_table.external_table_name.clone(),
1156 )?;
1157
1158 let (columns, pk_names) = match wildcard_idx {
1159 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1160 None => {
1161 for column_def in &column_defs {
1162 for option_def in &column_def.options {
1163 if let ColumnOption::DefaultValue(_)
1164 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1165 {
1166 return Err(ErrorCode::NotSupported(
1167 "Default value for columns defined on the table created from a CDC source".into(),
1168 "Remove the default value expression in the column definitions".into(),
1169 )
1170 .into());
1171 }
1172 }
1173 }
1174
1175 let (columns, pk_names) =
1176 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1177 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1179 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1180 .context("failed to extract external table config")?;
1181
1182 (columns, pk_names)
1183 }
1184 };
1185
1186 let context: OptimizerContextRef =
1187 OptimizerContext::new(handler_args, explain_options).into();
1188 let shared_source_id = source.id;
1189 let (plan, table) = gen_create_table_plan_for_cdc_table(
1190 context,
1191 source,
1192 cdc_table.external_table_name.clone(),
1193 column_defs,
1194 columns,
1195 pk_names,
1196 cdc_with_options,
1197 col_id_gen,
1198 on_conflict,
1199 with_version_columns,
1200 include_column_options,
1201 table_name.clone(),
1202 resolved_table_name,
1203 database_id,
1204 schema_id,
1205 TableId::placeholder(),
1206 engine,
1207 )?;
1208
1209 (
1210 (plan, None, table),
1211 TableJobType::SharedCdcSource,
1212 Some(shared_source_id),
1213 )
1214 }
1215 (Some(_), Some(_)) => {
1216 return Err(ErrorCode::NotSupported(
1217 "Data format and encoding format doesn't apply to table created from a CDC source"
1218 .into(),
1219 "Remove the FORMAT and ENCODE specification".into(),
1220 )
1221 .into());
1222 }
1223 };
1224 Ok((plan, source, table, job_type, shared_shource_id))
1225}
1226
1227fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1229 let mut found_generated_column = false;
1230 for column in columns {
1231 let mut is_generated = false;
1232
1233 for option_def in &column.options {
1234 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1235 is_generated = true;
1236 break;
1237 }
1238 }
1239
1240 if is_generated {
1241 found_generated_column = true;
1242 } else if found_generated_column {
1243 return Err(ErrorCode::NotSupported(
1244 "Non-generated column found after a generated column.".into(),
1245 "Ensure that all generated columns appear at the end of the cdc table definition."
1246 .into(),
1247 )
1248 .into());
1249 }
1250 }
1251 Ok(())
1252}
1253
1254fn not_null_check_for_cdc_table(
1256 wildcard_idx: &Option<usize>,
1257 column_defs: &Vec<ColumnDef>,
1258) -> Result<()> {
1259 if !wildcard_idx.is_some()
1260 && column_defs.iter().any(|col| {
1261 col.options
1262 .iter()
1263 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1264 })
1265 {
1266 return Err(ErrorCode::NotSupported(
1267 "CDC table with NOT NULL constraint is not supported".to_owned(),
1268 "Please remove the NOT NULL constraint for columns".to_owned(),
1269 )
1270 .into());
1271 }
1272 Ok(())
1273}
1274
1275fn sanity_check_for_table_on_cdc_source(
1277 append_only: bool,
1278 column_defs: &Vec<ColumnDef>,
1279 wildcard_idx: &Option<usize>,
1280 constraints: &Vec<TableConstraint>,
1281 source_watermarks: &Vec<SourceWatermark>,
1282) -> Result<()> {
1283 if wildcard_idx.is_some() && !column_defs.is_empty() {
1285 return Err(ErrorCode::NotSupported(
1286 "wildcard(*) and column definitions cannot be used together".to_owned(),
1287 "Remove the wildcard or column definitions".to_owned(),
1288 )
1289 .into());
1290 }
1291
1292 if !wildcard_idx.is_some()
1294 && !constraints.iter().any(|c| {
1295 matches!(
1296 c,
1297 TableConstraint::Unique {
1298 is_primary: true,
1299 ..
1300 }
1301 )
1302 })
1303 && !column_defs.iter().any(|col| {
1304 col.options
1305 .iter()
1306 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1307 })
1308 {
1309 return Err(ErrorCode::NotSupported(
1310 "CDC table without primary key constraint is not supported".to_owned(),
1311 "Please define a primary key".to_owned(),
1312 )
1313 .into());
1314 }
1315
1316 if append_only {
1317 return Err(ErrorCode::NotSupported(
1318 "append only modifier on the table created from a CDC source".into(),
1319 "Remove the APPEND ONLY clause".into(),
1320 )
1321 .into());
1322 }
1323
1324 if !source_watermarks.is_empty() {
1325 return Err(ErrorCode::NotSupported(
1326 "watermark defined on the table created from a CDC source".into(),
1327 "Remove the Watermark definitions".into(),
1328 )
1329 .into());
1330 }
1331
1332 Ok(())
1333}
1334
1335async fn bind_cdc_table_schema_externally(
1337 cdc_with_options: WithOptionsSecResolved,
1338) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1339 let (options, secret_refs) = cdc_with_options.into_parts();
1341 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1342 .context("failed to extract external table config")?;
1343
1344 let table = ExternalTableImpl::connect(config)
1345 .await
1346 .context("failed to auto derive table schema")?;
1347
1348 Ok((
1349 table
1350 .column_descs()
1351 .iter()
1352 .cloned()
1353 .map(|column_desc| ColumnCatalog {
1354 column_desc,
1355 is_hidden: false,
1356 })
1357 .collect(),
1358 table.pk_names().clone(),
1359 ))
1360}
1361
1362fn bind_cdc_table_schema(
1364 column_defs: &Vec<ColumnDef>,
1365 constraints: &Vec<TableConstraint>,
1366 is_for_replace_plan: bool,
1367) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1368 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1369
1370 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1371 Ok((columns, pk_names))
1372}
1373
1374#[allow(clippy::too_many_arguments)]
1375pub async fn handle_create_table(
1376 mut handler_args: HandlerArgs,
1377 table_name: ObjectName,
1378 column_defs: Vec<ColumnDef>,
1379 wildcard_idx: Option<usize>,
1380 constraints: Vec<TableConstraint>,
1381 if_not_exists: bool,
1382 format_encode: Option<FormatEncodeOptions>,
1383 source_watermarks: Vec<SourceWatermark>,
1384 append_only: bool,
1385 on_conflict: Option<OnConflict>,
1386 with_version_columns: Vec<String>,
1387 cdc_table_info: Option<CdcTableInfo>,
1388 include_column_options: IncludeOption,
1389 webhook_info: Option<WebhookSourceInfo>,
1390 ast_engine: risingwave_sqlparser::ast::Engine,
1391) -> Result<RwPgResponse> {
1392 let session = handler_args.session.clone();
1393
1394 if append_only {
1395 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1396 }
1397
1398 session.check_cluster_limits().await?;
1399
1400 let engine = match ast_engine {
1401 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1402 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1403 };
1404 if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1405 if let Some(_rate_limit) = handler_args.with_options.insert(
1408 OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1409 "0".to_owned(),
1410 ) {
1411 return Err(ErrorCode::NotSupported(
1413 "source_rate_limit for iceberg table engine during table creation".to_owned(),
1414 "Please remove source_rate_limit from WITH options.".to_owned(),
1415 )
1416 .into());
1417 }
1418 }
1419
1420 if let Either::Right(resp) = session.check_relation_name_duplicated(
1421 table_name.clone(),
1422 StatementType::CREATE_TABLE,
1423 if_not_exists,
1424 )? {
1425 return Ok(resp);
1426 }
1427
1428 let (graph, source, hummock_table, job_type, shared_source_id) = {
1429 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1430 handler_args.clone(),
1431 ExplainOptions::default(),
1432 format_encode,
1433 cdc_table_info,
1434 &table_name,
1435 column_defs.clone(),
1436 wildcard_idx,
1437 constraints.clone(),
1438 source_watermarks,
1439 append_only,
1440 on_conflict,
1441 with_version_columns,
1442 include_column_options,
1443 webhook_info,
1444 engine,
1445 )
1446 .await?;
1447 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1448
1449 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1450
1451 (graph, source, table, job_type, shared_source_id)
1452 };
1453
1454 tracing::trace!(
1455 "name={}, graph=\n{}",
1456 table_name,
1457 serde_json::to_string_pretty(&graph).unwrap()
1458 );
1459
1460 let dependencies = shared_source_id
1461 .map(|id| HashSet::from([id as ObjectId]))
1462 .unwrap_or_default();
1463
1464 match engine {
1466 Engine::Hummock => {
1467 let catalog_writer = session.catalog_writer()?;
1468 catalog_writer
1469 .create_table(
1470 source.map(|s| s.to_prost()),
1471 hummock_table.to_prost(),
1472 graph,
1473 job_type,
1474 if_not_exists,
1475 dependencies,
1476 )
1477 .await?;
1478 }
1479 Engine::Iceberg => {
1480 create_iceberg_engine_table(
1481 session,
1482 handler_args,
1483 source.map(|s| s.to_prost()),
1484 hummock_table,
1485 graph,
1486 table_name,
1487 job_type,
1488 if_not_exists,
1489 )
1490 .await?;
1491 }
1492 }
1493
1494 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1495}
1496
1497#[allow(clippy::too_many_arguments)]
1506pub async fn create_iceberg_engine_table(
1507 session: Arc<SessionImpl>,
1508 handler_args: HandlerArgs,
1509 mut source: Option<PbSource>,
1510 table: TableCatalog,
1511 graph: StreamFragmentGraph,
1512 table_name: ObjectName,
1513 job_type: PbTableJobType,
1514 if_not_exists: bool,
1515) -> Result<()> {
1516 let meta_client = session.env().meta_client();
1517 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1518
1519 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1520 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1521 })?;
1522 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1523 let meta_store_user = meta_store_endpoint.username().to_owned();
1524 let meta_store_password = match meta_store_endpoint.password() {
1525 Some(password) => percent_decode_str(password)
1526 .decode_utf8()
1527 .map_err(|_| {
1528 ErrorCode::InternalError(
1529 "failed to parse password from meta store endpoint".to_owned(),
1530 )
1531 })?
1532 .into_owned(),
1533 None => "".to_owned(),
1534 };
1535 let meta_store_host = meta_store_endpoint
1536 .host_str()
1537 .ok_or_else(|| {
1538 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1539 })?
1540 .to_owned();
1541 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1542 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1543 })?;
1544 let meta_store_database = meta_store_endpoint
1545 .path()
1546 .trim_start_matches('/')
1547 .to_owned();
1548
1549 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1550 bail!("failed to parse meta backend: {}", meta_store_backend);
1551 };
1552
1553 let catalog_uri = match meta_backend {
1554 MetaBackend::Postgres => {
1555 format!(
1556 "jdbc:postgresql://{}:{}/{}",
1557 meta_store_host.clone(),
1558 meta_store_port.clone(),
1559 meta_store_database.clone()
1560 )
1561 }
1562 MetaBackend::Mysql => {
1563 format!(
1564 "jdbc:mysql://{}:{}/{}",
1565 meta_store_host.clone(),
1566 meta_store_port.clone(),
1567 meta_store_database.clone()
1568 )
1569 }
1570 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1571 bail!(
1572 "Unsupported meta backend for iceberg engine table: {}",
1573 meta_store_backend
1574 );
1575 }
1576 };
1577
1578 let rw_db_name = session
1579 .env()
1580 .catalog_reader()
1581 .read_guard()
1582 .get_database_by_id(&table.database_id)?
1583 .name()
1584 .to_owned();
1585 let rw_schema_name = session
1586 .env()
1587 .catalog_reader()
1588 .read_guard()
1589 .get_schema_by_id(&table.database_id, &table.schema_id)?
1590 .name()
1591 .clone();
1592 let iceberg_catalog_name = rw_db_name.clone();
1593 let iceberg_database_name = rw_schema_name.clone();
1594 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1595
1596 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1597 let sink_decouple = session.config().sink_decouple();
1598 if matches!(sink_decouple, SinkDecouple::Disable) {
1599 bail!(
1600 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1601 );
1602 }
1603
1604 let mut connection_ref = BTreeMap::new();
1605 let with_common = if iceberg_engine_connection.is_empty() {
1606 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1607 } else {
1608 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1609 assert_eq!(parts.len(), 2);
1610 let connection_catalog =
1611 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1612 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1613 if params.connection_type == ConnectionType::Iceberg as i32 {
1614 connection_ref.insert(
1616 "connection".to_owned(),
1617 ConnectionRefValue {
1618 connection_name: ObjectName::from(vec![
1619 Ident::from(parts[0]),
1620 Ident::from(parts[1]),
1621 ]),
1622 },
1623 );
1624
1625 let mut with_common = BTreeMap::new();
1626 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1627 with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1628 with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1629
1630 if let Some(s) = params.properties.get("hosted_catalog")
1631 && s.eq_ignore_ascii_case("true")
1632 {
1633 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1634 with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1635 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1636 with_common.insert(
1637 "catalog.jdbc.password".to_owned(),
1638 meta_store_password.clone(),
1639 );
1640 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1641 }
1642
1643 with_common
1644 } else {
1645 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1646 "Only iceberg connection could be used in iceberg engine".to_owned(),
1647 )));
1648 }
1649 } else {
1650 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1651 "Private Link Service has been deprecated. Please create a new connection instead."
1652 .to_owned(),
1653 )));
1654 }
1655 };
1656
1657 let mut pks = table
1660 .pk_column_names()
1661 .iter()
1662 .map(|c| c.to_string())
1663 .collect::<Vec<String>>();
1664
1665 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1667 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1668 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1669 "select {} as {}, * from {}",
1670 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1671 ))
1672 .context("unable to parse query")?
1673 .try_into()
1674 .unwrap();
1675
1676 let Statement::Query(query) = &stmt else {
1677 panic!("unexpected statement: {:?}", stmt);
1678 };
1679 CreateSink::AsQuery(query.clone())
1680 } else {
1681 CreateSink::From(table_name.clone())
1682 };
1683
1684 let mut sink_name = table_name.clone();
1685 *sink_name.0.last_mut().unwrap() = Ident::from(
1686 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1687 );
1688 let create_sink_stmt = CreateSinkStatement {
1689 if_not_exists: false,
1690 sink_name,
1691 with_properties: WithProperties(vec![]),
1692 sink_from,
1693 columns: vec![],
1694 emit_mode: None,
1695 sink_schema: None,
1696 into_table_name: None,
1697 };
1698
1699 let mut sink_handler_args = handler_args.clone();
1700
1701 let mut sink_with = with_common.clone();
1702
1703 if table.append_only {
1704 sink_with.insert("type".to_owned(), "append-only".to_owned());
1705 } else {
1706 sink_with.insert("primary_key".to_owned(), pks.join(","));
1707 sink_with.insert("type".to_owned(), "upsert".to_owned());
1708 }
1709 let commit_checkpoint_interval = handler_args
1728 .with_options
1729 .get(COMMIT_CHECKPOINT_INTERVAL)
1730 .map(|v| v.to_owned())
1731 .unwrap_or_else(|| "60".to_owned());
1732 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1733 ErrorCode::InvalidInputSyntax(format!(
1734 "commit_checkpoint_interval must be a positive integer: {}",
1735 commit_checkpoint_interval
1736 ))
1737 })?;
1738
1739 if commit_checkpoint_interval == 0 {
1740 bail!("commit_checkpoint_interval must be a positive integer: 0");
1741 }
1742
1743 source
1745 .as_mut()
1746 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1747
1748 let sink_decouple = session.config().sink_decouple();
1749 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1750 bail!(
1751 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1752 )
1753 }
1754
1755 sink_with.insert(
1756 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1757 commit_checkpoint_interval.to_string(),
1758 );
1759 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1760
1761 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1762
1763 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1764 match enable_compaction.to_lowercase().as_str() {
1765 "true" => {
1766 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1767 }
1768 "false" => {
1769 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1770 }
1771 _ => {
1772 return Err(ErrorCode::InvalidInputSyntax(format!(
1773 "enable_compaction must be true or false: {}",
1774 enable_compaction
1775 ))
1776 .into());
1777 }
1778 }
1779
1780 source
1782 .as_mut()
1783 .map(|x| x.with_properties.remove("enable_compaction"));
1784 } else {
1785 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1786 }
1787
1788 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1789 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1790 ErrorCode::InvalidInputSyntax(format!(
1791 "compaction_interval_sec must be a positive integer: {}",
1792 commit_checkpoint_interval
1793 ))
1794 })?;
1795 if compaction_interval_sec == 0 {
1796 bail!("compaction_interval_sec must be a positive integer: 0");
1797 }
1798 sink_with.insert(
1799 "compaction_interval_sec".to_owned(),
1800 compaction_interval_sec.to_string(),
1801 );
1802 source
1804 .as_mut()
1805 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1806 }
1807
1808 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1809 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1810 {
1811 source
1813 .as_mut()
1814 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1815 match enable_snapshot_expiration.to_lowercase().as_str() {
1816 "true" => {
1817 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1818 true
1819 }
1820 "false" => {
1821 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1822 false
1823 }
1824 _ => {
1825 return Err(ErrorCode::InvalidInputSyntax(format!(
1826 "enable_snapshot_expiration must be true or false: {}",
1827 enable_snapshot_expiration
1828 ))
1829 .into());
1830 }
1831 }
1832 } else {
1833 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1834 true
1835 };
1836
1837 if has_enabled_snapshot_expiration {
1838 if let Some(snapshot_expiration_retain_last) = handler_args
1840 .with_options
1841 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1842 {
1843 sink_with.insert(
1844 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1845 snapshot_expiration_retain_last.to_owned(),
1846 );
1847 source
1849 .as_mut()
1850 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1851 }
1852
1853 if let Some(snapshot_expiration_max_age) = handler_args
1854 .with_options
1855 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1856 {
1857 sink_with.insert(
1858 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1859 snapshot_expiration_max_age.to_owned(),
1860 );
1861 source
1863 .as_mut()
1864 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1865 }
1866
1867 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1868 .with_options
1869 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1870 {
1871 sink_with.insert(
1872 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1873 snapshot_expiration_clear_expired_files.to_owned(),
1874 );
1875 source.as_mut().map(|x| {
1877 x.with_properties
1878 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1879 });
1880 }
1881
1882 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1883 .with_options
1884 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1885 {
1886 sink_with.insert(
1887 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1888 snapshot_expiration_clear_expired_meta_data.to_owned(),
1889 );
1890 source.as_mut().map(|x| {
1892 x.with_properties
1893 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1894 });
1895 }
1896 }
1897
1898 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1899 match write_mode.to_lowercase().as_str() {
1900 ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1901 sink_with.insert(
1902 WRITE_MODE.to_owned(),
1903 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1904 );
1905 }
1906
1907 ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1908 if table.append_only {
1909 return Err(ErrorCode::NotSupported(
1910 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1911 "Please use MERGE ON READ instead".to_owned(),
1912 )
1913 .into());
1914 }
1915
1916 sink_with.insert(
1917 WRITE_MODE.to_owned(),
1918 ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1919 );
1920 }
1921
1922 _ => {
1923 return Err(ErrorCode::InvalidInputSyntax(format!(
1924 "write_mode must be one of: {}, {}",
1925 ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1926 ))
1927 .into());
1928 }
1929 }
1930
1931 source
1933 .as_mut()
1934 .map(|x| x.with_properties.remove("write_mode"));
1935 } else {
1936 sink_with.insert(
1937 WRITE_MODE.to_owned(),
1938 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1939 );
1940 }
1941
1942 if let Some(max_snapshots_num_before_compaction) =
1943 handler_args.with_options.get(MAX_SNAPSHOTS_NUM)
1944 {
1945 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
1946 .parse::<u32>()
1947 .map_err(|_| {
1948 ErrorCode::InvalidInputSyntax(format!(
1949 "max_snapshots_num_before_compaction must be a positive integer: {}",
1950 max_snapshots_num_before_compaction
1951 ))
1952 })?;
1953 if max_snapshots_num_before_compaction == 0 {
1954 bail!("max_snapshots_num_before_compaction must be a positive integer: 0");
1955 }
1956 sink_with.insert(
1957 MAX_SNAPSHOTS_NUM.to_owned(),
1958 max_snapshots_num_before_compaction.to_string(),
1959 );
1960 }
1961
1962 let partition_by = handler_args
1963 .with_options
1964 .get("partition_by")
1965 .map(|v| v.to_owned());
1966
1967 if let Some(partition_by) = &partition_by {
1968 let mut partition_columns = vec![];
1969 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1970 table
1971 .columns()
1972 .iter()
1973 .find(|col| col.name().eq_ignore_ascii_case(&column))
1974 .ok_or_else(|| {
1975 ErrorCode::InvalidInputSyntax(format!(
1976 "Partition source column does not exist in schema: {}",
1977 column
1978 ))
1979 })?;
1980
1981 partition_columns.push(column.to_owned());
1982 }
1983
1984 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1985 |_| {
1986 ErrorCode::InvalidInputSyntax(
1987 "The partition columns should be the prefix of the primary key".to_owned(),
1988 )
1989 },
1990 )?;
1991
1992 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1993
1994 source
1996 .as_mut()
1997 .map(|x| x.with_properties.remove("partition_by"));
1998 }
1999
2000 sink_handler_args.with_options =
2001 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2002
2003 let mut source_name = table_name.clone();
2004 *source_name.0.last_mut().unwrap() = Ident::from(
2005 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2006 );
2007 let create_source_stmt = CreateSourceStatement {
2008 temporary: false,
2009 if_not_exists: false,
2010 columns: vec![],
2011 source_name,
2012 wildcard_idx: Some(0),
2013 constraints: vec![],
2014 with_properties: WithProperties(vec![]),
2015 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2016 source_watermarks: vec![],
2017 include_column_options: vec![],
2018 };
2019
2020 let mut source_handler_args = handler_args.clone();
2021 let source_with = with_common;
2022 source_handler_args.with_options =
2023 WithOptions::new(source_with, Default::default(), connection_ref);
2024
2025 let _ = Jvm::get_or_init()?;
2028
2029 let catalog_writer = session.catalog_writer()?;
2030 let has_connector = source.is_some();
2032 catalog_writer
2033 .create_table(
2034 source,
2035 table.to_prost(),
2036 graph,
2037 job_type,
2038 if_not_exists,
2039 HashSet::default(),
2040 )
2041 .await?;
2042 let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
2043 if res.is_err() {
2044 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
2046 res?;
2047 }
2048 let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
2049 if res.is_err() {
2050 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
2052 res?;
2053 }
2054
2055 if has_connector {
2056 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
2057 handler_args,
2058 PbThrottleTarget::TableWithSource,
2059 table_name,
2060 -1,
2061 )
2062 .await?;
2063 }
2064
2065 Ok(())
2066}
2067
2068pub fn check_create_table_with_source(
2069 with_options: &WithOptions,
2070 format_encode: Option<FormatEncodeOptions>,
2071 include_column_options: &IncludeOption,
2072 cdc_table_info: &Option<CdcTableInfo>,
2073) -> Result<Option<FormatEncodeOptions>> {
2074 if cdc_table_info.is_some() {
2076 return Ok(format_encode);
2077 }
2078 let defined_source = with_options.is_source_connector();
2079
2080 if !include_column_options.is_empty() && !defined_source {
2081 return Err(ErrorCode::InvalidInputSyntax(
2082 "INCLUDE should be used with a connector".to_owned(),
2083 )
2084 .into());
2085 }
2086 if defined_source {
2087 format_encode.as_ref().ok_or_else(|| {
2088 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2089 })?;
2090 }
2091 Ok(format_encode)
2092}
2093
2094fn ensure_partition_columns_are_prefix_of_primary_key(
2095 partition_columns: &[String],
2096 primary_key_columns: &[String],
2097) -> std::result::Result<(), String> {
2098 if partition_columns.len() > primary_key_columns.len() {
2099 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2100 }
2101
2102 for (i, partition_col) in partition_columns.iter().enumerate() {
2103 if primary_key_columns.get(i) != Some(partition_col) {
2104 return Err(format!(
2105 "Partition column '{}' is not a prefix of the primary key.",
2106 partition_col
2107 ));
2108 }
2109 }
2110
2111 Ok(())
2112}
2113
2114#[allow(clippy::too_many_arguments)]
2115pub async fn generate_stream_graph_for_replace_table(
2116 _session: &Arc<SessionImpl>,
2117 table_name: ObjectName,
2118 original_catalog: &Arc<TableCatalog>,
2119 handler_args: HandlerArgs,
2120 statement: Statement,
2121 col_id_gen: ColumnIdGenerator,
2122 sql_column_strategy: SqlColumnStrategy,
2123) -> Result<(
2124 StreamFragmentGraph,
2125 TableCatalog,
2126 Option<SourceCatalog>,
2127 TableJobType,
2128)> {
2129 let Statement::CreateTable {
2130 columns,
2131 constraints,
2132 source_watermarks,
2133 append_only,
2134 on_conflict,
2135 with_version_columns,
2136 wildcard_idx,
2137 cdc_table_info,
2138 format_encode,
2139 include_column_options,
2140 engine,
2141 with_options,
2142 ..
2143 } = statement
2144 else {
2145 panic!("unexpected statement type: {:?}", statement);
2146 };
2147
2148 let format_encode = format_encode
2149 .clone()
2150 .map(|format_encode| format_encode.into_v2_with_warning());
2151
2152 let engine = match engine {
2153 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2154 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2155 };
2156
2157 let is_drop_connector =
2158 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2159 if is_drop_connector {
2160 debug_assert!(
2161 source_watermarks.is_empty()
2162 && include_column_options.is_empty()
2163 && with_options
2164 .iter()
2165 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2166 );
2167 }
2168
2169 let props = CreateTableProps {
2170 definition: handler_args.normalized_sql.clone(),
2171 append_only,
2172 on_conflict: on_conflict.into(),
2173 with_version_columns: with_version_columns
2174 .iter()
2175 .map(|col| col.real_value())
2176 .collect(),
2177 webhook_info: original_catalog.webhook_info.clone(),
2178 engine,
2179 };
2180
2181 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2182 (Some(format_encode), None) => (
2183 gen_create_table_plan_with_source(
2184 handler_args,
2185 ExplainOptions::default(),
2186 table_name,
2187 columns,
2188 wildcard_idx,
2189 constraints,
2190 format_encode,
2191 source_watermarks,
2192 col_id_gen,
2193 include_column_options,
2194 props,
2195 sql_column_strategy,
2196 )
2197 .await?,
2198 TableJobType::General,
2199 ),
2200 (None, None) => {
2201 let context = OptimizerContext::from_handler_args(handler_args);
2202 let (plan, table) = gen_create_table_plan(
2203 context,
2204 table_name,
2205 columns,
2206 constraints,
2207 col_id_gen,
2208 source_watermarks,
2209 props,
2210 true,
2211 )?;
2212 ((plan, None, table), TableJobType::General)
2213 }
2214 (None, Some(cdc_table)) => {
2215 let session = &handler_args.session;
2216 let (source, resolved_table_name, database_id, schema_id) =
2217 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2218
2219 let cdc_with_options = derive_with_options_for_cdc_table(
2220 &source.with_properties,
2221 cdc_table.external_table_name.clone(),
2222 )?;
2223
2224 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2225
2226 let context: OptimizerContextRef =
2227 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2228 let (plan, table) = gen_create_table_plan_for_cdc_table(
2229 context,
2230 source,
2231 cdc_table.external_table_name.clone(),
2232 columns,
2233 column_catalogs,
2234 pk_names,
2235 cdc_with_options,
2236 col_id_gen,
2237 on_conflict,
2238 with_version_columns
2239 .iter()
2240 .map(|col| col.real_value())
2241 .collect(),
2242 include_column_options,
2243 table_name,
2244 resolved_table_name,
2245 database_id,
2246 schema_id,
2247 original_catalog.id(),
2248 engine,
2249 )?;
2250
2251 ((plan, None, table), TableJobType::SharedCdcSource)
2252 }
2253 (Some(_), Some(_)) => {
2254 return Err(ErrorCode::NotSupported(
2255 "Data format and encoding format doesn't apply to table created from a CDC source"
2256 .into(),
2257 "Remove the FORMAT and ENCODE specification".into(),
2258 )
2259 .into());
2260 }
2261 };
2262
2263 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2264 Err(ErrorCode::InvalidInputSyntax(
2265 "alter primary key of table is not supported".to_owned(),
2266 ))?
2267 }
2268
2269 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2270
2271 table.id = original_catalog.id();
2273 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2274 table.associated_source_id = Some(source_id);
2275
2276 let source = source.as_mut().unwrap();
2277 source.id = source_id.table_id;
2278 source.associated_table_id = Some(table.id());
2279 }
2280
2281 Ok((graph, table, source, job_type))
2282}
2283
2284fn get_source_and_resolved_table_name(
2285 session: &Arc<SessionImpl>,
2286 cdc_table: CdcTableInfo,
2287 table_name: ObjectName,
2288) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2289 let db_name = &session.database();
2290 let (schema_name, resolved_table_name) =
2291 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2292 let (database_id, schema_id) =
2293 session.get_database_and_schema_id_for_create(schema_name.clone())?;
2294
2295 let (format_encode, source_name) =
2296 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2297
2298 let source = {
2299 let catalog_reader = session.env().catalog_reader().read_guard();
2300 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2301 let (source, _) = catalog_reader.get_source_by_name(
2302 db_name,
2303 SchemaPath::Name(schema_name.as_str()),
2304 source_name.as_str(),
2305 )?;
2306 source.clone()
2307 };
2308
2309 Ok((source, resolved_table_name, database_id, schema_id))
2310}
2311
2312fn bind_webhook_info(
2314 session: &Arc<SessionImpl>,
2315 columns_defs: &[ColumnDef],
2316 webhook_info: WebhookSourceInfo,
2317) -> Result<PbWebhookSourceInfo> {
2318 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2320 {
2321 return Err(ErrorCode::InvalidInputSyntax(
2322 "Table with webhook source should have exactly one JSONB column".to_owned(),
2323 )
2324 .into());
2325 }
2326
2327 let WebhookSourceInfo {
2328 secret_ref,
2329 signature_expr,
2330 wait_for_persistence,
2331 is_batched,
2332 } = webhook_info;
2333
2334 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2336 let db_name = &session.database();
2337 let (schema_name, secret_name) =
2338 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2339 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2340 (
2341 Some(PbSecretRef {
2342 secret_id: secret_catalog.id.secret_id(),
2343 ref_as: match secret_ref.ref_as {
2344 SecretRefAsType::Text => PbRefAsType::Text,
2345 SecretRefAsType::File => PbRefAsType::File,
2346 }
2347 .into(),
2348 }),
2349 Some(secret_name),
2350 )
2351 } else {
2352 (None, None)
2353 };
2354
2355 let secure_compare_context = SecureCompareContext {
2356 column_name: columns_defs[0].name.real_value(),
2357 secret_name,
2358 };
2359 let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2360 let expr = binder.bind_expr(&signature_expr)?;
2361
2362 if expr.as_function_call().is_none()
2364 || expr.as_function_call().unwrap().func_type()
2365 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2366 {
2367 return Err(ErrorCode::InvalidInputSyntax(
2368 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2369 )
2370 .into());
2371 }
2372
2373 let pb_webhook_info = PbWebhookSourceInfo {
2374 secret_ref: pb_secret_ref,
2375 signature_expr: Some(expr.to_expr_proto()),
2376 wait_for_persistence,
2377 is_batched,
2378 };
2379
2380 Ok(pb_webhook_info)
2381}
2382
2383#[cfg(test)]
2384mod tests {
2385 use risingwave_common::catalog::{
2386 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2387 };
2388 use risingwave_common::types::{DataType, StructType};
2389
2390 use super::*;
2391 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2392
2393 #[tokio::test]
2394 async fn test_create_table_handler() {
2395 let sql =
2396 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2397 let frontend = LocalFrontend::new(Default::default()).await;
2398 frontend.run_sql(sql).await.unwrap();
2399
2400 let session = frontend.session_ref();
2401 let catalog_reader = session.env().catalog_reader().read_guard();
2402 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2403
2404 let (table, _) = catalog_reader
2406 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2407 .unwrap();
2408 assert_eq!(table.name(), "t");
2409
2410 let columns = table
2411 .columns
2412 .iter()
2413 .map(|col| (col.name(), col.data_type().clone()))
2414 .collect::<HashMap<&str, DataType>>();
2415
2416 let expected_columns = maplit::hashmap! {
2417 ROW_ID_COLUMN_NAME => DataType::Serial,
2418 "v1" => DataType::Int16,
2419 "v2" => StructType::new(
2420 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2421 )
2422 .with_ids([3, 4, 5].map(ColumnId::new))
2423 .into(),
2424 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2425 };
2426
2427 assert_eq!(columns, expected_columns, "{columns:#?}");
2428 }
2429
2430 #[test]
2431 fn test_bind_primary_key() {
2432 for (sql, expected) in [
2435 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2436 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2437 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2438 (
2439 "create table t (v1 int primary key, v2 int primary key)",
2440 Err("multiple primary keys are not allowed"),
2441 ),
2442 (
2443 "create table t (v1 int primary key primary key, v2 int)",
2444 Err("multiple primary keys are not allowed"),
2445 ),
2446 (
2447 "create table t (v1 int, v2 int, primary key (v1))",
2448 Ok(&[1]),
2449 ),
2450 (
2451 "create table t (v1 int, primary key (v2), v2 int)",
2452 Ok(&[2]),
2453 ),
2454 (
2455 "create table t (primary key (v2, v1), v1 int, v2 int)",
2456 Ok(&[2, 1]),
2457 ),
2458 (
2459 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2460 Err("multiple primary keys are not allowed"),
2461 ),
2462 (
2463 "create table t (v1 int primary key, primary key (v1), v2 int)",
2464 Err("multiple primary keys are not allowed"),
2465 ),
2466 (
2467 "create table t (v1 int, primary key (V3), v2 int)",
2468 Err("column \"v3\" named in key does not exist"),
2469 ),
2470 ] {
2471 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2472 let risingwave_sqlparser::ast::Statement::CreateTable {
2473 columns: column_defs,
2474 constraints,
2475 ..
2476 } = ast.remove(0)
2477 else {
2478 panic!("test case should be create table")
2479 };
2480 let actual: Result<_> = (|| {
2481 let mut columns = bind_sql_columns(&column_defs, false)?;
2482 let mut col_id_gen = ColumnIdGenerator::new_initial();
2483 for c in &mut columns {
2484 col_id_gen.generate(c)?;
2485 }
2486
2487 let pk_names =
2488 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2489 let (_, pk_column_ids, _) =
2490 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2491 Ok(pk_column_ids)
2492 })();
2493 match (expected, actual) {
2494 (Ok(expected), Ok(actual)) => assert_eq!(
2495 expected.iter().copied().map(ColumnId::new).collect_vec(),
2496 actual,
2497 "sql: {sql}"
2498 ),
2499 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2500 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2501 (Err(expected), Err(actual)) => assert!(
2502 actual.to_string().contains(expected),
2503 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2504 ),
2505 }
2506 }
2507 }
2508
2509 #[tokio::test]
2510 async fn test_duplicate_props_options() {
2511 let proto_file = create_proto_file(PROTO_FILE_DATA);
2512 let sql = format!(
2513 r#"CREATE TABLE t
2514 WITH (
2515 connector = 'kinesis',
2516 aws.region='user_test_topic',
2517 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2518 aws.credentials.access_key_id = 'your_access_key_1',
2519 aws.credentials.secret_access_key = 'your_secret_key_1'
2520 )
2521 FORMAT PLAIN ENCODE PROTOBUF (
2522 message = '.test.TestRecord',
2523 aws.credentials.access_key_id = 'your_access_key_2',
2524 aws.credentials.secret_access_key = 'your_secret_key_2',
2525 schema.location = 'file://{}',
2526 )"#,
2527 proto_file.path().to_str().unwrap()
2528 );
2529 let frontend = LocalFrontend::new(Default::default()).await;
2530 frontend.run_sql(sql).await.unwrap();
2531
2532 let session = frontend.session_ref();
2533 let catalog_reader = session.env().catalog_reader().read_guard();
2534 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2535
2536 let (source, _) = catalog_reader
2538 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2539 .unwrap();
2540 assert_eq!(source.name, "t");
2541
2542 assert_eq!(
2544 source
2545 .info
2546 .format_encode_options
2547 .get("aws.credentials.access_key_id")
2548 .unwrap(),
2549 "your_access_key_2"
2550 );
2551 assert_eq!(
2552 source
2553 .info
2554 .format_encode_options
2555 .get("aws.credentials.secret_access_key")
2556 .unwrap(),
2557 "your_secret_key_2"
2558 );
2559
2560 assert_eq!(
2562 source
2563 .with_properties
2564 .get("aws.credentials.access_key_id")
2565 .unwrap(),
2566 "your_access_key_1"
2567 );
2568 assert_eq!(
2569 source
2570 .with_properties
2571 .get("aws.credentials.secret_access_key")
2572 .unwrap(),
2573 "your_secret_key_1"
2574 );
2575
2576 assert!(!source.with_properties.contains_key("schema.location"));
2578 }
2579}