Skip to main content

risingwave_frontend/handler/
create_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::external::{
40    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
41    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::source::cdc::{
44    build_cdc_table_id, normalize_simple_postgres_quoted_table_name,
45};
46use risingwave_connector::{
47    AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
48};
49use risingwave_pb::catalog::connection::Info as ConnectionInfo;
50use risingwave_pb::catalog::connection_params::ConnectionType;
51use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
52use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
53use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
54use risingwave_pb::plan_common::{
55    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
56};
57use risingwave_pb::secret::PbSecretRef;
58use risingwave_pb::secret::secret_ref::PbRefAsType;
59use risingwave_pb::stream_plan::StreamFragmentGraph;
60use risingwave_sqlparser::ast::{
61    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
62    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
63    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
64    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
65};
66use risingwave_sqlparser::parser::IncludeOption;
67use thiserror_ext::AsReport;
68
69use super::RwPgResponse;
70use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
71use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
72use crate::catalog::root_catalog::SchemaPath;
73use crate::catalog::source_catalog::SourceCatalog;
74use crate::catalog::table_catalog::TableVersion;
75use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
76use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
77use crate::expr::{Expr, ExprImpl, ExprRewriter};
78use crate::handler::HandlerArgs;
79use crate::handler::create_source::{
80    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
81    bind_source_watermark, handle_addition_columns,
82};
83use crate::handler::util::{
84    LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
85};
86use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
87use crate::optimizer::plan_node::{
88    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
89    ensure_sync_log_store_fragment_root,
90};
91use crate::optimizer::property::{Order, RequiredDist};
92use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
93use crate::session::SessionImpl;
94use crate::session::current::notice_to_user;
95use crate::stream_fragmenter::{GraphJobType, build_graph};
96use crate::utils::OverwriteOptions;
97use crate::{Binder, Explain, TableCatalog, WithOptions};
98
99mod col_id_gen;
100pub use col_id_gen::*;
101use risingwave_connector::sink::SinkParam;
102use risingwave_connector::sink::iceberg::{
103    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
104    COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
105    COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
106    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS,
107    CompactionType, ENABLE_COMPACTION, ENABLE_PK_INDEX, ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
108    ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
109    IcebergWriteMode, ORDER_KEY, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
110    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
111    SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
112    validate_order_key_columns,
113};
114use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
115
116use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
117
118fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
119    for option_def in &c.options {
120        match option_def.option {
121            ColumnOption::GeneratedColumns(_) => {}
122            ColumnOption::DefaultValue(_) => {}
123            ColumnOption::DefaultValueInternal { .. } => {}
124            ColumnOption::Unique { is_primary: true } => {}
125            ColumnOption::Null => {}
126            ColumnOption::NotNull => {}
127            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
128        }
129    }
130    Ok(())
131}
132
133/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
134/// If a column is marked as `primary key`, its `ColumnId` is also returned.
135/// This primary key is not combined with table constraints yet.
136pub fn bind_sql_columns(
137    column_defs: &[ColumnDef],
138    is_for_drop_table_connector: bool,
139) -> Result<Vec<ColumnCatalog>> {
140    let mut columns = Vec::with_capacity(column_defs.len());
141
142    for column in column_defs {
143        ensure_column_options_supported(column)?;
144        // Destruct to make sure all fields are properly handled rather than ignored.
145        // Do NOT use `..` to ignore fields you do not want to deal with.
146        // Reject them with a clear NotImplemented error.
147        let ColumnDef {
148            name,
149            data_type,
150            collation,
151            options,
152            ..
153        } = column;
154
155        let data_type = data_type
156            .clone()
157            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
158        if let Some(collation) = collation {
159            // PostgreSQL will limit the datatypes that collate can work on.
160            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
161            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
162            //
163            // But we don't support real collation, we simply ignore it here.
164            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
165                bail_not_implemented!(
166                    "Collate collation other than `C` or `POSIX` is not implemented"
167                );
168            }
169
170            match data_type {
171                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
172                _ => {
173                    return Err(ErrorCode::NotSupported(
174                        format!("{} is not a collatable data type", data_type),
175                        "The only built-in collatable data types are `varchar`, please check your type".into(),
176                    ).into());
177                }
178            }
179        }
180
181        if !is_for_drop_table_connector {
182            // additional column name may have prefix _rw
183            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
184            // Under this case, we loosen the check for _rw prefix.
185            check_column_name_not_reserved(&name.real_value())?;
186        }
187
188        let nullable: bool = !options
189            .iter()
190            .any(|def| matches!(def.option, ColumnOption::NotNull));
191
192        columns.push(ColumnCatalog {
193            column_desc: ColumnDesc {
194                data_type: bind_data_type(&data_type)?,
195                column_id: ColumnId::placeholder(),
196                name: name.real_value(),
197                generated_or_default_column: None,
198                description: None,
199                additional_column: AdditionalColumn { column_type: None },
200                version: ColumnDescVersion::LATEST,
201                system_column: None,
202                nullable,
203            },
204            is_hidden: false,
205        });
206    }
207
208    Ok(columns)
209}
210
211fn check_generated_column_constraints(
212    column_name: &String,
213    column_id: ColumnId,
214    expr: &ExprImpl,
215    column_catalogs: &[ColumnCatalog],
216    generated_column_names: &[String],
217    pk_column_ids: &[ColumnId],
218) -> Result<()> {
219    let input_refs = expr.collect_input_refs(column_catalogs.len());
220    for idx in input_refs.ones() {
221        let referred_generated_column = &column_catalogs[idx].column_desc.name;
222        if generated_column_names
223            .iter()
224            .any(|c| c == referred_generated_column)
225        {
226            return Err(ErrorCode::BindError(format!(
227                "Generated can not reference another generated column. \
228                But here generated column \"{}\" referenced another generated column \"{}\"",
229                column_name, referred_generated_column
230            ))
231            .into());
232        }
233    }
234
235    if pk_column_ids.contains(&column_id) && expr.is_impure() {
236        return Err(ErrorCode::BindError(format!(
237            "Generated columns with impure expressions should not be part of the primary key. \
238            Here column \"{}\" is defined as part of the primary key.",
239            column_name
240        ))
241        .into());
242    }
243
244    Ok(())
245}
246
247/// Binds constraints that can be only specified in column definitions,
248/// currently generated columns and default columns.
249pub fn bind_sql_column_constraints(
250    session: &SessionImpl,
251    table_name: String,
252    column_catalogs: &mut [ColumnCatalog],
253    columns: &[ColumnDef],
254    pk_column_ids: &[ColumnId],
255) -> Result<()> {
256    let generated_column_names = {
257        let mut names = vec![];
258        for column in columns {
259            for option_def in &column.options {
260                if let ColumnOption::GeneratedColumns(_) = option_def.option {
261                    names.push(column.name.real_value());
262                    break;
263                }
264            }
265        }
266        names
267    };
268
269    let mut binder = Binder::new_for_ddl(session);
270    binder.bind_columns_to_context(table_name, column_catalogs)?;
271
272    for column in columns {
273        let Some(idx) = column_catalogs
274            .iter()
275            .position(|c| c.name() == column.name.real_value())
276        else {
277            // It's possible that we don't follow the user defined columns in SQL but take the
278            // ones resolved from the source, thus missing some columns. Simply ignore them.
279            continue;
280        };
281
282        for option_def in &column.options {
283            match &option_def.option {
284                ColumnOption::GeneratedColumns(expr) => {
285                    binder.set_clause(Some(Clause::GeneratedColumn));
286
287                    let expr_impl = binder.bind_expr(expr).with_context(|| {
288                        format!(
289                            "fail to bind expression in generated column \"{}\"",
290                            column.name.real_value()
291                        )
292                    })?;
293
294                    check_generated_column_constraints(
295                        &column.name.real_value(),
296                        column_catalogs[idx].column_id(),
297                        &expr_impl,
298                        column_catalogs,
299                        &generated_column_names,
300                        pk_column_ids,
301                    )?;
302
303                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
304                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
305                            expr: Some(expr_impl.to_expr_proto()),
306                        }),
307                    );
308                    binder.set_clause(None);
309                }
310                ColumnOption::DefaultValue(expr) => {
311                    let expr_impl = binder
312                        .bind_expr(expr)?
313                        .cast_assign(column_catalogs[idx].data_type())?;
314
315                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
316                    // schema change.
317                    //
318                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
319                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
320                    // here. Actually we do not require purity of the expression here since we're only to get a
321                    // snapshot value.
322                    let rewritten_expr_impl = session
323                        .pinned_snapshot()
324                        .inline_now_proc_time()
325                        .rewrite_expr(expr_impl.clone());
326
327                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
328                        let snapshot_value = snapshot_value?;
329
330                        column_catalogs[idx].column_desc.generated_or_default_column =
331                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
332                                snapshot_value: Some(snapshot_value.to_protobuf()),
333                                expr: Some(expr_impl.to_expr_proto()),
334                                // persist the original expression
335                            }));
336                    } else {
337                        return Err(ErrorCode::BindError(format!(
338                            "Default expression used in column `{}` cannot be evaluated. \
339                            Use generated columns instead if you mean to reference other columns.",
340                            column.name
341                        ))
342                        .into());
343                    }
344                }
345                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
346                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
347                    // should already be set during purifcation. So if we encounter an empty value here, it
348                    // means the user has specified it explicitly in the SQL statement, typically by
349                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
350                    if persisted.is_empty() {
351                        bail_bind_error!(
352                            "DEFAULT INTERNAL is only used for internal purposes, \
353                             please specify a concrete default value"
354                        );
355                    }
356
357                    let desc = DefaultColumnDesc::decode(&**persisted)
358                        .expect("failed to decode persisted `DefaultColumnDesc`");
359
360                    column_catalogs[idx].column_desc.generated_or_default_column =
361                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
362                }
363                _ => {}
364            }
365        }
366    }
367    Ok(())
368}
369
370/// Currently we only support Primary key table constraint, so just return pk names if it exists
371pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
372    let mut pk_column_names = vec![];
373
374    for constraint in table_constraints {
375        match constraint {
376            TableConstraint::Unique {
377                name: _,
378                columns,
379                is_primary: true,
380            } => {
381                if !pk_column_names.is_empty() {
382                    return Err(multiple_pk_definition_err());
383                }
384                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
385            }
386            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
387        }
388    }
389    Ok(pk_column_names)
390}
391
392pub fn bind_sql_pk_names(
393    columns_defs: &[ColumnDef],
394    pk_names_from_table_constraints: Vec<String>,
395) -> Result<Vec<String>> {
396    let mut pk_column_names = pk_names_from_table_constraints;
397
398    for column in columns_defs {
399        for option_def in &column.options {
400            if let ColumnOption::Unique { is_primary: true } = option_def.option {
401                if !pk_column_names.is_empty() {
402                    return Err(multiple_pk_definition_err());
403                }
404                pk_column_names.push(column.name.real_value());
405            };
406        }
407    }
408
409    Ok(pk_column_names)
410}
411
412fn multiple_pk_definition_err() -> RwError {
413    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
414}
415
416/// Binds primary keys defined in SQL.
417///
418/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
419/// added.
420pub fn bind_pk_and_row_id_on_relation(
421    mut columns: Vec<ColumnCatalog>,
422    pk_names: Vec<String>,
423    must_need_pk: bool,
424) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
425    for c in &columns {
426        assert!(c.column_id() != ColumnId::placeholder());
427    }
428
429    // Mapping from column name to column id.
430    let name_to_id = columns
431        .iter()
432        .map(|c| (c.name(), c.column_id()))
433        .collect::<HashMap<_, _>>();
434
435    let mut pk_column_ids: Vec<_> = pk_names
436        .iter()
437        .map(|name| {
438            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
439                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
440            })
441        })
442        .try_collect()?;
443
444    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
445    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
446
447    let row_id_index = need_row_id.then(|| {
448        let column = ColumnCatalog::row_id_column();
449        let index = columns.len();
450        pk_column_ids = vec![column.column_id()];
451        columns.push(column);
452        index
453    });
454
455    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
456        Err(ErrorCode::InvalidInputSyntax(format!(
457            "column \"{col}\" specified more than once"
458        )))?;
459    }
460
461    Ok((columns, pk_column_ids, row_id_index))
462}
463
464/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
465/// stream source.
466#[allow(clippy::too_many_arguments)]
467pub(crate) async fn gen_create_table_plan_with_source(
468    mut handler_args: HandlerArgs,
469    explain_options: ExplainOptions,
470    table_name: ObjectName,
471    column_defs: Vec<ColumnDef>,
472    wildcard_idx: Option<usize>,
473    constraints: Vec<TableConstraint>,
474    format_encode: FormatEncodeOptions,
475    source_watermarks: Vec<SourceWatermark>,
476    mut col_id_gen: ColumnIdGenerator,
477    include_column_options: IncludeOption,
478    props: CreateTableProps,
479    sql_column_strategy: SqlColumnStrategy,
480) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
481    if props.append_only
482        && format_encode.format != Format::Plain
483        && format_encode.format != Format::Native
484    {
485        return Err(ErrorCode::BindError(format!(
486            "Append only table does not support format {}.",
487            format_encode.format
488        ))
489        .into());
490    }
491
492    let session = &handler_args.session;
493    let (with_properties, refresh_mode) =
494        bind_connector_props(&handler_args, &format_encode, false)?;
495    if with_properties.is_shareable_cdc_connector() {
496        generated_columns_check_for_cdc_table(&column_defs)?;
497        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
498    } else if column_defs.iter().any(|col| {
499        col.options
500            .iter()
501            .any(|def| matches!(def.option, ColumnOption::NotNull))
502    }) {
503        // if non-cdc source
504        notice_to_user(
505            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
506        );
507    }
508
509    let db_name: &str = &session.database();
510    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
511
512    // TODO: omit this step if `sql_column_strategy` is `Follow`.
513    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
514        session,
515        &format_encode,
516        Either::Left(&with_properties),
517        CreateSourceType::Table,
518    )
519    .await?;
520
521    let overwrite_options = OverwriteOptions::new(&mut handler_args);
522    let rate_limit = overwrite_options.source_rate_limit;
523    let source = bind_create_source_or_table_with_connector(
524        handler_args.clone(),
525        table_name,
526        format_encode,
527        with_properties,
528        &column_defs,
529        constraints,
530        wildcard_idx,
531        source_watermarks,
532        columns_from_resolve_source,
533        source_info,
534        include_column_options,
535        &mut col_id_gen,
536        CreateSourceType::Table,
537        rate_limit,
538        sql_column_strategy,
539        refresh_mode,
540    )
541    .await?;
542
543    let context = OptimizerContext::new(handler_args, explain_options);
544
545    let (plan, table) = gen_table_plan_with_source(
546        context.into(),
547        schema_name,
548        source.clone(),
549        col_id_gen.into_version(),
550        props,
551    )?;
552
553    Ok((plan, Some(source), table))
554}
555
556/// `gen_create_table_plan` generates the plan for creating a table without an external stream
557/// source.
558#[allow(clippy::too_many_arguments)]
559pub(crate) fn gen_create_table_plan(
560    context: OptimizerContext,
561    table_name: ObjectName,
562    column_defs: Vec<ColumnDef>,
563    constraints: Vec<TableConstraint>,
564    mut col_id_gen: ColumnIdGenerator,
565    source_watermarks: Vec<SourceWatermark>,
566    props: CreateTableProps,
567    is_for_replace_plan: bool,
568) -> Result<(PlanRef, TableCatalog)> {
569    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
570    for c in &mut columns {
571        col_id_gen.generate(c)?;
572    }
573
574    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
575    if !secret_refs.is_empty() || !connection_refs.is_empty() {
576        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
577    }
578
579    gen_create_table_plan_without_source(
580        context,
581        table_name,
582        columns,
583        column_defs,
584        constraints,
585        source_watermarks,
586        col_id_gen.into_version(),
587        props,
588    )
589}
590
591#[allow(clippy::too_many_arguments)]
592pub(crate) fn gen_create_table_plan_without_source(
593    context: OptimizerContext,
594    table_name: ObjectName,
595    columns: Vec<ColumnCatalog>,
596    column_defs: Vec<ColumnDef>,
597    constraints: Vec<TableConstraint>,
598    source_watermarks: Vec<SourceWatermark>,
599    version: TableVersion,
600    props: CreateTableProps,
601) -> Result<(PlanRef, TableCatalog)> {
602    // XXX: Why not bind outside?
603    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
604    let (mut columns, pk_column_ids, row_id_index) =
605        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
606
607    let watermark_descs = bind_source_watermark(
608        context.session_ctx(),
609        table_name.real_value(),
610        source_watermarks,
611        &columns,
612    )?;
613
614    bind_sql_column_constraints(
615        context.session_ctx(),
616        table_name.real_value(),
617        &mut columns,
618        &column_defs,
619        &pk_column_ids,
620    )?;
621    let session = context.session_ctx().clone();
622
623    let db_name = &session.database();
624    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
625
626    let info = CreateTableInfo {
627        columns,
628        pk_column_ids,
629        row_id_index,
630        watermark_descs,
631        source_catalog: None,
632        version,
633    };
634
635    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
636}
637
638fn gen_table_plan_with_source(
639    context: OptimizerContextRef,
640    schema_name: Option<String>,
641    source_catalog: SourceCatalog,
642    version: TableVersion,
643    props: CreateTableProps,
644) -> Result<(PlanRef, TableCatalog)> {
645    let table_name = source_catalog.name.clone();
646
647    let info = CreateTableInfo {
648        columns: source_catalog.columns.clone(),
649        pk_column_ids: source_catalog.pk_col_ids.clone(),
650        row_id_index: source_catalog.row_id_index,
651        watermark_descs: source_catalog.watermark_descs.clone(),
652        source_catalog: Some(source_catalog),
653        version,
654    };
655
656    gen_table_plan_inner(context, schema_name, table_name, info, props)
657}
658
659/// On-conflict behavior either from user input or existing table catalog.
660#[derive(Clone, Copy)]
661pub enum EitherOnConflict {
662    Ast(Option<OnConflict>),
663    Resolved(ConflictBehavior),
664}
665
666impl From<Option<OnConflict>> for EitherOnConflict {
667    fn from(v: Option<OnConflict>) -> Self {
668        Self::Ast(v)
669    }
670}
671
672impl From<ConflictBehavior> for EitherOnConflict {
673    fn from(v: ConflictBehavior) -> Self {
674        Self::Resolved(v)
675    }
676}
677
678impl EitherOnConflict {
679    /// Resolves the conflict behavior based on the given information.
680    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
681        let conflict_behavior = match self {
682            EitherOnConflict::Ast(on_conflict) => {
683                if append_only {
684                    if row_id_as_pk {
685                        // Primary key will be generated, no conflict check needed.
686                        ConflictBehavior::NoCheck
687                    } else {
688                        // User defined PK on append-only table, enforce `DO NOTHING`.
689                        if let Some(on_conflict) = on_conflict
690                            && on_conflict != OnConflict::Nothing
691                        {
692                            return Err(ErrorCode::InvalidInputSyntax(
693                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
694                                     the ON CONFLICT behavior must be DO NOTHING."
695                                    .to_owned(),
696                            )
697                            .into());
698                        }
699                        ConflictBehavior::IgnoreConflict
700                    }
701                } else {
702                    // Default to `UPDATE FULL` for non-append-only tables.
703                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
704                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
705                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
706                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
707                    }
708                }
709            }
710            EitherOnConflict::Resolved(b) => b,
711        };
712
713        Ok(conflict_behavior)
714    }
715}
716
717/// Arguments of the functions that generate a table plan, part 1.
718///
719/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
720/// or resolving based on the user input.
721pub struct CreateTableInfo {
722    pub columns: Vec<ColumnCatalog>,
723    pub pk_column_ids: Vec<ColumnId>,
724    pub row_id_index: Option<usize>,
725    pub watermark_descs: Vec<WatermarkDesc>,
726    pub source_catalog: Option<SourceCatalog>,
727    pub version: TableVersion,
728}
729
730/// Arguments of the functions that generate a table plan, part 2.
731///
732/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
733/// obtained from the input or the context.
734pub struct CreateTableProps {
735    pub definition: String,
736    pub append_only: bool,
737    pub on_conflict: EitherOnConflict,
738    pub with_version_columns: Vec<String>,
739    pub webhook_info: Option<PbWebhookSourceInfo>,
740    pub engine: Engine,
741}
742
743#[allow(clippy::too_many_arguments)]
744fn gen_table_plan_inner(
745    context: OptimizerContextRef,
746    schema_name: Option<String>,
747    table_name: String,
748    info: CreateTableInfo,
749    props: CreateTableProps,
750) -> Result<(PlanRef, TableCatalog)> {
751    let CreateTableInfo {
752        ref columns,
753        row_id_index,
754        ref watermark_descs,
755        ref source_catalog,
756        ..
757    } = info;
758    let CreateTableProps { append_only, .. } = props;
759
760    let (database_id, schema_id) = context
761        .session_ctx()
762        .get_database_and_schema_id_for_create(schema_name)?;
763
764    let session = context.session_ctx().clone();
765    let retention_seconds = context.with_options().retention_seconds();
766
767    let source_node: LogicalPlanRef = LogicalSource::new(
768        source_catalog.clone().map(Rc::new),
769        columns.clone(),
770        row_id_index,
771        SourceNodeKind::CreateTable,
772        context.clone(),
773        None,
774    )?
775    .into();
776
777    let required_cols = FixedBitSet::with_capacity(columns.len());
778    let plan_root = PlanRoot::new_with_logical_plan(
779        source_node,
780        RequiredDist::Any,
781        Order::any(),
782        required_cols,
783        vec![],
784    );
785
786    let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
787
788    if !append_only && has_non_ttl_watermark {
789        return Err(ErrorCode::NotSupported(
790            "Defining watermarks on table requires the table to be append only.".to_owned(),
791            "Use the key words `APPEND ONLY`".to_owned(),
792        )
793        .into());
794    }
795
796    if !append_only && retention_seconds.is_some() {
797        if session
798            .config()
799            .unsafe_enable_storage_retention_for_non_append_only_tables()
800        {
801            tracing::warn!(
802                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
803                table_name
804            );
805            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
806                                  This may lead to stream inconsistency and unrecoverable \
807                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
808                                  corresponding to the TTLed primary keys";
809            session.notice_to_user(NOTICE);
810        } else {
811            return Err(ErrorCode::NotSupported(
812                "Defining retention seconds on table requires the table to be append only."
813                    .to_owned(),
814                "Use the key words `APPEND ONLY`".to_owned(),
815            )
816            .into());
817        }
818    }
819
820    let materialize =
821        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
822
823    let mut table = materialize.table().clone();
824    table.owner = session.user_id();
825
826    Ok((
827        ensure_sync_log_store_fragment_root(materialize.into()),
828        table,
829    ))
830}
831
832/// Generate stream plan for cdc table based on shared source.
833/// In replace workflow, the `table_id` is the id of the table to be replaced
834/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
835#[allow(clippy::too_many_arguments)]
836pub(crate) fn gen_create_table_plan_for_cdc_table(
837    context: OptimizerContextRef,
838    source: Arc<SourceCatalog>,
839    external_table_name: String,
840    column_defs: Vec<ColumnDef>,
841    source_watermarks: Vec<SourceWatermark>,
842    mut columns: Vec<ColumnCatalog>,
843    pk_names: Vec<String>,
844    cdc_with_options: WithOptionsSecResolved,
845    mut col_id_gen: ColumnIdGenerator,
846    on_conflict: Option<OnConflict>,
847    with_version_columns: Vec<String>,
848    include_column_options: IncludeOption,
849    table_name: ObjectName,
850    resolved_table_name: String, // table name without schema prefix
851    database_id: DatabaseId,
852    schema_id: SchemaId,
853    table_id: TableId,
854    engine: Engine,
855) -> Result<(PlanRef, TableCatalog)> {
856    let session = context.session_ctx().clone();
857
858    // append additional columns to the end
859    handle_addition_columns(
860        None,
861        &cdc_with_options,
862        include_column_options,
863        &mut columns,
864        true,
865    )?;
866
867    for c in &mut columns {
868        col_id_gen.generate(c)?;
869    }
870
871    let (mut columns, pk_column_ids, _row_id_index) =
872        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
873
874    let watermark_descs = bind_source_watermark(
875        context.session_ctx(),
876        table_name.real_value(),
877        source_watermarks,
878        &columns,
879    )?;
880
881    // NOTES: In auto schema change, default value is not provided in column definition.
882    bind_sql_column_constraints(
883        context.session_ctx(),
884        table_name.real_value(),
885        &mut columns,
886        &column_defs,
887        &pk_column_ids,
888    )?;
889
890    let definition = context.normalized_sql().to_owned();
891
892    let pk_column_indices = {
893        let mut id_to_idx = HashMap::new();
894        columns.iter().enumerate().for_each(|(idx, c)| {
895            id_to_idx.insert(c.column_id(), idx);
896        });
897        // pk column id must exist in table columns.
898        pk_column_ids
899            .iter()
900            .map(|c| id_to_idx.get(c).copied().unwrap())
901            .collect_vec()
902    };
903    let table_pk = pk_column_indices
904        .iter()
905        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
906        .collect();
907
908    let (options, secret_refs) = cdc_with_options.into_parts();
909
910    let non_generated_column_descs = columns
911        .iter()
912        .filter(|&c| !c.is_generated())
913        .map(|c| c.column_desc.clone())
914        .collect_vec();
915    let non_generated_column_num = non_generated_column_descs.len();
916    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
917    let cdc_table_desc = CdcTableDesc {
918        table_id,
919        source_id: source.id, // id of cdc source streaming job
920        external_table_name: external_table_name.clone(),
921        pk: table_pk,
922        columns: non_generated_column_descs,
923        stream_key: pk_column_indices,
924        connect_properties: options,
925        secret_refs,
926    };
927
928    tracing::debug!(?cdc_table_desc, "create cdc table");
929    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
930
931    let logical_scan = LogicalCdcScan::create(
932        external_table_name.clone(),
933        Rc::new(cdc_table_desc),
934        context.clone(),
935        options,
936    );
937
938    let scan_node: LogicalPlanRef = logical_scan.into();
939    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
940    let plan_root = PlanRoot::new_with_logical_plan(
941        scan_node,
942        RequiredDist::Any,
943        Order::any(),
944        required_cols,
945        vec![],
946    );
947
948    let cdc_table_id_external_table_name = if let ExternalCdcTableType::Postgres = cdc_table_type
949        && let Some(normalized_table_name) =
950            normalize_simple_postgres_quoted_table_name(&external_table_name)
951    {
952        normalized_table_name
953    } else {
954        external_table_name
955    };
956    let cdc_table_id = build_cdc_table_id(source.id, &cdc_table_id_external_table_name);
957    let materialize = plan_root.gen_table_plan(
958        context,
959        resolved_table_name,
960        database_id,
961        schema_id,
962        CreateTableInfo {
963            columns,
964            pk_column_ids,
965            row_id_index: None,
966            watermark_descs,
967            source_catalog: Some((*source).clone()),
968            version: col_id_gen.into_version(),
969        },
970        CreateTableProps {
971            definition,
972            append_only: false,
973            on_conflict: on_conflict.into(),
974            with_version_columns,
975            webhook_info: None,
976            engine,
977        },
978    )?;
979
980    let mut table = materialize.table().clone();
981    table.owner = session.user_id();
982    table.cdc_table_id = Some(cdc_table_id);
983    table.cdc_table_type = Some(cdc_table_type);
984    Ok((
985        ensure_sync_log_store_fragment_root(materialize.into()),
986        table,
987    ))
988}
989
990/// Derive connector properties and normalize `external_table_name` for CDC tables.
991///
992/// Returns (`connector_properties`, `normalized_external_table_name`) where:
993/// - For SQL Server: Normalizes 'db.schema.table' (3 parts) to 'schema.table' (2 parts),
994///   because users can optionally include database name for verification, but it needs to be
995///   stripped to match the format returned by Debezium's `extract_table_name()`.
996/// - For MySQL/Postgres: Returns the original `external_table_name` unchanged.
997fn derive_with_options_for_cdc_table(
998    source_with_properties: &WithOptionsSecResolved,
999    external_table_name: String,
1000) -> Result<(WithOptionsSecResolved, String)> {
1001    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
1002    // we should remove the prefix from `full_table_name`
1003    let source_database_name: &str = source_with_properties
1004        .get("database.name")
1005        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
1006        .as_str();
1007    let mut with_options = source_with_properties.clone();
1008    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
1009        match connector.as_str() {
1010            MYSQL_CDC_CONNECTOR => {
1011                // MySQL doesn't allow '.' in database name and table name, so we can split the
1012                // external table name by '.' to get the table name
1013                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
1014                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
1015                })?;
1016                // We allow multiple database names in the source definition
1017                if !source_database_name
1018                    .split(',')
1019                    .map(|s| s.trim())
1020                    .any(|name| name == db_name)
1021                {
1022                    return Err(anyhow!(
1023                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1024                        db_name,
1025                        source_database_name
1026                    ).into());
1027                }
1028                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1029                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1030                // Return original external_table_name unchanged for MySQL
1031                return Ok((with_options, external_table_name));
1032            }
1033            POSTGRES_CDC_CONNECTOR => {
1034                let (schema_name, table_name) =
1035                    parse_postgres_cdc_external_table_name(&external_table_name)?;
1036
1037                // insert 'schema.name' into connect properties
1038                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name);
1039                with_options.insert(TABLE_NAME_KEY.into(), table_name);
1040                // Return original external_table_name unchanged for Postgres
1041                return Ok((with_options, external_table_name));
1042            }
1043            SQL_SERVER_CDC_CONNECTOR => {
1044                // SQL Server external table name must be in one of two formats:
1045                // 1. 'schemaName.tableName' (2 parts) - database is already specified in source
1046                // 2. 'databaseName.schemaName.tableName' (3 parts) - for explicit verification
1047                //
1048                // We do NOT allow single table name (e.g., 't') because:
1049                // - Unlike database name (already in source), schema name is NOT pre-specified
1050                // - User must explicitly provide schema (even if it's 'dbo')
1051                let parts: Vec<&str> = external_table_name.split('.').collect();
1052                let (schema_name, table_name) = match parts.len() {
1053                    3 => {
1054                        // Format: database.schema.table
1055                        // Verify that the database name matches the one in source definition
1056                        let db_name = parts[0];
1057                        let schema_name = parts[1];
1058                        let table_name = parts[2];
1059
1060                        if db_name != source_database_name {
1061                            return Err(anyhow!(
1062                                "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1063                                 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1064                                db_name,
1065                                source_database_name
1066                            ).into());
1067                        }
1068                        (schema_name, table_name)
1069                    }
1070                    2 => {
1071                        // Format: schema.table (recommended)
1072                        // Database name is taken from source definition
1073                        let schema_name = parts[0];
1074                        let table_name = parts[1];
1075                        (schema_name, table_name)
1076                    }
1077                    1 => {
1078                        // Format: table only
1079                        // Reject with clear error message
1080                        return Err(anyhow!(
1081                            "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1082                             Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1083                            external_table_name,
1084                            external_table_name,
1085                            source_database_name,
1086                            external_table_name
1087                        ).into());
1088                    }
1089                    _ => {
1090                        // Invalid format (4+ parts or empty)
1091                        return Err(anyhow!(
1092                            "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1093                            external_table_name
1094                        ).into());
1095                    }
1096                };
1097
1098                // Insert schema and table names into connector properties
1099                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1100                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1101
1102                // Normalize external_table_name to 'schema.table' format
1103                // This ensures consistency with extract_table_name() in message.rs
1104                let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1105                return Ok((with_options, normalized_external_table_name));
1106            }
1107            _ => {
1108                return Err(RwError::from(anyhow!(
1109                    "connector {} is not supported for cdc table",
1110                    connector
1111                )));
1112            }
1113        };
1114    }
1115    unreachable!("All valid CDC connectors should have returned by now")
1116}
1117
1118/// Parse the schema/table name from the CDC `TABLE` clause.
1119///
1120/// Column names do not need the same parsing here: wildcard schema derivation reads
1121/// them from PostgreSQL catalogs after the exact table has been identified.
1122fn parse_postgres_cdc_external_table_name(external_table_name: &str) -> Result<(String, String)> {
1123    let mut parts = vec![];
1124    let mut current = String::new();
1125    let mut chars = external_table_name.chars().peekable();
1126    let mut in_quote = false;
1127    let mut just_closed_quote = false;
1128
1129    while let Some(ch) = chars.next() {
1130        if in_quote {
1131            if ch == '"' {
1132                if chars.peek() == Some(&'"') {
1133                    current.push('"');
1134                    chars.next();
1135                } else {
1136                    in_quote = false;
1137                    just_closed_quote = true;
1138                }
1139            } else {
1140                current.push(ch);
1141            }
1142        } else {
1143            match ch {
1144                '.' => {
1145                    if current.is_empty() {
1146                        return Err(anyhow!(
1147                            "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1148                            external_table_name
1149                        )
1150                        .into());
1151                    }
1152                    parts.push(std::mem::take(&mut current));
1153                    just_closed_quote = false;
1154                }
1155                '"' if current.is_empty() => {
1156                    in_quote = true;
1157                }
1158                '"' => {
1159                    return Err(anyhow!(
1160                        "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1161                        external_table_name
1162                    )
1163                    .into());
1164                }
1165                _ if just_closed_quote => {
1166                    return Err(anyhow!(
1167                        "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1168                        external_table_name
1169                    )
1170                    .into());
1171                }
1172                _ => current.push(ch),
1173            }
1174        }
1175    }
1176
1177    if in_quote || current.is_empty() {
1178        return Err(anyhow!(
1179            "Invalid Postgres CDC table name '{}'. Expected 'schema.table'.",
1180            external_table_name
1181        )
1182        .into());
1183    }
1184    parts.push(current);
1185
1186    if let [schema_name, table_name] = parts.as_slice() {
1187        Ok((schema_name.clone(), table_name.clone()))
1188    } else {
1189        Err(
1190            anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'")
1191                .into(),
1192        )
1193    }
1194}
1195
1196#[allow(clippy::too_many_arguments)]
1197pub(super) async fn handle_create_table_plan(
1198    handler_args: HandlerArgs,
1199    explain_options: ExplainOptions,
1200    format_encode: Option<FormatEncodeOptions>,
1201    cdc_table_info: Option<CdcTableInfo>,
1202    table_name: &ObjectName,
1203    column_defs: Vec<ColumnDef>,
1204    wildcard_idx: Option<usize>,
1205    constraints: Vec<TableConstraint>,
1206    source_watermarks: Vec<SourceWatermark>,
1207    append_only: bool,
1208    on_conflict: Option<OnConflict>,
1209    with_version_columns: Vec<String>,
1210    include_column_options: IncludeOption,
1211    webhook_info: Option<WebhookSourceInfo>,
1212    engine: Engine,
1213) -> Result<(
1214    PlanRef,
1215    Option<SourceCatalog>,
1216    TableCatalog,
1217    TableJobType,
1218    Option<SourceId>,
1219)> {
1220    let col_id_gen = ColumnIdGenerator::new_initial();
1221    let format_encode = check_create_table_with_source(
1222        &handler_args.with_options,
1223        format_encode,
1224        &include_column_options,
1225        &cdc_table_info,
1226    )?;
1227    let webhook_info = webhook_info
1228        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1229        .transpose()?;
1230
1231    let props = CreateTableProps {
1232        definition: handler_args.normalized_sql.clone(),
1233        append_only,
1234        on_conflict: on_conflict.into(),
1235        with_version_columns: with_version_columns.clone(),
1236        webhook_info,
1237        engine,
1238    };
1239
1240    let ((plan, source, table), job_type, shared_shource_id) = match (
1241        format_encode,
1242        cdc_table_info.as_ref(),
1243    ) {
1244        (Some(format_encode), None) => (
1245            gen_create_table_plan_with_source(
1246                handler_args,
1247                explain_options,
1248                table_name.clone(),
1249                column_defs,
1250                wildcard_idx,
1251                constraints,
1252                format_encode,
1253                source_watermarks,
1254                col_id_gen,
1255                include_column_options,
1256                props,
1257                SqlColumnStrategy::FollowChecked,
1258            )
1259            .await?,
1260            TableJobType::General,
1261            None,
1262        ),
1263        (None, None) => {
1264            let context = OptimizerContext::new(handler_args, explain_options);
1265            let (plan, table) = gen_create_table_plan(
1266                context,
1267                table_name.clone(),
1268                column_defs,
1269                constraints,
1270                col_id_gen,
1271                source_watermarks,
1272                props,
1273                false,
1274            )?;
1275
1276            ((plan, None, table), TableJobType::General, None)
1277        }
1278
1279        (None, Some(cdc_table)) => {
1280            sanity_check_for_table_on_cdc_source(
1281                append_only,
1282                &column_defs,
1283                &wildcard_idx,
1284                &constraints,
1285                &source_watermarks,
1286            )?;
1287
1288            generated_columns_check_for_cdc_table(&column_defs)?;
1289            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1290
1291            let session = &handler_args.session;
1292            let db_name = &session.database();
1293            let user_name = &session.user_name();
1294            let search_path = session.config().search_path();
1295            let (schema_name, resolved_table_name) =
1296                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1297            let (database_id, schema_id) =
1298                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1299
1300            // cdc table cannot be append-only
1301            let (source_schema, source_name) =
1302                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1303
1304            let source = {
1305                let catalog_reader = session.env().catalog_reader().read_guard();
1306                let schema_path =
1307                    SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1308
1309                let (source, _) = catalog_reader.get_source_by_name(
1310                    db_name,
1311                    schema_path,
1312                    source_name.as_str(),
1313                )?;
1314                source.clone()
1315            };
1316            let (cdc_with_options, normalized_external_table_name) =
1317                derive_with_options_for_cdc_table(
1318                    &source.with_properties,
1319                    cdc_table.external_table_name.clone(),
1320                )?;
1321
1322            let (columns, pk_names) = match wildcard_idx {
1323                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1324                None => {
1325                    for column_def in &column_defs {
1326                        for option_def in &column_def.options {
1327                            if let ColumnOption::DefaultValue(_)
1328                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1329                            {
1330                                return Err(ErrorCode::NotSupported(
1331                                            "Default value for columns defined on the table created from a CDC source".into(),
1332                                            "Remove the default value expression in the column definitions".into(),
1333                                        )
1334                                            .into());
1335                            }
1336                        }
1337                    }
1338
1339                    let (columns, pk_names) =
1340                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1341                    // read default value definition from external db
1342                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1343                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1344                        .context("failed to extract external table config")?;
1345
1346                    (columns, pk_names)
1347                }
1348            };
1349
1350            let context: OptimizerContextRef =
1351                OptimizerContext::new(handler_args, explain_options).into();
1352            let shared_source_id = source.id;
1353            let (plan, table) = gen_create_table_plan_for_cdc_table(
1354                context,
1355                source,
1356                normalized_external_table_name,
1357                column_defs,
1358                source_watermarks,
1359                columns,
1360                pk_names,
1361                cdc_with_options,
1362                col_id_gen,
1363                on_conflict,
1364                with_version_columns,
1365                include_column_options,
1366                table_name.clone(),
1367                resolved_table_name,
1368                database_id,
1369                schema_id,
1370                TableId::placeholder(),
1371                engine,
1372            )?;
1373
1374            (
1375                (plan, None, table),
1376                TableJobType::SharedCdcSource,
1377                Some(shared_source_id),
1378            )
1379        }
1380        (Some(_), Some(_)) => {
1381            return Err(ErrorCode::NotSupported(
1382                "Data format and encoding format doesn't apply to table created from a CDC source"
1383                    .into(),
1384                "Remove the FORMAT and ENCODE specification".into(),
1385            )
1386            .into());
1387        }
1388    };
1389    Ok((plan, source, table, job_type, shared_shource_id))
1390}
1391
1392// For both table from cdc source and table with cdc connector
1393fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1394    let mut found_generated_column = false;
1395    for column in columns {
1396        let mut is_generated = false;
1397
1398        for option_def in &column.options {
1399            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1400                is_generated = true;
1401                break;
1402            }
1403        }
1404
1405        if is_generated {
1406            found_generated_column = true;
1407        } else if found_generated_column {
1408            return Err(ErrorCode::NotSupported(
1409                "Non-generated column found after a generated column.".into(),
1410                "Ensure that all generated columns appear at the end of the cdc table definition."
1411                    .into(),
1412            )
1413            .into());
1414        }
1415    }
1416    Ok(())
1417}
1418
1419// For both table from cdc source and table with cdc connector
1420fn not_null_check_for_cdc_table(
1421    wildcard_idx: &Option<usize>,
1422    column_defs: &Vec<ColumnDef>,
1423) -> Result<()> {
1424    if !wildcard_idx.is_some()
1425        && column_defs.iter().any(|col| {
1426            col.options
1427                .iter()
1428                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1429        })
1430    {
1431        return Err(ErrorCode::NotSupported(
1432            "CDC table with NOT NULL constraint is not supported".to_owned(),
1433            "Please remove the NOT NULL constraint for columns".to_owned(),
1434        )
1435        .into());
1436    }
1437    Ok(())
1438}
1439
1440// Only for table from cdc source
1441fn sanity_check_for_table_on_cdc_source(
1442    append_only: bool,
1443    column_defs: &Vec<ColumnDef>,
1444    wildcard_idx: &Option<usize>,
1445    constraints: &Vec<TableConstraint>,
1446    source_watermarks: &Vec<SourceWatermark>,
1447) -> Result<()> {
1448    // wildcard cannot be used with column definitions
1449    if wildcard_idx.is_some() && !column_defs.is_empty() {
1450        return Err(ErrorCode::NotSupported(
1451            "wildcard(*) and column definitions cannot be used together".to_owned(),
1452            "Remove the wildcard or column definitions".to_owned(),
1453        )
1454        .into());
1455    }
1456
1457    // cdc table must have primary key constraint or primary key column
1458    if !wildcard_idx.is_some()
1459        && !constraints.iter().any(|c| {
1460            matches!(
1461                c,
1462                TableConstraint::Unique {
1463                    is_primary: true,
1464                    ..
1465                }
1466            )
1467        })
1468        && !column_defs.iter().any(|col| {
1469            col.options
1470                .iter()
1471                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1472        })
1473    {
1474        return Err(ErrorCode::NotSupported(
1475            "CDC table without primary key constraint is not supported".to_owned(),
1476            "Please define a primary key".to_owned(),
1477        )
1478        .into());
1479    }
1480
1481    if append_only {
1482        return Err(ErrorCode::NotSupported(
1483            "append only modifier on the table created from a CDC source".into(),
1484            "Remove the APPEND ONLY clause".into(),
1485        )
1486        .into());
1487    }
1488
1489    if !source_watermarks.is_empty()
1490        && source_watermarks
1491            .iter()
1492            .any(|watermark| !watermark.with_ttl)
1493    {
1494        return Err(ErrorCode::NotSupported(
1495            "non-TTL watermark defined on the table created from a CDC source".into(),
1496            "Use `WATERMARK ... WITH TTL` instead.".into(),
1497        )
1498        .into());
1499    }
1500
1501    Ok(())
1502}
1503
1504/// Derive schema for cdc table when create a new Table or alter an existing Table
1505async fn bind_cdc_table_schema_externally(
1506    cdc_with_options: WithOptionsSecResolved,
1507) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1508    // read cdc table schema from external db or parsing the schema from SQL definitions
1509    let (options, secret_refs) = cdc_with_options.into_parts();
1510    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1511        .context("failed to extract external table config")?;
1512
1513    let table = ExternalTableImpl::connect(config)
1514        .await
1515        .context("failed to auto derive table schema")?;
1516
1517    Ok((
1518        table
1519            .column_descs()
1520            .iter()
1521            .cloned()
1522            .map(|column_desc| ColumnCatalog {
1523                column_desc,
1524                is_hidden: false,
1525            })
1526            .collect(),
1527        table.pk_names().clone(),
1528    ))
1529}
1530
1531/// Derive schema for cdc table when create a new Table or alter an existing Table
1532fn bind_cdc_table_schema(
1533    column_defs: &Vec<ColumnDef>,
1534    constraints: &Vec<TableConstraint>,
1535    is_for_replace_plan: bool,
1536) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1537    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1538
1539    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1540    Ok((columns, pk_names))
1541}
1542
1543#[allow(clippy::too_many_arguments)]
1544pub async fn handle_create_table(
1545    handler_args: HandlerArgs,
1546    table_name: ObjectName,
1547    column_defs: Vec<ColumnDef>,
1548    wildcard_idx: Option<usize>,
1549    constraints: Vec<TableConstraint>,
1550    if_not_exists: bool,
1551    format_encode: Option<FormatEncodeOptions>,
1552    source_watermarks: Vec<SourceWatermark>,
1553    append_only: bool,
1554    on_conflict: Option<OnConflict>,
1555    with_version_columns: Vec<String>,
1556    cdc_table_info: Option<CdcTableInfo>,
1557    include_column_options: IncludeOption,
1558    webhook_info: Option<WebhookSourceInfo>,
1559    ast_engine: risingwave_sqlparser::ast::Engine,
1560) -> Result<RwPgResponse> {
1561    let session = handler_args.session.clone();
1562
1563    if append_only {
1564        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1565    }
1566
1567    session.check_cluster_limits().await?;
1568
1569    let engine = match ast_engine {
1570        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1571        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1572    };
1573
1574    if let Either::Right(resp) = session.check_relation_name_duplicated(
1575        table_name.clone(),
1576        StatementType::CREATE_TABLE,
1577        if_not_exists,
1578    )? {
1579        return Ok(resp);
1580    }
1581
1582    let (graph, source, hummock_table, job_type, shared_source_id) = {
1583        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1584            handler_args.clone(),
1585            ExplainOptions::default(),
1586            format_encode,
1587            cdc_table_info,
1588            &table_name,
1589            column_defs.clone(),
1590            wildcard_idx,
1591            constraints.clone(),
1592            source_watermarks,
1593            append_only,
1594            on_conflict,
1595            with_version_columns,
1596            include_column_options,
1597            webhook_info,
1598            engine,
1599        )
1600        .await?;
1601        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1602
1603        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1604
1605        (graph, source, table, job_type, shared_source_id)
1606    };
1607
1608    tracing::trace!(
1609        "name={}, graph=\n{}",
1610        table_name,
1611        serde_json::to_string_pretty(&graph).unwrap()
1612    );
1613
1614    let dependencies = shared_source_id
1615        .map(|id| HashSet::from([id.as_object_id()]))
1616        .unwrap_or_default();
1617
1618    // Handle engine
1619    match engine {
1620        Engine::Hummock => {
1621            let catalog_writer = session.catalog_writer()?;
1622            let action = match job_type {
1623                TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1624                _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1625            };
1626            execute_with_long_running_notification(
1627                catalog_writer.create_table(
1628                    source.map(|s| s.to_prost()),
1629                    hummock_table.to_prost(),
1630                    graph,
1631                    job_type,
1632                    if_not_exists,
1633                    dependencies,
1634                ),
1635                &session,
1636                "CREATE TABLE",
1637                action,
1638            )
1639            .await?;
1640        }
1641        Engine::Iceberg => {
1642            let hummock_table_name = hummock_table.name.clone();
1643            session.create_staging_table(hummock_table.clone());
1644            let res = Box::pin(create_iceberg_engine_table(
1645                session.clone(),
1646                handler_args,
1647                source.map(|s| s.to_prost()),
1648                hummock_table,
1649                graph,
1650                table_name,
1651                job_type,
1652                if_not_exists,
1653            ))
1654            .await;
1655            session.drop_staging_table(&hummock_table_name);
1656            res?
1657        }
1658    }
1659
1660    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1661}
1662
1663/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1664///
1665/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1666/// 2. create a hummock table
1667/// 3. create an iceberg sink
1668/// 4. create an iceberg source
1669///
1670/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1671#[allow(clippy::too_many_arguments)]
1672pub async fn create_iceberg_engine_table(
1673    session: Arc<SessionImpl>,
1674    handler_args: HandlerArgs,
1675    mut source: Option<PbSource>,
1676    table: TableCatalog,
1677    graph: StreamFragmentGraph,
1678    table_name: ObjectName,
1679    job_type: PbTableJobType,
1680    if_not_exists: bool,
1681) -> Result<()> {
1682    let rw_db_name = session
1683        .env()
1684        .catalog_reader()
1685        .read_guard()
1686        .get_database_by_id(table.database_id)?
1687        .name()
1688        .to_owned();
1689    let rw_schema_name = session
1690        .env()
1691        .catalog_reader()
1692        .read_guard()
1693        .get_schema_by_id(table.database_id, table.schema_id)?
1694        .name()
1695        .clone();
1696    let iceberg_catalog_name = rw_db_name.clone();
1697    let iceberg_database_name = rw_schema_name.clone();
1698    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1699
1700    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1701    let sink_decouple = session.config().sink_decouple();
1702    if matches!(sink_decouple, SinkDecouple::Disable) {
1703        bail!(
1704            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1705        );
1706    }
1707
1708    let mut connection_ref = BTreeMap::new();
1709    let with_common = if iceberg_engine_connection.is_empty() {
1710        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1711    } else {
1712        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1713        assert_eq!(parts.len(), 2);
1714        let connection_catalog =
1715            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1716        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1717            if params.connection_type == ConnectionType::Iceberg as i32 {
1718                // With iceberg engine connection:
1719                connection_ref.insert(
1720                    "connection".to_owned(),
1721                    ConnectionRefValue {
1722                        connection_name: ObjectName::from(vec![
1723                            Ident::from(parts[0]),
1724                            Ident::from(parts[1]),
1725                        ]),
1726                    },
1727                );
1728
1729                let mut with_common = BTreeMap::new();
1730                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1731                with_common.insert("database.name".to_owned(), iceberg_database_name);
1732                with_common.insert("table.name".to_owned(), iceberg_table_name);
1733
1734                let hosted_catalog = params
1735                    .properties
1736                    .get("hosted_catalog")
1737                    .map(|s| s.eq_ignore_ascii_case("true"))
1738                    .unwrap_or(false);
1739                if hosted_catalog {
1740                    let meta_client = session.env().meta_client();
1741                    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1742
1743                    let meta_store_endpoint =
1744                        url::Url::parse(&meta_store_endpoint).map_err(|_| {
1745                            ErrorCode::InternalError(
1746                                "failed to parse the meta store endpoint".to_owned(),
1747                            )
1748                        })?;
1749                    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1750                    let meta_store_user = meta_store_endpoint.username().to_owned();
1751                    let meta_store_password = match meta_store_endpoint.password() {
1752                        Some(password) => percent_decode_str(password)
1753                            .decode_utf8()
1754                            .map_err(|_| {
1755                                ErrorCode::InternalError(
1756                                    "failed to parse password from meta store endpoint".to_owned(),
1757                                )
1758                            })?
1759                            .into_owned(),
1760                        None => "".to_owned(),
1761                    };
1762                    let meta_store_host = meta_store_endpoint
1763                        .host_str()
1764                        .ok_or_else(|| {
1765                            ErrorCode::InternalError(
1766                                "failed to parse host from meta store endpoint".to_owned(),
1767                            )
1768                        })?
1769                        .to_owned();
1770                    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1771                        ErrorCode::InternalError(
1772                            "failed to parse port from meta store endpoint".to_owned(),
1773                        )
1774                    })?;
1775                    let meta_store_database = meta_store_endpoint
1776                        .path()
1777                        .trim_start_matches('/')
1778                        .to_owned();
1779
1780                    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1781                        bail!("failed to parse meta backend: {}", meta_store_backend);
1782                    };
1783
1784                    let catalog_uri = match meta_backend {
1785                        MetaBackend::Postgres => {
1786                            format!(
1787                                "jdbc:postgresql://{}:{}/{}",
1788                                meta_store_host, meta_store_port, meta_store_database
1789                            )
1790                        }
1791                        MetaBackend::Mysql => {
1792                            format!(
1793                                "jdbc:mysql://{}:{}/{}",
1794                                meta_store_host, meta_store_port, meta_store_database
1795                            )
1796                        }
1797                        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1798                            bail!(
1799                                "Unsupported meta backend for iceberg engine table: {}",
1800                                meta_store_backend
1801                            );
1802                        }
1803                    };
1804
1805                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1806                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1807                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1808                    with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1809                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1810                }
1811
1812                with_common
1813            } else {
1814                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1815                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1816                )));
1817            }
1818        } else {
1819            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1820                "Private Link Service has been deprecated. Please create a new connection instead."
1821                    .to_owned(),
1822            )));
1823        }
1824    };
1825
1826    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1827    // Fetch primary key from columns
1828    let mut pks = table
1829        .pk_column_names()
1830        .iter()
1831        .map(|c| c.to_string())
1832        .collect::<Vec<String>>();
1833
1834    // For the table without primary key. We will use `_row_id` as primary key.
1835    if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1836        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1837    }
1838
1839    let sink_from = CreateSink::From(table_name.clone());
1840
1841    let mut sink_name = table_name.clone();
1842    *sink_name.0.last_mut().unwrap() = Ident::from(
1843        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1844    );
1845    let create_sink_stmt = CreateSinkStatement {
1846        if_not_exists: false,
1847        sink_name,
1848        with_properties: WithProperties(vec![]),
1849        sink_from,
1850        columns: vec![],
1851        emit_mode: None,
1852        sink_schema: None,
1853        into_table_name: None,
1854    };
1855
1856    let mut sink_handler_args = handler_args.clone();
1857
1858    let mut sink_with = with_common.clone();
1859
1860    let enable_pk_index = handler_args
1861        .with_options
1862        .get(ENABLE_PK_INDEX)
1863        .is_some_and(|val| val.eq_ignore_ascii_case("true"));
1864
1865    // Iceberg with pk index does not support auto schema change yet.
1866    if !enable_pk_index {
1867        sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1868    }
1869
1870    if table.append_only {
1871        sink_with.insert("type".to_owned(), "append-only".to_owned());
1872    } else {
1873        sink_with.insert("type".to_owned(), "upsert".to_owned());
1874        if !enable_pk_index {
1875            sink_with.insert("primary_key".to_owned(), pks.join(","));
1876        }
1877    }
1878    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1879    //
1880    // Note: in theory, we don't need to backfill from the table to the sink,
1881    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1882    // so it may have potential data loss problem on the first barrier.
1883    //
1884    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1885    // data will be present in hummock table.
1886    //
1887    // For append-only table, we need to be more careful.
1888    //
1889    // The possible cases for a table:
1890    // - For table without connector: it doesn't matter, since there's no data before the table is created
1891    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1892    //   + If we support blocking DDL for table with connector, we need to be careful.
1893    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1894    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1895    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1896    let commit_checkpoint_interval = handler_args
1897        .with_options
1898        .get(COMMIT_CHECKPOINT_INTERVAL)
1899        .map(|v| v.to_owned())
1900        .unwrap_or_else(|| "60".to_owned());
1901    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1902        ErrorCode::InvalidInputSyntax(format!(
1903            "commit_checkpoint_interval must be greater than 0: {}",
1904            commit_checkpoint_interval
1905        ))
1906    })?;
1907
1908    if commit_checkpoint_interval == 0 {
1909        bail!("commit_checkpoint_interval must be greater than 0");
1910    }
1911
1912    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1913    source
1914        .as_mut()
1915        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1916
1917    let sink_decouple = session.config().sink_decouple();
1918    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1919        bail!(
1920            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1921        )
1922    }
1923
1924    sink_with.insert(
1925        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1926        commit_checkpoint_interval.to_string(),
1927    );
1928    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1929
1930    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1931
1932    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1933        match enable_compaction.to_lowercase().as_str() {
1934            "true" => {
1935                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1936            }
1937            "false" => {
1938                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1939            }
1940            _ => {
1941                return Err(ErrorCode::InvalidInputSyntax(format!(
1942                    "enable_compaction must be true or false: {}",
1943                    enable_compaction
1944                ))
1945                .into());
1946            }
1947        }
1948
1949        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1950        source
1951            .as_mut()
1952            .map(|x| x.with_properties.remove("enable_compaction"));
1953    } else {
1954        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1955    }
1956
1957    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1958        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1959            ErrorCode::InvalidInputSyntax(format!(
1960                "compaction_interval_sec must be greater than 0: {}",
1961                commit_checkpoint_interval
1962            ))
1963        })?;
1964        if compaction_interval_sec == 0 {
1965            bail!("compaction_interval_sec must be greater than 0");
1966        }
1967        sink_with.insert(
1968            "compaction_interval_sec".to_owned(),
1969            compaction_interval_sec.to_string(),
1970        );
1971        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1972        source
1973            .as_mut()
1974            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1975    }
1976
1977    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1978        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1979    {
1980        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1981        source
1982            .as_mut()
1983            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1984        match enable_snapshot_expiration.to_lowercase().as_str() {
1985            "true" => {
1986                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1987                true
1988            }
1989            "false" => {
1990                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1991                false
1992            }
1993            _ => {
1994                return Err(ErrorCode::InvalidInputSyntax(format!(
1995                    "enable_snapshot_expiration must be true or false: {}",
1996                    enable_snapshot_expiration
1997                ))
1998                .into());
1999            }
2000        }
2001    } else {
2002        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
2003        true
2004    };
2005
2006    if has_enabled_snapshot_expiration {
2007        // configuration for snapshot expiration
2008        if let Some(snapshot_expiration_retain_last) = handler_args
2009            .with_options
2010            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
2011        {
2012            sink_with.insert(
2013                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
2014                snapshot_expiration_retain_last.to_owned(),
2015            );
2016            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
2017            source
2018                .as_mut()
2019                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
2020        }
2021
2022        if let Some(snapshot_expiration_max_age) = handler_args
2023            .with_options
2024            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
2025        {
2026            sink_with.insert(
2027                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
2028                snapshot_expiration_max_age.to_owned(),
2029            );
2030            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
2031            source
2032                .as_mut()
2033                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
2034        }
2035
2036        if let Some(snapshot_expiration_clear_expired_files) = handler_args
2037            .with_options
2038            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
2039        {
2040            sink_with.insert(
2041                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
2042                snapshot_expiration_clear_expired_files.to_owned(),
2043            );
2044            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
2045            source.as_mut().map(|x| {
2046                x.with_properties
2047                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
2048            });
2049        }
2050
2051        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
2052            .with_options
2053            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
2054        {
2055            sink_with.insert(
2056                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
2057                snapshot_expiration_clear_expired_meta_data.to_owned(),
2058            );
2059            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
2060            source.as_mut().map(|x| {
2061                x.with_properties
2062                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
2063            });
2064        }
2065    }
2066
2067    if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
2068        let format_version = format_version.parse::<u8>().map_err(|_| {
2069            ErrorCode::InvalidInputSyntax(format!(
2070                "format_version must be 1, 2 or 3: {}",
2071                format_version
2072            ))
2073        })?;
2074        if format_version != 1 && format_version != 2 && format_version != 3 {
2075            bail!("format_version must be 1, 2 or 3");
2076        }
2077        sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
2078
2079        // remove format_version from source options, otherwise it will be considered as an unknown field.
2080        source
2081            .as_mut()
2082            .map(|x| x.with_properties.remove(FORMAT_VERSION));
2083    }
2084
2085    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
2086        let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
2087            ErrorCode::InvalidInputSyntax(format!(
2088                "invalid write_mode: {}, must be one of: {}, {}",
2089                write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
2090            ))
2091        })?;
2092
2093        match write_mode {
2094            IcebergWriteMode::MergeOnRead => {
2095                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2096            }
2097
2098            IcebergWriteMode::CopyOnWrite => {
2099                if table.append_only {
2100                    return Err(ErrorCode::NotSupported(
2101                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2102                        "Please use MERGE ON READ instead".to_owned(),
2103                    )
2104                    .into());
2105                }
2106
2107                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2108            }
2109        }
2110
2111        // remove write_mode from source options, otherwise it will be considered as an unknown field.
2112        source
2113            .as_mut()
2114            .map(|x| x.with_properties.remove("write_mode"));
2115    } else {
2116        sink_with.insert(
2117            WRITE_MODE.to_owned(),
2118            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2119        );
2120    }
2121
2122    if let Some(enable_pk_index) = handler_args.with_options.get(ENABLE_PK_INDEX) {
2123        match enable_pk_index.to_lowercase().as_str() {
2124            "true" => {
2125                sink_with.insert(ENABLE_PK_INDEX.to_owned(), "true".to_owned());
2126            }
2127            "false" => {
2128                sink_with.insert(ENABLE_PK_INDEX.to_owned(), "false".to_owned());
2129            }
2130            _ => {
2131                return Err(ErrorCode::InvalidInputSyntax(format!(
2132                    "enable_pk_index must be true or false: {}",
2133                    enable_pk_index
2134                ))
2135                .into());
2136            }
2137        }
2138
2139        // remove enable_pk_index from source options, otherwise it will be considered as an unknown field.
2140        source
2141            .as_mut()
2142            .map(|x| x.with_properties.remove(ENABLE_PK_INDEX));
2143    }
2144
2145    if let Some(max_snapshots_num_before_compaction) =
2146        handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2147    {
2148        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2149            .parse::<u32>()
2150            .map_err(|_| {
2151                ErrorCode::InvalidInputSyntax(format!(
2152                    "{} must be greater than 0: {}",
2153                    COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2154                ))
2155            })?;
2156
2157        if max_snapshots_num_before_compaction == 0 {
2158            bail!(format!(
2159                "{} must be greater than 0",
2160                COMPACTION_MAX_SNAPSHOTS_NUM
2161            ));
2162        }
2163
2164        sink_with.insert(
2165            COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2166            max_snapshots_num_before_compaction.to_string(),
2167        );
2168
2169        // remove from source options, otherwise it will be considered as an unknown field.
2170        source
2171            .as_mut()
2172            .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2173    }
2174
2175    if let Some(small_files_threshold_mb) = handler_args
2176        .with_options
2177        .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2178    {
2179        let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2180            ErrorCode::InvalidInputSyntax(format!(
2181                "{} must be greater than 0: {}",
2182                COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2183            ))
2184        })?;
2185        if small_files_threshold_mb == 0 {
2186            bail!(format!(
2187                "{} must be a greater than 0",
2188                COMPACTION_SMALL_FILES_THRESHOLD_MB
2189            ));
2190        }
2191        sink_with.insert(
2192            COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2193            small_files_threshold_mb.to_string(),
2194        );
2195
2196        // remove from source options, otherwise it will be considered as an unknown field.
2197        source.as_mut().map(|x| {
2198            x.with_properties
2199                .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2200        });
2201    }
2202
2203    if let Some(delete_files_count_threshold) = handler_args
2204        .with_options
2205        .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2206    {
2207        let delete_files_count_threshold =
2208            delete_files_count_threshold.parse::<usize>().map_err(|_| {
2209                ErrorCode::InvalidInputSyntax(format!(
2210                    "{} must be greater than 0: {}",
2211                    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2212                ))
2213            })?;
2214        if delete_files_count_threshold == 0 {
2215            bail!(format!(
2216                "{} must be greater than 0",
2217                COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2218            ));
2219        }
2220        sink_with.insert(
2221            COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2222            delete_files_count_threshold.to_string(),
2223        );
2224
2225        // remove from source options, otherwise it will be considered as an unknown field.
2226        source.as_mut().map(|x| {
2227            x.with_properties
2228                .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2229        });
2230    }
2231
2232    if let Some(trigger_snapshot_count) = handler_args
2233        .with_options
2234        .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2235    {
2236        let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2237            ErrorCode::InvalidInputSyntax(format!(
2238                "{} must be greater than 0: {}",
2239                COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2240            ))
2241        })?;
2242        if trigger_snapshot_count == 0 {
2243            bail!(format!(
2244                "{} must be greater than 0",
2245                COMPACTION_TRIGGER_SNAPSHOT_COUNT
2246            ));
2247        }
2248        sink_with.insert(
2249            COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2250            trigger_snapshot_count.to_string(),
2251        );
2252
2253        // remove from source options, otherwise it will be considered as an unknown field.
2254        source
2255            .as_mut()
2256            .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2257    }
2258
2259    if let Some(target_file_size_mb) = handler_args
2260        .with_options
2261        .get(COMPACTION_TARGET_FILE_SIZE_MB)
2262    {
2263        let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2264            ErrorCode::InvalidInputSyntax(format!(
2265                "{} must be greater than 0: {}",
2266                COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2267            ))
2268        })?;
2269        if target_file_size_mb == 0 {
2270            bail!(format!(
2271                "{} must be greater than 0",
2272                COMPACTION_TARGET_FILE_SIZE_MB
2273            ));
2274        }
2275        sink_with.insert(
2276            COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2277            target_file_size_mb.to_string(),
2278        );
2279        // remove from source options, otherwise it will be considered as an unknown field.
2280        source
2281            .as_mut()
2282            .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2283    }
2284
2285    if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2286        let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2287            ErrorCode::InvalidInputSyntax(format!(
2288                "invalid compaction_type: {}, must be one of {:?}",
2289                compaction_type,
2290                [
2291                    CompactionType::Full,
2292                    CompactionType::SmallFiles,
2293                    CompactionType::FilesWithDelete
2294                ]
2295            ))
2296        })?;
2297
2298        sink_with.insert(
2299            COMPACTION_TYPE.to_owned(),
2300            compaction_type.as_str().to_owned(),
2301        );
2302
2303        // remove from source options, otherwise it will be considered as an unknown field.
2304        source
2305            .as_mut()
2306            .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2307    }
2308
2309    if let Some(write_parquet_compression) = handler_args
2310        .with_options
2311        .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2312    {
2313        sink_with.insert(
2314            COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2315            write_parquet_compression.to_owned(),
2316        );
2317        // remove from source options, otherwise it will be considered as an unknown field.
2318        source.as_mut().map(|x| {
2319            x.with_properties
2320                .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2321        });
2322    }
2323
2324    if let Some(write_parquet_max_row_group_rows) = handler_args
2325        .with_options
2326        .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2327    {
2328        let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2329            .parse::<usize>()
2330            .map_err(|_| {
2331                ErrorCode::InvalidInputSyntax(format!(
2332                    "{} must be a positive integer: {}",
2333                    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2334                ))
2335            })?;
2336        if write_parquet_max_row_group_rows == 0 {
2337            bail!(format!(
2338                "{} must be greater than 0",
2339                COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2340            ));
2341        }
2342        sink_with.insert(
2343            COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2344            write_parquet_max_row_group_rows.to_string(),
2345        );
2346        // remove from source options, otherwise it will be considered as an unknown field.
2347        source.as_mut().map(|x| {
2348            x.with_properties
2349                .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2350        });
2351    }
2352
2353    if let Some(write_parquet_max_row_group_bytes) = handler_args
2354        .with_options
2355        .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2356    {
2357        let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2358            .parse::<usize>()
2359            .map_err(|_| {
2360                ErrorCode::InvalidInputSyntax(format!(
2361                    "{} must be a positive integer: {}",
2362                    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2363                ))
2364            })?;
2365        if write_parquet_max_row_group_bytes == 0 {
2366            bail!(format!(
2367                "{} must be greater than 0",
2368                COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2369            ));
2370        }
2371        sink_with.insert(
2372            COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2373            write_parquet_max_row_group_bytes.to_string(),
2374        );
2375        source.as_mut().map(|x| {
2376            x.with_properties
2377                .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2378        });
2379    }
2380
2381    let partition_by = handler_args
2382        .with_options
2383        .get("partition_by")
2384        .map(|v| v.to_owned());
2385
2386    if let Some(partition_by) = &partition_by {
2387        let mut partition_columns = vec![];
2388        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2389            table
2390                .columns()
2391                .iter()
2392                .find(|col| col.name().eq_ignore_ascii_case(&column))
2393                .ok_or_else(|| {
2394                    ErrorCode::InvalidInputSyntax(format!(
2395                        "Partition source column does not exist in schema: {}",
2396                        column
2397                    ))
2398                })?;
2399
2400            partition_columns.push(column);
2401        }
2402
2403        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2404            |_| {
2405                ErrorCode::InvalidInputSyntax(
2406                    "The partition columns should be the prefix of the primary key".to_owned(),
2407                )
2408            },
2409        )?;
2410
2411        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2412
2413        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2414        source
2415            .as_mut()
2416            .map(|x| x.with_properties.remove("partition_by"));
2417    }
2418
2419    let order_key = handler_args
2420        .with_options
2421        .get(ORDER_KEY)
2422        .map(|v| v.to_owned());
2423    if let Some(order_key) = &order_key {
2424        validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2425            .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2426
2427        sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2428
2429        source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2430    }
2431
2432    sink_handler_args.with_options =
2433        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2434    let SinkPlanContext {
2435        sink_plan,
2436        sink_catalog,
2437        ..
2438    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2439    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2440
2441    let mut source_name = table_name.clone();
2442    *source_name.0.last_mut().unwrap() = Ident::from(
2443        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2444    );
2445    let create_source_stmt = CreateSourceStatement {
2446        temporary: false,
2447        if_not_exists: false,
2448        columns: vec![],
2449        source_name,
2450        wildcard_idx: Some(0),
2451        constraints: vec![],
2452        with_properties: WithProperties(vec![]),
2453        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2454        source_watermarks: vec![],
2455        include_column_options: vec![],
2456    };
2457
2458    let mut source_handler_args = handler_args.clone();
2459    let source_with = with_common;
2460    source_handler_args.with_options =
2461        WithOptions::new(source_with, Default::default(), connection_ref);
2462
2463    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2464    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2465    let (with_properties, refresh_mode) =
2466        bind_connector_props(&source_handler_args, &format_encode, true)?;
2467
2468    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2469    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2470    let (iceberg_catalog, table_identifier) = {
2471        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2472        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2473        iceberg_sink.create_table_if_not_exists().await?;
2474
2475        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2476        let table_identifier = iceberg_sink.config.full_table_name()?;
2477        (iceberg_catalog, table_identifier)
2478    };
2479
2480    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2481    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2482        &session,
2483        &format_encode,
2484        Either::Left(&with_properties),
2485        create_source_type,
2486    )
2487    .await?;
2488    let mut col_id_gen = ColumnIdGenerator::new_initial();
2489
2490    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2491        source_handler_args,
2492        create_source_stmt.source_name,
2493        format_encode,
2494        with_properties,
2495        &create_source_stmt.columns,
2496        create_source_stmt.constraints,
2497        create_source_stmt.wildcard_idx,
2498        create_source_stmt.source_watermarks,
2499        columns_from_resolve_source,
2500        source_info,
2501        create_source_stmt.include_column_options,
2502        &mut col_id_gen,
2503        create_source_type,
2504        overwrite_options.source_rate_limit,
2505        SqlColumnStrategy::FollowChecked,
2506        refresh_mode,
2507    )
2508    .await?;
2509
2510    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2511    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2512    let _ = Jvm::get_or_init()?;
2513
2514    let catalog_writer = session.catalog_writer()?;
2515    let action = match job_type {
2516        TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2517        _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2518    };
2519    let res = execute_with_long_running_notification(
2520        catalog_writer.create_iceberg_table(
2521            PbTableJobInfo {
2522                source,
2523                table: Some(table.to_prost()),
2524                fragment_graph: Some(graph),
2525                job_type: job_type as _,
2526            },
2527            PbSinkJobInfo {
2528                sink: Some(sink_catalog.to_proto()),
2529                fragment_graph: Some(sink_graph),
2530            },
2531            iceberg_source_catalog.to_prost(),
2532            if_not_exists,
2533        ),
2534        &session,
2535        "CREATE TABLE",
2536        action,
2537    )
2538    .await;
2539
2540    if res.is_err() {
2541        let _ = iceberg_catalog
2542            .drop_table(&table_identifier)
2543            .await
2544            .inspect_err(|err| {
2545                tracing::error!(
2546                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2547                    table_identifier,
2548                    err.as_report()
2549                );
2550            });
2551        res?
2552    }
2553
2554    Ok(())
2555}
2556
2557pub fn check_create_table_with_source(
2558    with_options: &WithOptions,
2559    format_encode: Option<FormatEncodeOptions>,
2560    include_column_options: &IncludeOption,
2561    cdc_table_info: &Option<CdcTableInfo>,
2562) -> Result<Option<FormatEncodeOptions>> {
2563    // skip check for cdc table
2564    if cdc_table_info.is_some() {
2565        return Ok(format_encode);
2566    }
2567    let defined_source = with_options.is_source_connector();
2568
2569    if !include_column_options.is_empty() && !defined_source {
2570        return Err(ErrorCode::InvalidInputSyntax(
2571            "INCLUDE should be used with a connector".to_owned(),
2572        )
2573        .into());
2574    }
2575    if defined_source {
2576        format_encode.as_ref().ok_or_else(|| {
2577            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2578        })?;
2579    }
2580    Ok(format_encode)
2581}
2582
2583fn ensure_partition_columns_are_prefix_of_primary_key(
2584    partition_columns: &[String],
2585    primary_key_columns: &[String],
2586) -> std::result::Result<(), String> {
2587    if partition_columns.len() > primary_key_columns.len() {
2588        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2589    }
2590
2591    for (i, partition_col) in partition_columns.iter().enumerate() {
2592        if primary_key_columns.get(i) != Some(partition_col) {
2593            return Err(format!(
2594                "Partition column '{}' is not a prefix of the primary key.",
2595                partition_col
2596            ));
2597        }
2598    }
2599
2600    Ok(())
2601}
2602
2603#[allow(clippy::too_many_arguments)]
2604pub async fn generate_stream_graph_for_replace_table(
2605    _session: &Arc<SessionImpl>,
2606    table_name: ObjectName,
2607    original_catalog: &Arc<TableCatalog>,
2608    handler_args: HandlerArgs,
2609    statement: Statement,
2610    col_id_gen: ColumnIdGenerator,
2611    sql_column_strategy: SqlColumnStrategy,
2612) -> Result<(
2613    StreamFragmentGraph,
2614    TableCatalog,
2615    Option<SourceCatalog>,
2616    TableJobType,
2617)> {
2618    let Statement::CreateTable {
2619        columns,
2620        constraints,
2621        source_watermarks,
2622        append_only,
2623        on_conflict,
2624        with_version_columns,
2625        wildcard_idx,
2626        cdc_table_info,
2627        format_encode,
2628        include_column_options,
2629        engine,
2630        with_options,
2631        ..
2632    } = statement
2633    else {
2634        panic!("unexpected statement type: {:?}", statement);
2635    };
2636
2637    let format_encode = format_encode
2638        .clone()
2639        .map(|format_encode| format_encode.into_v2_with_warning());
2640
2641    let engine = match engine {
2642        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2643        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2644    };
2645
2646    let is_drop_connector =
2647        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2648    if is_drop_connector {
2649        debug_assert!(
2650            source_watermarks.is_empty()
2651                && include_column_options.is_empty()
2652                && with_options
2653                    .iter()
2654                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2655        );
2656    }
2657
2658    let props = CreateTableProps {
2659        definition: handler_args.normalized_sql.clone(),
2660        append_only,
2661        on_conflict: on_conflict.into(),
2662        with_version_columns: with_version_columns
2663            .iter()
2664            .map(|col| col.real_value())
2665            .collect(),
2666        webhook_info: original_catalog.webhook_info.clone(),
2667        engine,
2668    };
2669
2670    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2671        (Some(format_encode), None) => (
2672            gen_create_table_plan_with_source(
2673                handler_args,
2674                ExplainOptions::default(),
2675                table_name,
2676                columns,
2677                wildcard_idx,
2678                constraints,
2679                format_encode,
2680                source_watermarks,
2681                col_id_gen,
2682                include_column_options,
2683                props,
2684                sql_column_strategy,
2685            )
2686            .await?,
2687            TableJobType::General,
2688        ),
2689        (None, None) => {
2690            let context = OptimizerContext::from_handler_args(handler_args);
2691            let (plan, table) = gen_create_table_plan(
2692                context,
2693                table_name,
2694                columns,
2695                constraints,
2696                col_id_gen,
2697                source_watermarks,
2698                props,
2699                true,
2700            )?;
2701            ((plan, None, table), TableJobType::General)
2702        }
2703        (None, Some(cdc_table)) => {
2704            sanity_check_for_table_on_cdc_source(
2705                append_only,
2706                &columns,
2707                &wildcard_idx,
2708                &constraints,
2709                &source_watermarks,
2710            )?;
2711
2712            let session = &handler_args.session;
2713            let (source, resolved_table_name) =
2714                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2715
2716            let (cdc_with_options, normalized_external_table_name) =
2717                derive_with_options_for_cdc_table(
2718                    &source.with_properties,
2719                    cdc_table.external_table_name.clone(),
2720                )?;
2721
2722            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2723
2724            let context: OptimizerContextRef =
2725                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2726            let (plan, table) = gen_create_table_plan_for_cdc_table(
2727                context,
2728                source,
2729                normalized_external_table_name,
2730                columns,
2731                source_watermarks,
2732                column_catalogs,
2733                pk_names,
2734                cdc_with_options,
2735                col_id_gen,
2736                on_conflict,
2737                with_version_columns
2738                    .iter()
2739                    .map(|col| col.real_value())
2740                    .collect(),
2741                include_column_options,
2742                table_name,
2743                resolved_table_name,
2744                original_catalog.database_id,
2745                original_catalog.schema_id,
2746                original_catalog.id(),
2747                engine,
2748            )?;
2749
2750            ((plan, None, table), TableJobType::SharedCdcSource)
2751        }
2752        (Some(_), Some(_)) => {
2753            return Err(ErrorCode::NotSupported(
2754                "Data format and encoding format doesn't apply to table created from a CDC source"
2755                    .into(),
2756                "Remove the FORMAT and ENCODE specification".into(),
2757            )
2758            .into());
2759        }
2760    };
2761
2762    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2763        Err(ErrorCode::InvalidInputSyntax(
2764            "alter primary key of table is not supported".to_owned(),
2765        ))?
2766    }
2767
2768    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2769
2770    // Fill the original table ID.
2771    table.id = original_catalog.id();
2772    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2773        table.associated_source_id = Some(source_id);
2774
2775        let source = source.as_mut().unwrap();
2776        source.id = source_id;
2777        source.associated_table_id = Some(table.id());
2778    }
2779
2780    Ok((graph, table, source, job_type))
2781}
2782
2783fn get_source_and_resolved_table_name(
2784    session: &Arc<SessionImpl>,
2785    cdc_table: CdcTableInfo,
2786    table_name: ObjectName,
2787) -> Result<(Arc<SourceCatalog>, String)> {
2788    let db_name = &session.database();
2789    let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2790
2791    let (source_schema, source_name) =
2792        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2793
2794    let source = {
2795        let catalog_reader = session.env().catalog_reader().read_guard();
2796        let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2797        let (source, _) = catalog_reader.get_source_by_name(
2798            db_name,
2799            SchemaPath::Name(schema_name.as_str()),
2800            source_name.as_str(),
2801        )?;
2802        source.clone()
2803    };
2804
2805    Ok((source, resolved_table_name))
2806}
2807
2808// validate the webhook_info and also bind the webhook_info to protobuf
2809fn bind_webhook_info(
2810    session: &Arc<SessionImpl>,
2811    column_defs: &[ColumnDef],
2812    webhook_info: WebhookSourceInfo,
2813) -> Result<PbWebhookSourceInfo> {
2814    let WebhookSourceInfo {
2815        secret_ref,
2816        signature_expr,
2817        wait_for_persistence,
2818        is_batched,
2819    } = webhook_info;
2820
2821    for column in column_defs {
2822        for option_def in &column.options {
2823            match option_def.option {
2824                ColumnOption::Null => {}
2825                ColumnOption::GeneratedColumns(_) => {
2826                    return Err(ErrorCode::InvalidInputSyntax(
2827                        "generated columns are not supported for webhook tables".to_owned(),
2828                    )
2829                    .into());
2830                }
2831                ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2832                    return Err(ErrorCode::InvalidInputSyntax(
2833                        "default values are not supported for webhook tables".to_owned(),
2834                    )
2835                    .into());
2836                }
2837                ColumnOption::NotNull
2838                | ColumnOption::Unique { .. }
2839                | ColumnOption::ForeignKey { .. }
2840                | ColumnOption::Check(_)
2841                | ColumnOption::DialectSpecific(_) => {
2842                    return Err(ErrorCode::InvalidInputSyntax(
2843                        "only NULL column option is supported for webhook tables".to_owned(),
2844                    )
2845                    .into());
2846                }
2847            }
2848        }
2849    }
2850
2851    // validate secret_ref
2852    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2853        let db_name = &session.database();
2854        let (schema_name, secret_name) =
2855            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2856        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2857        (
2858            Some(PbSecretRef {
2859                secret_id: secret_catalog.id,
2860                ref_as: match secret_ref.ref_as {
2861                    SecretRefAsType::Text => PbRefAsType::Text,
2862                    SecretRefAsType::File => PbRefAsType::File,
2863                }
2864                .into(),
2865            }),
2866            Some(secret_name),
2867        )
2868    } else {
2869        (None, None)
2870    };
2871
2872    let signature_expr = if let Some(signature_expr) = signature_expr {
2873        let payload_name = if column_defs.len() == 1
2874            && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2875        {
2876            column_defs[0].name.real_value()
2877        } else {
2878            WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2879        };
2880        let secure_compare_context = SecureCompareContext {
2881            payload_name,
2882            secret_name,
2883        };
2884        let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2885        let expr = binder.bind_expr(&signature_expr)?;
2886
2887        // validate expr, ensuring it is SECURE_COMPARE()
2888        if expr.as_function_call().is_none()
2889            || expr.as_function_call().unwrap().func_type()
2890                != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2891        {
2892            return Err(ErrorCode::InvalidInputSyntax(
2893                "The signature verification function must be SECURE_COMPARE()".to_owned(),
2894            )
2895            .into());
2896        }
2897
2898        Some(expr.to_expr_proto())
2899    } else {
2900        session.notice_to_user(
2901            "VALIDATE clause is strongly recommended for safety or production usages",
2902        );
2903        None
2904    };
2905
2906    let pb_webhook_info = PbWebhookSourceInfo {
2907        secret_ref: pb_secret_ref,
2908        signature_expr,
2909        wait_for_persistence,
2910        is_batched,
2911    };
2912
2913    Ok(pb_webhook_info)
2914}
2915
2916#[cfg(test)]
2917mod tests {
2918    use risingwave_common::catalog::{
2919        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2920    };
2921    use risingwave_common::types::{DataType, StructType};
2922
2923    use super::*;
2924    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2925
2926    #[tokio::test]
2927    async fn test_create_table_handler() {
2928        let sql =
2929            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2930        let frontend = LocalFrontend::new(Default::default()).await;
2931        frontend.run_sql(sql).await.unwrap();
2932
2933        let session = frontend.session_ref();
2934        let catalog_reader = session.env().catalog_reader().read_guard();
2935        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2936
2937        // Check table exists.
2938        let (table, _) = catalog_reader
2939            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2940            .unwrap();
2941        assert_eq!(table.name(), "t");
2942
2943        let columns = table
2944            .columns
2945            .iter()
2946            .map(|col| (col.name(), col.data_type().clone()))
2947            .collect::<HashMap<&str, DataType>>();
2948
2949        let expected_columns = maplit::hashmap! {
2950            ROW_ID_COLUMN_NAME => DataType::Serial,
2951            "v1" => DataType::Int16,
2952            "v2" => StructType::new(
2953                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2954            )
2955            .with_ids([3, 4, 5].map(ColumnId::new))
2956            .into(),
2957            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2958        };
2959
2960        assert_eq!(columns, expected_columns, "{columns:#?}");
2961    }
2962
2963    #[tokio::test]
2964    async fn test_create_webhook_table_with_arbitrary_columns() {
2965        let frontend = LocalFrontend::new(Default::default()).await;
2966        frontend
2967            .run_sql("create schema ingest_schema;")
2968            .await
2969            .unwrap();
2970        frontend
2971            .run_sql(
2972                r#"
2973                create table ingest_schema.orders (
2974                    id int,
2975                    customer_name varchar,
2976                    amount double precision,
2977                    primary key (id)
2978                ) with (
2979                    connector = 'webhook'
2980                ) validate as secure_compare(
2981                    headers->>'x-rw-signature',
2982                    'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2983                );
2984                "#,
2985            )
2986            .await
2987            .unwrap();
2988
2989        let session = frontend.session_ref();
2990        let catalog_reader = session.env().catalog_reader().read_guard();
2991        let (table, _) = catalog_reader
2992            .get_created_table_by_name(
2993                DEFAULT_DATABASE_NAME,
2994                SchemaPath::Name("ingest_schema"),
2995                "orders",
2996            )
2997            .unwrap();
2998
2999        assert!(table.webhook_info.is_some());
3000        assert_eq!(
3001            table
3002                .columns
3003                .iter()
3004                .filter(|column| column.can_dml())
3005                .count(),
3006            3
3007        );
3008    }
3009
3010    #[tokio::test]
3011    async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
3012        let frontend = LocalFrontend::new(Default::default()).await;
3013        frontend
3014            .run_sql(
3015                r#"
3016                create table webhook_single_column (
3017                    body jsonb
3018                ) with (
3019                    connector = 'webhook'
3020                ) validate as secure_compare(
3021                    headers->>'x-rw-signature',
3022                    'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
3023                );
3024                "#,
3025            )
3026            .await
3027            .unwrap();
3028    }
3029
3030    #[tokio::test]
3031    async fn test_create_webhook_table_with_generated_columns() {
3032        let frontend = LocalFrontend::new(Default::default()).await;
3033        let err = frontend
3034            .run_sql(
3035                r#"
3036                create table webhook_generated_columns (
3037                    id int,
3038                    amount double precision,
3039                    amount_with_fee double precision as amount + 1.0
3040                ) with (
3041                    connector = 'webhook'
3042                );
3043                "#,
3044            )
3045            .await
3046            .unwrap_err();
3047
3048        assert!(
3049            err.to_string()
3050                .contains("generated columns are not supported for webhook tables"),
3051            "{err:?}"
3052        );
3053    }
3054
3055    #[tokio::test]
3056    async fn test_create_webhook_table_with_default_value() {
3057        let frontend = LocalFrontend::new(Default::default()).await;
3058        let err = frontend
3059            .run_sql(
3060                r#"
3061                create table webhook_default_value (
3062                    id int default 42,
3063                    amount double precision
3064                ) with (
3065                    connector = 'webhook'
3066                );
3067                "#,
3068            )
3069            .await
3070            .unwrap_err();
3071
3072        assert!(
3073            err.to_string()
3074                .contains("default values are not supported for webhook tables"),
3075            "{err:?}"
3076        );
3077    }
3078
3079    #[tokio::test]
3080    async fn test_create_webhook_table_with_not_null_option() {
3081        let frontend = LocalFrontend::new(Default::default()).await;
3082        let err = frontend
3083            .run_sql(
3084                r#"
3085                create table webhook_not_null (
3086                    id int not null,
3087                    amount double precision
3088                ) with (
3089                    connector = 'webhook'
3090                );
3091                "#,
3092            )
3093            .await
3094            .unwrap_err();
3095
3096        assert!(
3097            err.to_string()
3098                .contains("only NULL column option is supported for webhook tables"),
3099            "{err:?}"
3100        );
3101    }
3102
3103    #[test]
3104    fn test_parse_postgres_cdc_external_table_name() {
3105        for (input, expected) in [
3106            ("public.Note", ("public", "Note")),
3107            ("public.\"Note\"", ("public", "Note")),
3108            (
3109                "\"Mixed.Schema\".\"Note.Table\"",
3110                ("Mixed.Schema", "Note.Table"),
3111            ),
3112            ("public.\"Note\"\"Archive\"", ("public", "Note\"Archive")),
3113        ] {
3114            assert_eq!(
3115                parse_postgres_cdc_external_table_name(input).unwrap(),
3116                (expected.0.to_owned(), expected.1.to_owned()),
3117                "input: {input}"
3118            );
3119        }
3120
3121        for input in [
3122            "Note",
3123            "public.",
3124            ".Note",
3125            "public.\"Note",
3126            "public.\"Note\"Archive",
3127            "public.Note.Archive",
3128        ] {
3129            assert!(
3130                parse_postgres_cdc_external_table_name(input).is_err(),
3131                "input should be rejected: {input}"
3132            );
3133        }
3134    }
3135
3136    #[test]
3137    fn test_bind_primary_key() {
3138        // Note: Column ID 0 is reserved for row ID column.
3139
3140        for (sql, expected) in [
3141            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
3142            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
3143            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
3144            (
3145                "create table t (v1 int primary key, v2 int primary key)",
3146                Err("multiple primary keys are not allowed"),
3147            ),
3148            (
3149                "create table t (v1 int primary key primary key, v2 int)",
3150                Err("multiple primary keys are not allowed"),
3151            ),
3152            (
3153                "create table t (v1 int, v2 int, primary key (v1))",
3154                Ok(&[1]),
3155            ),
3156            (
3157                "create table t (v1 int, primary key (v2), v2 int)",
3158                Ok(&[2]),
3159            ),
3160            (
3161                "create table t (primary key (v2, v1), v1 int, v2 int)",
3162                Ok(&[2, 1]),
3163            ),
3164            (
3165                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3166                Err("multiple primary keys are not allowed"),
3167            ),
3168            (
3169                "create table t (v1 int primary key, primary key (v1), v2 int)",
3170                Err("multiple primary keys are not allowed"),
3171            ),
3172            (
3173                "create table t (v1 int, primary key (V3), v2 int)",
3174                Err("column \"v3\" named in key does not exist"),
3175            ),
3176        ] {
3177            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3178            let risingwave_sqlparser::ast::Statement::CreateTable {
3179                columns: column_defs,
3180                constraints,
3181                ..
3182            } = ast.remove(0)
3183            else {
3184                panic!("test case should be create table")
3185            };
3186            let actual: Result<_> = (|| {
3187                let mut columns = bind_sql_columns(&column_defs, false)?;
3188                let mut col_id_gen = ColumnIdGenerator::new_initial();
3189                for c in &mut columns {
3190                    col_id_gen.generate(c)?;
3191                }
3192
3193                let pk_names =
3194                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3195                let (_, pk_column_ids, _) =
3196                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3197                Ok(pk_column_ids)
3198            })();
3199            match (expected, actual) {
3200                (Ok(expected), Ok(actual)) => assert_eq!(
3201                    expected.iter().copied().map(ColumnId::new).collect_vec(),
3202                    actual,
3203                    "sql: {sql}"
3204                ),
3205                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3206                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3207                (Err(expected), Err(actual)) => assert!(
3208                    actual.to_string().contains(expected),
3209                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3210                ),
3211            }
3212        }
3213    }
3214
3215    #[tokio::test]
3216    async fn test_duplicate_props_options() {
3217        let proto_file = create_proto_file(PROTO_FILE_DATA);
3218        let sql = format!(
3219            r#"CREATE TABLE t
3220    WITH (
3221        connector = 'kinesis',
3222        aws.region='user_test_topic',
3223        endpoint='172.10.1.1:9090,172.10.1.2:9090',
3224        aws.credentials.access_key_id = 'your_access_key_1',
3225        aws.credentials.secret_access_key = 'your_secret_key_1'
3226    )
3227    FORMAT PLAIN ENCODE PROTOBUF (
3228        message = '.test.TestRecord',
3229        aws.credentials.access_key_id = 'your_access_key_2',
3230        aws.credentials.secret_access_key = 'your_secret_key_2',
3231        schema.location = 'file://{}',
3232    )"#,
3233            proto_file.path().to_str().unwrap()
3234        );
3235        let frontend = LocalFrontend::new(Default::default()).await;
3236        frontend.run_sql(sql).await.unwrap();
3237
3238        let session = frontend.session_ref();
3239        let catalog_reader = session.env().catalog_reader().read_guard();
3240        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3241
3242        // Check source exists.
3243        let (source, _) = catalog_reader
3244            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3245            .unwrap();
3246        assert_eq!(source.name, "t");
3247
3248        // AwsAuth params exist in options.
3249        assert_eq!(
3250            source
3251                .info
3252                .format_encode_options
3253                .get("aws.credentials.access_key_id")
3254                .unwrap(),
3255            "your_access_key_2"
3256        );
3257        assert_eq!(
3258            source
3259                .info
3260                .format_encode_options
3261                .get("aws.credentials.secret_access_key")
3262                .unwrap(),
3263            "your_secret_key_2"
3264        );
3265
3266        // AwsAuth params exist in props.
3267        assert_eq!(
3268            source
3269                .with_properties
3270                .get("aws.credentials.access_key_id")
3271                .unwrap(),
3272            "your_access_key_1"
3273        );
3274        assert_eq!(
3275            source
3276                .with_properties
3277                .get("aws.credentials.secret_access_key")
3278                .unwrap(),
3279            "your_secret_key_1"
3280        );
3281
3282        // Options are not merged into props.
3283        assert!(!source.with_properties.contains_key("schema.location"));
3284    }
3285}