1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28 CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29 ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41 DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42 SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57 CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58 CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59 FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60 Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76 UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77 bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::{
80 LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
81};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
83use crate::optimizer::plan_node::{
84 LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
85};
86use crate::optimizer::property::{Order, RequiredDist};
87use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
88use crate::session::SessionImpl;
89use crate::session::current::notice_to_user;
90use crate::stream_fragmenter::{GraphJobType, build_graph};
91use crate::utils::OverwriteOptions;
92use crate::{Binder, Explain, TableCatalog, WithOptions};
93
94mod col_id_gen;
95pub use col_id_gen::*;
96use risingwave_connector::sink::SinkParam;
97use risingwave_connector::sink::iceberg::{
98 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
99 COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
100 COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
101 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, CompactionType, ENABLE_COMPACTION,
102 ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION, ICEBERG_WRITE_MODE_COPY_ON_WRITE,
103 ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink, IcebergWriteMode,
104 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
105 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
106 parse_partition_by_exprs,
107};
108use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
109
110use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
111
112fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
113 for option_def in &c.options {
114 match option_def.option {
115 ColumnOption::GeneratedColumns(_) => {}
116 ColumnOption::DefaultValue(_) => {}
117 ColumnOption::DefaultValueInternal { .. } => {}
118 ColumnOption::Unique { is_primary: true } => {}
119 ColumnOption::Null => {}
120 ColumnOption::NotNull => {}
121 _ => bail_not_implemented!("column constraints \"{}\"", option_def),
122 }
123 }
124 Ok(())
125}
126
127pub fn bind_sql_columns(
131 column_defs: &[ColumnDef],
132 is_for_drop_table_connector: bool,
133) -> Result<Vec<ColumnCatalog>> {
134 let mut columns = Vec::with_capacity(column_defs.len());
135
136 for column in column_defs {
137 ensure_column_options_supported(column)?;
138 let ColumnDef {
142 name,
143 data_type,
144 collation,
145 options,
146 ..
147 } = column;
148
149 let data_type = data_type
150 .clone()
151 .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
152 if let Some(collation) = collation {
153 if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
159 bail_not_implemented!(
160 "Collate collation other than `C` or `POSIX` is not implemented"
161 );
162 }
163
164 match data_type {
165 AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
166 _ => {
167 return Err(ErrorCode::NotSupported(
168 format!("{} is not a collatable data type", data_type),
169 "The only built-in collatable data types are `varchar`, please check your type".into(),
170 ).into());
171 }
172 }
173 }
174
175 if !is_for_drop_table_connector {
176 check_column_name_not_reserved(&name.real_value())?;
180 }
181
182 let nullable: bool = !options
183 .iter()
184 .any(|def| matches!(def.option, ColumnOption::NotNull));
185
186 columns.push(ColumnCatalog {
187 column_desc: ColumnDesc {
188 data_type: bind_data_type(&data_type)?,
189 column_id: ColumnId::placeholder(),
190 name: name.real_value(),
191 generated_or_default_column: None,
192 description: None,
193 additional_column: AdditionalColumn { column_type: None },
194 version: ColumnDescVersion::LATEST,
195 system_column: None,
196 nullable,
197 },
198 is_hidden: false,
199 });
200 }
201
202 Ok(columns)
203}
204
205fn check_generated_column_constraints(
206 column_name: &String,
207 column_id: ColumnId,
208 expr: &ExprImpl,
209 column_catalogs: &[ColumnCatalog],
210 generated_column_names: &[String],
211 pk_column_ids: &[ColumnId],
212) -> Result<()> {
213 let input_refs = expr.collect_input_refs(column_catalogs.len());
214 for idx in input_refs.ones() {
215 let referred_generated_column = &column_catalogs[idx].column_desc.name;
216 if generated_column_names
217 .iter()
218 .any(|c| c == referred_generated_column)
219 {
220 return Err(ErrorCode::BindError(format!(
221 "Generated can not reference another generated column. \
222 But here generated column \"{}\" referenced another generated column \"{}\"",
223 column_name, referred_generated_column
224 ))
225 .into());
226 }
227 }
228
229 if pk_column_ids.contains(&column_id) && expr.is_impure() {
230 return Err(ErrorCode::BindError(format!(
231 "Generated columns with impure expressions should not be part of the primary key. \
232 Here column \"{}\" is defined as part of the primary key.",
233 column_name
234 ))
235 .into());
236 }
237
238 Ok(())
239}
240
241pub fn bind_sql_column_constraints(
244 session: &SessionImpl,
245 table_name: String,
246 column_catalogs: &mut [ColumnCatalog],
247 columns: &[ColumnDef],
248 pk_column_ids: &[ColumnId],
249) -> Result<()> {
250 let generated_column_names = {
251 let mut names = vec![];
252 for column in columns {
253 for option_def in &column.options {
254 if let ColumnOption::GeneratedColumns(_) = option_def.option {
255 names.push(column.name.real_value());
256 break;
257 }
258 }
259 }
260 names
261 };
262
263 let mut binder = Binder::new_for_ddl(session);
264 binder.bind_columns_to_context(table_name, column_catalogs)?;
265
266 for column in columns {
267 let Some(idx) = column_catalogs
268 .iter()
269 .position(|c| c.name() == column.name.real_value())
270 else {
271 continue;
274 };
275
276 for option_def in &column.options {
277 match &option_def.option {
278 ColumnOption::GeneratedColumns(expr) => {
279 binder.set_clause(Some(Clause::GeneratedColumn));
280
281 let expr_impl = binder.bind_expr(expr).with_context(|| {
282 format!(
283 "fail to bind expression in generated column \"{}\"",
284 column.name.real_value()
285 )
286 })?;
287
288 check_generated_column_constraints(
289 &column.name.real_value(),
290 column_catalogs[idx].column_id(),
291 &expr_impl,
292 column_catalogs,
293 &generated_column_names,
294 pk_column_ids,
295 )?;
296
297 column_catalogs[idx].column_desc.generated_or_default_column = Some(
298 GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
299 expr: Some(expr_impl.to_expr_proto()),
300 }),
301 );
302 binder.set_clause(None);
303 }
304 ColumnOption::DefaultValue(expr) => {
305 let expr_impl = binder
306 .bind_expr(expr)?
307 .cast_assign(column_catalogs[idx].data_type())?;
308
309 let rewritten_expr_impl = session
317 .pinned_snapshot()
318 .inline_now_proc_time()
319 .rewrite_expr(expr_impl.clone());
320
321 if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
322 let snapshot_value = snapshot_value?;
323
324 column_catalogs[idx].column_desc.generated_or_default_column =
325 Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
326 snapshot_value: Some(snapshot_value.to_protobuf()),
327 expr: Some(expr_impl.to_expr_proto()),
328 }));
330 } else {
331 return Err(ErrorCode::BindError(format!(
332 "Default expression used in column `{}` cannot be evaluated. \
333 Use generated columns instead if you mean to reference other columns.",
334 column.name
335 ))
336 .into());
337 }
338 }
339 ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
340 if persisted.is_empty() {
345 bail_bind_error!(
346 "DEFAULT INTERNAL is only used for internal purposes, \
347 please specify a concrete default value"
348 );
349 }
350
351 let desc = DefaultColumnDesc::decode(&**persisted)
352 .expect("failed to decode persisted `DefaultColumnDesc`");
353
354 column_catalogs[idx].column_desc.generated_or_default_column =
355 Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
356 }
357 _ => {}
358 }
359 }
360 }
361 Ok(())
362}
363
364pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
366 let mut pk_column_names = vec![];
367
368 for constraint in table_constraints {
369 match constraint {
370 TableConstraint::Unique {
371 name: _,
372 columns,
373 is_primary: true,
374 } => {
375 if !pk_column_names.is_empty() {
376 return Err(multiple_pk_definition_err());
377 }
378 pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
379 }
380 _ => bail_not_implemented!("table constraint \"{}\"", constraint),
381 }
382 }
383 Ok(pk_column_names)
384}
385
386pub fn bind_sql_pk_names(
387 columns_defs: &[ColumnDef],
388 pk_names_from_table_constraints: Vec<String>,
389) -> Result<Vec<String>> {
390 let mut pk_column_names = pk_names_from_table_constraints;
391
392 for column in columns_defs {
393 for option_def in &column.options {
394 if let ColumnOption::Unique { is_primary: true } = option_def.option {
395 if !pk_column_names.is_empty() {
396 return Err(multiple_pk_definition_err());
397 }
398 pk_column_names.push(column.name.real_value());
399 };
400 }
401 }
402
403 Ok(pk_column_names)
404}
405
406fn multiple_pk_definition_err() -> RwError {
407 ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
408}
409
410pub fn bind_pk_and_row_id_on_relation(
415 mut columns: Vec<ColumnCatalog>,
416 pk_names: Vec<String>,
417 must_need_pk: bool,
418) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
419 for c in &columns {
420 assert!(c.column_id() != ColumnId::placeholder());
421 }
422
423 let name_to_id = columns
425 .iter()
426 .map(|c| (c.name(), c.column_id()))
427 .collect::<HashMap<_, _>>();
428
429 let mut pk_column_ids: Vec<_> = pk_names
430 .iter()
431 .map(|name| {
432 name_to_id.get(name.as_str()).copied().ok_or_else(|| {
433 ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
434 })
435 })
436 .try_collect()?;
437
438 let need_row_id = pk_column_ids.is_empty() && must_need_pk;
440
441 let row_id_index = need_row_id.then(|| {
442 let column = ColumnCatalog::row_id_column();
443 let index = columns.len();
444 pk_column_ids = vec![column.column_id()];
445 columns.push(column);
446 index
447 });
448
449 if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
450 Err(ErrorCode::InvalidInputSyntax(format!(
451 "column \"{col}\" specified more than once"
452 )))?;
453 }
454
455 Ok((columns, pk_column_ids, row_id_index))
456}
457
458#[allow(clippy::too_many_arguments)]
461pub(crate) async fn gen_create_table_plan_with_source(
462 mut handler_args: HandlerArgs,
463 explain_options: ExplainOptions,
464 table_name: ObjectName,
465 column_defs: Vec<ColumnDef>,
466 wildcard_idx: Option<usize>,
467 constraints: Vec<TableConstraint>,
468 format_encode: FormatEncodeOptions,
469 source_watermarks: Vec<SourceWatermark>,
470 mut col_id_gen: ColumnIdGenerator,
471 include_column_options: IncludeOption,
472 props: CreateTableProps,
473 sql_column_strategy: SqlColumnStrategy,
474) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
475 if props.append_only
476 && format_encode.format != Format::Plain
477 && format_encode.format != Format::Native
478 {
479 return Err(ErrorCode::BindError(format!(
480 "Append only table does not support format {}.",
481 format_encode.format
482 ))
483 .into());
484 }
485
486 let session = &handler_args.session;
487 let (with_properties, refresh_mode) =
488 bind_connector_props(&handler_args, &format_encode, false)?;
489 if with_properties.is_shareable_cdc_connector() {
490 generated_columns_check_for_cdc_table(&column_defs)?;
491 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
492 } else if column_defs.iter().any(|col| {
493 col.options
494 .iter()
495 .any(|def| matches!(def.option, ColumnOption::NotNull))
496 }) {
497 notice_to_user(
499 "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
500 );
501 }
502
503 let db_name: &str = &session.database();
504 let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
505
506 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
508 session,
509 &format_encode,
510 Either::Left(&with_properties),
511 CreateSourceType::Table,
512 )
513 .await?;
514
515 let overwrite_options = OverwriteOptions::new(&mut handler_args);
516 let rate_limit = overwrite_options.source_rate_limit;
517 let source = bind_create_source_or_table_with_connector(
518 handler_args.clone(),
519 table_name,
520 format_encode,
521 with_properties,
522 &column_defs,
523 constraints,
524 wildcard_idx,
525 source_watermarks,
526 columns_from_resolve_source,
527 source_info,
528 include_column_options,
529 &mut col_id_gen,
530 CreateSourceType::Table,
531 rate_limit,
532 sql_column_strategy,
533 refresh_mode,
534 )
535 .await?;
536
537 let context = OptimizerContext::new(handler_args, explain_options);
538
539 let (plan, table) = gen_table_plan_with_source(
540 context.into(),
541 schema_name,
542 source.clone(),
543 col_id_gen.into_version(),
544 props,
545 )?;
546
547 Ok((plan, Some(source), table))
548}
549
550#[allow(clippy::too_many_arguments)]
553pub(crate) fn gen_create_table_plan(
554 context: OptimizerContext,
555 table_name: ObjectName,
556 column_defs: Vec<ColumnDef>,
557 constraints: Vec<TableConstraint>,
558 mut col_id_gen: ColumnIdGenerator,
559 source_watermarks: Vec<SourceWatermark>,
560 props: CreateTableProps,
561 is_for_replace_plan: bool,
562) -> Result<(PlanRef, TableCatalog)> {
563 let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
564 for c in &mut columns {
565 col_id_gen.generate(c)?;
566 }
567
568 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
569 if !secret_refs.is_empty() || !connection_refs.is_empty() {
570 return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
571 }
572
573 gen_create_table_plan_without_source(
574 context,
575 table_name,
576 columns,
577 column_defs,
578 constraints,
579 source_watermarks,
580 col_id_gen.into_version(),
581 props,
582 )
583}
584
585#[allow(clippy::too_many_arguments)]
586pub(crate) fn gen_create_table_plan_without_source(
587 context: OptimizerContext,
588 table_name: ObjectName,
589 columns: Vec<ColumnCatalog>,
590 column_defs: Vec<ColumnDef>,
591 constraints: Vec<TableConstraint>,
592 source_watermarks: Vec<SourceWatermark>,
593 version: TableVersion,
594 props: CreateTableProps,
595) -> Result<(PlanRef, TableCatalog)> {
596 let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
598 let (mut columns, pk_column_ids, row_id_index) =
599 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
600
601 let watermark_descs = bind_source_watermark(
602 context.session_ctx(),
603 table_name.real_value(),
604 source_watermarks,
605 &columns,
606 )?;
607
608 bind_sql_column_constraints(
609 context.session_ctx(),
610 table_name.real_value(),
611 &mut columns,
612 &column_defs,
613 &pk_column_ids,
614 )?;
615 let session = context.session_ctx().clone();
616
617 let db_name = &session.database();
618 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
619
620 let info = CreateTableInfo {
621 columns,
622 pk_column_ids,
623 row_id_index,
624 watermark_descs,
625 source_catalog: None,
626 version,
627 };
628
629 gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
630}
631
632fn gen_table_plan_with_source(
633 context: OptimizerContextRef,
634 schema_name: Option<String>,
635 source_catalog: SourceCatalog,
636 version: TableVersion,
637 props: CreateTableProps,
638) -> Result<(PlanRef, TableCatalog)> {
639 let table_name = source_catalog.name.clone();
640
641 let info = CreateTableInfo {
642 columns: source_catalog.columns.clone(),
643 pk_column_ids: source_catalog.pk_col_ids.clone(),
644 row_id_index: source_catalog.row_id_index,
645 watermark_descs: source_catalog.watermark_descs.clone(),
646 source_catalog: Some(source_catalog),
647 version,
648 };
649
650 gen_table_plan_inner(context, schema_name, table_name, info, props)
651}
652
653#[derive(Clone, Copy)]
655pub enum EitherOnConflict {
656 Ast(Option<OnConflict>),
657 Resolved(ConflictBehavior),
658}
659
660impl From<Option<OnConflict>> for EitherOnConflict {
661 fn from(v: Option<OnConflict>) -> Self {
662 Self::Ast(v)
663 }
664}
665
666impl From<ConflictBehavior> for EitherOnConflict {
667 fn from(v: ConflictBehavior) -> Self {
668 Self::Resolved(v)
669 }
670}
671
672impl EitherOnConflict {
673 pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
675 let conflict_behavior = match self {
676 EitherOnConflict::Ast(on_conflict) => {
677 if append_only {
678 if row_id_as_pk {
679 ConflictBehavior::NoCheck
681 } else {
682 if let Some(on_conflict) = on_conflict
684 && on_conflict != OnConflict::Nothing
685 {
686 return Err(ErrorCode::InvalidInputSyntax(
687 "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
688 the ON CONFLICT behavior must be DO NOTHING."
689 .to_owned(),
690 )
691 .into());
692 }
693 ConflictBehavior::IgnoreConflict
694 }
695 } else {
696 match on_conflict.unwrap_or(OnConflict::UpdateFull) {
698 OnConflict::UpdateFull => ConflictBehavior::Overwrite,
699 OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
700 OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
701 }
702 }
703 }
704 EitherOnConflict::Resolved(b) => b,
705 };
706
707 Ok(conflict_behavior)
708 }
709}
710
711pub struct CreateTableInfo {
716 pub columns: Vec<ColumnCatalog>,
717 pub pk_column_ids: Vec<ColumnId>,
718 pub row_id_index: Option<usize>,
719 pub watermark_descs: Vec<WatermarkDesc>,
720 pub source_catalog: Option<SourceCatalog>,
721 pub version: TableVersion,
722}
723
724pub struct CreateTableProps {
729 pub definition: String,
730 pub append_only: bool,
731 pub on_conflict: EitherOnConflict,
732 pub with_version_columns: Vec<String>,
733 pub webhook_info: Option<PbWebhookSourceInfo>,
734 pub engine: Engine,
735}
736
737#[allow(clippy::too_many_arguments)]
738fn gen_table_plan_inner(
739 context: OptimizerContextRef,
740 schema_name: Option<String>,
741 table_name: String,
742 info: CreateTableInfo,
743 props: CreateTableProps,
744) -> Result<(PlanRef, TableCatalog)> {
745 let CreateTableInfo {
746 ref columns,
747 row_id_index,
748 ref watermark_descs,
749 ref source_catalog,
750 ..
751 } = info;
752 let CreateTableProps { append_only, .. } = props;
753
754 let (database_id, schema_id) = context
755 .session_ctx()
756 .get_database_and_schema_id_for_create(schema_name)?;
757
758 let session = context.session_ctx().clone();
759 let retention_seconds = context.with_options().retention_seconds();
760
761 let source_node: LogicalPlanRef = LogicalSource::new(
762 source_catalog.clone().map(Rc::new),
763 columns.clone(),
764 row_id_index,
765 SourceNodeKind::CreateTable,
766 context.clone(),
767 None,
768 )?
769 .into();
770
771 let required_cols = FixedBitSet::with_capacity(columns.len());
772 let plan_root = PlanRoot::new_with_logical_plan(
773 source_node,
774 RequiredDist::Any,
775 Order::any(),
776 required_cols,
777 vec![],
778 );
779
780 let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
781
782 if !append_only && has_non_ttl_watermark {
783 return Err(ErrorCode::NotSupported(
784 "Defining watermarks on table requires the table to be append only.".to_owned(),
785 "Use the key words `APPEND ONLY`".to_owned(),
786 )
787 .into());
788 }
789
790 if !append_only && retention_seconds.is_some() {
791 if session
792 .config()
793 .unsafe_enable_storage_retention_for_non_append_only_tables()
794 {
795 tracing::warn!(
796 "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
797 table_name
798 );
799 const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
800 This may lead to stream inconsistency and unrecoverable \
801 node failure if there is any row INSERT/UPDATE/DELETE operation \
802 corresponding to the TTLed primary keys";
803 session.notice_to_user(NOTICE);
804 } else {
805 return Err(ErrorCode::NotSupported(
806 "Defining retention seconds on table requires the table to be append only."
807 .to_owned(),
808 "Use the key words `APPEND ONLY`".to_owned(),
809 )
810 .into());
811 }
812 }
813
814 let materialize =
815 plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
816
817 let mut table = materialize.table().clone();
818 table.owner = session.user_id();
819
820 Ok((materialize.into(), table))
821}
822
823#[allow(clippy::too_many_arguments)]
827pub(crate) fn gen_create_table_plan_for_cdc_table(
828 context: OptimizerContextRef,
829 source: Arc<SourceCatalog>,
830 external_table_name: String,
831 column_defs: Vec<ColumnDef>,
832 mut columns: Vec<ColumnCatalog>,
833 pk_names: Vec<String>,
834 cdc_with_options: WithOptionsSecResolved,
835 mut col_id_gen: ColumnIdGenerator,
836 on_conflict: Option<OnConflict>,
837 with_version_columns: Vec<String>,
838 include_column_options: IncludeOption,
839 table_name: ObjectName,
840 resolved_table_name: String, database_id: DatabaseId,
842 schema_id: SchemaId,
843 table_id: TableId,
844 engine: Engine,
845) -> Result<(PlanRef, TableCatalog)> {
846 let session = context.session_ctx().clone();
847
848 handle_addition_columns(
850 None,
851 &cdc_with_options,
852 include_column_options,
853 &mut columns,
854 true,
855 )?;
856
857 for c in &mut columns {
858 col_id_gen.generate(c)?;
859 }
860
861 let (mut columns, pk_column_ids, _row_id_index) =
862 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
863
864 bind_sql_column_constraints(
866 context.session_ctx(),
867 table_name.real_value(),
868 &mut columns,
869 &column_defs,
870 &pk_column_ids,
871 )?;
872
873 let definition = context.normalized_sql().to_owned();
874
875 let pk_column_indices = {
876 let mut id_to_idx = HashMap::new();
877 columns.iter().enumerate().for_each(|(idx, c)| {
878 id_to_idx.insert(c.column_id(), idx);
879 });
880 pk_column_ids
882 .iter()
883 .map(|c| id_to_idx.get(c).copied().unwrap())
884 .collect_vec()
885 };
886 let table_pk = pk_column_indices
887 .iter()
888 .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
889 .collect();
890
891 let (options, secret_refs) = cdc_with_options.into_parts();
892
893 let non_generated_column_descs = columns
894 .iter()
895 .filter(|&c| !c.is_generated())
896 .map(|c| c.column_desc.clone())
897 .collect_vec();
898 let non_generated_column_num = non_generated_column_descs.len();
899 let cdc_table_type = ExternalCdcTableType::from_properties(&options);
900 let cdc_table_desc = CdcTableDesc {
901 table_id,
902 source_id: source.id, external_table_name: external_table_name.clone(),
904 pk: table_pk,
905 columns: non_generated_column_descs,
906 stream_key: pk_column_indices,
907 connect_properties: options,
908 secret_refs,
909 };
910
911 tracing::debug!(?cdc_table_desc, "create cdc table");
912 let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
913
914 let logical_scan = LogicalCdcScan::create(
915 external_table_name.clone(),
916 Rc::new(cdc_table_desc),
917 context.clone(),
918 options,
919 );
920
921 let scan_node: LogicalPlanRef = logical_scan.into();
922 let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
923 let plan_root = PlanRoot::new_with_logical_plan(
924 scan_node,
925 RequiredDist::Any,
926 Order::any(),
927 required_cols,
928 vec![],
929 );
930
931 let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
932 let materialize = plan_root.gen_table_plan(
933 context,
934 resolved_table_name,
935 database_id,
936 schema_id,
937 CreateTableInfo {
938 columns,
939 pk_column_ids,
940 row_id_index: None,
941 watermark_descs: vec![],
942 source_catalog: Some((*source).clone()),
943 version: col_id_gen.into_version(),
944 },
945 CreateTableProps {
946 definition,
947 append_only: false,
948 on_conflict: on_conflict.into(),
949 with_version_columns,
950 webhook_info: None,
951 engine,
952 },
953 )?;
954
955 let mut table = materialize.table().clone();
956 table.owner = session.user_id();
957 table.cdc_table_id = Some(cdc_table_id);
958 table.cdc_table_type = Some(cdc_table_type);
959 Ok((materialize.into(), table))
960}
961
962fn derive_with_options_for_cdc_table(
970 source_with_properties: &WithOptionsSecResolved,
971 external_table_name: String,
972) -> Result<(WithOptionsSecResolved, String)> {
973 use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
974 let source_database_name: &str = source_with_properties
976 .get("database.name")
977 .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
978 .as_str();
979 let mut with_options = source_with_properties.clone();
980 if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
981 match connector.as_str() {
982 MYSQL_CDC_CONNECTOR => {
983 let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
986 anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
987 })?;
988 if !source_database_name
990 .split(',')
991 .map(|s| s.trim())
992 .any(|name| name == db_name)
993 {
994 return Err(anyhow!(
995 "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
996 db_name,
997 source_database_name
998 ).into());
999 }
1000 with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1001 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1002 return Ok((with_options, external_table_name));
1004 }
1005 POSTGRES_CDC_CONNECTOR => {
1006 let (schema_name, table_name) = external_table_name
1007 .split_once('.')
1008 .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1009
1010 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1012 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1013 return Ok((with_options, external_table_name));
1015 }
1016 SQL_SERVER_CDC_CONNECTOR => {
1017 let parts: Vec<&str> = external_table_name.split('.').collect();
1025 let (schema_name, table_name) = match parts.len() {
1026 3 => {
1027 let db_name = parts[0];
1030 let schema_name = parts[1];
1031 let table_name = parts[2];
1032
1033 if db_name != source_database_name {
1034 return Err(anyhow!(
1035 "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1036 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1037 db_name,
1038 source_database_name
1039 ).into());
1040 }
1041 (schema_name, table_name)
1042 }
1043 2 => {
1044 let schema_name = parts[0];
1047 let table_name = parts[1];
1048 (schema_name, table_name)
1049 }
1050 1 => {
1051 return Err(anyhow!(
1054 "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1055 Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1056 external_table_name,
1057 external_table_name,
1058 source_database_name,
1059 external_table_name
1060 ).into());
1061 }
1062 _ => {
1063 return Err(anyhow!(
1065 "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1066 external_table_name
1067 ).into());
1068 }
1069 };
1070
1071 with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1073 with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1074
1075 let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1078 return Ok((with_options, normalized_external_table_name));
1079 }
1080 _ => {
1081 return Err(RwError::from(anyhow!(
1082 "connector {} is not supported for cdc table",
1083 connector
1084 )));
1085 }
1086 };
1087 }
1088 unreachable!("All valid CDC connectors should have returned by now")
1089}
1090
1091#[allow(clippy::too_many_arguments)]
1092pub(super) async fn handle_create_table_plan(
1093 handler_args: HandlerArgs,
1094 explain_options: ExplainOptions,
1095 format_encode: Option<FormatEncodeOptions>,
1096 cdc_table_info: Option<CdcTableInfo>,
1097 table_name: &ObjectName,
1098 column_defs: Vec<ColumnDef>,
1099 wildcard_idx: Option<usize>,
1100 constraints: Vec<TableConstraint>,
1101 source_watermarks: Vec<SourceWatermark>,
1102 append_only: bool,
1103 on_conflict: Option<OnConflict>,
1104 with_version_columns: Vec<String>,
1105 include_column_options: IncludeOption,
1106 webhook_info: Option<WebhookSourceInfo>,
1107 engine: Engine,
1108) -> Result<(
1109 PlanRef,
1110 Option<SourceCatalog>,
1111 TableCatalog,
1112 TableJobType,
1113 Option<SourceId>,
1114)> {
1115 let col_id_gen = ColumnIdGenerator::new_initial();
1116 let format_encode = check_create_table_with_source(
1117 &handler_args.with_options,
1118 format_encode,
1119 &include_column_options,
1120 &cdc_table_info,
1121 )?;
1122 let webhook_info = webhook_info
1123 .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1124 .transpose()?;
1125
1126 let props = CreateTableProps {
1127 definition: handler_args.normalized_sql.clone(),
1128 append_only,
1129 on_conflict: on_conflict.into(),
1130 with_version_columns: with_version_columns.clone(),
1131 webhook_info,
1132 engine,
1133 };
1134
1135 let ((plan, source, table), job_type, shared_shource_id) = match (
1136 format_encode,
1137 cdc_table_info.as_ref(),
1138 ) {
1139 (Some(format_encode), None) => (
1140 gen_create_table_plan_with_source(
1141 handler_args,
1142 explain_options,
1143 table_name.clone(),
1144 column_defs,
1145 wildcard_idx,
1146 constraints,
1147 format_encode,
1148 source_watermarks,
1149 col_id_gen,
1150 include_column_options,
1151 props,
1152 SqlColumnStrategy::FollowChecked,
1153 )
1154 .await?,
1155 TableJobType::General,
1156 None,
1157 ),
1158 (None, None) => {
1159 let context = OptimizerContext::new(handler_args, explain_options);
1160 let (plan, table) = gen_create_table_plan(
1161 context,
1162 table_name.clone(),
1163 column_defs,
1164 constraints,
1165 col_id_gen,
1166 source_watermarks,
1167 props,
1168 false,
1169 )?;
1170
1171 ((plan, None, table), TableJobType::General, None)
1172 }
1173
1174 (None, Some(cdc_table)) => {
1175 sanity_check_for_table_on_cdc_source(
1176 append_only,
1177 &column_defs,
1178 &wildcard_idx,
1179 &constraints,
1180 &source_watermarks,
1181 )?;
1182
1183 generated_columns_check_for_cdc_table(&column_defs)?;
1184 not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1185
1186 let session = &handler_args.session;
1187 let db_name = &session.database();
1188 let user_name = &session.user_name();
1189 let search_path = session.config().search_path();
1190 let (schema_name, resolved_table_name) =
1191 Binder::resolve_schema_qualified_name(db_name, table_name)?;
1192 let (database_id, schema_id) =
1193 session.get_database_and_schema_id_for_create(schema_name.clone())?;
1194
1195 let (source_schema, source_name) =
1197 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1198
1199 let source = {
1200 let catalog_reader = session.env().catalog_reader().read_guard();
1201 let schema_path =
1202 SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1203
1204 let (source, _) = catalog_reader.get_source_by_name(
1205 db_name,
1206 schema_path,
1207 source_name.as_str(),
1208 )?;
1209 source.clone()
1210 };
1211 let (cdc_with_options, normalized_external_table_name) =
1212 derive_with_options_for_cdc_table(
1213 &source.with_properties,
1214 cdc_table.external_table_name.clone(),
1215 )?;
1216
1217 let (columns, pk_names) = match wildcard_idx {
1218 Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1219 None => {
1220 for column_def in &column_defs {
1221 for option_def in &column_def.options {
1222 if let ColumnOption::DefaultValue(_)
1223 | ColumnOption::DefaultValueInternal { .. } = option_def.option
1224 {
1225 return Err(ErrorCode::NotSupported(
1226 "Default value for columns defined on the table created from a CDC source".into(),
1227 "Remove the default value expression in the column definitions".into(),
1228 )
1229 .into());
1230 }
1231 }
1232 }
1233
1234 let (columns, pk_names) =
1235 bind_cdc_table_schema(&column_defs, &constraints, false)?;
1236 let (options, secret_refs) = cdc_with_options.clone().into_parts();
1238 let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1239 .context("failed to extract external table config")?;
1240
1241 (columns, pk_names)
1242 }
1243 };
1244
1245 let context: OptimizerContextRef =
1246 OptimizerContext::new(handler_args, explain_options).into();
1247 let shared_source_id = source.id;
1248 let (plan, table) = gen_create_table_plan_for_cdc_table(
1249 context,
1250 source,
1251 normalized_external_table_name,
1252 column_defs,
1253 columns,
1254 pk_names,
1255 cdc_with_options,
1256 col_id_gen,
1257 on_conflict,
1258 with_version_columns,
1259 include_column_options,
1260 table_name.clone(),
1261 resolved_table_name,
1262 database_id,
1263 schema_id,
1264 TableId::placeholder(),
1265 engine,
1266 )?;
1267
1268 (
1269 (plan, None, table),
1270 TableJobType::SharedCdcSource,
1271 Some(shared_source_id),
1272 )
1273 }
1274 (Some(_), Some(_)) => {
1275 return Err(ErrorCode::NotSupported(
1276 "Data format and encoding format doesn't apply to table created from a CDC source"
1277 .into(),
1278 "Remove the FORMAT and ENCODE specification".into(),
1279 )
1280 .into());
1281 }
1282 };
1283 Ok((plan, source, table, job_type, shared_shource_id))
1284}
1285
1286fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1288 let mut found_generated_column = false;
1289 for column in columns {
1290 let mut is_generated = false;
1291
1292 for option_def in &column.options {
1293 if let ColumnOption::GeneratedColumns(_) = option_def.option {
1294 is_generated = true;
1295 break;
1296 }
1297 }
1298
1299 if is_generated {
1300 found_generated_column = true;
1301 } else if found_generated_column {
1302 return Err(ErrorCode::NotSupported(
1303 "Non-generated column found after a generated column.".into(),
1304 "Ensure that all generated columns appear at the end of the cdc table definition."
1305 .into(),
1306 )
1307 .into());
1308 }
1309 }
1310 Ok(())
1311}
1312
1313fn not_null_check_for_cdc_table(
1315 wildcard_idx: &Option<usize>,
1316 column_defs: &Vec<ColumnDef>,
1317) -> Result<()> {
1318 if !wildcard_idx.is_some()
1319 && column_defs.iter().any(|col| {
1320 col.options
1321 .iter()
1322 .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1323 })
1324 {
1325 return Err(ErrorCode::NotSupported(
1326 "CDC table with NOT NULL constraint is not supported".to_owned(),
1327 "Please remove the NOT NULL constraint for columns".to_owned(),
1328 )
1329 .into());
1330 }
1331 Ok(())
1332}
1333
1334fn sanity_check_for_table_on_cdc_source(
1336 append_only: bool,
1337 column_defs: &Vec<ColumnDef>,
1338 wildcard_idx: &Option<usize>,
1339 constraints: &Vec<TableConstraint>,
1340 source_watermarks: &Vec<SourceWatermark>,
1341) -> Result<()> {
1342 if wildcard_idx.is_some() && !column_defs.is_empty() {
1344 return Err(ErrorCode::NotSupported(
1345 "wildcard(*) and column definitions cannot be used together".to_owned(),
1346 "Remove the wildcard or column definitions".to_owned(),
1347 )
1348 .into());
1349 }
1350
1351 if !wildcard_idx.is_some()
1353 && !constraints.iter().any(|c| {
1354 matches!(
1355 c,
1356 TableConstraint::Unique {
1357 is_primary: true,
1358 ..
1359 }
1360 )
1361 })
1362 && !column_defs.iter().any(|col| {
1363 col.options
1364 .iter()
1365 .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1366 })
1367 {
1368 return Err(ErrorCode::NotSupported(
1369 "CDC table without primary key constraint is not supported".to_owned(),
1370 "Please define a primary key".to_owned(),
1371 )
1372 .into());
1373 }
1374
1375 if append_only {
1376 return Err(ErrorCode::NotSupported(
1377 "append only modifier on the table created from a CDC source".into(),
1378 "Remove the APPEND ONLY clause".into(),
1379 )
1380 .into());
1381 }
1382
1383 if !source_watermarks.is_empty() {
1384 return Err(ErrorCode::NotSupported(
1385 "watermark defined on the table created from a CDC source".into(),
1386 "Remove the Watermark definitions".into(),
1387 )
1388 .into());
1389 }
1390
1391 Ok(())
1392}
1393
1394async fn bind_cdc_table_schema_externally(
1396 cdc_with_options: WithOptionsSecResolved,
1397) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1398 let (options, secret_refs) = cdc_with_options.into_parts();
1400 let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1401 .context("failed to extract external table config")?;
1402
1403 let table = ExternalTableImpl::connect(config)
1404 .await
1405 .context("failed to auto derive table schema")?;
1406
1407 Ok((
1408 table
1409 .column_descs()
1410 .iter()
1411 .cloned()
1412 .map(|column_desc| ColumnCatalog {
1413 column_desc,
1414 is_hidden: false,
1415 })
1416 .collect(),
1417 table.pk_names().clone(),
1418 ))
1419}
1420
1421fn bind_cdc_table_schema(
1423 column_defs: &Vec<ColumnDef>,
1424 constraints: &Vec<TableConstraint>,
1425 is_for_replace_plan: bool,
1426) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1427 let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1428
1429 let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1430 Ok((columns, pk_names))
1431}
1432
1433#[allow(clippy::too_many_arguments)]
1434pub async fn handle_create_table(
1435 handler_args: HandlerArgs,
1436 table_name: ObjectName,
1437 column_defs: Vec<ColumnDef>,
1438 wildcard_idx: Option<usize>,
1439 constraints: Vec<TableConstraint>,
1440 if_not_exists: bool,
1441 format_encode: Option<FormatEncodeOptions>,
1442 source_watermarks: Vec<SourceWatermark>,
1443 append_only: bool,
1444 on_conflict: Option<OnConflict>,
1445 with_version_columns: Vec<String>,
1446 cdc_table_info: Option<CdcTableInfo>,
1447 include_column_options: IncludeOption,
1448 webhook_info: Option<WebhookSourceInfo>,
1449 ast_engine: risingwave_sqlparser::ast::Engine,
1450) -> Result<RwPgResponse> {
1451 let session = handler_args.session.clone();
1452
1453 if append_only {
1454 session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1455 }
1456
1457 session.check_cluster_limits().await?;
1458
1459 let engine = match ast_engine {
1460 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1461 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1462 };
1463
1464 if let Either::Right(resp) = session.check_relation_name_duplicated(
1465 table_name.clone(),
1466 StatementType::CREATE_TABLE,
1467 if_not_exists,
1468 )? {
1469 return Ok(resp);
1470 }
1471
1472 let (graph, source, hummock_table, job_type, shared_source_id) = {
1473 let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1474 handler_args.clone(),
1475 ExplainOptions::default(),
1476 format_encode,
1477 cdc_table_info,
1478 &table_name,
1479 column_defs.clone(),
1480 wildcard_idx,
1481 constraints.clone(),
1482 source_watermarks,
1483 append_only,
1484 on_conflict,
1485 with_version_columns,
1486 include_column_options,
1487 webhook_info,
1488 engine,
1489 )
1490 .await?;
1491 tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1492
1493 let graph = build_graph(plan, Some(GraphJobType::Table))?;
1494
1495 (graph, source, table, job_type, shared_source_id)
1496 };
1497
1498 tracing::trace!(
1499 "name={}, graph=\n{}",
1500 table_name,
1501 serde_json::to_string_pretty(&graph).unwrap()
1502 );
1503
1504 let dependencies = shared_source_id
1505 .map(|id| HashSet::from([id.as_object_id()]))
1506 .unwrap_or_default();
1507
1508 match engine {
1510 Engine::Hummock => {
1511 let catalog_writer = session.catalog_writer()?;
1512 let action = match job_type {
1513 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1514 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1515 };
1516 execute_with_long_running_notification(
1517 catalog_writer.create_table(
1518 source.map(|s| s.to_prost()),
1519 hummock_table.to_prost(),
1520 graph,
1521 job_type,
1522 if_not_exists,
1523 dependencies,
1524 ),
1525 &session,
1526 "CREATE TABLE",
1527 action,
1528 )
1529 .await?;
1530 }
1531 Engine::Iceberg => {
1532 let hummock_table_name = hummock_table.name.clone();
1533 session.create_staging_table(hummock_table.clone());
1534 let res = create_iceberg_engine_table(
1535 session.clone(),
1536 handler_args,
1537 source.map(|s| s.to_prost()),
1538 hummock_table,
1539 graph,
1540 table_name,
1541 job_type,
1542 if_not_exists,
1543 )
1544 .await;
1545 session.drop_staging_table(&hummock_table_name);
1546 res?
1547 }
1548 }
1549
1550 Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1551}
1552
1553#[allow(clippy::too_many_arguments)]
1562pub async fn create_iceberg_engine_table(
1563 session: Arc<SessionImpl>,
1564 handler_args: HandlerArgs,
1565 mut source: Option<PbSource>,
1566 table: TableCatalog,
1567 graph: StreamFragmentGraph,
1568 table_name: ObjectName,
1569 job_type: PbTableJobType,
1570 if_not_exists: bool,
1571) -> Result<()> {
1572 let rw_db_name = session
1573 .env()
1574 .catalog_reader()
1575 .read_guard()
1576 .get_database_by_id(table.database_id)?
1577 .name()
1578 .to_owned();
1579 let rw_schema_name = session
1580 .env()
1581 .catalog_reader()
1582 .read_guard()
1583 .get_schema_by_id(table.database_id, table.schema_id)?
1584 .name()
1585 .clone();
1586 let iceberg_catalog_name = rw_db_name.clone();
1587 let iceberg_database_name = rw_schema_name.clone();
1588 let iceberg_table_name = table_name.0.last().unwrap().real_value();
1589
1590 let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1591 let sink_decouple = session.config().sink_decouple();
1592 if matches!(sink_decouple, SinkDecouple::Disable) {
1593 bail!(
1594 "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1595 );
1596 }
1597
1598 let mut connection_ref = BTreeMap::new();
1599 let with_common = if iceberg_engine_connection.is_empty() {
1600 bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1601 } else {
1602 let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1603 assert_eq!(parts.len(), 2);
1604 let connection_catalog =
1605 session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1606 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1607 if params.connection_type == ConnectionType::Iceberg as i32 {
1608 connection_ref.insert(
1610 "connection".to_owned(),
1611 ConnectionRefValue {
1612 connection_name: ObjectName::from(vec![
1613 Ident::from(parts[0]),
1614 Ident::from(parts[1]),
1615 ]),
1616 },
1617 );
1618
1619 let mut with_common = BTreeMap::new();
1620 with_common.insert("connector".to_owned(), "iceberg".to_owned());
1621 with_common.insert("database.name".to_owned(), iceberg_database_name);
1622 with_common.insert("table.name".to_owned(), iceberg_table_name);
1623
1624 let hosted_catalog = params
1625 .properties
1626 .get("hosted_catalog")
1627 .map(|s| s.eq_ignore_ascii_case("true"))
1628 .unwrap_or(false);
1629 if hosted_catalog {
1630 let meta_client = session.env().meta_client();
1631 let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1632
1633 let meta_store_endpoint =
1634 url::Url::parse(&meta_store_endpoint).map_err(|_| {
1635 ErrorCode::InternalError(
1636 "failed to parse the meta store endpoint".to_owned(),
1637 )
1638 })?;
1639 let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1640 let meta_store_user = meta_store_endpoint.username().to_owned();
1641 let meta_store_password = match meta_store_endpoint.password() {
1642 Some(password) => percent_decode_str(password)
1643 .decode_utf8()
1644 .map_err(|_| {
1645 ErrorCode::InternalError(
1646 "failed to parse password from meta store endpoint".to_owned(),
1647 )
1648 })?
1649 .into_owned(),
1650 None => "".to_owned(),
1651 };
1652 let meta_store_host = meta_store_endpoint
1653 .host_str()
1654 .ok_or_else(|| {
1655 ErrorCode::InternalError(
1656 "failed to parse host from meta store endpoint".to_owned(),
1657 )
1658 })?
1659 .to_owned();
1660 let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1661 ErrorCode::InternalError(
1662 "failed to parse port from meta store endpoint".to_owned(),
1663 )
1664 })?;
1665 let meta_store_database = meta_store_endpoint
1666 .path()
1667 .trim_start_matches('/')
1668 .to_owned();
1669
1670 let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1671 bail!("failed to parse meta backend: {}", meta_store_backend);
1672 };
1673
1674 let catalog_uri = match meta_backend {
1675 MetaBackend::Postgres => {
1676 format!(
1677 "jdbc:postgresql://{}:{}/{}",
1678 meta_store_host, meta_store_port, meta_store_database
1679 )
1680 }
1681 MetaBackend::Mysql => {
1682 format!(
1683 "jdbc:mysql://{}:{}/{}",
1684 meta_store_host, meta_store_port, meta_store_database
1685 )
1686 }
1687 MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1688 bail!(
1689 "Unsupported meta backend for iceberg engine table: {}",
1690 meta_store_backend
1691 );
1692 }
1693 };
1694
1695 with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1696 with_common.insert("catalog.uri".to_owned(), catalog_uri);
1697 with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1698 with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1699 with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1700 }
1701
1702 with_common
1703 } else {
1704 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1705 "Only iceberg connection could be used in iceberg engine".to_owned(),
1706 )));
1707 }
1708 } else {
1709 return Err(RwError::from(ErrorCode::InvalidParameterValue(
1710 "Private Link Service has been deprecated. Please create a new connection instead."
1711 .to_owned(),
1712 )));
1713 }
1714 };
1715
1716 let mut pks = table
1719 .pk_column_names()
1720 .iter()
1721 .map(|c| c.to_string())
1722 .collect::<Vec<String>>();
1723
1724 let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1726 pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1727 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1728 "select {} as {}, * from {}",
1729 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1730 ))
1731 .context("unable to parse query")?
1732 .try_into()
1733 .unwrap();
1734
1735 let Statement::Query(query) = &stmt else {
1736 panic!("unexpected statement: {:?}", stmt);
1737 };
1738 CreateSink::AsQuery(query.clone())
1739 } else {
1740 CreateSink::From(table_name.clone())
1741 };
1742
1743 let mut sink_name = table_name.clone();
1744 *sink_name.0.last_mut().unwrap() = Ident::from(
1745 (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1746 );
1747 let create_sink_stmt = CreateSinkStatement {
1748 if_not_exists: false,
1749 sink_name,
1750 with_properties: WithProperties(vec![]),
1751 sink_from,
1752 columns: vec![],
1753 emit_mode: None,
1754 sink_schema: None,
1755 into_table_name: None,
1756 };
1757
1758 let mut sink_handler_args = handler_args.clone();
1759
1760 let mut sink_with = with_common.clone();
1761
1762 if table.append_only {
1763 sink_with.insert("type".to_owned(), "append-only".to_owned());
1764 } else {
1765 sink_with.insert("primary_key".to_owned(), pks.join(","));
1766 sink_with.insert("type".to_owned(), "upsert".to_owned());
1767 }
1768 let commit_checkpoint_interval = handler_args
1787 .with_options
1788 .get(COMMIT_CHECKPOINT_INTERVAL)
1789 .map(|v| v.to_owned())
1790 .unwrap_or_else(|| "60".to_owned());
1791 let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1792 ErrorCode::InvalidInputSyntax(format!(
1793 "commit_checkpoint_interval must be greater than 0: {}",
1794 commit_checkpoint_interval
1795 ))
1796 })?;
1797
1798 if commit_checkpoint_interval == 0 {
1799 bail!("commit_checkpoint_interval must be greater than 0");
1800 }
1801
1802 source
1804 .as_mut()
1805 .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1806
1807 let sink_decouple = session.config().sink_decouple();
1808 if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1809 bail!(
1810 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1811 )
1812 }
1813
1814 sink_with.insert(
1815 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1816 commit_checkpoint_interval.to_string(),
1817 );
1818 sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1819
1820 sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1821
1822 if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1823 match enable_compaction.to_lowercase().as_str() {
1824 "true" => {
1825 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1826 }
1827 "false" => {
1828 sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1829 }
1830 _ => {
1831 return Err(ErrorCode::InvalidInputSyntax(format!(
1832 "enable_compaction must be true or false: {}",
1833 enable_compaction
1834 ))
1835 .into());
1836 }
1837 }
1838
1839 source
1841 .as_mut()
1842 .map(|x| x.with_properties.remove("enable_compaction"));
1843 } else {
1844 sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1845 }
1846
1847 if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1848 let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1849 ErrorCode::InvalidInputSyntax(format!(
1850 "compaction_interval_sec must be greater than 0: {}",
1851 commit_checkpoint_interval
1852 ))
1853 })?;
1854 if compaction_interval_sec == 0 {
1855 bail!("compaction_interval_sec must be greater than 0");
1856 }
1857 sink_with.insert(
1858 "compaction_interval_sec".to_owned(),
1859 compaction_interval_sec.to_string(),
1860 );
1861 source
1863 .as_mut()
1864 .map(|x| x.with_properties.remove("compaction_interval_sec"));
1865 }
1866
1867 let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1868 handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1869 {
1870 source
1872 .as_mut()
1873 .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1874 match enable_snapshot_expiration.to_lowercase().as_str() {
1875 "true" => {
1876 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1877 true
1878 }
1879 "false" => {
1880 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1881 false
1882 }
1883 _ => {
1884 return Err(ErrorCode::InvalidInputSyntax(format!(
1885 "enable_snapshot_expiration must be true or false: {}",
1886 enable_snapshot_expiration
1887 ))
1888 .into());
1889 }
1890 }
1891 } else {
1892 sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1893 true
1894 };
1895
1896 if has_enabled_snapshot_expiration {
1897 if let Some(snapshot_expiration_retain_last) = handler_args
1899 .with_options
1900 .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1901 {
1902 sink_with.insert(
1903 SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1904 snapshot_expiration_retain_last.to_owned(),
1905 );
1906 source
1908 .as_mut()
1909 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1910 }
1911
1912 if let Some(snapshot_expiration_max_age) = handler_args
1913 .with_options
1914 .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1915 {
1916 sink_with.insert(
1917 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1918 snapshot_expiration_max_age.to_owned(),
1919 );
1920 source
1922 .as_mut()
1923 .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1924 }
1925
1926 if let Some(snapshot_expiration_clear_expired_files) = handler_args
1927 .with_options
1928 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1929 {
1930 sink_with.insert(
1931 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1932 snapshot_expiration_clear_expired_files.to_owned(),
1933 );
1934 source.as_mut().map(|x| {
1936 x.with_properties
1937 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1938 });
1939 }
1940
1941 if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1942 .with_options
1943 .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1944 {
1945 sink_with.insert(
1946 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1947 snapshot_expiration_clear_expired_meta_data.to_owned(),
1948 );
1949 source.as_mut().map(|x| {
1951 x.with_properties
1952 .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1953 });
1954 }
1955 }
1956
1957 if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1958 let format_version = format_version.parse::<u8>().map_err(|_| {
1959 ErrorCode::InvalidInputSyntax(format!(
1960 "format_version must be 1, 2 or 3: {}",
1961 format_version
1962 ))
1963 })?;
1964 if format_version != 1 && format_version != 2 && format_version != 3 {
1965 bail!("format_version must be 1, 2 or 3");
1966 }
1967 sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
1968
1969 source
1971 .as_mut()
1972 .map(|x| x.with_properties.remove(FORMAT_VERSION));
1973 }
1974
1975 if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1976 let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1977 ErrorCode::InvalidInputSyntax(format!(
1978 "invalid write_mode: {}, must be one of: {}, {}",
1979 write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1980 ))
1981 })?;
1982
1983 match write_mode {
1984 IcebergWriteMode::MergeOnRead => {
1985 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1986 }
1987
1988 IcebergWriteMode::CopyOnWrite => {
1989 if table.append_only {
1990 return Err(ErrorCode::NotSupported(
1991 "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1992 "Please use MERGE ON READ instead".to_owned(),
1993 )
1994 .into());
1995 }
1996
1997 sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1998 }
1999 }
2000
2001 source
2003 .as_mut()
2004 .map(|x| x.with_properties.remove("write_mode"));
2005 } else {
2006 sink_with.insert(
2007 WRITE_MODE.to_owned(),
2008 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2009 );
2010 }
2011
2012 if let Some(max_snapshots_num_before_compaction) =
2013 handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2014 {
2015 let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2016 .parse::<u32>()
2017 .map_err(|_| {
2018 ErrorCode::InvalidInputSyntax(format!(
2019 "{} must be greater than 0: {}",
2020 COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2021 ))
2022 })?;
2023
2024 if max_snapshots_num_before_compaction == 0 {
2025 bail!(format!(
2026 "{} must be greater than 0",
2027 COMPACTION_MAX_SNAPSHOTS_NUM
2028 ));
2029 }
2030
2031 sink_with.insert(
2032 COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2033 max_snapshots_num_before_compaction.to_string(),
2034 );
2035
2036 source
2038 .as_mut()
2039 .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2040 }
2041
2042 if let Some(small_files_threshold_mb) = handler_args
2043 .with_options
2044 .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2045 {
2046 let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2047 ErrorCode::InvalidInputSyntax(format!(
2048 "{} must be greater than 0: {}",
2049 COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2050 ))
2051 })?;
2052 if small_files_threshold_mb == 0 {
2053 bail!(format!(
2054 "{} must be a greater than 0",
2055 COMPACTION_SMALL_FILES_THRESHOLD_MB
2056 ));
2057 }
2058 sink_with.insert(
2059 COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2060 small_files_threshold_mb.to_string(),
2061 );
2062
2063 source.as_mut().map(|x| {
2065 x.with_properties
2066 .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2067 });
2068 }
2069
2070 if let Some(delete_files_count_threshold) = handler_args
2071 .with_options
2072 .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2073 {
2074 let delete_files_count_threshold =
2075 delete_files_count_threshold.parse::<usize>().map_err(|_| {
2076 ErrorCode::InvalidInputSyntax(format!(
2077 "{} must be greater than 0: {}",
2078 COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2079 ))
2080 })?;
2081 if delete_files_count_threshold == 0 {
2082 bail!(format!(
2083 "{} must be greater than 0",
2084 COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2085 ));
2086 }
2087 sink_with.insert(
2088 COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2089 delete_files_count_threshold.to_string(),
2090 );
2091
2092 source.as_mut().map(|x| {
2094 x.with_properties
2095 .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2096 });
2097 }
2098
2099 if let Some(trigger_snapshot_count) = handler_args
2100 .with_options
2101 .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2102 {
2103 let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2104 ErrorCode::InvalidInputSyntax(format!(
2105 "{} must be greater than 0: {}",
2106 COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2107 ))
2108 })?;
2109 if trigger_snapshot_count == 0 {
2110 bail!(format!(
2111 "{} must be greater than 0",
2112 COMPACTION_TRIGGER_SNAPSHOT_COUNT
2113 ));
2114 }
2115 sink_with.insert(
2116 COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2117 trigger_snapshot_count.to_string(),
2118 );
2119
2120 source
2122 .as_mut()
2123 .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2124 }
2125
2126 if let Some(target_file_size_mb) = handler_args
2127 .with_options
2128 .get(COMPACTION_TARGET_FILE_SIZE_MB)
2129 {
2130 let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2131 ErrorCode::InvalidInputSyntax(format!(
2132 "{} must be greater than 0: {}",
2133 COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2134 ))
2135 })?;
2136 if target_file_size_mb == 0 {
2137 bail!(format!(
2138 "{} must be greater than 0",
2139 COMPACTION_TARGET_FILE_SIZE_MB
2140 ));
2141 }
2142 sink_with.insert(
2143 COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2144 target_file_size_mb.to_string(),
2145 );
2146 source
2148 .as_mut()
2149 .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2150 }
2151
2152 if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2153 let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2154 ErrorCode::InvalidInputSyntax(format!(
2155 "invalid compaction_type: {}, must be one of {:?}",
2156 compaction_type,
2157 &[
2158 CompactionType::Full,
2159 CompactionType::SmallFiles,
2160 CompactionType::FilesWithDelete
2161 ]
2162 ))
2163 })?;
2164
2165 sink_with.insert(
2166 COMPACTION_TYPE.to_owned(),
2167 compaction_type.as_str().to_owned(),
2168 );
2169
2170 source
2172 .as_mut()
2173 .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2174 }
2175
2176 if let Some(write_parquet_compression) = handler_args
2177 .with_options
2178 .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2179 {
2180 sink_with.insert(
2181 COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2182 write_parquet_compression.to_owned(),
2183 );
2184 source.as_mut().map(|x| {
2186 x.with_properties
2187 .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2188 });
2189 }
2190
2191 if let Some(write_parquet_max_row_group_rows) = handler_args
2192 .with_options
2193 .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2194 {
2195 let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2196 .parse::<usize>()
2197 .map_err(|_| {
2198 ErrorCode::InvalidInputSyntax(format!(
2199 "{} must be a positive integer: {}",
2200 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2201 ))
2202 })?;
2203 if write_parquet_max_row_group_rows == 0 {
2204 bail!(format!(
2205 "{} must be greater than 0",
2206 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2207 ));
2208 }
2209 sink_with.insert(
2210 COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2211 write_parquet_max_row_group_rows.to_string(),
2212 );
2213 source.as_mut().map(|x| {
2215 x.with_properties
2216 .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2217 });
2218 }
2219
2220 let partition_by = handler_args
2221 .with_options
2222 .get("partition_by")
2223 .map(|v| v.to_owned());
2224
2225 if let Some(partition_by) = &partition_by {
2226 let mut partition_columns = vec![];
2227 for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2228 table
2229 .columns()
2230 .iter()
2231 .find(|col| col.name().eq_ignore_ascii_case(&column))
2232 .ok_or_else(|| {
2233 ErrorCode::InvalidInputSyntax(format!(
2234 "Partition source column does not exist in schema: {}",
2235 column
2236 ))
2237 })?;
2238
2239 partition_columns.push(column);
2240 }
2241
2242 ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2243 |_| {
2244 ErrorCode::InvalidInputSyntax(
2245 "The partition columns should be the prefix of the primary key".to_owned(),
2246 )
2247 },
2248 )?;
2249
2250 sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2251
2252 source
2254 .as_mut()
2255 .map(|x| x.with_properties.remove("partition_by"));
2256 }
2257
2258 sink_handler_args.with_options =
2259 WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2260 let SinkPlanContext {
2261 sink_plan,
2262 sink_catalog,
2263 ..
2264 } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2265 let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2266
2267 let mut source_name = table_name.clone();
2268 *source_name.0.last_mut().unwrap() = Ident::from(
2269 (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2270 );
2271 let create_source_stmt = CreateSourceStatement {
2272 temporary: false,
2273 if_not_exists: false,
2274 columns: vec![],
2275 source_name,
2276 wildcard_idx: Some(0),
2277 constraints: vec![],
2278 with_properties: WithProperties(vec![]),
2279 format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2280 source_watermarks: vec![],
2281 include_column_options: vec![],
2282 };
2283
2284 let mut source_handler_args = handler_args.clone();
2285 let source_with = with_common;
2286 source_handler_args.with_options =
2287 WithOptions::new(source_with, Default::default(), connection_ref);
2288
2289 let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2290 let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2291 let (with_properties, refresh_mode) =
2292 bind_connector_props(&source_handler_args, &format_encode, true)?;
2293
2294 let (iceberg_catalog, table_identifier) = {
2297 let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2298 let iceberg_sink = IcebergSink::try_from(sink_param)?;
2299 iceberg_sink.create_table_if_not_exists().await?;
2300
2301 let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2302 let table_identifier = iceberg_sink.config.full_table_name()?;
2303 (iceberg_catalog, table_identifier)
2304 };
2305
2306 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2307 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2308 &session,
2309 &format_encode,
2310 Either::Left(&with_properties),
2311 create_source_type,
2312 )
2313 .await?;
2314 let mut col_id_gen = ColumnIdGenerator::new_initial();
2315
2316 let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2317 source_handler_args,
2318 create_source_stmt.source_name,
2319 format_encode,
2320 with_properties,
2321 &create_source_stmt.columns,
2322 create_source_stmt.constraints,
2323 create_source_stmt.wildcard_idx,
2324 create_source_stmt.source_watermarks,
2325 columns_from_resolve_source,
2326 source_info,
2327 create_source_stmt.include_column_options,
2328 &mut col_id_gen,
2329 create_source_type,
2330 overwrite_options.source_rate_limit,
2331 SqlColumnStrategy::FollowChecked,
2332 refresh_mode,
2333 )
2334 .await?;
2335
2336 let _ = Jvm::get_or_init()?;
2339
2340 let catalog_writer = session.catalog_writer()?;
2341 let action = match job_type {
2342 TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2343 _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2344 };
2345 let res = execute_with_long_running_notification(
2346 catalog_writer.create_iceberg_table(
2347 PbTableJobInfo {
2348 source,
2349 table: Some(table.to_prost()),
2350 fragment_graph: Some(graph),
2351 job_type: job_type as _,
2352 },
2353 PbSinkJobInfo {
2354 sink: Some(sink_catalog.to_proto()),
2355 fragment_graph: Some(sink_graph),
2356 },
2357 iceberg_source_catalog.to_prost(),
2358 if_not_exists,
2359 ),
2360 &session,
2361 "CREATE TABLE",
2362 action,
2363 )
2364 .await;
2365
2366 if res.is_err() {
2367 let _ = iceberg_catalog
2368 .drop_table(&table_identifier)
2369 .await
2370 .inspect_err(|err| {
2371 tracing::error!(
2372 "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2373 table_identifier,
2374 err.as_report()
2375 );
2376 });
2377 res?
2378 }
2379
2380 Ok(())
2381}
2382
2383pub fn check_create_table_with_source(
2384 with_options: &WithOptions,
2385 format_encode: Option<FormatEncodeOptions>,
2386 include_column_options: &IncludeOption,
2387 cdc_table_info: &Option<CdcTableInfo>,
2388) -> Result<Option<FormatEncodeOptions>> {
2389 if cdc_table_info.is_some() {
2391 return Ok(format_encode);
2392 }
2393 let defined_source = with_options.is_source_connector();
2394
2395 if !include_column_options.is_empty() && !defined_source {
2396 return Err(ErrorCode::InvalidInputSyntax(
2397 "INCLUDE should be used with a connector".to_owned(),
2398 )
2399 .into());
2400 }
2401 if defined_source {
2402 format_encode.as_ref().ok_or_else(|| {
2403 ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2404 })?;
2405 }
2406 Ok(format_encode)
2407}
2408
2409fn ensure_partition_columns_are_prefix_of_primary_key(
2410 partition_columns: &[String],
2411 primary_key_columns: &[String],
2412) -> std::result::Result<(), String> {
2413 if partition_columns.len() > primary_key_columns.len() {
2414 return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2415 }
2416
2417 for (i, partition_col) in partition_columns.iter().enumerate() {
2418 if primary_key_columns.get(i) != Some(partition_col) {
2419 return Err(format!(
2420 "Partition column '{}' is not a prefix of the primary key.",
2421 partition_col
2422 ));
2423 }
2424 }
2425
2426 Ok(())
2427}
2428
2429#[allow(clippy::too_many_arguments)]
2430pub async fn generate_stream_graph_for_replace_table(
2431 _session: &Arc<SessionImpl>,
2432 table_name: ObjectName,
2433 original_catalog: &Arc<TableCatalog>,
2434 handler_args: HandlerArgs,
2435 statement: Statement,
2436 col_id_gen: ColumnIdGenerator,
2437 sql_column_strategy: SqlColumnStrategy,
2438) -> Result<(
2439 StreamFragmentGraph,
2440 TableCatalog,
2441 Option<SourceCatalog>,
2442 TableJobType,
2443)> {
2444 let Statement::CreateTable {
2445 columns,
2446 constraints,
2447 source_watermarks,
2448 append_only,
2449 on_conflict,
2450 with_version_columns,
2451 wildcard_idx,
2452 cdc_table_info,
2453 format_encode,
2454 include_column_options,
2455 engine,
2456 with_options,
2457 ..
2458 } = statement
2459 else {
2460 panic!("unexpected statement type: {:?}", statement);
2461 };
2462
2463 let format_encode = format_encode
2464 .clone()
2465 .map(|format_encode| format_encode.into_v2_with_warning());
2466
2467 let engine = match engine {
2468 risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2469 risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2470 };
2471
2472 let is_drop_connector =
2473 original_catalog.associated_source_id().is_some() && format_encode.is_none();
2474 if is_drop_connector {
2475 debug_assert!(
2476 source_watermarks.is_empty()
2477 && include_column_options.is_empty()
2478 && with_options
2479 .iter()
2480 .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2481 );
2482 }
2483
2484 let props = CreateTableProps {
2485 definition: handler_args.normalized_sql.clone(),
2486 append_only,
2487 on_conflict: on_conflict.into(),
2488 with_version_columns: with_version_columns
2489 .iter()
2490 .map(|col| col.real_value())
2491 .collect(),
2492 webhook_info: original_catalog.webhook_info.clone(),
2493 engine,
2494 };
2495
2496 let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2497 (Some(format_encode), None) => (
2498 gen_create_table_plan_with_source(
2499 handler_args,
2500 ExplainOptions::default(),
2501 table_name,
2502 columns,
2503 wildcard_idx,
2504 constraints,
2505 format_encode,
2506 source_watermarks,
2507 col_id_gen,
2508 include_column_options,
2509 props,
2510 sql_column_strategy,
2511 )
2512 .await?,
2513 TableJobType::General,
2514 ),
2515 (None, None) => {
2516 let context = OptimizerContext::from_handler_args(handler_args);
2517 let (plan, table) = gen_create_table_plan(
2518 context,
2519 table_name,
2520 columns,
2521 constraints,
2522 col_id_gen,
2523 source_watermarks,
2524 props,
2525 true,
2526 )?;
2527 ((plan, None, table), TableJobType::General)
2528 }
2529 (None, Some(cdc_table)) => {
2530 let session = &handler_args.session;
2531 let (source, resolved_table_name) =
2532 get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2533
2534 let (cdc_with_options, normalized_external_table_name) =
2535 derive_with_options_for_cdc_table(
2536 &source.with_properties,
2537 cdc_table.external_table_name.clone(),
2538 )?;
2539
2540 let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2541
2542 let context: OptimizerContextRef =
2543 OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2544 let (plan, table) = gen_create_table_plan_for_cdc_table(
2545 context,
2546 source,
2547 normalized_external_table_name,
2548 columns,
2549 column_catalogs,
2550 pk_names,
2551 cdc_with_options,
2552 col_id_gen,
2553 on_conflict,
2554 with_version_columns
2555 .iter()
2556 .map(|col| col.real_value())
2557 .collect(),
2558 include_column_options,
2559 table_name,
2560 resolved_table_name,
2561 original_catalog.database_id,
2562 original_catalog.schema_id,
2563 original_catalog.id(),
2564 engine,
2565 )?;
2566
2567 ((plan, None, table), TableJobType::SharedCdcSource)
2568 }
2569 (Some(_), Some(_)) => {
2570 return Err(ErrorCode::NotSupported(
2571 "Data format and encoding format doesn't apply to table created from a CDC source"
2572 .into(),
2573 "Remove the FORMAT and ENCODE specification".into(),
2574 )
2575 .into());
2576 }
2577 };
2578
2579 if table.pk_column_ids() != original_catalog.pk_column_ids() {
2580 Err(ErrorCode::InvalidInputSyntax(
2581 "alter primary key of table is not supported".to_owned(),
2582 ))?
2583 }
2584
2585 let graph = build_graph(plan, Some(GraphJobType::Table))?;
2586
2587 table.id = original_catalog.id();
2589 if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2590 table.associated_source_id = Some(source_id);
2591
2592 let source = source.as_mut().unwrap();
2593 source.id = source_id;
2594 source.associated_table_id = Some(table.id());
2595 }
2596
2597 Ok((graph, table, source, job_type))
2598}
2599
2600fn get_source_and_resolved_table_name(
2601 session: &Arc<SessionImpl>,
2602 cdc_table: CdcTableInfo,
2603 table_name: ObjectName,
2604) -> Result<(Arc<SourceCatalog>, String)> {
2605 let db_name = &session.database();
2606 let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2607
2608 let (source_schema, source_name) =
2609 Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2610
2611 let source = {
2612 let catalog_reader = session.env().catalog_reader().read_guard();
2613 let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2614 let (source, _) = catalog_reader.get_source_by_name(
2615 db_name,
2616 SchemaPath::Name(schema_name.as_str()),
2617 source_name.as_str(),
2618 )?;
2619 source.clone()
2620 };
2621
2622 Ok((source, resolved_table_name))
2623}
2624
2625fn bind_webhook_info(
2627 session: &Arc<SessionImpl>,
2628 columns_defs: &[ColumnDef],
2629 webhook_info: WebhookSourceInfo,
2630) -> Result<PbWebhookSourceInfo> {
2631 if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2633 {
2634 return Err(ErrorCode::InvalidInputSyntax(
2635 "Table with webhook source should have exactly one JSONB column".to_owned(),
2636 )
2637 .into());
2638 }
2639
2640 let WebhookSourceInfo {
2641 secret_ref,
2642 signature_expr,
2643 wait_for_persistence,
2644 is_batched,
2645 } = webhook_info;
2646
2647 let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2649 let db_name = &session.database();
2650 let (schema_name, secret_name) =
2651 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2652 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2653 (
2654 Some(PbSecretRef {
2655 secret_id: secret_catalog.id,
2656 ref_as: match secret_ref.ref_as {
2657 SecretRefAsType::Text => PbRefAsType::Text,
2658 SecretRefAsType::File => PbRefAsType::File,
2659 }
2660 .into(),
2661 }),
2662 Some(secret_name),
2663 )
2664 } else {
2665 (None, None)
2666 };
2667
2668 let signature_expr = if let Some(signature_expr) = signature_expr {
2669 let secure_compare_context = SecureCompareContext {
2670 column_name: columns_defs[0].name.real_value(),
2671 secret_name,
2672 };
2673 let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2674 let expr = binder.bind_expr(&signature_expr)?;
2675
2676 if expr.as_function_call().is_none()
2678 || expr.as_function_call().unwrap().func_type()
2679 != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2680 {
2681 return Err(ErrorCode::InvalidInputSyntax(
2682 "The signature verification function must be SECURE_COMPARE()".to_owned(),
2683 )
2684 .into());
2685 }
2686
2687 Some(expr.to_expr_proto())
2688 } else {
2689 session.notice_to_user(
2690 "VALIDATE clause is strongly recommended for safety or production usages",
2691 );
2692 None
2693 };
2694
2695 let pb_webhook_info = PbWebhookSourceInfo {
2696 secret_ref: pb_secret_ref,
2697 signature_expr,
2698 wait_for_persistence,
2699 is_batched,
2700 };
2701
2702 Ok(pb_webhook_info)
2703}
2704
2705#[cfg(test)]
2706mod tests {
2707 use risingwave_common::catalog::{
2708 DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2709 };
2710 use risingwave_common::types::{DataType, StructType};
2711
2712 use super::*;
2713 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2714
2715 #[tokio::test]
2716 async fn test_create_table_handler() {
2717 let sql =
2718 "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2719 let frontend = LocalFrontend::new(Default::default()).await;
2720 frontend.run_sql(sql).await.unwrap();
2721
2722 let session = frontend.session_ref();
2723 let catalog_reader = session.env().catalog_reader().read_guard();
2724 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2725
2726 let (table, _) = catalog_reader
2728 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2729 .unwrap();
2730 assert_eq!(table.name(), "t");
2731
2732 let columns = table
2733 .columns
2734 .iter()
2735 .map(|col| (col.name(), col.data_type().clone()))
2736 .collect::<HashMap<&str, DataType>>();
2737
2738 let expected_columns = maplit::hashmap! {
2739 ROW_ID_COLUMN_NAME => DataType::Serial,
2740 "v1" => DataType::Int16,
2741 "v2" => StructType::new(
2742 vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2743 )
2744 .with_ids([3, 4, 5].map(ColumnId::new))
2745 .into(),
2746 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2747 };
2748
2749 assert_eq!(columns, expected_columns, "{columns:#?}");
2750 }
2751
2752 #[test]
2753 fn test_bind_primary_key() {
2754 for (sql, expected) in [
2757 ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2758 ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2759 ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2760 (
2761 "create table t (v1 int primary key, v2 int primary key)",
2762 Err("multiple primary keys are not allowed"),
2763 ),
2764 (
2765 "create table t (v1 int primary key primary key, v2 int)",
2766 Err("multiple primary keys are not allowed"),
2767 ),
2768 (
2769 "create table t (v1 int, v2 int, primary key (v1))",
2770 Ok(&[1]),
2771 ),
2772 (
2773 "create table t (v1 int, primary key (v2), v2 int)",
2774 Ok(&[2]),
2775 ),
2776 (
2777 "create table t (primary key (v2, v1), v1 int, v2 int)",
2778 Ok(&[2, 1]),
2779 ),
2780 (
2781 "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2782 Err("multiple primary keys are not allowed"),
2783 ),
2784 (
2785 "create table t (v1 int primary key, primary key (v1), v2 int)",
2786 Err("multiple primary keys are not allowed"),
2787 ),
2788 (
2789 "create table t (v1 int, primary key (V3), v2 int)",
2790 Err("column \"v3\" named in key does not exist"),
2791 ),
2792 ] {
2793 let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2794 let risingwave_sqlparser::ast::Statement::CreateTable {
2795 columns: column_defs,
2796 constraints,
2797 ..
2798 } = ast.remove(0)
2799 else {
2800 panic!("test case should be create table")
2801 };
2802 let actual: Result<_> = (|| {
2803 let mut columns = bind_sql_columns(&column_defs, false)?;
2804 let mut col_id_gen = ColumnIdGenerator::new_initial();
2805 for c in &mut columns {
2806 col_id_gen.generate(c)?;
2807 }
2808
2809 let pk_names =
2810 bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2811 let (_, pk_column_ids, _) =
2812 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2813 Ok(pk_column_ids)
2814 })();
2815 match (expected, actual) {
2816 (Ok(expected), Ok(actual)) => assert_eq!(
2817 expected.iter().copied().map(ColumnId::new).collect_vec(),
2818 actual,
2819 "sql: {sql}"
2820 ),
2821 (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2822 (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2823 (Err(expected), Err(actual)) => assert!(
2824 actual.to_string().contains(expected),
2825 "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2826 ),
2827 }
2828 }
2829 }
2830
2831 #[tokio::test]
2832 async fn test_duplicate_props_options() {
2833 let proto_file = create_proto_file(PROTO_FILE_DATA);
2834 let sql = format!(
2835 r#"CREATE TABLE t
2836 WITH (
2837 connector = 'kinesis',
2838 aws.region='user_test_topic',
2839 endpoint='172.10.1.1:9090,172.10.1.2:9090',
2840 aws.credentials.access_key_id = 'your_access_key_1',
2841 aws.credentials.secret_access_key = 'your_secret_key_1'
2842 )
2843 FORMAT PLAIN ENCODE PROTOBUF (
2844 message = '.test.TestRecord',
2845 aws.credentials.access_key_id = 'your_access_key_2',
2846 aws.credentials.secret_access_key = 'your_secret_key_2',
2847 schema.location = 'file://{}',
2848 )"#,
2849 proto_file.path().to_str().unwrap()
2850 );
2851 let frontend = LocalFrontend::new(Default::default()).await;
2852 frontend.run_sql(sql).await.unwrap();
2853
2854 let session = frontend.session_ref();
2855 let catalog_reader = session.env().catalog_reader().read_guard();
2856 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2857
2858 let (source, _) = catalog_reader
2860 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2861 .unwrap();
2862 assert_eq!(source.name, "t");
2863
2864 assert_eq!(
2866 source
2867 .info
2868 .format_encode_options
2869 .get("aws.credentials.access_key_id")
2870 .unwrap(),
2871 "your_access_key_2"
2872 );
2873 assert_eq!(
2874 source
2875 .info
2876 .format_encode_options
2877 .get("aws.credentials.secret_access_key")
2878 .unwrap(),
2879 "your_secret_key_2"
2880 );
2881
2882 assert_eq!(
2884 source
2885 .with_properties
2886 .get("aws.credentials.access_key_id")
2887 .unwrap(),
2888 "your_access_key_1"
2889 );
2890 assert_eq!(
2891 source
2892 .with_properties
2893 .get("aws.credentials.secret_access_key")
2894 .unwrap(),
2895 "your_secret_key_1"
2896 );
2897
2898 assert!(!source.with_properties.contains_key("schema.location"));
2900 }
2901}