risingwave_frontend/handler/
create_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{
45    AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
46};
47use risingwave_pb::catalog::connection::Info as ConnectionInfo;
48use risingwave_pb::catalog::connection_params::ConnectionType;
49use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
50use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
51use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
52use risingwave_pb::plan_common::{
53    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
54};
55use risingwave_pb::secret::PbSecretRef;
56use risingwave_pb::secret::secret_ref::PbRefAsType;
57use risingwave_pb::stream_plan::StreamFragmentGraph;
58use risingwave_sqlparser::ast::{
59    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
60    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
61    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
62    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
63};
64use risingwave_sqlparser::parser::IncludeOption;
65use thiserror_ext::AsReport;
66
67use super::RwPgResponse;
68use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
69use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
70use crate::catalog::root_catalog::SchemaPath;
71use crate::catalog::source_catalog::SourceCatalog;
72use crate::catalog::table_catalog::TableVersion;
73use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
74use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
75use crate::expr::{Expr, ExprImpl, ExprRewriter};
76use crate::handler::HandlerArgs;
77use crate::handler::create_source::{
78    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
79    bind_source_watermark, handle_addition_columns,
80};
81use crate::handler::util::{
82    LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
83};
84use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
85use crate::optimizer::plan_node::{
86    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
87};
88use crate::optimizer::property::{Order, RequiredDist};
89use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
90use crate::session::SessionImpl;
91use crate::session::current::notice_to_user;
92use crate::stream_fragmenter::{GraphJobType, build_graph};
93use crate::utils::OverwriteOptions;
94use crate::{Binder, Explain, TableCatalog, WithOptions};
95
96mod col_id_gen;
97pub use col_id_gen::*;
98use risingwave_connector::sink::SinkParam;
99use risingwave_connector::sink::iceberg::{
100    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
101    COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
102    COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
103    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS,
104    CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
105    ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
106    IcebergWriteMode, ORDER_KEY, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
107    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
108    SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
109    validate_order_key_columns,
110};
111use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
112
113use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
114
115fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
116    for option_def in &c.options {
117        match option_def.option {
118            ColumnOption::GeneratedColumns(_) => {}
119            ColumnOption::DefaultValue(_) => {}
120            ColumnOption::DefaultValueInternal { .. } => {}
121            ColumnOption::Unique { is_primary: true } => {}
122            ColumnOption::Null => {}
123            ColumnOption::NotNull => {}
124            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
125        }
126    }
127    Ok(())
128}
129
130/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
131/// If a column is marked as `primary key`, its `ColumnId` is also returned.
132/// This primary key is not combined with table constraints yet.
133pub fn bind_sql_columns(
134    column_defs: &[ColumnDef],
135    is_for_drop_table_connector: bool,
136) -> Result<Vec<ColumnCatalog>> {
137    let mut columns = Vec::with_capacity(column_defs.len());
138
139    for column in column_defs {
140        ensure_column_options_supported(column)?;
141        // Destruct to make sure all fields are properly handled rather than ignored.
142        // Do NOT use `..` to ignore fields you do not want to deal with.
143        // Reject them with a clear NotImplemented error.
144        let ColumnDef {
145            name,
146            data_type,
147            collation,
148            options,
149            ..
150        } = column;
151
152        let data_type = data_type
153            .clone()
154            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
155        if let Some(collation) = collation {
156            // PostgreSQL will limit the datatypes that collate can work on.
157            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
158            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
159            //
160            // But we don't support real collation, we simply ignore it here.
161            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
162                bail_not_implemented!(
163                    "Collate collation other than `C` or `POSIX` is not implemented"
164                );
165            }
166
167            match data_type {
168                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
169                _ => {
170                    return Err(ErrorCode::NotSupported(
171                        format!("{} is not a collatable data type", data_type),
172                        "The only built-in collatable data types are `varchar`, please check your type".into(),
173                    ).into());
174                }
175            }
176        }
177
178        if !is_for_drop_table_connector {
179            // additional column name may have prefix _rw
180            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
181            // Under this case, we loosen the check for _rw prefix.
182            check_column_name_not_reserved(&name.real_value())?;
183        }
184
185        let nullable: bool = !options
186            .iter()
187            .any(|def| matches!(def.option, ColumnOption::NotNull));
188
189        columns.push(ColumnCatalog {
190            column_desc: ColumnDesc {
191                data_type: bind_data_type(&data_type)?,
192                column_id: ColumnId::placeholder(),
193                name: name.real_value(),
194                generated_or_default_column: None,
195                description: None,
196                additional_column: AdditionalColumn { column_type: None },
197                version: ColumnDescVersion::LATEST,
198                system_column: None,
199                nullable,
200            },
201            is_hidden: false,
202        });
203    }
204
205    Ok(columns)
206}
207
208fn check_generated_column_constraints(
209    column_name: &String,
210    column_id: ColumnId,
211    expr: &ExprImpl,
212    column_catalogs: &[ColumnCatalog],
213    generated_column_names: &[String],
214    pk_column_ids: &[ColumnId],
215) -> Result<()> {
216    let input_refs = expr.collect_input_refs(column_catalogs.len());
217    for idx in input_refs.ones() {
218        let referred_generated_column = &column_catalogs[idx].column_desc.name;
219        if generated_column_names
220            .iter()
221            .any(|c| c == referred_generated_column)
222        {
223            return Err(ErrorCode::BindError(format!(
224                "Generated can not reference another generated column. \
225                But here generated column \"{}\" referenced another generated column \"{}\"",
226                column_name, referred_generated_column
227            ))
228            .into());
229        }
230    }
231
232    if pk_column_ids.contains(&column_id) && expr.is_impure() {
233        return Err(ErrorCode::BindError(format!(
234            "Generated columns with impure expressions should not be part of the primary key. \
235            Here column \"{}\" is defined as part of the primary key.",
236            column_name
237        ))
238        .into());
239    }
240
241    Ok(())
242}
243
244/// Binds constraints that can be only specified in column definitions,
245/// currently generated columns and default columns.
246pub fn bind_sql_column_constraints(
247    session: &SessionImpl,
248    table_name: String,
249    column_catalogs: &mut [ColumnCatalog],
250    columns: &[ColumnDef],
251    pk_column_ids: &[ColumnId],
252) -> Result<()> {
253    let generated_column_names = {
254        let mut names = vec![];
255        for column in columns {
256            for option_def in &column.options {
257                if let ColumnOption::GeneratedColumns(_) = option_def.option {
258                    names.push(column.name.real_value());
259                    break;
260                }
261            }
262        }
263        names
264    };
265
266    let mut binder = Binder::new_for_ddl(session);
267    binder.bind_columns_to_context(table_name, column_catalogs)?;
268
269    for column in columns {
270        let Some(idx) = column_catalogs
271            .iter()
272            .position(|c| c.name() == column.name.real_value())
273        else {
274            // It's possible that we don't follow the user defined columns in SQL but take the
275            // ones resolved from the source, thus missing some columns. Simply ignore them.
276            continue;
277        };
278
279        for option_def in &column.options {
280            match &option_def.option {
281                ColumnOption::GeneratedColumns(expr) => {
282                    binder.set_clause(Some(Clause::GeneratedColumn));
283
284                    let expr_impl = binder.bind_expr(expr).with_context(|| {
285                        format!(
286                            "fail to bind expression in generated column \"{}\"",
287                            column.name.real_value()
288                        )
289                    })?;
290
291                    check_generated_column_constraints(
292                        &column.name.real_value(),
293                        column_catalogs[idx].column_id(),
294                        &expr_impl,
295                        column_catalogs,
296                        &generated_column_names,
297                        pk_column_ids,
298                    )?;
299
300                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
301                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
302                            expr: Some(expr_impl.to_expr_proto()),
303                        }),
304                    );
305                    binder.set_clause(None);
306                }
307                ColumnOption::DefaultValue(expr) => {
308                    let expr_impl = binder
309                        .bind_expr(expr)?
310                        .cast_assign(column_catalogs[idx].data_type())?;
311
312                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
313                    // schema change.
314                    //
315                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
316                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
317                    // here. Actually we do not require purity of the expression here since we're only to get a
318                    // snapshot value.
319                    let rewritten_expr_impl = session
320                        .pinned_snapshot()
321                        .inline_now_proc_time()
322                        .rewrite_expr(expr_impl.clone());
323
324                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
325                        let snapshot_value = snapshot_value?;
326
327                        column_catalogs[idx].column_desc.generated_or_default_column =
328                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
329                                snapshot_value: Some(snapshot_value.to_protobuf()),
330                                expr: Some(expr_impl.to_expr_proto()),
331                                // persist the original expression
332                            }));
333                    } else {
334                        return Err(ErrorCode::BindError(format!(
335                            "Default expression used in column `{}` cannot be evaluated. \
336                            Use generated columns instead if you mean to reference other columns.",
337                            column.name
338                        ))
339                        .into());
340                    }
341                }
342                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
343                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
344                    // should already be set during purifcation. So if we encounter an empty value here, it
345                    // means the user has specified it explicitly in the SQL statement, typically by
346                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
347                    if persisted.is_empty() {
348                        bail_bind_error!(
349                            "DEFAULT INTERNAL is only used for internal purposes, \
350                             please specify a concrete default value"
351                        );
352                    }
353
354                    let desc = DefaultColumnDesc::decode(&**persisted)
355                        .expect("failed to decode persisted `DefaultColumnDesc`");
356
357                    column_catalogs[idx].column_desc.generated_or_default_column =
358                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
359                }
360                _ => {}
361            }
362        }
363    }
364    Ok(())
365}
366
367/// Currently we only support Primary key table constraint, so just return pk names if it exists
368pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
369    let mut pk_column_names = vec![];
370
371    for constraint in table_constraints {
372        match constraint {
373            TableConstraint::Unique {
374                name: _,
375                columns,
376                is_primary: true,
377            } => {
378                if !pk_column_names.is_empty() {
379                    return Err(multiple_pk_definition_err());
380                }
381                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
382            }
383            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
384        }
385    }
386    Ok(pk_column_names)
387}
388
389pub fn bind_sql_pk_names(
390    columns_defs: &[ColumnDef],
391    pk_names_from_table_constraints: Vec<String>,
392) -> Result<Vec<String>> {
393    let mut pk_column_names = pk_names_from_table_constraints;
394
395    for column in columns_defs {
396        for option_def in &column.options {
397            if let ColumnOption::Unique { is_primary: true } = option_def.option {
398                if !pk_column_names.is_empty() {
399                    return Err(multiple_pk_definition_err());
400                }
401                pk_column_names.push(column.name.real_value());
402            };
403        }
404    }
405
406    Ok(pk_column_names)
407}
408
409fn multiple_pk_definition_err() -> RwError {
410    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
411}
412
413/// Binds primary keys defined in SQL.
414///
415/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
416/// added.
417pub fn bind_pk_and_row_id_on_relation(
418    mut columns: Vec<ColumnCatalog>,
419    pk_names: Vec<String>,
420    must_need_pk: bool,
421) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
422    for c in &columns {
423        assert!(c.column_id() != ColumnId::placeholder());
424    }
425
426    // Mapping from column name to column id.
427    let name_to_id = columns
428        .iter()
429        .map(|c| (c.name(), c.column_id()))
430        .collect::<HashMap<_, _>>();
431
432    let mut pk_column_ids: Vec<_> = pk_names
433        .iter()
434        .map(|name| {
435            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
436                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
437            })
438        })
439        .try_collect()?;
440
441    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
442    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
443
444    let row_id_index = need_row_id.then(|| {
445        let column = ColumnCatalog::row_id_column();
446        let index = columns.len();
447        pk_column_ids = vec![column.column_id()];
448        columns.push(column);
449        index
450    });
451
452    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
453        Err(ErrorCode::InvalidInputSyntax(format!(
454            "column \"{col}\" specified more than once"
455        )))?;
456    }
457
458    Ok((columns, pk_column_ids, row_id_index))
459}
460
461/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
462/// stream source.
463#[allow(clippy::too_many_arguments)]
464pub(crate) async fn gen_create_table_plan_with_source(
465    mut handler_args: HandlerArgs,
466    explain_options: ExplainOptions,
467    table_name: ObjectName,
468    column_defs: Vec<ColumnDef>,
469    wildcard_idx: Option<usize>,
470    constraints: Vec<TableConstraint>,
471    format_encode: FormatEncodeOptions,
472    source_watermarks: Vec<SourceWatermark>,
473    mut col_id_gen: ColumnIdGenerator,
474    include_column_options: IncludeOption,
475    props: CreateTableProps,
476    sql_column_strategy: SqlColumnStrategy,
477) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
478    if props.append_only
479        && format_encode.format != Format::Plain
480        && format_encode.format != Format::Native
481    {
482        return Err(ErrorCode::BindError(format!(
483            "Append only table does not support format {}.",
484            format_encode.format
485        ))
486        .into());
487    }
488
489    let session = &handler_args.session;
490    let (with_properties, refresh_mode) =
491        bind_connector_props(&handler_args, &format_encode, false)?;
492    if with_properties.is_shareable_cdc_connector() {
493        generated_columns_check_for_cdc_table(&column_defs)?;
494        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
495    } else if column_defs.iter().any(|col| {
496        col.options
497            .iter()
498            .any(|def| matches!(def.option, ColumnOption::NotNull))
499    }) {
500        // if non-cdc source
501        notice_to_user(
502            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
503        );
504    }
505
506    let db_name: &str = &session.database();
507    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
508
509    // TODO: omit this step if `sql_column_strategy` is `Follow`.
510    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
511        session,
512        &format_encode,
513        Either::Left(&with_properties),
514        CreateSourceType::Table,
515    )
516    .await?;
517
518    let overwrite_options = OverwriteOptions::new(&mut handler_args);
519    let rate_limit = overwrite_options.source_rate_limit;
520    let source = bind_create_source_or_table_with_connector(
521        handler_args.clone(),
522        table_name,
523        format_encode,
524        with_properties,
525        &column_defs,
526        constraints,
527        wildcard_idx,
528        source_watermarks,
529        columns_from_resolve_source,
530        source_info,
531        include_column_options,
532        &mut col_id_gen,
533        CreateSourceType::Table,
534        rate_limit,
535        sql_column_strategy,
536        refresh_mode,
537    )
538    .await?;
539
540    let context = OptimizerContext::new(handler_args, explain_options);
541
542    let (plan, table) = gen_table_plan_with_source(
543        context.into(),
544        schema_name,
545        source.clone(),
546        col_id_gen.into_version(),
547        props,
548    )?;
549
550    Ok((plan, Some(source), table))
551}
552
553/// `gen_create_table_plan` generates the plan for creating a table without an external stream
554/// source.
555#[allow(clippy::too_many_arguments)]
556pub(crate) fn gen_create_table_plan(
557    context: OptimizerContext,
558    table_name: ObjectName,
559    column_defs: Vec<ColumnDef>,
560    constraints: Vec<TableConstraint>,
561    mut col_id_gen: ColumnIdGenerator,
562    source_watermarks: Vec<SourceWatermark>,
563    props: CreateTableProps,
564    is_for_replace_plan: bool,
565) -> Result<(PlanRef, TableCatalog)> {
566    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
567    for c in &mut columns {
568        col_id_gen.generate(c)?;
569    }
570
571    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
572    if !secret_refs.is_empty() || !connection_refs.is_empty() {
573        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
574    }
575
576    gen_create_table_plan_without_source(
577        context,
578        table_name,
579        columns,
580        column_defs,
581        constraints,
582        source_watermarks,
583        col_id_gen.into_version(),
584        props,
585    )
586}
587
588#[allow(clippy::too_many_arguments)]
589pub(crate) fn gen_create_table_plan_without_source(
590    context: OptimizerContext,
591    table_name: ObjectName,
592    columns: Vec<ColumnCatalog>,
593    column_defs: Vec<ColumnDef>,
594    constraints: Vec<TableConstraint>,
595    source_watermarks: Vec<SourceWatermark>,
596    version: TableVersion,
597    props: CreateTableProps,
598) -> Result<(PlanRef, TableCatalog)> {
599    // XXX: Why not bind outside?
600    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
601    let (mut columns, pk_column_ids, row_id_index) =
602        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
603
604    let watermark_descs = bind_source_watermark(
605        context.session_ctx(),
606        table_name.real_value(),
607        source_watermarks,
608        &columns,
609    )?;
610
611    bind_sql_column_constraints(
612        context.session_ctx(),
613        table_name.real_value(),
614        &mut columns,
615        &column_defs,
616        &pk_column_ids,
617    )?;
618    let session = context.session_ctx().clone();
619
620    let db_name = &session.database();
621    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
622
623    let info = CreateTableInfo {
624        columns,
625        pk_column_ids,
626        row_id_index,
627        watermark_descs,
628        source_catalog: None,
629        version,
630    };
631
632    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
633}
634
635fn gen_table_plan_with_source(
636    context: OptimizerContextRef,
637    schema_name: Option<String>,
638    source_catalog: SourceCatalog,
639    version: TableVersion,
640    props: CreateTableProps,
641) -> Result<(PlanRef, TableCatalog)> {
642    let table_name = source_catalog.name.clone();
643
644    let info = CreateTableInfo {
645        columns: source_catalog.columns.clone(),
646        pk_column_ids: source_catalog.pk_col_ids.clone(),
647        row_id_index: source_catalog.row_id_index,
648        watermark_descs: source_catalog.watermark_descs.clone(),
649        source_catalog: Some(source_catalog),
650        version,
651    };
652
653    gen_table_plan_inner(context, schema_name, table_name, info, props)
654}
655
656/// On-conflict behavior either from user input or existing table catalog.
657#[derive(Clone, Copy)]
658pub enum EitherOnConflict {
659    Ast(Option<OnConflict>),
660    Resolved(ConflictBehavior),
661}
662
663impl From<Option<OnConflict>> for EitherOnConflict {
664    fn from(v: Option<OnConflict>) -> Self {
665        Self::Ast(v)
666    }
667}
668
669impl From<ConflictBehavior> for EitherOnConflict {
670    fn from(v: ConflictBehavior) -> Self {
671        Self::Resolved(v)
672    }
673}
674
675impl EitherOnConflict {
676    /// Resolves the conflict behavior based on the given information.
677    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
678        let conflict_behavior = match self {
679            EitherOnConflict::Ast(on_conflict) => {
680                if append_only {
681                    if row_id_as_pk {
682                        // Primary key will be generated, no conflict check needed.
683                        ConflictBehavior::NoCheck
684                    } else {
685                        // User defined PK on append-only table, enforce `DO NOTHING`.
686                        if let Some(on_conflict) = on_conflict
687                            && on_conflict != OnConflict::Nothing
688                        {
689                            return Err(ErrorCode::InvalidInputSyntax(
690                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
691                                     the ON CONFLICT behavior must be DO NOTHING."
692                                    .to_owned(),
693                            )
694                            .into());
695                        }
696                        ConflictBehavior::IgnoreConflict
697                    }
698                } else {
699                    // Default to `UPDATE FULL` for non-append-only tables.
700                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
701                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
702                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
703                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
704                    }
705                }
706            }
707            EitherOnConflict::Resolved(b) => b,
708        };
709
710        Ok(conflict_behavior)
711    }
712}
713
714/// Arguments of the functions that generate a table plan, part 1.
715///
716/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
717/// or resolving based on the user input.
718pub struct CreateTableInfo {
719    pub columns: Vec<ColumnCatalog>,
720    pub pk_column_ids: Vec<ColumnId>,
721    pub row_id_index: Option<usize>,
722    pub watermark_descs: Vec<WatermarkDesc>,
723    pub source_catalog: Option<SourceCatalog>,
724    pub version: TableVersion,
725}
726
727/// Arguments of the functions that generate a table plan, part 2.
728///
729/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
730/// obtained from the input or the context.
731pub struct CreateTableProps {
732    pub definition: String,
733    pub append_only: bool,
734    pub on_conflict: EitherOnConflict,
735    pub with_version_columns: Vec<String>,
736    pub webhook_info: Option<PbWebhookSourceInfo>,
737    pub engine: Engine,
738}
739
740#[allow(clippy::too_many_arguments)]
741fn gen_table_plan_inner(
742    context: OptimizerContextRef,
743    schema_name: Option<String>,
744    table_name: String,
745    info: CreateTableInfo,
746    props: CreateTableProps,
747) -> Result<(PlanRef, TableCatalog)> {
748    let CreateTableInfo {
749        ref columns,
750        row_id_index,
751        ref watermark_descs,
752        ref source_catalog,
753        ..
754    } = info;
755    let CreateTableProps { append_only, .. } = props;
756
757    let (database_id, schema_id) = context
758        .session_ctx()
759        .get_database_and_schema_id_for_create(schema_name)?;
760
761    let session = context.session_ctx().clone();
762    let retention_seconds = context.with_options().retention_seconds();
763
764    let source_node: LogicalPlanRef = LogicalSource::new(
765        source_catalog.clone().map(Rc::new),
766        columns.clone(),
767        row_id_index,
768        SourceNodeKind::CreateTable,
769        context.clone(),
770        None,
771    )?
772    .into();
773
774    let required_cols = FixedBitSet::with_capacity(columns.len());
775    let plan_root = PlanRoot::new_with_logical_plan(
776        source_node,
777        RequiredDist::Any,
778        Order::any(),
779        required_cols,
780        vec![],
781    );
782
783    let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
784
785    if !append_only && has_non_ttl_watermark {
786        return Err(ErrorCode::NotSupported(
787            "Defining watermarks on table requires the table to be append only.".to_owned(),
788            "Use the key words `APPEND ONLY`".to_owned(),
789        )
790        .into());
791    }
792
793    if !append_only && retention_seconds.is_some() {
794        if session
795            .config()
796            .unsafe_enable_storage_retention_for_non_append_only_tables()
797        {
798            tracing::warn!(
799                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
800                table_name
801            );
802            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
803                                  This may lead to stream inconsistency and unrecoverable \
804                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
805                                  corresponding to the TTLed primary keys";
806            session.notice_to_user(NOTICE);
807        } else {
808            return Err(ErrorCode::NotSupported(
809                "Defining retention seconds on table requires the table to be append only."
810                    .to_owned(),
811                "Use the key words `APPEND ONLY`".to_owned(),
812            )
813            .into());
814        }
815    }
816
817    let materialize =
818        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
819
820    let mut table = materialize.table().clone();
821    table.owner = session.user_id();
822
823    Ok((materialize.into(), table))
824}
825
826/// Generate stream plan for cdc table based on shared source.
827/// In replace workflow, the `table_id` is the id of the table to be replaced
828/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
829#[allow(clippy::too_many_arguments)]
830pub(crate) fn gen_create_table_plan_for_cdc_table(
831    context: OptimizerContextRef,
832    source: Arc<SourceCatalog>,
833    external_table_name: String,
834    column_defs: Vec<ColumnDef>,
835    source_watermarks: Vec<SourceWatermark>,
836    mut columns: Vec<ColumnCatalog>,
837    pk_names: Vec<String>,
838    cdc_with_options: WithOptionsSecResolved,
839    mut col_id_gen: ColumnIdGenerator,
840    on_conflict: Option<OnConflict>,
841    with_version_columns: Vec<String>,
842    include_column_options: IncludeOption,
843    table_name: ObjectName,
844    resolved_table_name: String, // table name without schema prefix
845    database_id: DatabaseId,
846    schema_id: SchemaId,
847    table_id: TableId,
848    engine: Engine,
849) -> Result<(PlanRef, TableCatalog)> {
850    let session = context.session_ctx().clone();
851
852    // append additional columns to the end
853    handle_addition_columns(
854        None,
855        &cdc_with_options,
856        include_column_options,
857        &mut columns,
858        true,
859    )?;
860
861    for c in &mut columns {
862        col_id_gen.generate(c)?;
863    }
864
865    let (mut columns, pk_column_ids, _row_id_index) =
866        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
867
868    let watermark_descs = bind_source_watermark(
869        context.session_ctx(),
870        table_name.real_value(),
871        source_watermarks,
872        &columns,
873    )?;
874
875    // NOTES: In auto schema change, default value is not provided in column definition.
876    bind_sql_column_constraints(
877        context.session_ctx(),
878        table_name.real_value(),
879        &mut columns,
880        &column_defs,
881        &pk_column_ids,
882    )?;
883
884    let definition = context.normalized_sql().to_owned();
885
886    let pk_column_indices = {
887        let mut id_to_idx = HashMap::new();
888        columns.iter().enumerate().for_each(|(idx, c)| {
889            id_to_idx.insert(c.column_id(), idx);
890        });
891        // pk column id must exist in table columns.
892        pk_column_ids
893            .iter()
894            .map(|c| id_to_idx.get(c).copied().unwrap())
895            .collect_vec()
896    };
897    let table_pk = pk_column_indices
898        .iter()
899        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
900        .collect();
901
902    let (options, secret_refs) = cdc_with_options.into_parts();
903
904    let non_generated_column_descs = columns
905        .iter()
906        .filter(|&c| !c.is_generated())
907        .map(|c| c.column_desc.clone())
908        .collect_vec();
909    let non_generated_column_num = non_generated_column_descs.len();
910    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
911    let cdc_table_desc = CdcTableDesc {
912        table_id,
913        source_id: source.id, // id of cdc source streaming job
914        external_table_name: external_table_name.clone(),
915        pk: table_pk,
916        columns: non_generated_column_descs,
917        stream_key: pk_column_indices,
918        connect_properties: options,
919        secret_refs,
920    };
921
922    tracing::debug!(?cdc_table_desc, "create cdc table");
923    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
924
925    let logical_scan = LogicalCdcScan::create(
926        external_table_name.clone(),
927        Rc::new(cdc_table_desc),
928        context.clone(),
929        options,
930    );
931
932    let scan_node: LogicalPlanRef = logical_scan.into();
933    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
934    let plan_root = PlanRoot::new_with_logical_plan(
935        scan_node,
936        RequiredDist::Any,
937        Order::any(),
938        required_cols,
939        vec![],
940    );
941
942    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
943    let materialize = plan_root.gen_table_plan(
944        context,
945        resolved_table_name,
946        database_id,
947        schema_id,
948        CreateTableInfo {
949            columns,
950            pk_column_ids,
951            row_id_index: None,
952            watermark_descs,
953            source_catalog: Some((*source).clone()),
954            version: col_id_gen.into_version(),
955        },
956        CreateTableProps {
957            definition,
958            append_only: false,
959            on_conflict: on_conflict.into(),
960            with_version_columns,
961            webhook_info: None,
962            engine,
963        },
964    )?;
965
966    let mut table = materialize.table().clone();
967    table.owner = session.user_id();
968    table.cdc_table_id = Some(cdc_table_id);
969    table.cdc_table_type = Some(cdc_table_type);
970    Ok((materialize.into(), table))
971}
972
973/// Derive connector properties and normalize `external_table_name` for CDC tables.
974///
975/// Returns (`connector_properties`, `normalized_external_table_name`) where:
976/// - For SQL Server: Normalizes 'db.schema.table' (3 parts) to 'schema.table' (2 parts),
977///   because users can optionally include database name for verification, but it needs to be
978///   stripped to match the format returned by Debezium's `extract_table_name()`.
979/// - For MySQL/Postgres: Returns the original `external_table_name` unchanged.
980fn derive_with_options_for_cdc_table(
981    source_with_properties: &WithOptionsSecResolved,
982    external_table_name: String,
983) -> Result<(WithOptionsSecResolved, String)> {
984    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
985    // we should remove the prefix from `full_table_name`
986    let source_database_name: &str = source_with_properties
987        .get("database.name")
988        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
989        .as_str();
990    let mut with_options = source_with_properties.clone();
991    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
992        match connector.as_str() {
993            MYSQL_CDC_CONNECTOR => {
994                // MySQL doesn't allow '.' in database name and table name, so we can split the
995                // external table name by '.' to get the table name
996                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
997                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
998                })?;
999                // We allow multiple database names in the source definition
1000                if !source_database_name
1001                    .split(',')
1002                    .map(|s| s.trim())
1003                    .any(|name| name == db_name)
1004                {
1005                    return Err(anyhow!(
1006                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1007                        db_name,
1008                        source_database_name
1009                    ).into());
1010                }
1011                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1012                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1013                // Return original external_table_name unchanged for MySQL
1014                return Ok((with_options, external_table_name));
1015            }
1016            POSTGRES_CDC_CONNECTOR => {
1017                let (schema_name, table_name) = external_table_name
1018                    .split_once('.')
1019                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1020
1021                // insert 'schema.name' into connect properties
1022                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1023                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1024                // Return original external_table_name unchanged for Postgres
1025                return Ok((with_options, external_table_name));
1026            }
1027            SQL_SERVER_CDC_CONNECTOR => {
1028                // SQL Server external table name must be in one of two formats:
1029                // 1. 'schemaName.tableName' (2 parts) - database is already specified in source
1030                // 2. 'databaseName.schemaName.tableName' (3 parts) - for explicit verification
1031                //
1032                // We do NOT allow single table name (e.g., 't') because:
1033                // - Unlike database name (already in source), schema name is NOT pre-specified
1034                // - User must explicitly provide schema (even if it's 'dbo')
1035                let parts: Vec<&str> = external_table_name.split('.').collect();
1036                let (schema_name, table_name) = match parts.len() {
1037                    3 => {
1038                        // Format: database.schema.table
1039                        // Verify that the database name matches the one in source definition
1040                        let db_name = parts[0];
1041                        let schema_name = parts[1];
1042                        let table_name = parts[2];
1043
1044                        if db_name != source_database_name {
1045                            return Err(anyhow!(
1046                                "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1047                                 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1048                                db_name,
1049                                source_database_name
1050                            ).into());
1051                        }
1052                        (schema_name, table_name)
1053                    }
1054                    2 => {
1055                        // Format: schema.table (recommended)
1056                        // Database name is taken from source definition
1057                        let schema_name = parts[0];
1058                        let table_name = parts[1];
1059                        (schema_name, table_name)
1060                    }
1061                    1 => {
1062                        // Format: table only
1063                        // Reject with clear error message
1064                        return Err(anyhow!(
1065                            "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1066                             Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1067                            external_table_name,
1068                            external_table_name,
1069                            source_database_name,
1070                            external_table_name
1071                        ).into());
1072                    }
1073                    _ => {
1074                        // Invalid format (4+ parts or empty)
1075                        return Err(anyhow!(
1076                            "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1077                            external_table_name
1078                        ).into());
1079                    }
1080                };
1081
1082                // Insert schema and table names into connector properties
1083                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1084                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1085
1086                // Normalize external_table_name to 'schema.table' format
1087                // This ensures consistency with extract_table_name() in message.rs
1088                let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1089                return Ok((with_options, normalized_external_table_name));
1090            }
1091            _ => {
1092                return Err(RwError::from(anyhow!(
1093                    "connector {} is not supported for cdc table",
1094                    connector
1095                )));
1096            }
1097        };
1098    }
1099    unreachable!("All valid CDC connectors should have returned by now")
1100}
1101
1102#[allow(clippy::too_many_arguments)]
1103pub(super) async fn handle_create_table_plan(
1104    handler_args: HandlerArgs,
1105    explain_options: ExplainOptions,
1106    format_encode: Option<FormatEncodeOptions>,
1107    cdc_table_info: Option<CdcTableInfo>,
1108    table_name: &ObjectName,
1109    column_defs: Vec<ColumnDef>,
1110    wildcard_idx: Option<usize>,
1111    constraints: Vec<TableConstraint>,
1112    source_watermarks: Vec<SourceWatermark>,
1113    append_only: bool,
1114    on_conflict: Option<OnConflict>,
1115    with_version_columns: Vec<String>,
1116    include_column_options: IncludeOption,
1117    webhook_info: Option<WebhookSourceInfo>,
1118    engine: Engine,
1119) -> Result<(
1120    PlanRef,
1121    Option<SourceCatalog>,
1122    TableCatalog,
1123    TableJobType,
1124    Option<SourceId>,
1125)> {
1126    let col_id_gen = ColumnIdGenerator::new_initial();
1127    let format_encode = check_create_table_with_source(
1128        &handler_args.with_options,
1129        format_encode,
1130        &include_column_options,
1131        &cdc_table_info,
1132    )?;
1133    let webhook_info = webhook_info
1134        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1135        .transpose()?;
1136
1137    let props = CreateTableProps {
1138        definition: handler_args.normalized_sql.clone(),
1139        append_only,
1140        on_conflict: on_conflict.into(),
1141        with_version_columns: with_version_columns.clone(),
1142        webhook_info,
1143        engine,
1144    };
1145
1146    let ((plan, source, table), job_type, shared_shource_id) = match (
1147        format_encode,
1148        cdc_table_info.as_ref(),
1149    ) {
1150        (Some(format_encode), None) => (
1151            gen_create_table_plan_with_source(
1152                handler_args,
1153                explain_options,
1154                table_name.clone(),
1155                column_defs,
1156                wildcard_idx,
1157                constraints,
1158                format_encode,
1159                source_watermarks,
1160                col_id_gen,
1161                include_column_options,
1162                props,
1163                SqlColumnStrategy::FollowChecked,
1164            )
1165            .await?,
1166            TableJobType::General,
1167            None,
1168        ),
1169        (None, None) => {
1170            let context = OptimizerContext::new(handler_args, explain_options);
1171            let (plan, table) = gen_create_table_plan(
1172                context,
1173                table_name.clone(),
1174                column_defs,
1175                constraints,
1176                col_id_gen,
1177                source_watermarks,
1178                props,
1179                false,
1180            )?;
1181
1182            ((plan, None, table), TableJobType::General, None)
1183        }
1184
1185        (None, Some(cdc_table)) => {
1186            sanity_check_for_table_on_cdc_source(
1187                append_only,
1188                &column_defs,
1189                &wildcard_idx,
1190                &constraints,
1191                &source_watermarks,
1192            )?;
1193
1194            generated_columns_check_for_cdc_table(&column_defs)?;
1195            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1196
1197            let session = &handler_args.session;
1198            let db_name = &session.database();
1199            let user_name = &session.user_name();
1200            let search_path = session.config().search_path();
1201            let (schema_name, resolved_table_name) =
1202                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1203            let (database_id, schema_id) =
1204                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1205
1206            // cdc table cannot be append-only
1207            let (source_schema, source_name) =
1208                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1209
1210            let source = {
1211                let catalog_reader = session.env().catalog_reader().read_guard();
1212                let schema_path =
1213                    SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1214
1215                let (source, _) = catalog_reader.get_source_by_name(
1216                    db_name,
1217                    schema_path,
1218                    source_name.as_str(),
1219                )?;
1220                source.clone()
1221            };
1222            let (cdc_with_options, normalized_external_table_name) =
1223                derive_with_options_for_cdc_table(
1224                    &source.with_properties,
1225                    cdc_table.external_table_name.clone(),
1226                )?;
1227
1228            let (columns, pk_names) = match wildcard_idx {
1229                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1230                None => {
1231                    for column_def in &column_defs {
1232                        for option_def in &column_def.options {
1233                            if let ColumnOption::DefaultValue(_)
1234                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1235                            {
1236                                return Err(ErrorCode::NotSupported(
1237                                            "Default value for columns defined on the table created from a CDC source".into(),
1238                                            "Remove the default value expression in the column definitions".into(),
1239                                        )
1240                                            .into());
1241                            }
1242                        }
1243                    }
1244
1245                    let (columns, pk_names) =
1246                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1247                    // read default value definition from external db
1248                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1249                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1250                        .context("failed to extract external table config")?;
1251
1252                    (columns, pk_names)
1253                }
1254            };
1255
1256            let context: OptimizerContextRef =
1257                OptimizerContext::new(handler_args, explain_options).into();
1258            let shared_source_id = source.id;
1259            let (plan, table) = gen_create_table_plan_for_cdc_table(
1260                context,
1261                source,
1262                normalized_external_table_name,
1263                column_defs,
1264                source_watermarks,
1265                columns,
1266                pk_names,
1267                cdc_with_options,
1268                col_id_gen,
1269                on_conflict,
1270                with_version_columns,
1271                include_column_options,
1272                table_name.clone(),
1273                resolved_table_name,
1274                database_id,
1275                schema_id,
1276                TableId::placeholder(),
1277                engine,
1278            )?;
1279
1280            (
1281                (plan, None, table),
1282                TableJobType::SharedCdcSource,
1283                Some(shared_source_id),
1284            )
1285        }
1286        (Some(_), Some(_)) => {
1287            return Err(ErrorCode::NotSupported(
1288                "Data format and encoding format doesn't apply to table created from a CDC source"
1289                    .into(),
1290                "Remove the FORMAT and ENCODE specification".into(),
1291            )
1292            .into());
1293        }
1294    };
1295    Ok((plan, source, table, job_type, shared_shource_id))
1296}
1297
1298// For both table from cdc source and table with cdc connector
1299fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1300    let mut found_generated_column = false;
1301    for column in columns {
1302        let mut is_generated = false;
1303
1304        for option_def in &column.options {
1305            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1306                is_generated = true;
1307                break;
1308            }
1309        }
1310
1311        if is_generated {
1312            found_generated_column = true;
1313        } else if found_generated_column {
1314            return Err(ErrorCode::NotSupported(
1315                "Non-generated column found after a generated column.".into(),
1316                "Ensure that all generated columns appear at the end of the cdc table definition."
1317                    .into(),
1318            )
1319            .into());
1320        }
1321    }
1322    Ok(())
1323}
1324
1325// For both table from cdc source and table with cdc connector
1326fn not_null_check_for_cdc_table(
1327    wildcard_idx: &Option<usize>,
1328    column_defs: &Vec<ColumnDef>,
1329) -> Result<()> {
1330    if !wildcard_idx.is_some()
1331        && column_defs.iter().any(|col| {
1332            col.options
1333                .iter()
1334                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1335        })
1336    {
1337        return Err(ErrorCode::NotSupported(
1338            "CDC table with NOT NULL constraint is not supported".to_owned(),
1339            "Please remove the NOT NULL constraint for columns".to_owned(),
1340        )
1341        .into());
1342    }
1343    Ok(())
1344}
1345
1346// Only for table from cdc source
1347fn sanity_check_for_table_on_cdc_source(
1348    append_only: bool,
1349    column_defs: &Vec<ColumnDef>,
1350    wildcard_idx: &Option<usize>,
1351    constraints: &Vec<TableConstraint>,
1352    source_watermarks: &Vec<SourceWatermark>,
1353) -> Result<()> {
1354    // wildcard cannot be used with column definitions
1355    if wildcard_idx.is_some() && !column_defs.is_empty() {
1356        return Err(ErrorCode::NotSupported(
1357            "wildcard(*) and column definitions cannot be used together".to_owned(),
1358            "Remove the wildcard or column definitions".to_owned(),
1359        )
1360        .into());
1361    }
1362
1363    // cdc table must have primary key constraint or primary key column
1364    if !wildcard_idx.is_some()
1365        && !constraints.iter().any(|c| {
1366            matches!(
1367                c,
1368                TableConstraint::Unique {
1369                    is_primary: true,
1370                    ..
1371                }
1372            )
1373        })
1374        && !column_defs.iter().any(|col| {
1375            col.options
1376                .iter()
1377                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1378        })
1379    {
1380        return Err(ErrorCode::NotSupported(
1381            "CDC table without primary key constraint is not supported".to_owned(),
1382            "Please define a primary key".to_owned(),
1383        )
1384        .into());
1385    }
1386
1387    if append_only {
1388        return Err(ErrorCode::NotSupported(
1389            "append only modifier on the table created from a CDC source".into(),
1390            "Remove the APPEND ONLY clause".into(),
1391        )
1392        .into());
1393    }
1394
1395    if !source_watermarks.is_empty()
1396        && source_watermarks
1397            .iter()
1398            .any(|watermark| !watermark.with_ttl)
1399    {
1400        return Err(ErrorCode::NotSupported(
1401            "non-TTL watermark defined on the table created from a CDC source".into(),
1402            "Use `WATERMARK ... WITH TTL` instead.".into(),
1403        )
1404        .into());
1405    }
1406
1407    Ok(())
1408}
1409
1410/// Derive schema for cdc table when create a new Table or alter an existing Table
1411async fn bind_cdc_table_schema_externally(
1412    cdc_with_options: WithOptionsSecResolved,
1413) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1414    // read cdc table schema from external db or parsing the schema from SQL definitions
1415    let (options, secret_refs) = cdc_with_options.into_parts();
1416    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1417        .context("failed to extract external table config")?;
1418
1419    let table = ExternalTableImpl::connect(config)
1420        .await
1421        .context("failed to auto derive table schema")?;
1422
1423    Ok((
1424        table
1425            .column_descs()
1426            .iter()
1427            .cloned()
1428            .map(|column_desc| ColumnCatalog {
1429                column_desc,
1430                is_hidden: false,
1431            })
1432            .collect(),
1433        table.pk_names().clone(),
1434    ))
1435}
1436
1437/// Derive schema for cdc table when create a new Table or alter an existing Table
1438fn bind_cdc_table_schema(
1439    column_defs: &Vec<ColumnDef>,
1440    constraints: &Vec<TableConstraint>,
1441    is_for_replace_plan: bool,
1442) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1443    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1444
1445    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1446    Ok((columns, pk_names))
1447}
1448
1449#[allow(clippy::too_many_arguments)]
1450pub async fn handle_create_table(
1451    handler_args: HandlerArgs,
1452    table_name: ObjectName,
1453    column_defs: Vec<ColumnDef>,
1454    wildcard_idx: Option<usize>,
1455    constraints: Vec<TableConstraint>,
1456    if_not_exists: bool,
1457    format_encode: Option<FormatEncodeOptions>,
1458    source_watermarks: Vec<SourceWatermark>,
1459    append_only: bool,
1460    on_conflict: Option<OnConflict>,
1461    with_version_columns: Vec<String>,
1462    cdc_table_info: Option<CdcTableInfo>,
1463    include_column_options: IncludeOption,
1464    webhook_info: Option<WebhookSourceInfo>,
1465    ast_engine: risingwave_sqlparser::ast::Engine,
1466) -> Result<RwPgResponse> {
1467    let session = handler_args.session.clone();
1468
1469    if append_only {
1470        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1471    }
1472
1473    session.check_cluster_limits().await?;
1474
1475    let engine = match ast_engine {
1476        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1477        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1478    };
1479
1480    if let Either::Right(resp) = session.check_relation_name_duplicated(
1481        table_name.clone(),
1482        StatementType::CREATE_TABLE,
1483        if_not_exists,
1484    )? {
1485        return Ok(resp);
1486    }
1487
1488    let (graph, source, hummock_table, job_type, shared_source_id) = {
1489        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1490            handler_args.clone(),
1491            ExplainOptions::default(),
1492            format_encode,
1493            cdc_table_info,
1494            &table_name,
1495            column_defs.clone(),
1496            wildcard_idx,
1497            constraints.clone(),
1498            source_watermarks,
1499            append_only,
1500            on_conflict,
1501            with_version_columns,
1502            include_column_options,
1503            webhook_info,
1504            engine,
1505        )
1506        .await?;
1507        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1508
1509        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1510
1511        (graph, source, table, job_type, shared_source_id)
1512    };
1513
1514    tracing::trace!(
1515        "name={}, graph=\n{}",
1516        table_name,
1517        serde_json::to_string_pretty(&graph).unwrap()
1518    );
1519
1520    let dependencies = shared_source_id
1521        .map(|id| HashSet::from([id.as_object_id()]))
1522        .unwrap_or_default();
1523
1524    // Handle engine
1525    match engine {
1526        Engine::Hummock => {
1527            let catalog_writer = session.catalog_writer()?;
1528            let action = match job_type {
1529                TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1530                _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1531            };
1532            execute_with_long_running_notification(
1533                catalog_writer.create_table(
1534                    source.map(|s| s.to_prost()),
1535                    hummock_table.to_prost(),
1536                    graph,
1537                    job_type,
1538                    if_not_exists,
1539                    dependencies,
1540                ),
1541                &session,
1542                "CREATE TABLE",
1543                action,
1544            )
1545            .await?;
1546        }
1547        Engine::Iceberg => {
1548            let hummock_table_name = hummock_table.name.clone();
1549            session.create_staging_table(hummock_table.clone());
1550            let res = Box::pin(create_iceberg_engine_table(
1551                session.clone(),
1552                handler_args,
1553                source.map(|s| s.to_prost()),
1554                hummock_table,
1555                graph,
1556                table_name,
1557                job_type,
1558                if_not_exists,
1559            ))
1560            .await;
1561            session.drop_staging_table(&hummock_table_name);
1562            res?
1563        }
1564    }
1565
1566    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1567}
1568
1569/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1570///
1571/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1572/// 2. create a hummock table
1573/// 3. create an iceberg sink
1574/// 4. create an iceberg source
1575///
1576/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1577#[allow(clippy::too_many_arguments)]
1578pub async fn create_iceberg_engine_table(
1579    session: Arc<SessionImpl>,
1580    handler_args: HandlerArgs,
1581    mut source: Option<PbSource>,
1582    table: TableCatalog,
1583    graph: StreamFragmentGraph,
1584    table_name: ObjectName,
1585    job_type: PbTableJobType,
1586    if_not_exists: bool,
1587) -> Result<()> {
1588    let rw_db_name = session
1589        .env()
1590        .catalog_reader()
1591        .read_guard()
1592        .get_database_by_id(table.database_id)?
1593        .name()
1594        .to_owned();
1595    let rw_schema_name = session
1596        .env()
1597        .catalog_reader()
1598        .read_guard()
1599        .get_schema_by_id(table.database_id, table.schema_id)?
1600        .name()
1601        .clone();
1602    let iceberg_catalog_name = rw_db_name.clone();
1603    let iceberg_database_name = rw_schema_name.clone();
1604    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1605
1606    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1607    let sink_decouple = session.config().sink_decouple();
1608    if matches!(sink_decouple, SinkDecouple::Disable) {
1609        bail!(
1610            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1611        );
1612    }
1613
1614    let mut connection_ref = BTreeMap::new();
1615    let with_common = if iceberg_engine_connection.is_empty() {
1616        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1617    } else {
1618        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1619        assert_eq!(parts.len(), 2);
1620        let connection_catalog =
1621            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1622        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1623            if params.connection_type == ConnectionType::Iceberg as i32 {
1624                // With iceberg engine connection:
1625                connection_ref.insert(
1626                    "connection".to_owned(),
1627                    ConnectionRefValue {
1628                        connection_name: ObjectName::from(vec![
1629                            Ident::from(parts[0]),
1630                            Ident::from(parts[1]),
1631                        ]),
1632                    },
1633                );
1634
1635                let mut with_common = BTreeMap::new();
1636                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1637                with_common.insert("database.name".to_owned(), iceberg_database_name);
1638                with_common.insert("table.name".to_owned(), iceberg_table_name);
1639
1640                let hosted_catalog = params
1641                    .properties
1642                    .get("hosted_catalog")
1643                    .map(|s| s.eq_ignore_ascii_case("true"))
1644                    .unwrap_or(false);
1645                if hosted_catalog {
1646                    let meta_client = session.env().meta_client();
1647                    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1648
1649                    let meta_store_endpoint =
1650                        url::Url::parse(&meta_store_endpoint).map_err(|_| {
1651                            ErrorCode::InternalError(
1652                                "failed to parse the meta store endpoint".to_owned(),
1653                            )
1654                        })?;
1655                    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1656                    let meta_store_user = meta_store_endpoint.username().to_owned();
1657                    let meta_store_password = match meta_store_endpoint.password() {
1658                        Some(password) => percent_decode_str(password)
1659                            .decode_utf8()
1660                            .map_err(|_| {
1661                                ErrorCode::InternalError(
1662                                    "failed to parse password from meta store endpoint".to_owned(),
1663                                )
1664                            })?
1665                            .into_owned(),
1666                        None => "".to_owned(),
1667                    };
1668                    let meta_store_host = meta_store_endpoint
1669                        .host_str()
1670                        .ok_or_else(|| {
1671                            ErrorCode::InternalError(
1672                                "failed to parse host from meta store endpoint".to_owned(),
1673                            )
1674                        })?
1675                        .to_owned();
1676                    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1677                        ErrorCode::InternalError(
1678                            "failed to parse port from meta store endpoint".to_owned(),
1679                        )
1680                    })?;
1681                    let meta_store_database = meta_store_endpoint
1682                        .path()
1683                        .trim_start_matches('/')
1684                        .to_owned();
1685
1686                    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1687                        bail!("failed to parse meta backend: {}", meta_store_backend);
1688                    };
1689
1690                    let catalog_uri = match meta_backend {
1691                        MetaBackend::Postgres => {
1692                            format!(
1693                                "jdbc:postgresql://{}:{}/{}",
1694                                meta_store_host, meta_store_port, meta_store_database
1695                            )
1696                        }
1697                        MetaBackend::Mysql => {
1698                            format!(
1699                                "jdbc:mysql://{}:{}/{}",
1700                                meta_store_host, meta_store_port, meta_store_database
1701                            )
1702                        }
1703                        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1704                            bail!(
1705                                "Unsupported meta backend for iceberg engine table: {}",
1706                                meta_store_backend
1707                            );
1708                        }
1709                    };
1710
1711                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1712                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1713                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1714                    with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1715                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1716                }
1717
1718                with_common
1719            } else {
1720                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1721                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1722                )));
1723            }
1724        } else {
1725            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1726                "Private Link Service has been deprecated. Please create a new connection instead."
1727                    .to_owned(),
1728            )));
1729        }
1730    };
1731
1732    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1733    // Fetch primary key from columns
1734    let mut pks = table
1735        .pk_column_names()
1736        .iter()
1737        .map(|c| c.to_string())
1738        .collect::<Vec<String>>();
1739
1740    // For the table without primary key. We will use `_row_id` as primary key.
1741    if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1742        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1743    }
1744
1745    let sink_from = CreateSink::From(table_name.clone());
1746
1747    let mut sink_name = table_name.clone();
1748    *sink_name.0.last_mut().unwrap() = Ident::from(
1749        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1750    );
1751    let create_sink_stmt = CreateSinkStatement {
1752        if_not_exists: false,
1753        sink_name,
1754        with_properties: WithProperties(vec![]),
1755        sink_from,
1756        columns: vec![],
1757        emit_mode: None,
1758        sink_schema: None,
1759        into_table_name: None,
1760    };
1761
1762    let mut sink_handler_args = handler_args.clone();
1763
1764    let mut sink_with = with_common.clone();
1765
1766    // TODO: Iceberg with pk index doesn't support auto schema change
1767    if !handler_args
1768        .with_options
1769        .get(ENABLE_COMPACTION)
1770        .is_some_and(|val| val.eq_ignore_ascii_case("true"))
1771    {
1772        sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1773    }
1774
1775    if table.append_only {
1776        sink_with.insert("type".to_owned(), "append-only".to_owned());
1777    } else {
1778        sink_with.insert("primary_key".to_owned(), pks.join(","));
1779        sink_with.insert("type".to_owned(), "upsert".to_owned());
1780    }
1781    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1782    //
1783    // Note: in theory, we don't need to backfill from the table to the sink,
1784    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1785    // so it may have potential data loss problem on the first barrier.
1786    //
1787    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1788    // data will be present in hummock table.
1789    //
1790    // For append-only table, we need to be more careful.
1791    //
1792    // The possible cases for a table:
1793    // - For table without connector: it doesn't matter, since there's no data before the table is created
1794    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1795    //   + If we support blocking DDL for table with connector, we need to be careful.
1796    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1797    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1798    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1799    let commit_checkpoint_interval = handler_args
1800        .with_options
1801        .get(COMMIT_CHECKPOINT_INTERVAL)
1802        .map(|v| v.to_owned())
1803        .unwrap_or_else(|| "60".to_owned());
1804    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1805        ErrorCode::InvalidInputSyntax(format!(
1806            "commit_checkpoint_interval must be greater than 0: {}",
1807            commit_checkpoint_interval
1808        ))
1809    })?;
1810
1811    if commit_checkpoint_interval == 0 {
1812        bail!("commit_checkpoint_interval must be greater than 0");
1813    }
1814
1815    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1816    source
1817        .as_mut()
1818        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1819
1820    let sink_decouple = session.config().sink_decouple();
1821    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1822        bail!(
1823            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1824        )
1825    }
1826
1827    sink_with.insert(
1828        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1829        commit_checkpoint_interval.to_string(),
1830    );
1831    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1832
1833    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1834
1835    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1836        match enable_compaction.to_lowercase().as_str() {
1837            "true" => {
1838                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1839            }
1840            "false" => {
1841                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1842            }
1843            _ => {
1844                return Err(ErrorCode::InvalidInputSyntax(format!(
1845                    "enable_compaction must be true or false: {}",
1846                    enable_compaction
1847                ))
1848                .into());
1849            }
1850        }
1851
1852        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1853        source
1854            .as_mut()
1855            .map(|x| x.with_properties.remove("enable_compaction"));
1856    } else {
1857        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1858    }
1859
1860    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1861        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1862            ErrorCode::InvalidInputSyntax(format!(
1863                "compaction_interval_sec must be greater than 0: {}",
1864                commit_checkpoint_interval
1865            ))
1866        })?;
1867        if compaction_interval_sec == 0 {
1868            bail!("compaction_interval_sec must be greater than 0");
1869        }
1870        sink_with.insert(
1871            "compaction_interval_sec".to_owned(),
1872            compaction_interval_sec.to_string(),
1873        );
1874        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1875        source
1876            .as_mut()
1877            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1878    }
1879
1880    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1881        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1882    {
1883        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1884        source
1885            .as_mut()
1886            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1887        match enable_snapshot_expiration.to_lowercase().as_str() {
1888            "true" => {
1889                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1890                true
1891            }
1892            "false" => {
1893                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1894                false
1895            }
1896            _ => {
1897                return Err(ErrorCode::InvalidInputSyntax(format!(
1898                    "enable_snapshot_expiration must be true or false: {}",
1899                    enable_snapshot_expiration
1900                ))
1901                .into());
1902            }
1903        }
1904    } else {
1905        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1906        true
1907    };
1908
1909    if has_enabled_snapshot_expiration {
1910        // configuration for snapshot expiration
1911        if let Some(snapshot_expiration_retain_last) = handler_args
1912            .with_options
1913            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1914        {
1915            sink_with.insert(
1916                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1917                snapshot_expiration_retain_last.to_owned(),
1918            );
1919            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1920            source
1921                .as_mut()
1922                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1923        }
1924
1925        if let Some(snapshot_expiration_max_age) = handler_args
1926            .with_options
1927            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1928        {
1929            sink_with.insert(
1930                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1931                snapshot_expiration_max_age.to_owned(),
1932            );
1933            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1934            source
1935                .as_mut()
1936                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1937        }
1938
1939        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1940            .with_options
1941            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1942        {
1943            sink_with.insert(
1944                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1945                snapshot_expiration_clear_expired_files.to_owned(),
1946            );
1947            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1948            source.as_mut().map(|x| {
1949                x.with_properties
1950                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1951            });
1952        }
1953
1954        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1955            .with_options
1956            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1957        {
1958            sink_with.insert(
1959                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1960                snapshot_expiration_clear_expired_meta_data.to_owned(),
1961            );
1962            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1963            source.as_mut().map(|x| {
1964                x.with_properties
1965                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1966            });
1967        }
1968    }
1969
1970    if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1971        let format_version = format_version.parse::<u8>().map_err(|_| {
1972            ErrorCode::InvalidInputSyntax(format!(
1973                "format_version must be 1, 2 or 3: {}",
1974                format_version
1975            ))
1976        })?;
1977        if format_version != 1 && format_version != 2 && format_version != 3 {
1978            bail!("format_version must be 1, 2 or 3");
1979        }
1980        sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
1981
1982        // remove format_version from source options, otherwise it will be considered as an unknown field.
1983        source
1984            .as_mut()
1985            .map(|x| x.with_properties.remove(FORMAT_VERSION));
1986    }
1987
1988    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1989        let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1990            ErrorCode::InvalidInputSyntax(format!(
1991                "invalid write_mode: {}, must be one of: {}, {}",
1992                write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1993            ))
1994        })?;
1995
1996        match write_mode {
1997            IcebergWriteMode::MergeOnRead => {
1998                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1999            }
2000
2001            IcebergWriteMode::CopyOnWrite => {
2002                if table.append_only {
2003                    return Err(ErrorCode::NotSupported(
2004                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2005                        "Please use MERGE ON READ instead".to_owned(),
2006                    )
2007                    .into());
2008                }
2009
2010                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2011            }
2012        }
2013
2014        // remove write_mode from source options, otherwise it will be considered as an unknown field.
2015        source
2016            .as_mut()
2017            .map(|x| x.with_properties.remove("write_mode"));
2018    } else {
2019        sink_with.insert(
2020            WRITE_MODE.to_owned(),
2021            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2022        );
2023    }
2024
2025    if let Some(max_snapshots_num_before_compaction) =
2026        handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2027    {
2028        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2029            .parse::<u32>()
2030            .map_err(|_| {
2031                ErrorCode::InvalidInputSyntax(format!(
2032                    "{} must be greater than 0: {}",
2033                    COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2034                ))
2035            })?;
2036
2037        if max_snapshots_num_before_compaction == 0 {
2038            bail!(format!(
2039                "{} must be greater than 0",
2040                COMPACTION_MAX_SNAPSHOTS_NUM
2041            ));
2042        }
2043
2044        sink_with.insert(
2045            COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2046            max_snapshots_num_before_compaction.to_string(),
2047        );
2048
2049        // remove from source options, otherwise it will be considered as an unknown field.
2050        source
2051            .as_mut()
2052            .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2053    }
2054
2055    if let Some(small_files_threshold_mb) = handler_args
2056        .with_options
2057        .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2058    {
2059        let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2060            ErrorCode::InvalidInputSyntax(format!(
2061                "{} must be greater than 0: {}",
2062                COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2063            ))
2064        })?;
2065        if small_files_threshold_mb == 0 {
2066            bail!(format!(
2067                "{} must be a greater than 0",
2068                COMPACTION_SMALL_FILES_THRESHOLD_MB
2069            ));
2070        }
2071        sink_with.insert(
2072            COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2073            small_files_threshold_mb.to_string(),
2074        );
2075
2076        // remove from source options, otherwise it will be considered as an unknown field.
2077        source.as_mut().map(|x| {
2078            x.with_properties
2079                .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2080        });
2081    }
2082
2083    if let Some(delete_files_count_threshold) = handler_args
2084        .with_options
2085        .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2086    {
2087        let delete_files_count_threshold =
2088            delete_files_count_threshold.parse::<usize>().map_err(|_| {
2089                ErrorCode::InvalidInputSyntax(format!(
2090                    "{} must be greater than 0: {}",
2091                    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2092                ))
2093            })?;
2094        if delete_files_count_threshold == 0 {
2095            bail!(format!(
2096                "{} must be greater than 0",
2097                COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2098            ));
2099        }
2100        sink_with.insert(
2101            COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2102            delete_files_count_threshold.to_string(),
2103        );
2104
2105        // remove from source options, otherwise it will be considered as an unknown field.
2106        source.as_mut().map(|x| {
2107            x.with_properties
2108                .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2109        });
2110    }
2111
2112    if let Some(trigger_snapshot_count) = handler_args
2113        .with_options
2114        .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2115    {
2116        let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2117            ErrorCode::InvalidInputSyntax(format!(
2118                "{} must be greater than 0: {}",
2119                COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2120            ))
2121        })?;
2122        if trigger_snapshot_count == 0 {
2123            bail!(format!(
2124                "{} must be greater than 0",
2125                COMPACTION_TRIGGER_SNAPSHOT_COUNT
2126            ));
2127        }
2128        sink_with.insert(
2129            COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2130            trigger_snapshot_count.to_string(),
2131        );
2132
2133        // remove from source options, otherwise it will be considered as an unknown field.
2134        source
2135            .as_mut()
2136            .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2137    }
2138
2139    if let Some(target_file_size_mb) = handler_args
2140        .with_options
2141        .get(COMPACTION_TARGET_FILE_SIZE_MB)
2142    {
2143        let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2144            ErrorCode::InvalidInputSyntax(format!(
2145                "{} must be greater than 0: {}",
2146                COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2147            ))
2148        })?;
2149        if target_file_size_mb == 0 {
2150            bail!(format!(
2151                "{} must be greater than 0",
2152                COMPACTION_TARGET_FILE_SIZE_MB
2153            ));
2154        }
2155        sink_with.insert(
2156            COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2157            target_file_size_mb.to_string(),
2158        );
2159        // remove from source options, otherwise it will be considered as an unknown field.
2160        source
2161            .as_mut()
2162            .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2163    }
2164
2165    if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2166        let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2167            ErrorCode::InvalidInputSyntax(format!(
2168                "invalid compaction_type: {}, must be one of {:?}",
2169                compaction_type,
2170                &[
2171                    CompactionType::Full,
2172                    CompactionType::SmallFiles,
2173                    CompactionType::FilesWithDelete
2174                ]
2175            ))
2176        })?;
2177
2178        sink_with.insert(
2179            COMPACTION_TYPE.to_owned(),
2180            compaction_type.as_str().to_owned(),
2181        );
2182
2183        // remove from source options, otherwise it will be considered as an unknown field.
2184        source
2185            .as_mut()
2186            .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2187    }
2188
2189    if let Some(write_parquet_compression) = handler_args
2190        .with_options
2191        .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2192    {
2193        sink_with.insert(
2194            COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2195            write_parquet_compression.to_owned(),
2196        );
2197        // remove from source options, otherwise it will be considered as an unknown field.
2198        source.as_mut().map(|x| {
2199            x.with_properties
2200                .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2201        });
2202    }
2203
2204    if let Some(write_parquet_max_row_group_rows) = handler_args
2205        .with_options
2206        .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2207    {
2208        let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2209            .parse::<usize>()
2210            .map_err(|_| {
2211                ErrorCode::InvalidInputSyntax(format!(
2212                    "{} must be a positive integer: {}",
2213                    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2214                ))
2215            })?;
2216        if write_parquet_max_row_group_rows == 0 {
2217            bail!(format!(
2218                "{} must be greater than 0",
2219                COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2220            ));
2221        }
2222        sink_with.insert(
2223            COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2224            write_parquet_max_row_group_rows.to_string(),
2225        );
2226        // remove from source options, otherwise it will be considered as an unknown field.
2227        source.as_mut().map(|x| {
2228            x.with_properties
2229                .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2230        });
2231    }
2232
2233    if let Some(write_parquet_max_row_group_bytes) = handler_args
2234        .with_options
2235        .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2236    {
2237        let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2238            .parse::<usize>()
2239            .map_err(|_| {
2240                ErrorCode::InvalidInputSyntax(format!(
2241                    "{} must be a positive integer: {}",
2242                    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2243                ))
2244            })?;
2245        if write_parquet_max_row_group_bytes == 0 {
2246            bail!(format!(
2247                "{} must be greater than 0",
2248                COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2249            ));
2250        }
2251        sink_with.insert(
2252            COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2253            write_parquet_max_row_group_bytes.to_string(),
2254        );
2255        source.as_mut().map(|x| {
2256            x.with_properties
2257                .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2258        });
2259    }
2260
2261    let partition_by = handler_args
2262        .with_options
2263        .get("partition_by")
2264        .map(|v| v.to_owned());
2265
2266    if let Some(partition_by) = &partition_by {
2267        let mut partition_columns = vec![];
2268        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2269            table
2270                .columns()
2271                .iter()
2272                .find(|col| col.name().eq_ignore_ascii_case(&column))
2273                .ok_or_else(|| {
2274                    ErrorCode::InvalidInputSyntax(format!(
2275                        "Partition source column does not exist in schema: {}",
2276                        column
2277                    ))
2278                })?;
2279
2280            partition_columns.push(column);
2281        }
2282
2283        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2284            |_| {
2285                ErrorCode::InvalidInputSyntax(
2286                    "The partition columns should be the prefix of the primary key".to_owned(),
2287                )
2288            },
2289        )?;
2290
2291        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2292
2293        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2294        source
2295            .as_mut()
2296            .map(|x| x.with_properties.remove("partition_by"));
2297    }
2298
2299    let order_key = handler_args
2300        .with_options
2301        .get(ORDER_KEY)
2302        .map(|v| v.to_owned());
2303    if let Some(order_key) = &order_key {
2304        validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2305            .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2306
2307        sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2308
2309        source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2310    }
2311
2312    sink_handler_args.with_options =
2313        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2314    let SinkPlanContext {
2315        sink_plan,
2316        sink_catalog,
2317        ..
2318    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2319    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2320
2321    let mut source_name = table_name.clone();
2322    *source_name.0.last_mut().unwrap() = Ident::from(
2323        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2324    );
2325    let create_source_stmt = CreateSourceStatement {
2326        temporary: false,
2327        if_not_exists: false,
2328        columns: vec![],
2329        source_name,
2330        wildcard_idx: Some(0),
2331        constraints: vec![],
2332        with_properties: WithProperties(vec![]),
2333        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2334        source_watermarks: vec![],
2335        include_column_options: vec![],
2336    };
2337
2338    let mut source_handler_args = handler_args.clone();
2339    let source_with = with_common;
2340    source_handler_args.with_options =
2341        WithOptions::new(source_with, Default::default(), connection_ref);
2342
2343    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2344    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2345    let (with_properties, refresh_mode) =
2346        bind_connector_props(&source_handler_args, &format_encode, true)?;
2347
2348    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2349    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2350    let (iceberg_catalog, table_identifier) = {
2351        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2352        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2353        iceberg_sink.create_table_if_not_exists().await?;
2354
2355        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2356        let table_identifier = iceberg_sink.config.full_table_name()?;
2357        (iceberg_catalog, table_identifier)
2358    };
2359
2360    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2361    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2362        &session,
2363        &format_encode,
2364        Either::Left(&with_properties),
2365        create_source_type,
2366    )
2367    .await?;
2368    let mut col_id_gen = ColumnIdGenerator::new_initial();
2369
2370    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2371        source_handler_args,
2372        create_source_stmt.source_name,
2373        format_encode,
2374        with_properties,
2375        &create_source_stmt.columns,
2376        create_source_stmt.constraints,
2377        create_source_stmt.wildcard_idx,
2378        create_source_stmt.source_watermarks,
2379        columns_from_resolve_source,
2380        source_info,
2381        create_source_stmt.include_column_options,
2382        &mut col_id_gen,
2383        create_source_type,
2384        overwrite_options.source_rate_limit,
2385        SqlColumnStrategy::FollowChecked,
2386        refresh_mode,
2387    )
2388    .await?;
2389
2390    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2391    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2392    let _ = Jvm::get_or_init()?;
2393
2394    let catalog_writer = session.catalog_writer()?;
2395    let action = match job_type {
2396        TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2397        _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2398    };
2399    let res = execute_with_long_running_notification(
2400        catalog_writer.create_iceberg_table(
2401            PbTableJobInfo {
2402                source,
2403                table: Some(table.to_prost()),
2404                fragment_graph: Some(graph),
2405                job_type: job_type as _,
2406            },
2407            PbSinkJobInfo {
2408                sink: Some(sink_catalog.to_proto()),
2409                fragment_graph: Some(sink_graph),
2410            },
2411            iceberg_source_catalog.to_prost(),
2412            if_not_exists,
2413        ),
2414        &session,
2415        "CREATE TABLE",
2416        action,
2417    )
2418    .await;
2419
2420    if res.is_err() {
2421        let _ = iceberg_catalog
2422            .drop_table(&table_identifier)
2423            .await
2424            .inspect_err(|err| {
2425                tracing::error!(
2426                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2427                    table_identifier,
2428                    err.as_report()
2429                );
2430            });
2431        res?
2432    }
2433
2434    Ok(())
2435}
2436
2437pub fn check_create_table_with_source(
2438    with_options: &WithOptions,
2439    format_encode: Option<FormatEncodeOptions>,
2440    include_column_options: &IncludeOption,
2441    cdc_table_info: &Option<CdcTableInfo>,
2442) -> Result<Option<FormatEncodeOptions>> {
2443    // skip check for cdc table
2444    if cdc_table_info.is_some() {
2445        return Ok(format_encode);
2446    }
2447    let defined_source = with_options.is_source_connector();
2448
2449    if !include_column_options.is_empty() && !defined_source {
2450        return Err(ErrorCode::InvalidInputSyntax(
2451            "INCLUDE should be used with a connector".to_owned(),
2452        )
2453        .into());
2454    }
2455    if defined_source {
2456        format_encode.as_ref().ok_or_else(|| {
2457            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2458        })?;
2459    }
2460    Ok(format_encode)
2461}
2462
2463fn ensure_partition_columns_are_prefix_of_primary_key(
2464    partition_columns: &[String],
2465    primary_key_columns: &[String],
2466) -> std::result::Result<(), String> {
2467    if partition_columns.len() > primary_key_columns.len() {
2468        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2469    }
2470
2471    for (i, partition_col) in partition_columns.iter().enumerate() {
2472        if primary_key_columns.get(i) != Some(partition_col) {
2473            return Err(format!(
2474                "Partition column '{}' is not a prefix of the primary key.",
2475                partition_col
2476            ));
2477        }
2478    }
2479
2480    Ok(())
2481}
2482
2483#[allow(clippy::too_many_arguments)]
2484pub async fn generate_stream_graph_for_replace_table(
2485    _session: &Arc<SessionImpl>,
2486    table_name: ObjectName,
2487    original_catalog: &Arc<TableCatalog>,
2488    handler_args: HandlerArgs,
2489    statement: Statement,
2490    col_id_gen: ColumnIdGenerator,
2491    sql_column_strategy: SqlColumnStrategy,
2492) -> Result<(
2493    StreamFragmentGraph,
2494    TableCatalog,
2495    Option<SourceCatalog>,
2496    TableJobType,
2497)> {
2498    let Statement::CreateTable {
2499        columns,
2500        constraints,
2501        source_watermarks,
2502        append_only,
2503        on_conflict,
2504        with_version_columns,
2505        wildcard_idx,
2506        cdc_table_info,
2507        format_encode,
2508        include_column_options,
2509        engine,
2510        with_options,
2511        ..
2512    } = statement
2513    else {
2514        panic!("unexpected statement type: {:?}", statement);
2515    };
2516
2517    let format_encode = format_encode
2518        .clone()
2519        .map(|format_encode| format_encode.into_v2_with_warning());
2520
2521    let engine = match engine {
2522        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2523        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2524    };
2525
2526    let is_drop_connector =
2527        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2528    if is_drop_connector {
2529        debug_assert!(
2530            source_watermarks.is_empty()
2531                && include_column_options.is_empty()
2532                && with_options
2533                    .iter()
2534                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2535        );
2536    }
2537
2538    let props = CreateTableProps {
2539        definition: handler_args.normalized_sql.clone(),
2540        append_only,
2541        on_conflict: on_conflict.into(),
2542        with_version_columns: with_version_columns
2543            .iter()
2544            .map(|col| col.real_value())
2545            .collect(),
2546        webhook_info: original_catalog.webhook_info.clone(),
2547        engine,
2548    };
2549
2550    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2551        (Some(format_encode), None) => (
2552            gen_create_table_plan_with_source(
2553                handler_args,
2554                ExplainOptions::default(),
2555                table_name,
2556                columns,
2557                wildcard_idx,
2558                constraints,
2559                format_encode,
2560                source_watermarks,
2561                col_id_gen,
2562                include_column_options,
2563                props,
2564                sql_column_strategy,
2565            )
2566            .await?,
2567            TableJobType::General,
2568        ),
2569        (None, None) => {
2570            let context = OptimizerContext::from_handler_args(handler_args);
2571            let (plan, table) = gen_create_table_plan(
2572                context,
2573                table_name,
2574                columns,
2575                constraints,
2576                col_id_gen,
2577                source_watermarks,
2578                props,
2579                true,
2580            )?;
2581            ((plan, None, table), TableJobType::General)
2582        }
2583        (None, Some(cdc_table)) => {
2584            sanity_check_for_table_on_cdc_source(
2585                append_only,
2586                &columns,
2587                &wildcard_idx,
2588                &constraints,
2589                &source_watermarks,
2590            )?;
2591
2592            let session = &handler_args.session;
2593            let (source, resolved_table_name) =
2594                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2595
2596            let (cdc_with_options, normalized_external_table_name) =
2597                derive_with_options_for_cdc_table(
2598                    &source.with_properties,
2599                    cdc_table.external_table_name.clone(),
2600                )?;
2601
2602            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2603
2604            let context: OptimizerContextRef =
2605                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2606            let (plan, table) = gen_create_table_plan_for_cdc_table(
2607                context,
2608                source,
2609                normalized_external_table_name,
2610                columns,
2611                source_watermarks,
2612                column_catalogs,
2613                pk_names,
2614                cdc_with_options,
2615                col_id_gen,
2616                on_conflict,
2617                with_version_columns
2618                    .iter()
2619                    .map(|col| col.real_value())
2620                    .collect(),
2621                include_column_options,
2622                table_name,
2623                resolved_table_name,
2624                original_catalog.database_id,
2625                original_catalog.schema_id,
2626                original_catalog.id(),
2627                engine,
2628            )?;
2629
2630            ((plan, None, table), TableJobType::SharedCdcSource)
2631        }
2632        (Some(_), Some(_)) => {
2633            return Err(ErrorCode::NotSupported(
2634                "Data format and encoding format doesn't apply to table created from a CDC source"
2635                    .into(),
2636                "Remove the FORMAT and ENCODE specification".into(),
2637            )
2638            .into());
2639        }
2640    };
2641
2642    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2643        Err(ErrorCode::InvalidInputSyntax(
2644            "alter primary key of table is not supported".to_owned(),
2645        ))?
2646    }
2647
2648    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2649
2650    // Fill the original table ID.
2651    table.id = original_catalog.id();
2652    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2653        table.associated_source_id = Some(source_id);
2654
2655        let source = source.as_mut().unwrap();
2656        source.id = source_id;
2657        source.associated_table_id = Some(table.id());
2658    }
2659
2660    Ok((graph, table, source, job_type))
2661}
2662
2663fn get_source_and_resolved_table_name(
2664    session: &Arc<SessionImpl>,
2665    cdc_table: CdcTableInfo,
2666    table_name: ObjectName,
2667) -> Result<(Arc<SourceCatalog>, String)> {
2668    let db_name = &session.database();
2669    let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2670
2671    let (source_schema, source_name) =
2672        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2673
2674    let source = {
2675        let catalog_reader = session.env().catalog_reader().read_guard();
2676        let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2677        let (source, _) = catalog_reader.get_source_by_name(
2678            db_name,
2679            SchemaPath::Name(schema_name.as_str()),
2680            source_name.as_str(),
2681        )?;
2682        source.clone()
2683    };
2684
2685    Ok((source, resolved_table_name))
2686}
2687
2688// validate the webhook_info and also bind the webhook_info to protobuf
2689fn bind_webhook_info(
2690    session: &Arc<SessionImpl>,
2691    column_defs: &[ColumnDef],
2692    webhook_info: WebhookSourceInfo,
2693) -> Result<PbWebhookSourceInfo> {
2694    let WebhookSourceInfo {
2695        secret_ref,
2696        signature_expr,
2697        wait_for_persistence,
2698        is_batched,
2699    } = webhook_info;
2700
2701    for column in column_defs {
2702        for option_def in &column.options {
2703            match option_def.option {
2704                ColumnOption::Null => {}
2705                ColumnOption::GeneratedColumns(_) => {
2706                    return Err(ErrorCode::InvalidInputSyntax(
2707                        "generated columns are not supported for webhook tables".to_owned(),
2708                    )
2709                    .into());
2710                }
2711                ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2712                    return Err(ErrorCode::InvalidInputSyntax(
2713                        "default values are not supported for webhook tables".to_owned(),
2714                    )
2715                    .into());
2716                }
2717                ColumnOption::NotNull
2718                | ColumnOption::Unique { .. }
2719                | ColumnOption::ForeignKey { .. }
2720                | ColumnOption::Check(_)
2721                | ColumnOption::DialectSpecific(_) => {
2722                    return Err(ErrorCode::InvalidInputSyntax(
2723                        "only NULL column option is supported for webhook tables".to_owned(),
2724                    )
2725                    .into());
2726                }
2727            }
2728        }
2729    }
2730
2731    // validate secret_ref
2732    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2733        let db_name = &session.database();
2734        let (schema_name, secret_name) =
2735            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2736        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2737        (
2738            Some(PbSecretRef {
2739                secret_id: secret_catalog.id,
2740                ref_as: match secret_ref.ref_as {
2741                    SecretRefAsType::Text => PbRefAsType::Text,
2742                    SecretRefAsType::File => PbRefAsType::File,
2743                }
2744                .into(),
2745            }),
2746            Some(secret_name),
2747        )
2748    } else {
2749        (None, None)
2750    };
2751
2752    let signature_expr = if let Some(signature_expr) = signature_expr {
2753        let payload_name = if column_defs.len() == 1
2754            && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2755        {
2756            column_defs[0].name.real_value()
2757        } else {
2758            WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2759        };
2760        let secure_compare_context = SecureCompareContext {
2761            payload_name,
2762            secret_name,
2763        };
2764        let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2765        let expr = binder.bind_expr(&signature_expr)?;
2766
2767        // validate expr, ensuring it is SECURE_COMPARE()
2768        if expr.as_function_call().is_none()
2769            || expr.as_function_call().unwrap().func_type()
2770                != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2771        {
2772            return Err(ErrorCode::InvalidInputSyntax(
2773                "The signature verification function must be SECURE_COMPARE()".to_owned(),
2774            )
2775            .into());
2776        }
2777
2778        Some(expr.to_expr_proto())
2779    } else {
2780        session.notice_to_user(
2781            "VALIDATE clause is strongly recommended for safety or production usages",
2782        );
2783        None
2784    };
2785
2786    let pb_webhook_info = PbWebhookSourceInfo {
2787        secret_ref: pb_secret_ref,
2788        signature_expr,
2789        wait_for_persistence,
2790        is_batched,
2791    };
2792
2793    Ok(pb_webhook_info)
2794}
2795
2796#[cfg(test)]
2797mod tests {
2798    use risingwave_common::catalog::{
2799        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2800    };
2801    use risingwave_common::types::{DataType, StructType};
2802
2803    use super::*;
2804    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2805
2806    #[tokio::test]
2807    async fn test_create_table_handler() {
2808        let sql =
2809            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2810        let frontend = LocalFrontend::new(Default::default()).await;
2811        frontend.run_sql(sql).await.unwrap();
2812
2813        let session = frontend.session_ref();
2814        let catalog_reader = session.env().catalog_reader().read_guard();
2815        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2816
2817        // Check table exists.
2818        let (table, _) = catalog_reader
2819            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2820            .unwrap();
2821        assert_eq!(table.name(), "t");
2822
2823        let columns = table
2824            .columns
2825            .iter()
2826            .map(|col| (col.name(), col.data_type().clone()))
2827            .collect::<HashMap<&str, DataType>>();
2828
2829        let expected_columns = maplit::hashmap! {
2830            ROW_ID_COLUMN_NAME => DataType::Serial,
2831            "v1" => DataType::Int16,
2832            "v2" => StructType::new(
2833                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2834            )
2835            .with_ids([3, 4, 5].map(ColumnId::new))
2836            .into(),
2837            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2838        };
2839
2840        assert_eq!(columns, expected_columns, "{columns:#?}");
2841    }
2842
2843    #[tokio::test]
2844    async fn test_create_webhook_table_with_arbitrary_columns() {
2845        let frontend = LocalFrontend::new(Default::default()).await;
2846        frontend
2847            .run_sql("create schema ingest_schema;")
2848            .await
2849            .unwrap();
2850        frontend
2851            .run_sql(
2852                r#"
2853                create table ingest_schema.orders (
2854                    id int,
2855                    customer_name varchar,
2856                    amount double precision,
2857                    primary key (id)
2858                ) with (
2859                    connector = 'webhook'
2860                ) validate as secure_compare(
2861                    headers->>'x-rw-signature',
2862                    'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2863                );
2864                "#,
2865            )
2866            .await
2867            .unwrap();
2868
2869        let session = frontend.session_ref();
2870        let catalog_reader = session.env().catalog_reader().read_guard();
2871        let (table, _) = catalog_reader
2872            .get_created_table_by_name(
2873                DEFAULT_DATABASE_NAME,
2874                SchemaPath::Name("ingest_schema"),
2875                "orders",
2876            )
2877            .unwrap();
2878
2879        assert!(table.webhook_info.is_some());
2880        assert_eq!(
2881            table
2882                .columns
2883                .iter()
2884                .filter(|column| column.can_dml())
2885                .count(),
2886            3
2887        );
2888    }
2889
2890    #[tokio::test]
2891    async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
2892        let frontend = LocalFrontend::new(Default::default()).await;
2893        frontend
2894            .run_sql(
2895                r#"
2896                create table webhook_single_column (
2897                    body jsonb
2898                ) with (
2899                    connector = 'webhook'
2900                ) validate as secure_compare(
2901                    headers->>'x-rw-signature',
2902                    'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
2903                );
2904                "#,
2905            )
2906            .await
2907            .unwrap();
2908    }
2909
2910    #[tokio::test]
2911    async fn test_create_webhook_table_with_generated_columns() {
2912        let frontend = LocalFrontend::new(Default::default()).await;
2913        let err = frontend
2914            .run_sql(
2915                r#"
2916                create table webhook_generated_columns (
2917                    id int,
2918                    amount double precision,
2919                    amount_with_fee double precision as amount + 1.0
2920                ) with (
2921                    connector = 'webhook'
2922                );
2923                "#,
2924            )
2925            .await
2926            .unwrap_err();
2927
2928        assert!(
2929            err.to_string()
2930                .contains("generated columns are not supported for webhook tables"),
2931            "{err:?}"
2932        );
2933    }
2934
2935    #[tokio::test]
2936    async fn test_create_webhook_table_with_default_value() {
2937        let frontend = LocalFrontend::new(Default::default()).await;
2938        let err = frontend
2939            .run_sql(
2940                r#"
2941                create table webhook_default_value (
2942                    id int default 42,
2943                    amount double precision
2944                ) with (
2945                    connector = 'webhook'
2946                );
2947                "#,
2948            )
2949            .await
2950            .unwrap_err();
2951
2952        assert!(
2953            err.to_string()
2954                .contains("default values are not supported for webhook tables"),
2955            "{err:?}"
2956        );
2957    }
2958
2959    #[tokio::test]
2960    async fn test_create_webhook_table_with_not_null_option() {
2961        let frontend = LocalFrontend::new(Default::default()).await;
2962        let err = frontend
2963            .run_sql(
2964                r#"
2965                create table webhook_not_null (
2966                    id int not null,
2967                    amount double precision
2968                ) with (
2969                    connector = 'webhook'
2970                );
2971                "#,
2972            )
2973            .await
2974            .unwrap_err();
2975
2976        assert!(
2977            err.to_string()
2978                .contains("only NULL column option is supported for webhook tables"),
2979            "{err:?}"
2980        );
2981    }
2982
2983    #[test]
2984    fn test_bind_primary_key() {
2985        // Note: Column ID 0 is reserved for row ID column.
2986
2987        for (sql, expected) in [
2988            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2989            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2990            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2991            (
2992                "create table t (v1 int primary key, v2 int primary key)",
2993                Err("multiple primary keys are not allowed"),
2994            ),
2995            (
2996                "create table t (v1 int primary key primary key, v2 int)",
2997                Err("multiple primary keys are not allowed"),
2998            ),
2999            (
3000                "create table t (v1 int, v2 int, primary key (v1))",
3001                Ok(&[1]),
3002            ),
3003            (
3004                "create table t (v1 int, primary key (v2), v2 int)",
3005                Ok(&[2]),
3006            ),
3007            (
3008                "create table t (primary key (v2, v1), v1 int, v2 int)",
3009                Ok(&[2, 1]),
3010            ),
3011            (
3012                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3013                Err("multiple primary keys are not allowed"),
3014            ),
3015            (
3016                "create table t (v1 int primary key, primary key (v1), v2 int)",
3017                Err("multiple primary keys are not allowed"),
3018            ),
3019            (
3020                "create table t (v1 int, primary key (V3), v2 int)",
3021                Err("column \"v3\" named in key does not exist"),
3022            ),
3023        ] {
3024            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3025            let risingwave_sqlparser::ast::Statement::CreateTable {
3026                columns: column_defs,
3027                constraints,
3028                ..
3029            } = ast.remove(0)
3030            else {
3031                panic!("test case should be create table")
3032            };
3033            let actual: Result<_> = (|| {
3034                let mut columns = bind_sql_columns(&column_defs, false)?;
3035                let mut col_id_gen = ColumnIdGenerator::new_initial();
3036                for c in &mut columns {
3037                    col_id_gen.generate(c)?;
3038                }
3039
3040                let pk_names =
3041                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3042                let (_, pk_column_ids, _) =
3043                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3044                Ok(pk_column_ids)
3045            })();
3046            match (expected, actual) {
3047                (Ok(expected), Ok(actual)) => assert_eq!(
3048                    expected.iter().copied().map(ColumnId::new).collect_vec(),
3049                    actual,
3050                    "sql: {sql}"
3051                ),
3052                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3053                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3054                (Err(expected), Err(actual)) => assert!(
3055                    actual.to_string().contains(expected),
3056                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3057                ),
3058            }
3059        }
3060    }
3061
3062    #[tokio::test]
3063    async fn test_duplicate_props_options() {
3064        let proto_file = create_proto_file(PROTO_FILE_DATA);
3065        let sql = format!(
3066            r#"CREATE TABLE t
3067    WITH (
3068        connector = 'kinesis',
3069        aws.region='user_test_topic',
3070        endpoint='172.10.1.1:9090,172.10.1.2:9090',
3071        aws.credentials.access_key_id = 'your_access_key_1',
3072        aws.credentials.secret_access_key = 'your_secret_key_1'
3073    )
3074    FORMAT PLAIN ENCODE PROTOBUF (
3075        message = '.test.TestRecord',
3076        aws.credentials.access_key_id = 'your_access_key_2',
3077        aws.credentials.secret_access_key = 'your_secret_key_2',
3078        schema.location = 'file://{}',
3079    )"#,
3080            proto_file.path().to_str().unwrap()
3081        );
3082        let frontend = LocalFrontend::new(Default::default()).await;
3083        frontend.run_sql(sql).await.unwrap();
3084
3085        let session = frontend.session_ref();
3086        let catalog_reader = session.env().catalog_reader().read_guard();
3087        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3088
3089        // Check source exists.
3090        let (source, _) = catalog_reader
3091            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3092            .unwrap();
3093        assert_eq!(source.name, "t");
3094
3095        // AwsAuth params exist in options.
3096        assert_eq!(
3097            source
3098                .info
3099                .format_encode_options
3100                .get("aws.credentials.access_key_id")
3101                .unwrap(),
3102            "your_access_key_2"
3103        );
3104        assert_eq!(
3105            source
3106                .info
3107                .format_encode_options
3108                .get("aws.credentials.secret_access_key")
3109                .unwrap(),
3110            "your_secret_key_2"
3111        );
3112
3113        // AwsAuth params exist in props.
3114        assert_eq!(
3115            source
3116                .with_properties
3117                .get("aws.credentials.access_key_id")
3118                .unwrap(),
3119            "your_access_key_1"
3120        );
3121        assert_eq!(
3122            source
3123                .with_properties
3124                .get("aws.credentials.secret_access_key")
3125                .unwrap(),
3126            "your_secret_key_1"
3127        );
3128
3129        // Options are not merged into props.
3130        assert!(!source.with_properties.contains_key("schema.location"));
3131    }
3132}