1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::global_jvm::JVM;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40 CdcTableType, DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY,
41 TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76 bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
80use crate::optimizer::plan_node::{
81 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
82};
83use crate::optimizer::property::{Order, RequiredDist};
84use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
85use crate::session::SessionImpl;
86use crate::session::current::notice_to_user;
87use crate::stream_fragmenter::{GraphJobType, build_graph};
88use crate::utils::OverwriteOptions;
89use crate::{Binder, Explain, TableCatalog, WithOptions};
90
91mod col_id_gen;
92pub use col_id_gen::*;
93use risingwave_connector::sink::iceberg::{
94 COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
95 ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, WRITE_MODE,
96 parse_partition_by_exprs,
97};
98
99use crate::handler::drop_table::handle_drop_table;
100
101fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
102 for option_def in &c.options {
103 match option_def.option {
104 ColumnOption::GeneratedColumns(_) => {}
105 ColumnOption::DefaultValue(_) => {}
106 ColumnOption::DefaultValueInternal { .. } => {}
107 ColumnOption::Unique { is_primary: true } => {}
108 ColumnOption::Null => {}
109 ColumnOption::NotNull => {}
110 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
111 }
112 }
113 Ok(())
114}
115
116pub fn bind_sql_columns(
120 column_defs: &[ColumnDef],
121 is_for_drop_table_connector: bool,
122) -> Result<Vec<ColumnCatalog>> {
123 let mut columns = Vec::with_capacity(column_defs.len());
124
125 for column in column_defs {
126 ensure_column_options_supported(column)?;
127 let ColumnDef {
131 name,
132 data_type,
133 collation,
134 options,
135 ..
136 } = column;
137
138 let data_type = data_type
139 .clone()
140 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
141 if let Some(collation) = collation {
142 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
148 bail_not_implemented!(
149 "Collate collation other than `C` or `POSIX` is not implemented"
150 );
151 }
152
153 match data_type {
154 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
155 _ => {
156 return Err(ErrorCode::NotSupported(
157 format!("{} is not a collatable data type", data_type),
158 "The only built-in collatable data types are `varchar`, please check your type".into(),
159 ).into());
160 }
161 }
162 }
163
164 if !is_for_drop_table_connector {
165 check_column_name_not_reserved(&name.real_value())?;
169 }
170
171 let nullable: bool = !options
172 .iter()
173 .any(|def| matches!(def.option, ColumnOption::NotNull));
174
175 columns.push(ColumnCatalog {
176 column_desc: ColumnDesc {
177 data_type: bind_data_type(&data_type)?,
178 column_id: ColumnId::placeholder(),
179 name: name.real_value(),
180 generated_or_default_column: None,
181 description: None,
182 additional_column: AdditionalColumn { column_type: None },
183 version: ColumnDescVersion::LATEST,
184 system_column: None,
185 nullable,
186 },
187 is_hidden: false,
188 });
189 }
190
191 Ok(columns)
192}
193
194fn check_generated_column_constraints(
195 column_name: &String,
196 column_id: ColumnId,
197 expr: &ExprImpl,
198 column_catalogs: &[ColumnCatalog],
199 generated_column_names: &[String],
200 pk_column_ids: &[ColumnId],
201) -> Result<()> {
202 let input_refs = expr.collect_input_refs(column_catalogs.len());
203 for idx in input_refs.ones() {
204 let referred_generated_column = &column_catalogs[idx].column_desc.name;
205 if generated_column_names
206 .iter()
207 .any(|c| c == referred_generated_column)
208 {
209 return Err(ErrorCode::BindError(format!(
210 "Generated can not reference another generated column. \
211 But here generated column \"{}\" referenced another generated column \"{}\"",
212 column_name, referred_generated_column
213 ))
214 .into());
215 }
216 }
217
218 if pk_column_ids.contains(&column_id) && expr.is_impure() {
219 return Err(ErrorCode::BindError(format!(
220 "Generated columns with impure expressions should not be part of the primary key. \
221 Here column \"{}\" is defined as part of the primary key.",
222 column_name
223 ))
224 .into());
225 }
226
227 Ok(())
228}
229
230pub fn bind_sql_column_constraints(
233 session: &SessionImpl,
234 table_name: String,
235 column_catalogs: &mut [ColumnCatalog],
236 columns: &[ColumnDef],
237 pk_column_ids: &[ColumnId],
238) -> Result<()> {
239 let generated_column_names = {
240 let mut names = vec![];
241 for column in columns {
242 for option_def in &column.options {
243 if let ColumnOption::GeneratedColumns(_) = option_def.option {
244 names.push(column.name.real_value());
245 break;
246 }
247 }
248 }
249 names
250 };
251
252 let mut binder = Binder::new_for_ddl(session);
253 binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
254
255 for column in columns {
256 let Some(idx) = column_catalogs
257 .iter()
258 .position(|c| c.name() == column.name.real_value())
259 else {
260 continue;
263 };
264
265 for option_def in &column.options {
266 match &option_def.option {
267 ColumnOption::GeneratedColumns(expr) => {
268 binder.set_clause(Some(Clause::GeneratedColumn));
269
270 let expr_impl = binder.bind_expr(expr).with_context(|| {
271 format!(
272 "fail to bind expression in generated column \"{}\"",
273 column.name.real_value()
274 )
275 })?;
276
277 check_generated_column_constraints(
278 &column.name.real_value(),
279 column_catalogs[idx].column_id(),
280 &expr_impl,
281 column_catalogs,
282 &generated_column_names,
283 pk_column_ids,
284 )?;
285
286 column_catalogs[idx].column_desc.generated_or_default_column = Some(
287 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
288 expr: Some(expr_impl.to_expr_proto()),
289 }),
290 );
291 binder.set_clause(None);
292 }
293 ColumnOption::DefaultValue(expr) => {
294 let expr_impl = binder
295 .bind_expr(expr)?
296 .cast_assign(column_catalogs[idx].data_type())?;
297
298 let rewritten_expr_impl = session
306 .pinned_snapshot()
307 .inline_now_proc_time()
308 .rewrite_expr(expr_impl.clone());
309
310 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
311 let snapshot_value = snapshot_value?;
312
313 column_catalogs[idx].column_desc.generated_or_default_column =
314 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
315 snapshot_value: Some(snapshot_value.to_protobuf()),
316 expr: Some(expr_impl.to_expr_proto()),
317 }));
319 } else {
320 return Err(ErrorCode::BindError(format!(
321 "Default expression used in column `{}` cannot be evaluated. \
322 Use generated columns instead if you mean to reference other columns.",
323 column.name
324 ))
325 .into());
326 }
327 }
328 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
329 if persisted.is_empty() {
334 bail_bind_error!(
335 "DEFAULT INTERNAL is only used for internal purposes, \
336 please specify a concrete default value"
337 );
338 }
339
340 let desc = DefaultColumnDesc::decode(&**persisted)
341 .expect("failed to decode persisted `DefaultColumnDesc`");
342
343 column_catalogs[idx].column_desc.generated_or_default_column =
344 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
345 }
346 _ => {}
347 }
348 }
349 }
350 Ok(())
351}
352
353pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
355 let mut pk_column_names = vec![];
356
357 for constraint in table_constraints {
358 match constraint {
359 TableConstraint::Unique {
360 name: _,
361 columns,
362 is_primary: true,
363 } => {
364 if !pk_column_names.is_empty() {
365 return Err(multiple_pk_definition_err());
366 }
367 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
368 }
369 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
370 }
371 }
372 Ok(pk_column_names)
373}
374
375pub fn bind_sql_pk_names(
376 columns_defs: &[ColumnDef],
377 pk_names_from_table_constraints: Vec<String>,
378) -> Result<Vec<String>> {
379 let mut pk_column_names = pk_names_from_table_constraints;
380
381 for column in columns_defs {
382 for option_def in &column.options {
383 if let ColumnOption::Unique { is_primary: true } = option_def.option {
384 if !pk_column_names.is_empty() {
385 return Err(multiple_pk_definition_err());
386 }
387 pk_column_names.push(column.name.real_value());
388 };
389 }
390 }
391
392 Ok(pk_column_names)
393}
394
395fn multiple_pk_definition_err() -> RwError {
396 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
397}
398
399pub fn bind_pk_and_row_id_on_relation(
404 mut columns: Vec<ColumnCatalog>,
405 pk_names: Vec<String>,
406 must_need_pk: bool,
407) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
408 for c in &columns {
409 assert!(c.column_id() != ColumnId::placeholder());
410 }
411
412 let name_to_id = columns
414 .iter()
415 .map(|c| (c.name(), c.column_id()))
416 .collect::<HashMap<_, _>>();
417
418 let mut pk_column_ids: Vec<_> = pk_names
419 .iter()
420 .map(|name| {
421 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
422 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
423 })
424 })
425 .try_collect()?;
426
427 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
429
430 let row_id_index = need_row_id.then(|| {
431 let column = ColumnCatalog::row_id_column();
432 let index = columns.len();
433 pk_column_ids = vec![column.column_id()];
434 columns.push(column);
435 index
436 });
437
438 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
439 Err(ErrorCode::InvalidInputSyntax(format!(
440 "column \"{col}\" specified more than once"
441 )))?;
442 }
443
444 Ok((columns, pk_column_ids, row_id_index))
445}
446
447#[allow(clippy::too_many_arguments)]
450pub(crate) async fn gen_create_table_plan_with_source(
451 mut handler_args: HandlerArgs,
452 explain_options: ExplainOptions,
453 table_name: ObjectName,
454 column_defs: Vec<ColumnDef>,
455 wildcard_idx: Option<usize>,
456 constraints: Vec<TableConstraint>,
457 format_encode: FormatEncodeOptions,
458 source_watermarks: Vec<SourceWatermark>,
459 mut col_id_gen: ColumnIdGenerator,
460 include_column_options: IncludeOption,
461 props: CreateTableProps,
462 sql_column_strategy: SqlColumnStrategy,
463) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
464 if props.append_only
465 && format_encode.format != Format::Plain
466 && format_encode.format != Format::Native
467 {
468 return Err(ErrorCode::BindError(format!(
469 "Append only table does not support format {}.",
470 format_encode.format
471 ))
472 .into());
473 }
474
475 let session = &handler_args.session;
476 let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
477 if with_properties.is_shareable_cdc_connector() {
478 generated_columns_check_for_cdc_table(&column_defs)?;
479 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
480 } else if column_defs.iter().any(|col| {
481 col.options
482 .iter()
483 .any(|def| matches!(def.option, ColumnOption::NotNull))
484 }) {
485 notice_to_user(
487 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
488 );
489 }
490
491 let db_name: &str = &session.database();
492 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
493
494 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
496 session,
497 &format_encode,
498 Either::Left(&with_properties),
499 CreateSourceType::Table,
500 )
501 .await?;
502
503 let overwrite_options = OverwriteOptions::new(&mut handler_args);
504 let rate_limit = overwrite_options.source_rate_limit;
505 let source = bind_create_source_or_table_with_connector(
506 handler_args.clone(),
507 table_name,
508 format_encode,
509 with_properties,
510 &column_defs,
511 constraints,
512 wildcard_idx,
513 source_watermarks,
514 columns_from_resolve_source,
515 source_info,
516 include_column_options,
517 &mut col_id_gen,
518 CreateSourceType::Table,
519 rate_limit,
520 sql_column_strategy,
521 )
522 .await?;
523
524 let context = OptimizerContext::new(handler_args, explain_options);
525
526 let (plan, table) = gen_table_plan_with_source(
527 context.into(),
528 schema_name,
529 source.clone(),
530 col_id_gen.into_version(),
531 props,
532 )?;
533
534 Ok((plan, Some(source), table))
535}
536
537#[allow(clippy::too_many_arguments)]
540pub(crate) fn gen_create_table_plan(
541 context: OptimizerContext,
542 table_name: ObjectName,
543 column_defs: Vec<ColumnDef>,
544 constraints: Vec<TableConstraint>,
545 mut col_id_gen: ColumnIdGenerator,
546 source_watermarks: Vec<SourceWatermark>,
547 props: CreateTableProps,
548 is_for_replace_plan: bool,
549) -> Result<(PlanRef, TableCatalog)> {
550 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
551 for c in &mut columns {
552 col_id_gen.generate(c)?;
553 }
554
555 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
556 if !secret_refs.is_empty() || !connection_refs.is_empty() {
557 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
558 }
559
560 gen_create_table_plan_without_source(
561 context,
562 table_name,
563 columns,
564 column_defs,
565 constraints,
566 source_watermarks,
567 col_id_gen.into_version(),
568 props,
569 )
570}
571
572#[allow(clippy::too_many_arguments)]
573pub(crate) fn gen_create_table_plan_without_source(
574 context: OptimizerContext,
575 table_name: ObjectName,
576 columns: Vec<ColumnCatalog>,
577 column_defs: Vec<ColumnDef>,
578 constraints: Vec<TableConstraint>,
579 source_watermarks: Vec<SourceWatermark>,
580 version: TableVersion,
581 props: CreateTableProps,
582) -> Result<(PlanRef, TableCatalog)> {
583 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
585 let (mut columns, pk_column_ids, row_id_index) =
586 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
587
588 let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
589 context.session_ctx(),
590 table_name.real_value(),
591 source_watermarks,
592 &columns,
593 )?;
594
595 bind_sql_column_constraints(
596 context.session_ctx(),
597 table_name.real_value(),
598 &mut columns,
599 &column_defs,
600 &pk_column_ids,
601 )?;
602 let session = context.session_ctx().clone();
603
604 let db_name = &session.database();
605 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
606
607 let info = CreateTableInfo {
608 columns,
609 pk_column_ids,
610 row_id_index,
611 watermark_descs,
612 source_catalog: None,
613 version,
614 };
615
616 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
617}
618
619fn gen_table_plan_with_source(
620 context: OptimizerContextRef,
621 schema_name: Option<String>,
622 source_catalog: SourceCatalog,
623 version: TableVersion,
624 props: CreateTableProps,
625) -> Result<(PlanRef, TableCatalog)> {
626 let table_name = source_catalog.name.clone();
627
628 let info = CreateTableInfo {
629 columns: source_catalog.columns.clone(),
630 pk_column_ids: source_catalog.pk_col_ids.clone(),
631 row_id_index: source_catalog.row_id_index,
632 watermark_descs: source_catalog.watermark_descs.clone(),
633 source_catalog: Some(source_catalog),
634 version,
635 };
636
637 gen_table_plan_inner(context, schema_name, table_name, info, props)
638}
639
640#[derive(Clone, Copy)]
642pub enum EitherOnConflict {
643 Ast(Option<OnConflict>),
644 Resolved(ConflictBehavior),
645}
646
647impl From<Option<OnConflict>> for EitherOnConflict {
648 fn from(v: Option<OnConflict>) -> Self {
649 Self::Ast(v)
650 }
651}
652
653impl From<ConflictBehavior> for EitherOnConflict {
654 fn from(v: ConflictBehavior) -> Self {
655 Self::Resolved(v)
656 }
657}
658
659impl EitherOnConflict {
660 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
662 let conflict_behavior = match self {
663 EitherOnConflict::Ast(on_conflict) => {
664 if append_only {
665 if row_id_as_pk {
666 ConflictBehavior::NoCheck
668 } else {
669 if let Some(on_conflict) = on_conflict
671 && on_conflict != OnConflict::Nothing
672 {
673 return Err(ErrorCode::InvalidInputSyntax(
674 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
675 the ON CONFLICT behavior must be DO NOTHING."
676 .to_owned(),
677 )
678 .into());
679 }
680 ConflictBehavior::IgnoreConflict
681 }
682 } else {
683 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
685 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
686 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
687 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
688 }
689 }
690 }
691 EitherOnConflict::Resolved(b) => b,
692 };
693
694 Ok(conflict_behavior)
695 }
696}
697
698pub struct CreateTableInfo {
703 pub columns: Vec<ColumnCatalog>,
704 pub pk_column_ids: Vec<ColumnId>,
705 pub row_id_index: Option<usize>,
706 pub watermark_descs: Vec<WatermarkDesc>,
707 pub source_catalog: Option<SourceCatalog>,
708 pub version: TableVersion,
709}
710
711pub struct CreateTableProps {
716 pub definition: String,
717 pub append_only: bool,
718 pub on_conflict: EitherOnConflict,
719 pub with_version_column: Option<String>,
720 pub webhook_info: Option<PbWebhookSourceInfo>,
721 pub engine: Engine,
722}
723
724#[allow(clippy::too_many_arguments)]
725fn gen_table_plan_inner(
726 context: OptimizerContextRef,
727 schema_name: Option<String>,
728 table_name: String,
729 info: CreateTableInfo,
730 props: CreateTableProps,
731) -> Result<(PlanRef, TableCatalog)> {
732 let CreateTableInfo {
733 ref columns,
734 row_id_index,
735 ref watermark_descs,
736 ref source_catalog,
737 ..
738 } = info;
739 let CreateTableProps { append_only, .. } = props;
740
741 let (database_id, schema_id) = context
742 .session_ctx()
743 .get_database_and_schema_id_for_create(schema_name)?;
744
745 let session = context.session_ctx().clone();
746 let retention_seconds = context.with_options().retention_seconds();
747
748 let source_node: LogicalPlanRef = LogicalSource::new(
749 source_catalog.clone().map(Rc::new),
750 columns.clone(),
751 row_id_index,
752 SourceNodeKind::CreateTable,
753 context.clone(),
754 None,
755 )?
756 .into();
757
758 let required_cols = FixedBitSet::with_capacity(columns.len());
759 let plan_root = PlanRoot::new_with_logical_plan(
760 source_node,
761 RequiredDist::Any,
762 Order::any(),
763 required_cols,
764 vec![],
765 );
766
767 if !append_only && !watermark_descs.is_empty() {
768 return Err(ErrorCode::NotSupported(
769 "Defining watermarks on table requires the table to be append only.".to_owned(),
770 "Use the key words `APPEND ONLY`".to_owned(),
771 )
772 .into());
773 }
774
775 if !append_only && retention_seconds.is_some() {
776 return Err(ErrorCode::NotSupported(
777 "Defining retention seconds on table requires the table to be append only.".to_owned(),
778 "Use the key words `APPEND ONLY`".to_owned(),
779 )
780 .into());
781 }
782
783 let materialize =
784 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
785
786 let mut table = materialize.table().clone();
787 table.owner = session.user_id();
788
789 Ok((materialize.into(), table))
790}
791
792#[allow(clippy::too_many_arguments)]
796pub(crate) fn gen_create_table_plan_for_cdc_table(
797 context: OptimizerContextRef,
798 source: Arc<SourceCatalog>,
799 external_table_name: String,
800 column_defs: Vec<ColumnDef>,
801 mut columns: Vec<ColumnCatalog>,
802 pk_names: Vec<String>,
803 cdc_with_options: WithOptionsSecResolved,
804 mut col_id_gen: ColumnIdGenerator,
805 on_conflict: Option<OnConflict>,
806 with_version_column: Option<String>,
807 include_column_options: IncludeOption,
808 table_name: ObjectName,
809 resolved_table_name: String, database_id: DatabaseId,
811 schema_id: SchemaId,
812 table_id: TableId,
813 engine: Engine,
814) -> Result<(PlanRef, TableCatalog)> {
815 let session = context.session_ctx().clone();
816
817 handle_addition_columns(
819 None,
820 &cdc_with_options,
821 include_column_options,
822 &mut columns,
823 true,
824 )?;
825
826 for c in &mut columns {
827 col_id_gen.generate(c)?;
828 }
829
830 let (mut columns, pk_column_ids, _row_id_index) =
831 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
832
833 bind_sql_column_constraints(
835 context.session_ctx(),
836 table_name.real_value(),
837 &mut columns,
838 &column_defs,
839 &pk_column_ids,
840 )?;
841
842 let definition = context.normalized_sql().to_owned();
843
844 let pk_column_indices = {
845 let mut id_to_idx = HashMap::new();
846 columns.iter().enumerate().for_each(|(idx, c)| {
847 id_to_idx.insert(c.column_id(), idx);
848 });
849 pk_column_ids
851 .iter()
852 .map(|c| id_to_idx.get(c).copied().unwrap())
853 .collect_vec()
854 };
855 let table_pk = pk_column_indices
856 .iter()
857 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
858 .collect();
859
860 let (options, secret_refs) = cdc_with_options.into_parts();
861
862 let non_generated_column_descs = columns
863 .iter()
864 .filter(|&c| (!c.is_generated()))
865 .map(|c| c.column_desc.clone())
866 .collect_vec();
867 let non_generated_column_num = non_generated_column_descs.len();
868 let cdc_table_type = CdcTableType::from_properties(&options);
869 let cdc_table_desc = CdcTableDesc {
870 table_id,
871 source_id: source.id.into(), external_table_name: external_table_name.clone(),
873 pk: table_pk,
874 columns: non_generated_column_descs,
875 stream_key: pk_column_indices,
876 connect_properties: options,
877 secret_refs,
878 };
879
880 tracing::debug!(?cdc_table_desc, "create cdc table");
881 let options = build_cdc_scan_options_with_options(context.with_options(), cdc_table_type)?;
882
883 let logical_scan = LogicalCdcScan::create(
884 external_table_name.clone(),
885 Rc::new(cdc_table_desc),
886 context.clone(),
887 options,
888 );
889
890 let scan_node: LogicalPlanRef = logical_scan.into();
891 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
892 let plan_root = PlanRoot::new_with_logical_plan(
893 scan_node,
894 RequiredDist::Any,
895 Order::any(),
896 required_cols,
897 vec![],
898 );
899
900 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
901 let materialize = plan_root.gen_table_plan(
902 context,
903 resolved_table_name,
904 database_id,
905 schema_id,
906 CreateTableInfo {
907 columns,
908 pk_column_ids,
909 row_id_index: None,
910 watermark_descs: vec![],
911 source_catalog: Some((*source).clone()),
912 version: col_id_gen.into_version(),
913 },
914 CreateTableProps {
915 definition,
916 append_only: false,
917 on_conflict: on_conflict.into(),
918 with_version_column,
919 webhook_info: None,
920 engine,
921 },
922 )?;
923
924 let mut table = materialize.table().clone();
925 table.owner = session.user_id();
926 table.cdc_table_id = Some(cdc_table_id);
927
928 Ok((materialize.into(), table))
929}
930
931fn derive_with_options_for_cdc_table(
932 source_with_properties: &WithOptionsSecResolved,
933 external_table_name: String,
934) -> Result<WithOptionsSecResolved> {
935 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
936 let source_database_name: &str = source_with_properties
938 .get("database.name")
939 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
940 .as_str();
941 let mut with_options = source_with_properties.clone();
942 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
943 match connector.as_str() {
944 MYSQL_CDC_CONNECTOR => {
945 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
948 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
949 })?;
950 if !source_database_name
952 .split(',')
953 .map(|s| s.trim())
954 .any(|name| name == db_name)
955 {
956 return Err(anyhow!(
957 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
958 db_name,
959 source_database_name
960 ).into());
961 }
962 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
963 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
964 }
965 POSTGRES_CDC_CONNECTOR => {
966 let (schema_name, table_name) = external_table_name
967 .split_once('.')
968 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
969
970 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
972 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
973 }
974 SQL_SERVER_CDC_CONNECTOR => {
975 let parts: Vec<&str> = external_table_name.split('.').collect();
981 let (_, schema_name, table_name) = match parts.len() {
982 3 => {
983 let db_name = parts[0];
985 let schema_name = parts[1];
986 let table_name = parts[2];
987
988 if db_name != source_database_name {
990 return Err(anyhow!(
991 "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
992 db_name,
993 source_database_name
994 ).into());
995 }
996 (db_name, schema_name, table_name)
997 }
998 2 => {
999 let schema_name = parts[0];
1001 let table_name = parts[1];
1002 (source_database_name, schema_name, table_name)
1003 }
1004 1 => {
1005 let table_name = parts[0];
1007 (source_database_name, "dbo", table_name)
1008 }
1009 _ => {
1010 return Err(anyhow!(
1011 "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1012 ).into());
1013 }
1014 };
1015
1016 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1018 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1019 }
1020 _ => {
1021 return Err(RwError::from(anyhow!(
1022 "connector {} is not supported for cdc table",
1023 connector
1024 )));
1025 }
1026 };
1027 }
1028 Ok(with_options)
1029}
1030
1031#[allow(clippy::too_many_arguments)]
1032pub(super) async fn handle_create_table_plan(
1033 handler_args: HandlerArgs,
1034 explain_options: ExplainOptions,
1035 format_encode: Option<FormatEncodeOptions>,
1036 cdc_table_info: Option<CdcTableInfo>,
1037 table_name: &ObjectName,
1038 column_defs: Vec<ColumnDef>,
1039 wildcard_idx: Option<usize>,
1040 constraints: Vec<TableConstraint>,
1041 source_watermarks: Vec<SourceWatermark>,
1042 append_only: bool,
1043 on_conflict: Option<OnConflict>,
1044 with_version_column: Option<String>,
1045 include_column_options: IncludeOption,
1046 webhook_info: Option<WebhookSourceInfo>,
1047 engine: Engine,
1048) -> Result<(
1049 PlanRef,
1050 Option<SourceCatalog>,
1051 TableCatalog,
1052 TableJobType,
1053 Option<SourceId>,
1054)> {
1055 let col_id_gen = ColumnIdGenerator::new_initial();
1056 let format_encode = check_create_table_with_source(
1057 &handler_args.with_options,
1058 format_encode,
1059 &include_column_options,
1060 &cdc_table_info,
1061 )?;
1062 let webhook_info = webhook_info
1063 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1064 .transpose()?;
1065
1066 let props = CreateTableProps {
1067 definition: handler_args.normalized_sql.clone(),
1068 append_only,
1069 on_conflict: on_conflict.into(),
1070 with_version_column: with_version_column.clone(),
1071 webhook_info,
1072 engine,
1073 };
1074
1075 let ((plan, source, table), job_type, shared_shource_id) = match (
1076 format_encode,
1077 cdc_table_info.as_ref(),
1078 ) {
1079 (Some(format_encode), None) => (
1080 gen_create_table_plan_with_source(
1081 handler_args,
1082 explain_options,
1083 table_name.clone(),
1084 column_defs,
1085 wildcard_idx,
1086 constraints,
1087 format_encode,
1088 source_watermarks,
1089 col_id_gen,
1090 include_column_options,
1091 props,
1092 SqlColumnStrategy::FollowChecked,
1093 )
1094 .await?,
1095 TableJobType::General,
1096 None,
1097 ),
1098 (None, None) => {
1099 let context = OptimizerContext::new(handler_args, explain_options);
1100 let (plan, table) = gen_create_table_plan(
1101 context,
1102 table_name.clone(),
1103 column_defs,
1104 constraints,
1105 col_id_gen,
1106 source_watermarks,
1107 props,
1108 false,
1109 )?;
1110
1111 ((plan, None, table), TableJobType::General, None)
1112 }
1113
1114 (None, Some(cdc_table)) => {
1115 sanity_check_for_table_on_cdc_source(
1116 append_only,
1117 &column_defs,
1118 &wildcard_idx,
1119 &constraints,
1120 &source_watermarks,
1121 )?;
1122
1123 generated_columns_check_for_cdc_table(&column_defs)?;
1124 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1125
1126 let session = &handler_args.session;
1127 let db_name = &session.database();
1128 let user_name = &session.user_name();
1129 let search_path = session.config().search_path();
1130 let (schema_name, resolved_table_name) =
1131 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1132 let (database_id, schema_id) =
1133 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1134
1135 let (format_encode, source_name) =
1137 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1138
1139 let source = {
1140 let catalog_reader = session.env().catalog_reader().read_guard();
1141 let schema_path =
1142 SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1143
1144 let (source, _) = catalog_reader.get_source_by_name(
1145 db_name,
1146 schema_path,
1147 source_name.as_str(),
1148 )?;
1149 source.clone()
1150 };
1151 let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1152 &source.with_properties,
1153 cdc_table.external_table_name.clone(),
1154 )?;
1155
1156 let (columns, pk_names) = match wildcard_idx {
1157 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1158 None => {
1159 for column_def in &column_defs {
1160 for option_def in &column_def.options {
1161 if let ColumnOption::DefaultValue(_)
1162 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1163 {
1164 return Err(ErrorCode::NotSupported(
1165 "Default value for columns defined on the table created from a CDC source".into(),
1166 "Remove the default value expression in the column definitions".into(),
1167 )
1168 .into());
1169 }
1170 }
1171 }
1172
1173 let (columns, pk_names) =
1174 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1175 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1177 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1178 .context("failed to extract external table config")?;
1179
1180 (columns, pk_names)
1182 }
1183 };
1184
1185 let context: OptimizerContextRef =
1186 OptimizerContext::new(handler_args, explain_options).into();
1187 let shared_source_id = source.id;
1188 let (plan, table) = gen_create_table_plan_for_cdc_table(
1189 context,
1190 source,
1191 cdc_table.external_table_name.clone(),
1192 column_defs,
1193 columns,
1194 pk_names,
1195 cdc_with_options,
1196 col_id_gen,
1197 on_conflict,
1198 with_version_column,
1199 include_column_options,
1200 table_name.clone(),
1201 resolved_table_name,
1202 database_id,
1203 schema_id,
1204 TableId::placeholder(),
1205 engine,
1206 )?;
1207
1208 (
1209 (plan, None, table),
1210 TableJobType::SharedCdcSource,
1211 Some(shared_source_id),
1212 )
1213 }
1214 (Some(_), Some(_)) => {
1215 return Err(ErrorCode::NotSupported(
1216 "Data format and encoding format doesn't apply to table created from a CDC source"
1217 .into(),
1218 "Remove the FORMAT and ENCODE specification".into(),
1219 )
1220 .into());
1221 }
1222 };
1223 Ok((plan, source, table, job_type, shared_shource_id))
1224}
1225
1226fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1228 let mut found_generated_column = false;
1229 for column in columns {
1230 let mut is_generated = false;
1231
1232 for option_def in &column.options {
1233 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1234 is_generated = true;
1235 break;
1236 }
1237 }
1238
1239 if is_generated {
1240 found_generated_column = true;
1241 } else if found_generated_column {
1242 return Err(ErrorCode::NotSupported(
1243 "Non-generated column found after a generated column.".into(),
1244 "Ensure that all generated columns appear at the end of the cdc table definition."
1245 .into(),
1246 )
1247 .into());
1248 }
1249 }
1250 Ok(())
1251}
1252
1253fn not_null_check_for_cdc_table(
1255 wildcard_idx: &Option<usize>,
1256 column_defs: &Vec<ColumnDef>,
1257) -> Result<()> {
1258 if !wildcard_idx.is_some()
1259 && column_defs.iter().any(|col| {
1260 col.options
1261 .iter()
1262 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1263 })
1264 {
1265 return Err(ErrorCode::NotSupported(
1266 "CDC table with NOT NULL constraint is not supported".to_owned(),
1267 "Please remove the NOT NULL constraint for columns".to_owned(),
1268 )
1269 .into());
1270 }
1271 Ok(())
1272}
1273
1274fn sanity_check_for_table_on_cdc_source(
1276 append_only: bool,
1277 column_defs: &Vec<ColumnDef>,
1278 wildcard_idx: &Option<usize>,
1279 constraints: &Vec<TableConstraint>,
1280 source_watermarks: &Vec<SourceWatermark>,
1281) -> Result<()> {
1282 if wildcard_idx.is_some() && !column_defs.is_empty() {
1284 return Err(ErrorCode::NotSupported(
1285 "wildcard(*) and column definitions cannot be used together".to_owned(),
1286 "Remove the wildcard or column definitions".to_owned(),
1287 )
1288 .into());
1289 }
1290
1291 if !wildcard_idx.is_some()
1293 && !constraints.iter().any(|c| {
1294 matches!(
1295 c,
1296 TableConstraint::Unique {
1297 is_primary: true,
1298 ..
1299 }
1300 )
1301 })
1302 && !column_defs.iter().any(|col| {
1303 col.options
1304 .iter()
1305 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1306 })
1307 {
1308 return Err(ErrorCode::NotSupported(
1309 "CDC table without primary key constraint is not supported".to_owned(),
1310 "Please define a primary key".to_owned(),
1311 )
1312 .into());
1313 }
1314
1315 if append_only {
1316 return Err(ErrorCode::NotSupported(
1317 "append only modifier on the table created from a CDC source".into(),
1318 "Remove the APPEND ONLY clause".into(),
1319 )
1320 .into());
1321 }
1322
1323 if !source_watermarks.is_empty() {
1324 return Err(ErrorCode::NotSupported(
1325 "watermark defined on the table created from a CDC source".into(),
1326 "Remove the Watermark definitions".into(),
1327 )
1328 .into());
1329 }
1330
1331 Ok(())
1332}
1333
1334async fn bind_cdc_table_schema_externally(
1336 cdc_with_options: WithOptionsSecResolved,
1337) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1338 let (options, secret_refs) = cdc_with_options.into_parts();
1340 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1341 .context("failed to extract external table config")?;
1342
1343 let table = ExternalTableImpl::connect(config)
1344 .await
1345 .context("failed to auto derive table schema")?;
1346 Ok((
1347 table
1348 .column_descs()
1349 .iter()
1350 .cloned()
1351 .map(|column_desc| ColumnCatalog {
1352 column_desc,
1353 is_hidden: false,
1354 })
1355 .collect(),
1356 table.pk_names().clone(),
1357 ))
1358}
1359
1360fn bind_cdc_table_schema(
1362 column_defs: &Vec<ColumnDef>,
1363 constraints: &Vec<TableConstraint>,
1364 is_for_replace_plan: bool,
1365) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1366 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1367
1368 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1369 Ok((columns, pk_names))
1370}
1371
1372#[allow(clippy::too_many_arguments)]
1373pub async fn handle_create_table(
1374 mut handler_args: HandlerArgs,
1375 table_name: ObjectName,
1376 column_defs: Vec<ColumnDef>,
1377 wildcard_idx: Option<usize>,
1378 constraints: Vec<TableConstraint>,
1379 if_not_exists: bool,
1380 format_encode: Option<FormatEncodeOptions>,
1381 source_watermarks: Vec<SourceWatermark>,
1382 append_only: bool,
1383 on_conflict: Option<OnConflict>,
1384 with_version_column: Option<String>,
1385 cdc_table_info: Option<CdcTableInfo>,
1386 include_column_options: IncludeOption,
1387 webhook_info: Option<WebhookSourceInfo>,
1388 ast_engine: risingwave_sqlparser::ast::Engine,
1389) -> Result<RwPgResponse> {
1390 let session = handler_args.session.clone();
1391
1392 if append_only {
1393 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1394 }
1395
1396 session.check_cluster_limits().await?;
1397
1398 let engine = match ast_engine {
1399 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1400 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1401 };
1402 if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1403 if let Some(_rate_limit) = handler_args.with_options.insert(
1406 OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1407 "0".to_owned(),
1408 ) {
1409 return Err(ErrorCode::NotSupported(
1411 "source_rate_limit for iceberg table engine during table creation".to_owned(),
1412 "Please remove source_rate_limit from WITH options.".to_owned(),
1413 )
1414 .into());
1415 }
1416 }
1417
1418 if let Either::Right(resp) = session.check_relation_name_duplicated(
1419 table_name.clone(),
1420 StatementType::CREATE_TABLE,
1421 if_not_exists,
1422 )? {
1423 return Ok(resp);
1424 }
1425
1426 let (graph, source, hummock_table, job_type, shared_source_id) = {
1427 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1428 handler_args.clone(),
1429 ExplainOptions::default(),
1430 format_encode,
1431 cdc_table_info,
1432 &table_name,
1433 column_defs.clone(),
1434 wildcard_idx,
1435 constraints.clone(),
1436 source_watermarks,
1437 append_only,
1438 on_conflict,
1439 with_version_column,
1440 include_column_options,
1441 webhook_info,
1442 engine,
1443 )
1444 .await?;
1445 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1446
1447 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1448
1449 (graph, source, table, job_type, shared_source_id)
1450 };
1451
1452 tracing::trace!(
1453 "name={}, graph=\n{}",
1454 table_name,
1455 serde_json::to_string_pretty(&graph).unwrap()
1456 );
1457
1458 let dependencies = shared_source_id
1459 .map(|id| HashSet::from([id as ObjectId]))
1460 .unwrap_or_default();
1461
1462 match engine {
1464 Engine::Hummock => {
1465 let catalog_writer = session.catalog_writer()?;
1466 catalog_writer
1467 .create_table(
1468 source.map(|s| s.to_prost()),
1469 hummock_table.to_prost(),
1470 graph,
1471 job_type,
1472 if_not_exists,
1473 dependencies,
1474 )
1475 .await?;
1476 }
1477 Engine::Iceberg => {
1478 create_iceberg_engine_table(
1479 session,
1480 handler_args,
1481 source.map(|s| s.to_prost()),
1482 hummock_table,
1483 graph,
1484 table_name,
1485 job_type,
1486 if_not_exists,
1487 )
1488 .await?;
1489 }
1490 }
1491
1492 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1493}
1494
1495#[allow(clippy::too_many_arguments)]
1504pub async fn create_iceberg_engine_table(
1505 session: Arc<SessionImpl>,
1506 handler_args: HandlerArgs,
1507 mut source: Option<PbSource>,
1508 table: TableCatalog,
1509 graph: StreamFragmentGraph,
1510 table_name: ObjectName,
1511 job_type: PbTableJobType,
1512 if_not_exists: bool,
1513) -> Result<()> {
1514 let meta_client = session.env().meta_client();
1515 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1516
1517 let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1518 ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1519 })?;
1520 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1521 let meta_store_user = meta_store_endpoint.username().to_owned();
1522 let meta_store_password = match meta_store_endpoint.password() {
1523 Some(password) => percent_decode_str(password)
1524 .decode_utf8()
1525 .map_err(|_| {
1526 ErrorCode::InternalError(
1527 "failed to parse password from meta store endpoint".to_owned(),
1528 )
1529 })?
1530 .into_owned(),
1531 None => "".to_owned(),
1532 };
1533 let meta_store_host = meta_store_endpoint
1534 .host_str()
1535 .ok_or_else(|| {
1536 ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1537 })?
1538 .to_owned();
1539 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1540 ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1541 })?;
1542 let meta_store_database = meta_store_endpoint
1543 .path()
1544 .trim_start_matches('/')
1545 .to_owned();
1546
1547 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1548 bail!("failed to parse meta backend: {}", meta_store_backend);
1549 };
1550
1551 let catalog_uri = match meta_backend {
1552 MetaBackend::Postgres => {
1553 format!(
1554 "jdbc:postgresql://{}:{}/{}",
1555 meta_store_host.clone(),
1556 meta_store_port.clone(),
1557 meta_store_database.clone()
1558 )
1559 }
1560 MetaBackend::Mysql => {
1561 format!(
1562 "jdbc:mysql://{}:{}/{}",
1563 meta_store_host.clone(),
1564 meta_store_port.clone(),
1565 meta_store_database.clone()
1566 )
1567 }
1568 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1569 bail!(
1570 "Unsupported meta backend for iceberg engine table: {}",
1571 meta_store_backend
1572 );
1573 }
1574 };
1575
1576 let rw_db_name = session
1577 .env()
1578 .catalog_reader()
1579 .read_guard()
1580 .get_database_by_id(&table.database_id)?
1581 .name()
1582 .to_owned();
1583 let rw_schema_name = session
1584 .env()
1585 .catalog_reader()
1586 .read_guard()
1587 .get_schema_by_id(&table.database_id, &table.schema_id)?
1588 .name()
1589 .clone();
1590 let iceberg_catalog_name = rw_db_name.clone();
1591 let iceberg_database_name = rw_schema_name.clone();
1592 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1593
1594 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1595 let sink_decouple = session.config().sink_decouple();
1596 if matches!(sink_decouple, SinkDecouple::Disable) {
1597 bail!(
1598 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1599 );
1600 }
1601
1602 let mut connection_ref = BTreeMap::new();
1603 let with_common = if iceberg_engine_connection.is_empty() {
1604 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1605 } else {
1606 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1607 assert_eq!(parts.len(), 2);
1608 let connection_catalog =
1609 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1610 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1611 if params.connection_type == ConnectionType::Iceberg as i32 {
1612 connection_ref.insert(
1614 "connection".to_owned(),
1615 ConnectionRefValue {
1616 connection_name: ObjectName::from(vec![
1617 Ident::from(parts[0]),
1618 Ident::from(parts[1]),
1619 ]),
1620 },
1621 );
1622
1623 let mut with_common = BTreeMap::new();
1624 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1625 with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1626 with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1627
1628 if let Some(s) = params.properties.get("hosted_catalog")
1629 && s.eq_ignore_ascii_case("true")
1630 {
1631 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1632 with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1633 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1634 with_common.insert(
1635 "catalog.jdbc.password".to_owned(),
1636 meta_store_password.clone(),
1637 );
1638 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1639 }
1640
1641 with_common
1642 } else {
1643 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1644 "Only iceberg connection could be used in iceberg engine".to_owned(),
1645 )));
1646 }
1647 } else {
1648 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1649 "Private Link Service has been deprecated. Please create a new connection instead."
1650 .to_owned(),
1651 )));
1652 }
1653 };
1654
1655 let mut pks = table
1658 .pk_column_names()
1659 .iter()
1660 .map(|c| c.to_string())
1661 .collect::<Vec<String>>();
1662
1663 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1665 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1666 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1667 "select {} as {}, * from {}",
1668 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1669 ))
1670 .context("unable to parse query")?
1671 .try_into()
1672 .unwrap();
1673
1674 let Statement::Query(query) = &stmt else {
1675 panic!("unexpected statement: {:?}", stmt);
1676 };
1677 CreateSink::AsQuery(query.clone())
1678 } else {
1679 CreateSink::From(table_name.clone())
1680 };
1681
1682 let mut sink_name = table_name.clone();
1683 *sink_name.0.last_mut().unwrap() = Ident::from(
1684 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1685 );
1686 let create_sink_stmt = CreateSinkStatement {
1687 if_not_exists: false,
1688 sink_name,
1689 with_properties: WithProperties(vec![]),
1690 sink_from,
1691 columns: vec![],
1692 emit_mode: None,
1693 sink_schema: None,
1694 into_table_name: None,
1695 };
1696
1697 let mut sink_handler_args = handler_args.clone();
1698
1699 let mut sink_with = with_common.clone();
1700
1701 if table.append_only {
1702 sink_with.insert("type".to_owned(), "append-only".to_owned());
1703 } else {
1704 sink_with.insert("primary_key".to_owned(), pks.join(","));
1705 sink_with.insert("type".to_owned(), "upsert".to_owned());
1706 }
1707 let commit_checkpoint_interval = handler_args
1726 .with_options
1727 .get(COMMIT_CHECKPOINT_INTERVAL)
1728 .map(|v| v.to_owned())
1729 .unwrap_or_else(|| "60".to_owned());
1730 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1731 ErrorCode::InvalidInputSyntax(format!(
1732 "commit_checkpoint_interval must be a positive integer: {}",
1733 commit_checkpoint_interval
1734 ))
1735 })?;
1736
1737 if commit_checkpoint_interval == 0 {
1738 bail!("commit_checkpoint_interval must be a positive integer: 0");
1739 }
1740
1741 source
1743 .as_mut()
1744 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1745
1746 let sink_decouple = session.config().sink_decouple();
1747 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1748 bail!(
1749 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1750 )
1751 }
1752
1753 sink_with.insert(
1754 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1755 commit_checkpoint_interval.to_string(),
1756 );
1757 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1758
1759 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1760
1761 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1762 match enable_compaction.to_lowercase().as_str() {
1763 "true" => {
1764 risingwave_common::license::Feature::IcebergCompaction.check_available()?;
1765
1766 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1767 }
1768 "false" => {
1769 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1770 }
1771 _ => {
1772 return Err(ErrorCode::InvalidInputSyntax(format!(
1773 "enable_compaction must be true or false: {}",
1774 enable_compaction
1775 ))
1776 .into());
1777 }
1778 }
1779
1780 source
1782 .as_mut()
1783 .map(|x| x.with_properties.remove("enable_compaction"));
1784 } else {
1785 sink_with.insert(
1786 ENABLE_COMPACTION.to_owned(),
1787 risingwave_common::license::Feature::IcebergCompaction
1788 .check_available()
1789 .is_ok()
1790 .to_string(),
1791 );
1792 }
1793
1794 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1795 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1796 ErrorCode::InvalidInputSyntax(format!(
1797 "compaction_interval_sec must be a positive integer: {}",
1798 commit_checkpoint_interval
1799 ))
1800 })?;
1801 if compaction_interval_sec == 0 {
1802 bail!("compaction_interval_sec must be a positive integer: 0");
1803 }
1804 sink_with.insert(
1805 "compaction_interval_sec".to_owned(),
1806 compaction_interval_sec.to_string(),
1807 );
1808 source
1810 .as_mut()
1811 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1812 }
1813
1814 if let Some(enable_snapshot_expiration) =
1815 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1816 {
1817 match enable_snapshot_expiration.to_lowercase().as_str() {
1818 "true" => {
1819 risingwave_common::license::Feature::IcebergCompaction.check_available()?;
1820 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1821 }
1822 "false" => {
1823 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1824 }
1825 _ => {
1826 return Err(ErrorCode::InvalidInputSyntax(format!(
1827 "enable_snapshot_expiration must be true or false: {}",
1828 enable_snapshot_expiration
1829 ))
1830 .into());
1831 }
1832 }
1833
1834 source
1836 .as_mut()
1837 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1838 } else {
1839 sink_with.insert(
1840 ENABLE_SNAPSHOT_EXPIRATION.to_owned(),
1841 risingwave_common::license::Feature::IcebergCompaction
1842 .check_available()
1843 .is_ok()
1844 .to_string(),
1845 );
1846 }
1847
1848 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1849 match write_mode.to_lowercase().as_str() {
1850 ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1851 sink_with.insert(
1852 WRITE_MODE.to_owned(),
1853 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1854 );
1855 }
1856
1857 ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1858 risingwave_common::license::Feature::IcebergCompaction.check_available()?;
1859
1860 if table.append_only {
1861 return Err(ErrorCode::NotSupported(
1862 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1863 "Please use MERGE ON READ instead".to_owned(),
1864 )
1865 .into());
1866 }
1867
1868 sink_with.insert(
1869 WRITE_MODE.to_owned(),
1870 ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1871 );
1872 }
1873
1874 _ => {
1875 return Err(ErrorCode::InvalidInputSyntax(format!(
1876 "write_mode must be one of: {}, {}",
1877 ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1878 ))
1879 .into());
1880 }
1881 }
1882
1883 source
1885 .as_mut()
1886 .map(|x| x.with_properties.remove("write_mode"));
1887 } else {
1888 sink_with.insert(
1889 WRITE_MODE.to_owned(),
1890 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1891 );
1892 }
1893
1894 let partition_by = handler_args
1895 .with_options
1896 .get("partition_by")
1897 .map(|v| v.to_owned());
1898
1899 if let Some(partition_by) = &partition_by {
1900 let mut partition_columns = vec![];
1901 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1902 table
1903 .columns()
1904 .iter()
1905 .find(|col| col.name().eq_ignore_ascii_case(&column))
1906 .ok_or_else(|| {
1907 ErrorCode::InvalidInputSyntax(format!(
1908 "Partition source column does not exist in schema: {}",
1909 column
1910 ))
1911 })?;
1912
1913 partition_columns.push(column.to_owned());
1914 }
1915
1916 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1917 |_| {
1918 ErrorCode::InvalidInputSyntax(
1919 "The partition columns should be the prefix of the primary key".to_owned(),
1920 )
1921 },
1922 )?;
1923
1924 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1925
1926 source
1928 .as_mut()
1929 .map(|x| x.with_properties.remove("partition_by"));
1930 }
1931
1932 sink_handler_args.with_options =
1933 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1934
1935 let mut source_name = table_name.clone();
1936 *source_name.0.last_mut().unwrap() = Ident::from(
1937 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1938 );
1939 let create_source_stmt = CreateSourceStatement {
1940 temporary: false,
1941 if_not_exists: false,
1942 columns: vec![],
1943 source_name,
1944 wildcard_idx: Some(0),
1945 constraints: vec![],
1946 with_properties: WithProperties(vec![]),
1947 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1948 source_watermarks: vec![],
1949 include_column_options: vec![],
1950 };
1951
1952 let mut source_handler_args = handler_args.clone();
1953 let source_with = with_common;
1954 source_handler_args.with_options =
1955 WithOptions::new(source_with, Default::default(), connection_ref);
1956
1957 let _ = JVM.get_or_init()?;
1960
1961 let catalog_writer = session.catalog_writer()?;
1962 let has_connector = source.is_some();
1964 catalog_writer
1965 .create_table(
1966 source,
1967 table.to_prost(),
1968 graph,
1969 job_type,
1970 if_not_exists,
1971 HashSet::default(),
1972 )
1973 .await?;
1974 let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1975 if res.is_err() {
1976 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1978 res?;
1979 }
1980 let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1981 if res.is_err() {
1982 handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1984 res?;
1985 }
1986
1987 if has_connector {
1988 alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1989 handler_args,
1990 PbThrottleTarget::TableWithSource,
1991 table_name,
1992 -1,
1993 )
1994 .await?;
1995 }
1996
1997 Ok(())
1998}
1999
2000pub fn check_create_table_with_source(
2001 with_options: &WithOptions,
2002 format_encode: Option<FormatEncodeOptions>,
2003 include_column_options: &IncludeOption,
2004 cdc_table_info: &Option<CdcTableInfo>,
2005) -> Result<Option<FormatEncodeOptions>> {
2006 if cdc_table_info.is_some() {
2008 return Ok(format_encode);
2009 }
2010 let defined_source = with_options.is_source_connector();
2011
2012 if !include_column_options.is_empty() && !defined_source {
2013 return Err(ErrorCode::InvalidInputSyntax(
2014 "INCLUDE should be used with a connector".to_owned(),
2015 )
2016 .into());
2017 }
2018 if defined_source {
2019 format_encode.as_ref().ok_or_else(|| {
2020 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2021 })?;
2022 }
2023 Ok(format_encode)
2024}
2025
2026fn ensure_partition_columns_are_prefix_of_primary_key(
2027 partition_columns: &[String],
2028 primary_key_columns: &[String],
2029) -> std::result::Result<(), String> {
2030 if partition_columns.len() > primary_key_columns.len() {
2031 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2032 }
2033
2034 for (i, partition_col) in partition_columns.iter().enumerate() {
2035 if primary_key_columns.get(i) != Some(partition_col) {
2036 return Err(format!(
2037 "Partition column '{}' is not a prefix of the primary key.",
2038 partition_col
2039 ));
2040 }
2041 }
2042
2043 Ok(())
2044}
2045
2046#[allow(clippy::too_many_arguments)]
2047pub async fn generate_stream_graph_for_replace_table(
2048 _session: &Arc<SessionImpl>,
2049 table_name: ObjectName,
2050 original_catalog: &Arc<TableCatalog>,
2051 handler_args: HandlerArgs,
2052 statement: Statement,
2053 col_id_gen: ColumnIdGenerator,
2054 sql_column_strategy: SqlColumnStrategy,
2055) -> Result<(
2056 StreamFragmentGraph,
2057 TableCatalog,
2058 Option<SourceCatalog>,
2059 TableJobType,
2060)> {
2061 let Statement::CreateTable {
2062 columns,
2063 constraints,
2064 source_watermarks,
2065 append_only,
2066 on_conflict,
2067 with_version_column,
2068 wildcard_idx,
2069 cdc_table_info,
2070 format_encode,
2071 include_column_options,
2072 engine,
2073 with_options,
2074 ..
2075 } = statement
2076 else {
2077 panic!("unexpected statement type: {:?}", statement);
2078 };
2079
2080 let format_encode = format_encode
2081 .clone()
2082 .map(|format_encode| format_encode.into_v2_with_warning());
2083
2084 let engine = match engine {
2085 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2086 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2087 };
2088
2089 let is_drop_connector =
2090 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2091 if is_drop_connector {
2092 debug_assert!(
2093 source_watermarks.is_empty()
2094 && include_column_options.is_empty()
2095 && with_options
2096 .iter()
2097 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2098 );
2099 }
2100
2101 let props = CreateTableProps {
2102 definition: handler_args.normalized_sql.clone(),
2103 append_only,
2104 on_conflict: on_conflict.into(),
2105 with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
2106 webhook_info: original_catalog.webhook_info.clone(),
2107 engine,
2108 };
2109
2110 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2111 (Some(format_encode), None) => (
2112 gen_create_table_plan_with_source(
2113 handler_args,
2114 ExplainOptions::default(),
2115 table_name,
2116 columns,
2117 wildcard_idx,
2118 constraints,
2119 format_encode,
2120 source_watermarks,
2121 col_id_gen,
2122 include_column_options,
2123 props,
2124 sql_column_strategy,
2125 )
2126 .await?,
2127 TableJobType::General,
2128 ),
2129 (None, None) => {
2130 let context = OptimizerContext::from_handler_args(handler_args);
2131 let (plan, table) = gen_create_table_plan(
2132 context,
2133 table_name,
2134 columns,
2135 constraints,
2136 col_id_gen,
2137 source_watermarks,
2138 props,
2139 true,
2140 )?;
2141 ((plan, None, table), TableJobType::General)
2142 }
2143 (None, Some(cdc_table)) => {
2144 let session = &handler_args.session;
2145 let (source, resolved_table_name, database_id, schema_id) =
2146 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2147
2148 let cdc_with_options = derive_with_options_for_cdc_table(
2149 &source.with_properties,
2150 cdc_table.external_table_name.clone(),
2151 )?;
2152
2153 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2154
2155 let context: OptimizerContextRef =
2156 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2157 let (plan, table) = gen_create_table_plan_for_cdc_table(
2158 context,
2159 source,
2160 cdc_table.external_table_name.clone(),
2161 columns,
2162 column_catalogs,
2163 pk_names,
2164 cdc_with_options,
2165 col_id_gen,
2166 on_conflict,
2167 with_version_column.map(|x| x.real_value()),
2168 include_column_options,
2169 table_name,
2170 resolved_table_name,
2171 database_id,
2172 schema_id,
2173 original_catalog.id(),
2174 engine,
2175 )?;
2176
2177 ((plan, None, table), TableJobType::SharedCdcSource)
2178 }
2179 (Some(_), Some(_)) => {
2180 return Err(ErrorCode::NotSupported(
2181 "Data format and encoding format doesn't apply to table created from a CDC source"
2182 .into(),
2183 "Remove the FORMAT and ENCODE specification".into(),
2184 )
2185 .into());
2186 }
2187 };
2188
2189 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2190 Err(ErrorCode::InvalidInputSyntax(
2191 "alter primary key of table is not supported".to_owned(),
2192 ))?
2193 }
2194
2195 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2196
2197 table.id = original_catalog.id();
2199 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2200 table.associated_source_id = Some(source_id);
2201
2202 let source = source.as_mut().unwrap();
2203 source.id = source_id.table_id;
2204 source.associated_table_id = Some(table.id());
2205 }
2206
2207 Ok((graph, table, source, job_type))
2208}
2209
2210fn get_source_and_resolved_table_name(
2211 session: &Arc<SessionImpl>,
2212 cdc_table: CdcTableInfo,
2213 table_name: ObjectName,
2214) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2215 let db_name = &session.database();
2216 let (schema_name, resolved_table_name) =
2217 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2218 let (database_id, schema_id) =
2219 session.get_database_and_schema_id_for_create(schema_name.clone())?;
2220
2221 let (format_encode, source_name) =
2222 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2223
2224 let source = {
2225 let catalog_reader = session.env().catalog_reader().read_guard();
2226 let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2227 let (source, _) = catalog_reader.get_source_by_name(
2228 db_name,
2229 SchemaPath::Name(schema_name.as_str()),
2230 source_name.as_str(),
2231 )?;
2232 source.clone()
2233 };
2234
2235 Ok((source, resolved_table_name, database_id, schema_id))
2236}
2237
2238fn bind_webhook_info(
2240 session: &Arc<SessionImpl>,
2241 columns_defs: &[ColumnDef],
2242 webhook_info: WebhookSourceInfo,
2243) -> Result<PbWebhookSourceInfo> {
2244 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2246 {
2247 return Err(ErrorCode::InvalidInputSyntax(
2248 "Table with webhook source should have exactly one JSONB column".to_owned(),
2249 )
2250 .into());
2251 }
2252
2253 let WebhookSourceInfo {
2254 secret_ref,
2255 signature_expr,
2256 wait_for_persistence,
2257 is_batched,
2258 } = webhook_info;
2259
2260 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2262 let db_name = &session.database();
2263 let (schema_name, secret_name) =
2264 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2265 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2266 (
2267 Some(PbSecretRef {
2268 secret_id: secret_catalog.id.secret_id(),
2269 ref_as: match secret_ref.ref_as {
2270 SecretRefAsType::Text => PbRefAsType::Text,
2271 SecretRefAsType::File => PbRefAsType::File,
2272 }
2273 .into(),
2274 }),
2275 Some(secret_name),
2276 )
2277 } else {
2278 (None, None)
2279 };
2280
2281 let secure_compare_context = SecureCompareContext {
2282 column_name: columns_defs[0].name.real_value(),
2283 secret_name,
2284 };
2285 let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2286 let expr = binder.bind_expr(&signature_expr)?;
2287
2288 if expr.as_function_call().is_none()
2290 || expr.as_function_call().unwrap().func_type()
2291 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2292 {
2293 return Err(ErrorCode::InvalidInputSyntax(
2294 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2295 )
2296 .into());
2297 }
2298
2299 let pb_webhook_info = PbWebhookSourceInfo {
2300 secret_ref: pb_secret_ref,
2301 signature_expr: Some(expr.to_expr_proto()),
2302 wait_for_persistence,
2303 is_batched,
2304 };
2305
2306 Ok(pb_webhook_info)
2307}
2308
2309#[cfg(test)]
2310mod tests {
2311 use risingwave_common::catalog::{
2312 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2313 };
2314 use risingwave_common::types::{DataType, StructType};
2315
2316 use super::*;
2317 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2318
2319 #[tokio::test]
2320 async fn test_create_table_handler() {
2321 let sql =
2322 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2323 let frontend = LocalFrontend::new(Default::default()).await;
2324 frontend.run_sql(sql).await.unwrap();
2325
2326 let session = frontend.session_ref();
2327 let catalog_reader = session.env().catalog_reader().read_guard();
2328 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2329
2330 let (table, _) = catalog_reader
2332 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2333 .unwrap();
2334 assert_eq!(table.name(), "t");
2335
2336 let columns = table
2337 .columns
2338 .iter()
2339 .map(|col| (col.name(), col.data_type().clone()))
2340 .collect::<HashMap<&str, DataType>>();
2341
2342 let expected_columns = maplit::hashmap! {
2343 ROW_ID_COLUMN_NAME => DataType::Serial,
2344 "v1" => DataType::Int16,
2345 "v2" => StructType::new(
2346 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2347 )
2348 .with_ids([3, 4, 5].map(ColumnId::new))
2349 .into(),
2350 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2351 };
2352
2353 assert_eq!(columns, expected_columns, "{columns:#?}");
2354 }
2355
2356 #[test]
2357 fn test_bind_primary_key() {
2358 for (sql, expected) in [
2361 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2362 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2363 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2364 (
2365 "create table t (v1 int primary key, v2 int primary key)",
2366 Err("multiple primary keys are not allowed"),
2367 ),
2368 (
2369 "create table t (v1 int primary key primary key, v2 int)",
2370 Err("multiple primary keys are not allowed"),
2371 ),
2372 (
2373 "create table t (v1 int, v2 int, primary key (v1))",
2374 Ok(&[1]),
2375 ),
2376 (
2377 "create table t (v1 int, primary key (v2), v2 int)",
2378 Ok(&[2]),
2379 ),
2380 (
2381 "create table t (primary key (v2, v1), v1 int, v2 int)",
2382 Ok(&[2, 1]),
2383 ),
2384 (
2385 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2386 Err("multiple primary keys are not allowed"),
2387 ),
2388 (
2389 "create table t (v1 int primary key, primary key (v1), v2 int)",
2390 Err("multiple primary keys are not allowed"),
2391 ),
2392 (
2393 "create table t (v1 int, primary key (V3), v2 int)",
2394 Err("column \"v3\" named in key does not exist"),
2395 ),
2396 ] {
2397 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2398 let risingwave_sqlparser::ast::Statement::CreateTable {
2399 columns: column_defs,
2400 constraints,
2401 ..
2402 } = ast.remove(0)
2403 else {
2404 panic!("test case should be create table")
2405 };
2406 let actual: Result<_> = (|| {
2407 let mut columns = bind_sql_columns(&column_defs, false)?;
2408 let mut col_id_gen = ColumnIdGenerator::new_initial();
2409 for c in &mut columns {
2410 col_id_gen.generate(c)?;
2411 }
2412
2413 let pk_names =
2414 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2415 let (_, pk_column_ids, _) =
2416 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2417 Ok(pk_column_ids)
2418 })();
2419 match (expected, actual) {
2420 (Ok(expected), Ok(actual)) => assert_eq!(
2421 expected.iter().copied().map(ColumnId::new).collect_vec(),
2422 actual,
2423 "sql: {sql}"
2424 ),
2425 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2426 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2427 (Err(expected), Err(actual)) => assert!(
2428 actual.to_string().contains(expected),
2429 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2430 ),
2431 }
2432 }
2433 }
2434
2435 #[tokio::test]
2436 async fn test_duplicate_props_options() {
2437 let proto_file = create_proto_file(PROTO_FILE_DATA);
2438 let sql = format!(
2439 r#"CREATE TABLE t
2440 WITH (
2441 connector = 'kinesis',
2442 aws.region='user_test_topic',
2443 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2444 aws.credentials.access_key_id = 'your_access_key_1',
2445 aws.credentials.secret_access_key = 'your_secret_key_1'
2446 )
2447 FORMAT PLAIN ENCODE PROTOBUF (
2448 message = '.test.TestRecord',
2449 aws.credentials.access_key_id = 'your_access_key_2',
2450 aws.credentials.secret_access_key = 'your_secret_key_2',
2451 schema.location = 'file://{}',
2452 )"#,
2453 proto_file.path().to_str().unwrap()
2454 );
2455 let frontend = LocalFrontend::new(Default::default()).await;
2456 frontend.run_sql(sql).await.unwrap();
2457
2458 let session = frontend.session_ref();
2459 let catalog_reader = session.env().catalog_reader().read_guard();
2460 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2461
2462 let (source, _) = catalog_reader
2464 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2465 .unwrap();
2466 assert_eq!(source.name, "t");
2467
2468 assert_eq!(
2470 source
2471 .info
2472 .format_encode_options
2473 .get("aws.credentials.access_key_id")
2474 .unwrap(),
2475 "your_access_key_2"
2476 );
2477 assert_eq!(
2478 source
2479 .info
2480 .format_encode_options
2481 .get("aws.credentials.secret_access_key")
2482 .unwrap(),
2483 "your_secret_key_2"
2484 );
2485
2486 assert_eq!(
2488 source
2489 .with_properties
2490 .get("aws.credentials.access_key_id")
2491 .unwrap(),
2492 "your_access_key_1"
2493 );
2494 assert_eq!(
2495 source
2496 .with_properties
2497 .get("aws.credentials.secret_access_key")
2498 .unwrap(),
2499 "your_secret_key_1"
2500 );
2501
2502 assert!(!source.with_properties.contains_key("schema.location"));
2504 }
2505}