risingwave_frontend/handler/
create_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77    bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::{
80    LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
81};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
83use crate::optimizer::plan_node::{
84    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
85};
86use crate::optimizer::property::{Order, RequiredDist};
87use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
88use crate::session::SessionImpl;
89use crate::session::current::notice_to_user;
90use crate::stream_fragmenter::{GraphJobType, build_graph};
91use crate::utils::OverwriteOptions;
92use crate::{Binder, Explain, TableCatalog, WithOptions};
93
94mod col_id_gen;
95pub use col_id_gen::*;
96use risingwave_connector::sink::SinkParam;
97use risingwave_connector::sink::iceberg::{
98    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
99    COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
100    COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, CompactionType, ENABLE_COMPACTION,
101    ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION, ICEBERG_WRITE_MODE_COPY_ON_WRITE,
102    ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink, IcebergWriteMode,
103    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
104    SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
105    parse_partition_by_exprs,
106};
107use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
108
109use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
110
111fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
112    for option_def in &c.options {
113        match option_def.option {
114            ColumnOption::GeneratedColumns(_) => {}
115            ColumnOption::DefaultValue(_) => {}
116            ColumnOption::DefaultValueInternal { .. } => {}
117            ColumnOption::Unique { is_primary: true } => {}
118            ColumnOption::Null => {}
119            ColumnOption::NotNull => {}
120            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
121        }
122    }
123    Ok(())
124}
125
126/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
127/// If a column is marked as `primary key`, its `ColumnId` is also returned.
128/// This primary key is not combined with table constraints yet.
129pub fn bind_sql_columns(
130    column_defs: &[ColumnDef],
131    is_for_drop_table_connector: bool,
132) -> Result<Vec<ColumnCatalog>> {
133    let mut columns = Vec::with_capacity(column_defs.len());
134
135    for column in column_defs {
136        ensure_column_options_supported(column)?;
137        // Destruct to make sure all fields are properly handled rather than ignored.
138        // Do NOT use `..` to ignore fields you do not want to deal with.
139        // Reject them with a clear NotImplemented error.
140        let ColumnDef {
141            name,
142            data_type,
143            collation,
144            options,
145            ..
146        } = column;
147
148        let data_type = data_type
149            .clone()
150            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
151        if let Some(collation) = collation {
152            // PostgreSQL will limit the datatypes that collate can work on.
153            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
154            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
155            //
156            // But we don't support real collation, we simply ignore it here.
157            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
158                bail_not_implemented!(
159                    "Collate collation other than `C` or `POSIX` is not implemented"
160                );
161            }
162
163            match data_type {
164                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
165                _ => {
166                    return Err(ErrorCode::NotSupported(
167                        format!("{} is not a collatable data type", data_type),
168                        "The only built-in collatable data types are `varchar`, please check your type".into(),
169                    ).into());
170                }
171            }
172        }
173
174        if !is_for_drop_table_connector {
175            // additional column name may have prefix _rw
176            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
177            // Under this case, we loosen the check for _rw prefix.
178            check_column_name_not_reserved(&name.real_value())?;
179        }
180
181        let nullable: bool = !options
182            .iter()
183            .any(|def| matches!(def.option, ColumnOption::NotNull));
184
185        columns.push(ColumnCatalog {
186            column_desc: ColumnDesc {
187                data_type: bind_data_type(&data_type)?,
188                column_id: ColumnId::placeholder(),
189                name: name.real_value(),
190                generated_or_default_column: None,
191                description: None,
192                additional_column: AdditionalColumn { column_type: None },
193                version: ColumnDescVersion::LATEST,
194                system_column: None,
195                nullable,
196            },
197            is_hidden: false,
198        });
199    }
200
201    Ok(columns)
202}
203
204fn check_generated_column_constraints(
205    column_name: &String,
206    column_id: ColumnId,
207    expr: &ExprImpl,
208    column_catalogs: &[ColumnCatalog],
209    generated_column_names: &[String],
210    pk_column_ids: &[ColumnId],
211) -> Result<()> {
212    let input_refs = expr.collect_input_refs(column_catalogs.len());
213    for idx in input_refs.ones() {
214        let referred_generated_column = &column_catalogs[idx].column_desc.name;
215        if generated_column_names
216            .iter()
217            .any(|c| c == referred_generated_column)
218        {
219            return Err(ErrorCode::BindError(format!(
220                "Generated can not reference another generated column. \
221                But here generated column \"{}\" referenced another generated column \"{}\"",
222                column_name, referred_generated_column
223            ))
224            .into());
225        }
226    }
227
228    if pk_column_ids.contains(&column_id) && expr.is_impure() {
229        return Err(ErrorCode::BindError(format!(
230            "Generated columns with impure expressions should not be part of the primary key. \
231            Here column \"{}\" is defined as part of the primary key.",
232            column_name
233        ))
234        .into());
235    }
236
237    Ok(())
238}
239
240/// Binds constraints that can be only specified in column definitions,
241/// currently generated columns and default columns.
242pub fn bind_sql_column_constraints(
243    session: &SessionImpl,
244    table_name: String,
245    column_catalogs: &mut [ColumnCatalog],
246    columns: &[ColumnDef],
247    pk_column_ids: &[ColumnId],
248) -> Result<()> {
249    let generated_column_names = {
250        let mut names = vec![];
251        for column in columns {
252            for option_def in &column.options {
253                if let ColumnOption::GeneratedColumns(_) = option_def.option {
254                    names.push(column.name.real_value());
255                    break;
256                }
257            }
258        }
259        names
260    };
261
262    let mut binder = Binder::new_for_ddl(session);
263    binder.bind_columns_to_context(table_name, column_catalogs)?;
264
265    for column in columns {
266        let Some(idx) = column_catalogs
267            .iter()
268            .position(|c| c.name() == column.name.real_value())
269        else {
270            // It's possible that we don't follow the user defined columns in SQL but take the
271            // ones resolved from the source, thus missing some columns. Simply ignore them.
272            continue;
273        };
274
275        for option_def in &column.options {
276            match &option_def.option {
277                ColumnOption::GeneratedColumns(expr) => {
278                    binder.set_clause(Some(Clause::GeneratedColumn));
279
280                    let expr_impl = binder.bind_expr(expr).with_context(|| {
281                        format!(
282                            "fail to bind expression in generated column \"{}\"",
283                            column.name.real_value()
284                        )
285                    })?;
286
287                    check_generated_column_constraints(
288                        &column.name.real_value(),
289                        column_catalogs[idx].column_id(),
290                        &expr_impl,
291                        column_catalogs,
292                        &generated_column_names,
293                        pk_column_ids,
294                    )?;
295
296                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
297                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
298                            expr: Some(expr_impl.to_expr_proto()),
299                        }),
300                    );
301                    binder.set_clause(None);
302                }
303                ColumnOption::DefaultValue(expr) => {
304                    let expr_impl = binder
305                        .bind_expr(expr)?
306                        .cast_assign(column_catalogs[idx].data_type())?;
307
308                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
309                    // schema change.
310                    //
311                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
312                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
313                    // here. Actually we do not require purity of the expression here since we're only to get a
314                    // snapshot value.
315                    let rewritten_expr_impl = session
316                        .pinned_snapshot()
317                        .inline_now_proc_time()
318                        .rewrite_expr(expr_impl.clone());
319
320                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
321                        let snapshot_value = snapshot_value?;
322
323                        column_catalogs[idx].column_desc.generated_or_default_column =
324                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
325                                snapshot_value: Some(snapshot_value.to_protobuf()),
326                                expr: Some(expr_impl.to_expr_proto()),
327                                // persist the original expression
328                            }));
329                    } else {
330                        return Err(ErrorCode::BindError(format!(
331                            "Default expression used in column `{}` cannot be evaluated. \
332                            Use generated columns instead if you mean to reference other columns.",
333                            column.name
334                        ))
335                        .into());
336                    }
337                }
338                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
339                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
340                    // should already be set during purifcation. So if we encounter an empty value here, it
341                    // means the user has specified it explicitly in the SQL statement, typically by
342                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
343                    if persisted.is_empty() {
344                        bail_bind_error!(
345                            "DEFAULT INTERNAL is only used for internal purposes, \
346                             please specify a concrete default value"
347                        );
348                    }
349
350                    let desc = DefaultColumnDesc::decode(&**persisted)
351                        .expect("failed to decode persisted `DefaultColumnDesc`");
352
353                    column_catalogs[idx].column_desc.generated_or_default_column =
354                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
355                }
356                _ => {}
357            }
358        }
359    }
360    Ok(())
361}
362
363/// Currently we only support Primary key table constraint, so just return pk names if it exists
364pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
365    let mut pk_column_names = vec![];
366
367    for constraint in table_constraints {
368        match constraint {
369            TableConstraint::Unique {
370                name: _,
371                columns,
372                is_primary: true,
373            } => {
374                if !pk_column_names.is_empty() {
375                    return Err(multiple_pk_definition_err());
376                }
377                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
378            }
379            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
380        }
381    }
382    Ok(pk_column_names)
383}
384
385pub fn bind_sql_pk_names(
386    columns_defs: &[ColumnDef],
387    pk_names_from_table_constraints: Vec<String>,
388) -> Result<Vec<String>> {
389    let mut pk_column_names = pk_names_from_table_constraints;
390
391    for column in columns_defs {
392        for option_def in &column.options {
393            if let ColumnOption::Unique { is_primary: true } = option_def.option {
394                if !pk_column_names.is_empty() {
395                    return Err(multiple_pk_definition_err());
396                }
397                pk_column_names.push(column.name.real_value());
398            };
399        }
400    }
401
402    Ok(pk_column_names)
403}
404
405fn multiple_pk_definition_err() -> RwError {
406    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
407}
408
409/// Binds primary keys defined in SQL.
410///
411/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
412/// added.
413pub fn bind_pk_and_row_id_on_relation(
414    mut columns: Vec<ColumnCatalog>,
415    pk_names: Vec<String>,
416    must_need_pk: bool,
417) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
418    for c in &columns {
419        assert!(c.column_id() != ColumnId::placeholder());
420    }
421
422    // Mapping from column name to column id.
423    let name_to_id = columns
424        .iter()
425        .map(|c| (c.name(), c.column_id()))
426        .collect::<HashMap<_, _>>();
427
428    let mut pk_column_ids: Vec<_> = pk_names
429        .iter()
430        .map(|name| {
431            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
432                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
433            })
434        })
435        .try_collect()?;
436
437    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
438    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
439
440    let row_id_index = need_row_id.then(|| {
441        let column = ColumnCatalog::row_id_column();
442        let index = columns.len();
443        pk_column_ids = vec![column.column_id()];
444        columns.push(column);
445        index
446    });
447
448    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
449        Err(ErrorCode::InvalidInputSyntax(format!(
450            "column \"{col}\" specified more than once"
451        )))?;
452    }
453
454    Ok((columns, pk_column_ids, row_id_index))
455}
456
457/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
458/// stream source.
459#[allow(clippy::too_many_arguments)]
460pub(crate) async fn gen_create_table_plan_with_source(
461    mut handler_args: HandlerArgs,
462    explain_options: ExplainOptions,
463    table_name: ObjectName,
464    column_defs: Vec<ColumnDef>,
465    wildcard_idx: Option<usize>,
466    constraints: Vec<TableConstraint>,
467    format_encode: FormatEncodeOptions,
468    source_watermarks: Vec<SourceWatermark>,
469    mut col_id_gen: ColumnIdGenerator,
470    include_column_options: IncludeOption,
471    props: CreateTableProps,
472    sql_column_strategy: SqlColumnStrategy,
473) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
474    if props.append_only
475        && format_encode.format != Format::Plain
476        && format_encode.format != Format::Native
477    {
478        return Err(ErrorCode::BindError(format!(
479            "Append only table does not support format {}.",
480            format_encode.format
481        ))
482        .into());
483    }
484
485    let session = &handler_args.session;
486    let (with_properties, refresh_mode) =
487        bind_connector_props(&handler_args, &format_encode, false)?;
488    if with_properties.is_shareable_cdc_connector() {
489        generated_columns_check_for_cdc_table(&column_defs)?;
490        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
491    } else if column_defs.iter().any(|col| {
492        col.options
493            .iter()
494            .any(|def| matches!(def.option, ColumnOption::NotNull))
495    }) {
496        // if non-cdc source
497        notice_to_user(
498            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
499        );
500    }
501
502    let db_name: &str = &session.database();
503    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
504
505    // TODO: omit this step if `sql_column_strategy` is `Follow`.
506    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
507        session,
508        &format_encode,
509        Either::Left(&with_properties),
510        CreateSourceType::Table,
511    )
512    .await?;
513
514    let overwrite_options = OverwriteOptions::new(&mut handler_args);
515    let rate_limit = overwrite_options.source_rate_limit;
516    let source = bind_create_source_or_table_with_connector(
517        handler_args.clone(),
518        table_name,
519        format_encode,
520        with_properties,
521        &column_defs,
522        constraints,
523        wildcard_idx,
524        source_watermarks,
525        columns_from_resolve_source,
526        source_info,
527        include_column_options,
528        &mut col_id_gen,
529        CreateSourceType::Table,
530        rate_limit,
531        sql_column_strategy,
532        refresh_mode,
533    )
534    .await?;
535
536    let context = OptimizerContext::new(handler_args, explain_options);
537
538    let (plan, table) = gen_table_plan_with_source(
539        context.into(),
540        schema_name,
541        source.clone(),
542        col_id_gen.into_version(),
543        props,
544    )?;
545
546    Ok((plan, Some(source), table))
547}
548
549/// `gen_create_table_plan` generates the plan for creating a table without an external stream
550/// source.
551#[allow(clippy::too_many_arguments)]
552pub(crate) fn gen_create_table_plan(
553    context: OptimizerContext,
554    table_name: ObjectName,
555    column_defs: Vec<ColumnDef>,
556    constraints: Vec<TableConstraint>,
557    mut col_id_gen: ColumnIdGenerator,
558    source_watermarks: Vec<SourceWatermark>,
559    props: CreateTableProps,
560    is_for_replace_plan: bool,
561) -> Result<(PlanRef, TableCatalog)> {
562    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
563    for c in &mut columns {
564        col_id_gen.generate(c)?;
565    }
566
567    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
568    if !secret_refs.is_empty() || !connection_refs.is_empty() {
569        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
570    }
571
572    gen_create_table_plan_without_source(
573        context,
574        table_name,
575        columns,
576        column_defs,
577        constraints,
578        source_watermarks,
579        col_id_gen.into_version(),
580        props,
581    )
582}
583
584#[allow(clippy::too_many_arguments)]
585pub(crate) fn gen_create_table_plan_without_source(
586    context: OptimizerContext,
587    table_name: ObjectName,
588    columns: Vec<ColumnCatalog>,
589    column_defs: Vec<ColumnDef>,
590    constraints: Vec<TableConstraint>,
591    source_watermarks: Vec<SourceWatermark>,
592    version: TableVersion,
593    props: CreateTableProps,
594) -> Result<(PlanRef, TableCatalog)> {
595    // XXX: Why not bind outside?
596    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
597    let (mut columns, pk_column_ids, row_id_index) =
598        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
599
600    let watermark_descs = bind_source_watermark(
601        context.session_ctx(),
602        table_name.real_value(),
603        source_watermarks,
604        &columns,
605    )?;
606
607    bind_sql_column_constraints(
608        context.session_ctx(),
609        table_name.real_value(),
610        &mut columns,
611        &column_defs,
612        &pk_column_ids,
613    )?;
614    let session = context.session_ctx().clone();
615
616    let db_name = &session.database();
617    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
618
619    let info = CreateTableInfo {
620        columns,
621        pk_column_ids,
622        row_id_index,
623        watermark_descs,
624        source_catalog: None,
625        version,
626    };
627
628    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
629}
630
631fn gen_table_plan_with_source(
632    context: OptimizerContextRef,
633    schema_name: Option<String>,
634    source_catalog: SourceCatalog,
635    version: TableVersion,
636    props: CreateTableProps,
637) -> Result<(PlanRef, TableCatalog)> {
638    let table_name = source_catalog.name.clone();
639
640    let info = CreateTableInfo {
641        columns: source_catalog.columns.clone(),
642        pk_column_ids: source_catalog.pk_col_ids.clone(),
643        row_id_index: source_catalog.row_id_index,
644        watermark_descs: source_catalog.watermark_descs.clone(),
645        source_catalog: Some(source_catalog),
646        version,
647    };
648
649    gen_table_plan_inner(context, schema_name, table_name, info, props)
650}
651
652/// On-conflict behavior either from user input or existing table catalog.
653#[derive(Clone, Copy)]
654pub enum EitherOnConflict {
655    Ast(Option<OnConflict>),
656    Resolved(ConflictBehavior),
657}
658
659impl From<Option<OnConflict>> for EitherOnConflict {
660    fn from(v: Option<OnConflict>) -> Self {
661        Self::Ast(v)
662    }
663}
664
665impl From<ConflictBehavior> for EitherOnConflict {
666    fn from(v: ConflictBehavior) -> Self {
667        Self::Resolved(v)
668    }
669}
670
671impl EitherOnConflict {
672    /// Resolves the conflict behavior based on the given information.
673    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
674        let conflict_behavior = match self {
675            EitherOnConflict::Ast(on_conflict) => {
676                if append_only {
677                    if row_id_as_pk {
678                        // Primary key will be generated, no conflict check needed.
679                        ConflictBehavior::NoCheck
680                    } else {
681                        // User defined PK on append-only table, enforce `DO NOTHING`.
682                        if let Some(on_conflict) = on_conflict
683                            && on_conflict != OnConflict::Nothing
684                        {
685                            return Err(ErrorCode::InvalidInputSyntax(
686                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
687                                     the ON CONFLICT behavior must be DO NOTHING."
688                                    .to_owned(),
689                            )
690                            .into());
691                        }
692                        ConflictBehavior::IgnoreConflict
693                    }
694                } else {
695                    // Default to `UPDATE FULL` for non-append-only tables.
696                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
697                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
698                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
699                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
700                    }
701                }
702            }
703            EitherOnConflict::Resolved(b) => b,
704        };
705
706        Ok(conflict_behavior)
707    }
708}
709
710/// Arguments of the functions that generate a table plan, part 1.
711///
712/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
713/// or resolving based on the user input.
714pub struct CreateTableInfo {
715    pub columns: Vec<ColumnCatalog>,
716    pub pk_column_ids: Vec<ColumnId>,
717    pub row_id_index: Option<usize>,
718    pub watermark_descs: Vec<WatermarkDesc>,
719    pub source_catalog: Option<SourceCatalog>,
720    pub version: TableVersion,
721}
722
723/// Arguments of the functions that generate a table plan, part 2.
724///
725/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
726/// obtained from the input or the context.
727pub struct CreateTableProps {
728    pub definition: String,
729    pub append_only: bool,
730    pub on_conflict: EitherOnConflict,
731    pub with_version_columns: Vec<String>,
732    pub webhook_info: Option<PbWebhookSourceInfo>,
733    pub engine: Engine,
734}
735
736#[allow(clippy::too_many_arguments)]
737fn gen_table_plan_inner(
738    context: OptimizerContextRef,
739    schema_name: Option<String>,
740    table_name: String,
741    info: CreateTableInfo,
742    props: CreateTableProps,
743) -> Result<(PlanRef, TableCatalog)> {
744    let CreateTableInfo {
745        ref columns,
746        row_id_index,
747        ref watermark_descs,
748        ref source_catalog,
749        ..
750    } = info;
751    let CreateTableProps { append_only, .. } = props;
752
753    let (database_id, schema_id) = context
754        .session_ctx()
755        .get_database_and_schema_id_for_create(schema_name)?;
756
757    let session = context.session_ctx().clone();
758    let retention_seconds = context.with_options().retention_seconds();
759
760    let source_node: LogicalPlanRef = LogicalSource::new(
761        source_catalog.clone().map(Rc::new),
762        columns.clone(),
763        row_id_index,
764        SourceNodeKind::CreateTable,
765        context.clone(),
766        None,
767    )?
768    .into();
769
770    let required_cols = FixedBitSet::with_capacity(columns.len());
771    let plan_root = PlanRoot::new_with_logical_plan(
772        source_node,
773        RequiredDist::Any,
774        Order::any(),
775        required_cols,
776        vec![],
777    );
778
779    let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
780
781    if !append_only && has_non_ttl_watermark {
782        return Err(ErrorCode::NotSupported(
783            "Defining watermarks on table requires the table to be append only.".to_owned(),
784            "Use the key words `APPEND ONLY`".to_owned(),
785        )
786        .into());
787    }
788
789    if !append_only && retention_seconds.is_some() {
790        if session
791            .config()
792            .unsafe_enable_storage_retention_for_non_append_only_tables()
793        {
794            tracing::warn!(
795                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
796                table_name
797            );
798            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
799                                  This may lead to stream inconsistency and unrecoverable \
800                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
801                                  corresponding to the TTLed primary keys";
802            session.notice_to_user(NOTICE);
803        } else {
804            return Err(ErrorCode::NotSupported(
805                "Defining retention seconds on table requires the table to be append only."
806                    .to_owned(),
807                "Use the key words `APPEND ONLY`".to_owned(),
808            )
809            .into());
810        }
811    }
812
813    let materialize =
814        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
815
816    let mut table = materialize.table().clone();
817    table.owner = session.user_id();
818
819    Ok((materialize.into(), table))
820}
821
822/// Generate stream plan for cdc table based on shared source.
823/// In replace workflow, the `table_id` is the id of the table to be replaced
824/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
825#[allow(clippy::too_many_arguments)]
826pub(crate) fn gen_create_table_plan_for_cdc_table(
827    context: OptimizerContextRef,
828    source: Arc<SourceCatalog>,
829    external_table_name: String,
830    column_defs: Vec<ColumnDef>,
831    mut columns: Vec<ColumnCatalog>,
832    pk_names: Vec<String>,
833    cdc_with_options: WithOptionsSecResolved,
834    mut col_id_gen: ColumnIdGenerator,
835    on_conflict: Option<OnConflict>,
836    with_version_columns: Vec<String>,
837    include_column_options: IncludeOption,
838    table_name: ObjectName,
839    resolved_table_name: String, // table name without schema prefix
840    database_id: DatabaseId,
841    schema_id: SchemaId,
842    table_id: TableId,
843    engine: Engine,
844) -> Result<(PlanRef, TableCatalog)> {
845    let session = context.session_ctx().clone();
846
847    // append additional columns to the end
848    handle_addition_columns(
849        None,
850        &cdc_with_options,
851        include_column_options,
852        &mut columns,
853        true,
854    )?;
855
856    for c in &mut columns {
857        col_id_gen.generate(c)?;
858    }
859
860    let (mut columns, pk_column_ids, _row_id_index) =
861        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
862
863    // NOTES: In auto schema change, default value is not provided in column definition.
864    bind_sql_column_constraints(
865        context.session_ctx(),
866        table_name.real_value(),
867        &mut columns,
868        &column_defs,
869        &pk_column_ids,
870    )?;
871
872    let definition = context.normalized_sql().to_owned();
873
874    let pk_column_indices = {
875        let mut id_to_idx = HashMap::new();
876        columns.iter().enumerate().for_each(|(idx, c)| {
877            id_to_idx.insert(c.column_id(), idx);
878        });
879        // pk column id must exist in table columns.
880        pk_column_ids
881            .iter()
882            .map(|c| id_to_idx.get(c).copied().unwrap())
883            .collect_vec()
884    };
885    let table_pk = pk_column_indices
886        .iter()
887        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
888        .collect();
889
890    let (options, secret_refs) = cdc_with_options.into_parts();
891
892    let non_generated_column_descs = columns
893        .iter()
894        .filter(|&c| !c.is_generated())
895        .map(|c| c.column_desc.clone())
896        .collect_vec();
897    let non_generated_column_num = non_generated_column_descs.len();
898    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
899    let cdc_table_desc = CdcTableDesc {
900        table_id,
901        source_id: source.id, // id of cdc source streaming job
902        external_table_name: external_table_name.clone(),
903        pk: table_pk,
904        columns: non_generated_column_descs,
905        stream_key: pk_column_indices,
906        connect_properties: options,
907        secret_refs,
908    };
909
910    tracing::debug!(?cdc_table_desc, "create cdc table");
911    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
912
913    let logical_scan = LogicalCdcScan::create(
914        external_table_name.clone(),
915        Rc::new(cdc_table_desc),
916        context.clone(),
917        options,
918    );
919
920    let scan_node: LogicalPlanRef = logical_scan.into();
921    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
922    let plan_root = PlanRoot::new_with_logical_plan(
923        scan_node,
924        RequiredDist::Any,
925        Order::any(),
926        required_cols,
927        vec![],
928    );
929
930    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
931    let materialize = plan_root.gen_table_plan(
932        context,
933        resolved_table_name,
934        database_id,
935        schema_id,
936        CreateTableInfo {
937            columns,
938            pk_column_ids,
939            row_id_index: None,
940            watermark_descs: vec![],
941            source_catalog: Some((*source).clone()),
942            version: col_id_gen.into_version(),
943        },
944        CreateTableProps {
945            definition,
946            append_only: false,
947            on_conflict: on_conflict.into(),
948            with_version_columns,
949            webhook_info: None,
950            engine,
951        },
952    )?;
953
954    let mut table = materialize.table().clone();
955    table.owner = session.user_id();
956    table.cdc_table_id = Some(cdc_table_id);
957    table.cdc_table_type = Some(cdc_table_type);
958    Ok((materialize.into(), table))
959}
960
961/// Derive connector properties and normalize `external_table_name` for CDC tables.
962///
963/// Returns (`connector_properties`, `normalized_external_table_name`) where:
964/// - For SQL Server: Normalizes 'db.schema.table' (3 parts) to 'schema.table' (2 parts),
965///   because users can optionally include database name for verification, but it needs to be
966///   stripped to match the format returned by Debezium's `extract_table_name()`.
967/// - For MySQL/Postgres: Returns the original `external_table_name` unchanged.
968fn derive_with_options_for_cdc_table(
969    source_with_properties: &WithOptionsSecResolved,
970    external_table_name: String,
971) -> Result<(WithOptionsSecResolved, String)> {
972    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
973    // we should remove the prefix from `full_table_name`
974    let source_database_name: &str = source_with_properties
975        .get("database.name")
976        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
977        .as_str();
978    let mut with_options = source_with_properties.clone();
979    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
980        match connector.as_str() {
981            MYSQL_CDC_CONNECTOR => {
982                // MySQL doesn't allow '.' in database name and table name, so we can split the
983                // external table name by '.' to get the table name
984                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
985                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
986                })?;
987                // We allow multiple database names in the source definition
988                if !source_database_name
989                    .split(',')
990                    .map(|s| s.trim())
991                    .any(|name| name == db_name)
992                {
993                    return Err(anyhow!(
994                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
995                        db_name,
996                        source_database_name
997                    ).into());
998                }
999                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1000                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1001                // Return original external_table_name unchanged for MySQL
1002                return Ok((with_options, external_table_name));
1003            }
1004            POSTGRES_CDC_CONNECTOR => {
1005                let (schema_name, table_name) = external_table_name
1006                    .split_once('.')
1007                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1008
1009                // insert 'schema.name' into connect properties
1010                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1011                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1012                // Return original external_table_name unchanged for Postgres
1013                return Ok((with_options, external_table_name));
1014            }
1015            SQL_SERVER_CDC_CONNECTOR => {
1016                // SQL Server external table name must be in one of two formats:
1017                // 1. 'schemaName.tableName' (2 parts) - database is already specified in source
1018                // 2. 'databaseName.schemaName.tableName' (3 parts) - for explicit verification
1019                //
1020                // We do NOT allow single table name (e.g., 't') because:
1021                // - Unlike database name (already in source), schema name is NOT pre-specified
1022                // - User must explicitly provide schema (even if it's 'dbo')
1023                let parts: Vec<&str> = external_table_name.split('.').collect();
1024                let (schema_name, table_name) = match parts.len() {
1025                    3 => {
1026                        // Format: database.schema.table
1027                        // Verify that the database name matches the one in source definition
1028                        let db_name = parts[0];
1029                        let schema_name = parts[1];
1030                        let table_name = parts[2];
1031
1032                        if db_name != source_database_name {
1033                            return Err(anyhow!(
1034                                "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1035                                 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1036                                db_name,
1037                                source_database_name
1038                            ).into());
1039                        }
1040                        (schema_name, table_name)
1041                    }
1042                    2 => {
1043                        // Format: schema.table (recommended)
1044                        // Database name is taken from source definition
1045                        let schema_name = parts[0];
1046                        let table_name = parts[1];
1047                        (schema_name, table_name)
1048                    }
1049                    1 => {
1050                        // Format: table only
1051                        // Reject with clear error message
1052                        return Err(anyhow!(
1053                            "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1054                             Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1055                            external_table_name,
1056                            external_table_name,
1057                            source_database_name,
1058                            external_table_name
1059                        ).into());
1060                    }
1061                    _ => {
1062                        // Invalid format (4+ parts or empty)
1063                        return Err(anyhow!(
1064                            "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1065                            external_table_name
1066                        ).into());
1067                    }
1068                };
1069
1070                // Insert schema and table names into connector properties
1071                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1072                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1073
1074                // Normalize external_table_name to 'schema.table' format
1075                // This ensures consistency with extract_table_name() in message.rs
1076                let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1077                return Ok((with_options, normalized_external_table_name));
1078            }
1079            _ => {
1080                return Err(RwError::from(anyhow!(
1081                    "connector {} is not supported for cdc table",
1082                    connector
1083                )));
1084            }
1085        };
1086    }
1087    unreachable!("All valid CDC connectors should have returned by now")
1088}
1089
1090#[allow(clippy::too_many_arguments)]
1091pub(super) async fn handle_create_table_plan(
1092    handler_args: HandlerArgs,
1093    explain_options: ExplainOptions,
1094    format_encode: Option<FormatEncodeOptions>,
1095    cdc_table_info: Option<CdcTableInfo>,
1096    table_name: &ObjectName,
1097    column_defs: Vec<ColumnDef>,
1098    wildcard_idx: Option<usize>,
1099    constraints: Vec<TableConstraint>,
1100    source_watermarks: Vec<SourceWatermark>,
1101    append_only: bool,
1102    on_conflict: Option<OnConflict>,
1103    with_version_columns: Vec<String>,
1104    include_column_options: IncludeOption,
1105    webhook_info: Option<WebhookSourceInfo>,
1106    engine: Engine,
1107) -> Result<(
1108    PlanRef,
1109    Option<SourceCatalog>,
1110    TableCatalog,
1111    TableJobType,
1112    Option<SourceId>,
1113)> {
1114    let col_id_gen = ColumnIdGenerator::new_initial();
1115    let format_encode = check_create_table_with_source(
1116        &handler_args.with_options,
1117        format_encode,
1118        &include_column_options,
1119        &cdc_table_info,
1120    )?;
1121    let webhook_info = webhook_info
1122        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1123        .transpose()?;
1124
1125    let props = CreateTableProps {
1126        definition: handler_args.normalized_sql.clone(),
1127        append_only,
1128        on_conflict: on_conflict.into(),
1129        with_version_columns: with_version_columns.clone(),
1130        webhook_info,
1131        engine,
1132    };
1133
1134    let ((plan, source, table), job_type, shared_shource_id) = match (
1135        format_encode,
1136        cdc_table_info.as_ref(),
1137    ) {
1138        (Some(format_encode), None) => (
1139            gen_create_table_plan_with_source(
1140                handler_args,
1141                explain_options,
1142                table_name.clone(),
1143                column_defs,
1144                wildcard_idx,
1145                constraints,
1146                format_encode,
1147                source_watermarks,
1148                col_id_gen,
1149                include_column_options,
1150                props,
1151                SqlColumnStrategy::FollowChecked,
1152            )
1153            .await?,
1154            TableJobType::General,
1155            None,
1156        ),
1157        (None, None) => {
1158            let context = OptimizerContext::new(handler_args, explain_options);
1159            let (plan, table) = gen_create_table_plan(
1160                context,
1161                table_name.clone(),
1162                column_defs,
1163                constraints,
1164                col_id_gen,
1165                source_watermarks,
1166                props,
1167                false,
1168            )?;
1169
1170            ((plan, None, table), TableJobType::General, None)
1171        }
1172
1173        (None, Some(cdc_table)) => {
1174            sanity_check_for_table_on_cdc_source(
1175                append_only,
1176                &column_defs,
1177                &wildcard_idx,
1178                &constraints,
1179                &source_watermarks,
1180            )?;
1181
1182            generated_columns_check_for_cdc_table(&column_defs)?;
1183            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1184
1185            let session = &handler_args.session;
1186            let db_name = &session.database();
1187            let user_name = &session.user_name();
1188            let search_path = session.config().search_path();
1189            let (schema_name, resolved_table_name) =
1190                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1191            let (database_id, schema_id) =
1192                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1193
1194            // cdc table cannot be append-only
1195            let (source_schema, source_name) =
1196                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1197
1198            let source = {
1199                let catalog_reader = session.env().catalog_reader().read_guard();
1200                let schema_path =
1201                    SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1202
1203                let (source, _) = catalog_reader.get_source_by_name(
1204                    db_name,
1205                    schema_path,
1206                    source_name.as_str(),
1207                )?;
1208                source.clone()
1209            };
1210            let (cdc_with_options, normalized_external_table_name) =
1211                derive_with_options_for_cdc_table(
1212                    &source.with_properties,
1213                    cdc_table.external_table_name.clone(),
1214                )?;
1215
1216            let (columns, pk_names) = match wildcard_idx {
1217                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1218                None => {
1219                    for column_def in &column_defs {
1220                        for option_def in &column_def.options {
1221                            if let ColumnOption::DefaultValue(_)
1222                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1223                            {
1224                                return Err(ErrorCode::NotSupported(
1225                                            "Default value for columns defined on the table created from a CDC source".into(),
1226                                            "Remove the default value expression in the column definitions".into(),
1227                                        )
1228                                            .into());
1229                            }
1230                        }
1231                    }
1232
1233                    let (columns, pk_names) =
1234                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1235                    // read default value definition from external db
1236                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1237                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1238                        .context("failed to extract external table config")?;
1239
1240                    (columns, pk_names)
1241                }
1242            };
1243
1244            let context: OptimizerContextRef =
1245                OptimizerContext::new(handler_args, explain_options).into();
1246            let shared_source_id = source.id;
1247            let (plan, table) = gen_create_table_plan_for_cdc_table(
1248                context,
1249                source,
1250                normalized_external_table_name,
1251                column_defs,
1252                columns,
1253                pk_names,
1254                cdc_with_options,
1255                col_id_gen,
1256                on_conflict,
1257                with_version_columns,
1258                include_column_options,
1259                table_name.clone(),
1260                resolved_table_name,
1261                database_id,
1262                schema_id,
1263                TableId::placeholder(),
1264                engine,
1265            )?;
1266
1267            (
1268                (plan, None, table),
1269                TableJobType::SharedCdcSource,
1270                Some(shared_source_id),
1271            )
1272        }
1273        (Some(_), Some(_)) => {
1274            return Err(ErrorCode::NotSupported(
1275                "Data format and encoding format doesn't apply to table created from a CDC source"
1276                    .into(),
1277                "Remove the FORMAT and ENCODE specification".into(),
1278            )
1279            .into());
1280        }
1281    };
1282    Ok((plan, source, table, job_type, shared_shource_id))
1283}
1284
1285// For both table from cdc source and table with cdc connector
1286fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1287    let mut found_generated_column = false;
1288    for column in columns {
1289        let mut is_generated = false;
1290
1291        for option_def in &column.options {
1292            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1293                is_generated = true;
1294                break;
1295            }
1296        }
1297
1298        if is_generated {
1299            found_generated_column = true;
1300        } else if found_generated_column {
1301            return Err(ErrorCode::NotSupported(
1302                "Non-generated column found after a generated column.".into(),
1303                "Ensure that all generated columns appear at the end of the cdc table definition."
1304                    .into(),
1305            )
1306            .into());
1307        }
1308    }
1309    Ok(())
1310}
1311
1312// For both table from cdc source and table with cdc connector
1313fn not_null_check_for_cdc_table(
1314    wildcard_idx: &Option<usize>,
1315    column_defs: &Vec<ColumnDef>,
1316) -> Result<()> {
1317    if !wildcard_idx.is_some()
1318        && column_defs.iter().any(|col| {
1319            col.options
1320                .iter()
1321                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1322        })
1323    {
1324        return Err(ErrorCode::NotSupported(
1325            "CDC table with NOT NULL constraint is not supported".to_owned(),
1326            "Please remove the NOT NULL constraint for columns".to_owned(),
1327        )
1328        .into());
1329    }
1330    Ok(())
1331}
1332
1333// Only for table from cdc source
1334fn sanity_check_for_table_on_cdc_source(
1335    append_only: bool,
1336    column_defs: &Vec<ColumnDef>,
1337    wildcard_idx: &Option<usize>,
1338    constraints: &Vec<TableConstraint>,
1339    source_watermarks: &Vec<SourceWatermark>,
1340) -> Result<()> {
1341    // wildcard cannot be used with column definitions
1342    if wildcard_idx.is_some() && !column_defs.is_empty() {
1343        return Err(ErrorCode::NotSupported(
1344            "wildcard(*) and column definitions cannot be used together".to_owned(),
1345            "Remove the wildcard or column definitions".to_owned(),
1346        )
1347        .into());
1348    }
1349
1350    // cdc table must have primary key constraint or primary key column
1351    if !wildcard_idx.is_some()
1352        && !constraints.iter().any(|c| {
1353            matches!(
1354                c,
1355                TableConstraint::Unique {
1356                    is_primary: true,
1357                    ..
1358                }
1359            )
1360        })
1361        && !column_defs.iter().any(|col| {
1362            col.options
1363                .iter()
1364                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1365        })
1366    {
1367        return Err(ErrorCode::NotSupported(
1368            "CDC table without primary key constraint is not supported".to_owned(),
1369            "Please define a primary key".to_owned(),
1370        )
1371        .into());
1372    }
1373
1374    if append_only {
1375        return Err(ErrorCode::NotSupported(
1376            "append only modifier on the table created from a CDC source".into(),
1377            "Remove the APPEND ONLY clause".into(),
1378        )
1379        .into());
1380    }
1381
1382    if !source_watermarks.is_empty() {
1383        return Err(ErrorCode::NotSupported(
1384            "watermark defined on the table created from a CDC source".into(),
1385            "Remove the Watermark definitions".into(),
1386        )
1387        .into());
1388    }
1389
1390    Ok(())
1391}
1392
1393/// Derive schema for cdc table when create a new Table or alter an existing Table
1394async fn bind_cdc_table_schema_externally(
1395    cdc_with_options: WithOptionsSecResolved,
1396) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1397    // read cdc table schema from external db or parsing the schema from SQL definitions
1398    let (options, secret_refs) = cdc_with_options.into_parts();
1399    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1400        .context("failed to extract external table config")?;
1401
1402    let table = ExternalTableImpl::connect(config)
1403        .await
1404        .context("failed to auto derive table schema")?;
1405
1406    Ok((
1407        table
1408            .column_descs()
1409            .iter()
1410            .cloned()
1411            .map(|column_desc| ColumnCatalog {
1412                column_desc,
1413                is_hidden: false,
1414            })
1415            .collect(),
1416        table.pk_names().clone(),
1417    ))
1418}
1419
1420/// Derive schema for cdc table when create a new Table or alter an existing Table
1421fn bind_cdc_table_schema(
1422    column_defs: &Vec<ColumnDef>,
1423    constraints: &Vec<TableConstraint>,
1424    is_for_replace_plan: bool,
1425) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1426    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1427
1428    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1429    Ok((columns, pk_names))
1430}
1431
1432#[allow(clippy::too_many_arguments)]
1433pub async fn handle_create_table(
1434    handler_args: HandlerArgs,
1435    table_name: ObjectName,
1436    column_defs: Vec<ColumnDef>,
1437    wildcard_idx: Option<usize>,
1438    constraints: Vec<TableConstraint>,
1439    if_not_exists: bool,
1440    format_encode: Option<FormatEncodeOptions>,
1441    source_watermarks: Vec<SourceWatermark>,
1442    append_only: bool,
1443    on_conflict: Option<OnConflict>,
1444    with_version_columns: Vec<String>,
1445    cdc_table_info: Option<CdcTableInfo>,
1446    include_column_options: IncludeOption,
1447    webhook_info: Option<WebhookSourceInfo>,
1448    ast_engine: risingwave_sqlparser::ast::Engine,
1449) -> Result<RwPgResponse> {
1450    let session = handler_args.session.clone();
1451
1452    if append_only {
1453        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1454    }
1455
1456    session.check_cluster_limits().await?;
1457
1458    let engine = match ast_engine {
1459        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1460        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1461    };
1462
1463    if let Either::Right(resp) = session.check_relation_name_duplicated(
1464        table_name.clone(),
1465        StatementType::CREATE_TABLE,
1466        if_not_exists,
1467    )? {
1468        return Ok(resp);
1469    }
1470
1471    let (graph, source, hummock_table, job_type, shared_source_id) = {
1472        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1473            handler_args.clone(),
1474            ExplainOptions::default(),
1475            format_encode,
1476            cdc_table_info,
1477            &table_name,
1478            column_defs.clone(),
1479            wildcard_idx,
1480            constraints.clone(),
1481            source_watermarks,
1482            append_only,
1483            on_conflict,
1484            with_version_columns,
1485            include_column_options,
1486            webhook_info,
1487            engine,
1488        )
1489        .await?;
1490        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1491
1492        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1493
1494        (graph, source, table, job_type, shared_source_id)
1495    };
1496
1497    tracing::trace!(
1498        "name={}, graph=\n{}",
1499        table_name,
1500        serde_json::to_string_pretty(&graph).unwrap()
1501    );
1502
1503    let dependencies = shared_source_id
1504        .map(|id| HashSet::from([id.as_object_id()]))
1505        .unwrap_or_default();
1506
1507    // Handle engine
1508    match engine {
1509        Engine::Hummock => {
1510            let catalog_writer = session.catalog_writer()?;
1511            let action = match job_type {
1512                TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1513                _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1514            };
1515            execute_with_long_running_notification(
1516                catalog_writer.create_table(
1517                    source.map(|s| s.to_prost()),
1518                    hummock_table.to_prost(),
1519                    graph,
1520                    job_type,
1521                    if_not_exists,
1522                    dependencies,
1523                ),
1524                &session,
1525                "CREATE TABLE",
1526                action,
1527            )
1528            .await?;
1529        }
1530        Engine::Iceberg => {
1531            let hummock_table_name = hummock_table.name.clone();
1532            session.create_staging_table(hummock_table.clone());
1533            let res = create_iceberg_engine_table(
1534                session.clone(),
1535                handler_args,
1536                source.map(|s| s.to_prost()),
1537                hummock_table,
1538                graph,
1539                table_name,
1540                job_type,
1541                if_not_exists,
1542            )
1543            .await;
1544            session.drop_staging_table(&hummock_table_name);
1545            res?
1546        }
1547    }
1548
1549    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1550}
1551
1552/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1553///
1554/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1555/// 2. create a hummock table
1556/// 3. create an iceberg sink
1557/// 4. create an iceberg source
1558///
1559/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1560#[allow(clippy::too_many_arguments)]
1561pub async fn create_iceberg_engine_table(
1562    session: Arc<SessionImpl>,
1563    handler_args: HandlerArgs,
1564    mut source: Option<PbSource>,
1565    table: TableCatalog,
1566    graph: StreamFragmentGraph,
1567    table_name: ObjectName,
1568    job_type: PbTableJobType,
1569    if_not_exists: bool,
1570) -> Result<()> {
1571    let rw_db_name = session
1572        .env()
1573        .catalog_reader()
1574        .read_guard()
1575        .get_database_by_id(table.database_id)?
1576        .name()
1577        .to_owned();
1578    let rw_schema_name = session
1579        .env()
1580        .catalog_reader()
1581        .read_guard()
1582        .get_schema_by_id(table.database_id, table.schema_id)?
1583        .name()
1584        .clone();
1585    let iceberg_catalog_name = rw_db_name.clone();
1586    let iceberg_database_name = rw_schema_name.clone();
1587    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1588
1589    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1590    let sink_decouple = session.config().sink_decouple();
1591    if matches!(sink_decouple, SinkDecouple::Disable) {
1592        bail!(
1593            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1594        );
1595    }
1596
1597    let mut connection_ref = BTreeMap::new();
1598    let with_common = if iceberg_engine_connection.is_empty() {
1599        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1600    } else {
1601        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1602        assert_eq!(parts.len(), 2);
1603        let connection_catalog =
1604            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1605        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1606            if params.connection_type == ConnectionType::Iceberg as i32 {
1607                // With iceberg engine connection:
1608                connection_ref.insert(
1609                    "connection".to_owned(),
1610                    ConnectionRefValue {
1611                        connection_name: ObjectName::from(vec![
1612                            Ident::from(parts[0]),
1613                            Ident::from(parts[1]),
1614                        ]),
1615                    },
1616                );
1617
1618                let mut with_common = BTreeMap::new();
1619                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1620                with_common.insert("database.name".to_owned(), iceberg_database_name);
1621                with_common.insert("table.name".to_owned(), iceberg_table_name);
1622
1623                let hosted_catalog = params
1624                    .properties
1625                    .get("hosted_catalog")
1626                    .map(|s| s.eq_ignore_ascii_case("true"))
1627                    .unwrap_or(false);
1628                if hosted_catalog {
1629                    let meta_client = session.env().meta_client();
1630                    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1631
1632                    let meta_store_endpoint =
1633                        url::Url::parse(&meta_store_endpoint).map_err(|_| {
1634                            ErrorCode::InternalError(
1635                                "failed to parse the meta store endpoint".to_owned(),
1636                            )
1637                        })?;
1638                    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1639                    let meta_store_user = meta_store_endpoint.username().to_owned();
1640                    let meta_store_password = match meta_store_endpoint.password() {
1641                        Some(password) => percent_decode_str(password)
1642                            .decode_utf8()
1643                            .map_err(|_| {
1644                                ErrorCode::InternalError(
1645                                    "failed to parse password from meta store endpoint".to_owned(),
1646                                )
1647                            })?
1648                            .into_owned(),
1649                        None => "".to_owned(),
1650                    };
1651                    let meta_store_host = meta_store_endpoint
1652                        .host_str()
1653                        .ok_or_else(|| {
1654                            ErrorCode::InternalError(
1655                                "failed to parse host from meta store endpoint".to_owned(),
1656                            )
1657                        })?
1658                        .to_owned();
1659                    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1660                        ErrorCode::InternalError(
1661                            "failed to parse port from meta store endpoint".to_owned(),
1662                        )
1663                    })?;
1664                    let meta_store_database = meta_store_endpoint
1665                        .path()
1666                        .trim_start_matches('/')
1667                        .to_owned();
1668
1669                    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1670                        bail!("failed to parse meta backend: {}", meta_store_backend);
1671                    };
1672
1673                    let catalog_uri = match meta_backend {
1674                        MetaBackend::Postgres => {
1675                            format!(
1676                                "jdbc:postgresql://{}:{}/{}",
1677                                meta_store_host, meta_store_port, meta_store_database
1678                            )
1679                        }
1680                        MetaBackend::Mysql => {
1681                            format!(
1682                                "jdbc:mysql://{}:{}/{}",
1683                                meta_store_host, meta_store_port, meta_store_database
1684                            )
1685                        }
1686                        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1687                            bail!(
1688                                "Unsupported meta backend for iceberg engine table: {}",
1689                                meta_store_backend
1690                            );
1691                        }
1692                    };
1693
1694                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1695                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1696                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1697                    with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1698                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1699                }
1700
1701                with_common
1702            } else {
1703                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1704                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1705                )));
1706            }
1707        } else {
1708            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1709                "Private Link Service has been deprecated. Please create a new connection instead."
1710                    .to_owned(),
1711            )));
1712        }
1713    };
1714
1715    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1716    // Fetch primary key from columns
1717    let mut pks = table
1718        .pk_column_names()
1719        .iter()
1720        .map(|c| c.to_string())
1721        .collect::<Vec<String>>();
1722
1723    // For the table without primary key. We will use `_row_id` as primary key
1724    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1725        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1726        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1727            "select {} as {}, * from {}",
1728            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1729        ))
1730        .context("unable to parse query")?
1731        .try_into()
1732        .unwrap();
1733
1734        let Statement::Query(query) = &stmt else {
1735            panic!("unexpected statement: {:?}", stmt);
1736        };
1737        CreateSink::AsQuery(query.clone())
1738    } else {
1739        CreateSink::From(table_name.clone())
1740    };
1741
1742    let mut sink_name = table_name.clone();
1743    *sink_name.0.last_mut().unwrap() = Ident::from(
1744        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1745    );
1746    let create_sink_stmt = CreateSinkStatement {
1747        if_not_exists: false,
1748        sink_name,
1749        with_properties: WithProperties(vec![]),
1750        sink_from,
1751        columns: vec![],
1752        emit_mode: None,
1753        sink_schema: None,
1754        into_table_name: None,
1755    };
1756
1757    let mut sink_handler_args = handler_args.clone();
1758
1759    let mut sink_with = with_common.clone();
1760
1761    if table.append_only {
1762        sink_with.insert("type".to_owned(), "append-only".to_owned());
1763    } else {
1764        sink_with.insert("primary_key".to_owned(), pks.join(","));
1765        sink_with.insert("type".to_owned(), "upsert".to_owned());
1766    }
1767    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1768    //
1769    // Note: in theory, we don't need to backfill from the table to the sink,
1770    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1771    // so it may have potential data loss problem on the first barrier.
1772    //
1773    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1774    // data will be present in hummock table.
1775    //
1776    // For append-only table, we need to be more careful.
1777    //
1778    // The possible cases for a table:
1779    // - For table without connector: it doesn't matter, since there's no data before the table is created
1780    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1781    //   + If we support blocking DDL for table with connector, we need to be careful.
1782    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1783    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1784    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1785    let commit_checkpoint_interval = handler_args
1786        .with_options
1787        .get(COMMIT_CHECKPOINT_INTERVAL)
1788        .map(|v| v.to_owned())
1789        .unwrap_or_else(|| "60".to_owned());
1790    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1791        ErrorCode::InvalidInputSyntax(format!(
1792            "commit_checkpoint_interval must be greater than 0: {}",
1793            commit_checkpoint_interval
1794        ))
1795    })?;
1796
1797    if commit_checkpoint_interval == 0 {
1798        bail!("commit_checkpoint_interval must be greater than 0");
1799    }
1800
1801    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1802    source
1803        .as_mut()
1804        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1805
1806    let sink_decouple = session.config().sink_decouple();
1807    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1808        bail!(
1809            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1810        )
1811    }
1812
1813    sink_with.insert(
1814        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1815        commit_checkpoint_interval.to_string(),
1816    );
1817    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1818
1819    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1820
1821    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1822        match enable_compaction.to_lowercase().as_str() {
1823            "true" => {
1824                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1825            }
1826            "false" => {
1827                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1828            }
1829            _ => {
1830                return Err(ErrorCode::InvalidInputSyntax(format!(
1831                    "enable_compaction must be true or false: {}",
1832                    enable_compaction
1833                ))
1834                .into());
1835            }
1836        }
1837
1838        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1839        source
1840            .as_mut()
1841            .map(|x| x.with_properties.remove("enable_compaction"));
1842    } else {
1843        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1844    }
1845
1846    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1847        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1848            ErrorCode::InvalidInputSyntax(format!(
1849                "compaction_interval_sec must be greater than 0: {}",
1850                commit_checkpoint_interval
1851            ))
1852        })?;
1853        if compaction_interval_sec == 0 {
1854            bail!("compaction_interval_sec must be greater than 0");
1855        }
1856        sink_with.insert(
1857            "compaction_interval_sec".to_owned(),
1858            compaction_interval_sec.to_string(),
1859        );
1860        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1861        source
1862            .as_mut()
1863            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1864    }
1865
1866    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1867        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1868    {
1869        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1870        source
1871            .as_mut()
1872            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1873        match enable_snapshot_expiration.to_lowercase().as_str() {
1874            "true" => {
1875                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1876                true
1877            }
1878            "false" => {
1879                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1880                false
1881            }
1882            _ => {
1883                return Err(ErrorCode::InvalidInputSyntax(format!(
1884                    "enable_snapshot_expiration must be true or false: {}",
1885                    enable_snapshot_expiration
1886                ))
1887                .into());
1888            }
1889        }
1890    } else {
1891        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1892        true
1893    };
1894
1895    if has_enabled_snapshot_expiration {
1896        // configuration for snapshot expiration
1897        if let Some(snapshot_expiration_retain_last) = handler_args
1898            .with_options
1899            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1900        {
1901            sink_with.insert(
1902                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1903                snapshot_expiration_retain_last.to_owned(),
1904            );
1905            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1906            source
1907                .as_mut()
1908                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1909        }
1910
1911        if let Some(snapshot_expiration_max_age) = handler_args
1912            .with_options
1913            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1914        {
1915            sink_with.insert(
1916                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1917                snapshot_expiration_max_age.to_owned(),
1918            );
1919            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1920            source
1921                .as_mut()
1922                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1923        }
1924
1925        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1926            .with_options
1927            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1928        {
1929            sink_with.insert(
1930                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1931                snapshot_expiration_clear_expired_files.to_owned(),
1932            );
1933            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1934            source.as_mut().map(|x| {
1935                x.with_properties
1936                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1937            });
1938        }
1939
1940        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1941            .with_options
1942            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1943        {
1944            sink_with.insert(
1945                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1946                snapshot_expiration_clear_expired_meta_data.to_owned(),
1947            );
1948            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1949            source.as_mut().map(|x| {
1950                x.with_properties
1951                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1952            });
1953        }
1954    }
1955
1956    if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1957        let format_version = format_version.parse::<u8>().map_err(|_| {
1958            ErrorCode::InvalidInputSyntax(format!(
1959                "format_version must be 1, 2 or 3: {}",
1960                format_version
1961            ))
1962        })?;
1963        if format_version != 1 && format_version != 2 && format_version != 3 {
1964            bail!("format_version must be 1, 2 or 3");
1965        }
1966        sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
1967
1968        // remove format_version from source options, otherwise it will be considered as an unknown field.
1969        source
1970            .as_mut()
1971            .map(|x| x.with_properties.remove(FORMAT_VERSION));
1972    }
1973
1974    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1975        let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1976            ErrorCode::InvalidInputSyntax(format!(
1977                "invalid write_mode: {}, must be one of: {}, {}",
1978                write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1979            ))
1980        })?;
1981
1982        match write_mode {
1983            IcebergWriteMode::MergeOnRead => {
1984                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1985            }
1986
1987            IcebergWriteMode::CopyOnWrite => {
1988                if table.append_only {
1989                    return Err(ErrorCode::NotSupported(
1990                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1991                        "Please use MERGE ON READ instead".to_owned(),
1992                    )
1993                    .into());
1994                }
1995
1996                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1997            }
1998        }
1999
2000        // remove write_mode from source options, otherwise it will be considered as an unknown field.
2001        source
2002            .as_mut()
2003            .map(|x| x.with_properties.remove("write_mode"));
2004    } else {
2005        sink_with.insert(
2006            WRITE_MODE.to_owned(),
2007            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2008        );
2009    }
2010
2011    if let Some(max_snapshots_num_before_compaction) =
2012        handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2013    {
2014        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2015            .parse::<u32>()
2016            .map_err(|_| {
2017                ErrorCode::InvalidInputSyntax(format!(
2018                    "{} must be greater than 0: {}",
2019                    COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2020                ))
2021            })?;
2022
2023        if max_snapshots_num_before_compaction == 0 {
2024            bail!(format!(
2025                "{} must be greater than 0",
2026                COMPACTION_MAX_SNAPSHOTS_NUM
2027            ));
2028        }
2029
2030        sink_with.insert(
2031            COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2032            max_snapshots_num_before_compaction.to_string(),
2033        );
2034
2035        // remove from source options, otherwise it will be considered as an unknown field.
2036        source
2037            .as_mut()
2038            .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2039    }
2040
2041    if let Some(small_files_threshold_mb) = handler_args
2042        .with_options
2043        .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2044    {
2045        let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2046            ErrorCode::InvalidInputSyntax(format!(
2047                "{} must be greater than 0: {}",
2048                COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2049            ))
2050        })?;
2051        if small_files_threshold_mb == 0 {
2052            bail!(format!(
2053                "{} must be a greater than 0",
2054                COMPACTION_SMALL_FILES_THRESHOLD_MB
2055            ));
2056        }
2057        sink_with.insert(
2058            COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2059            small_files_threshold_mb.to_string(),
2060        );
2061
2062        // remove from source options, otherwise it will be considered as an unknown field.
2063        source.as_mut().map(|x| {
2064            x.with_properties
2065                .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2066        });
2067    }
2068
2069    if let Some(delete_files_count_threshold) = handler_args
2070        .with_options
2071        .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2072    {
2073        let delete_files_count_threshold =
2074            delete_files_count_threshold.parse::<usize>().map_err(|_| {
2075                ErrorCode::InvalidInputSyntax(format!(
2076                    "{} must be greater than 0: {}",
2077                    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2078                ))
2079            })?;
2080        if delete_files_count_threshold == 0 {
2081            bail!(format!(
2082                "{} must be greater than 0",
2083                COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2084            ));
2085        }
2086        sink_with.insert(
2087            COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2088            delete_files_count_threshold.to_string(),
2089        );
2090
2091        // remove from source options, otherwise it will be considered as an unknown field.
2092        source.as_mut().map(|x| {
2093            x.with_properties
2094                .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2095        });
2096    }
2097
2098    if let Some(trigger_snapshot_count) = handler_args
2099        .with_options
2100        .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2101    {
2102        let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2103            ErrorCode::InvalidInputSyntax(format!(
2104                "{} must be greater than 0: {}",
2105                COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2106            ))
2107        })?;
2108        if trigger_snapshot_count == 0 {
2109            bail!(format!(
2110                "{} must be greater than 0",
2111                COMPACTION_TRIGGER_SNAPSHOT_COUNT
2112            ));
2113        }
2114        sink_with.insert(
2115            COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2116            trigger_snapshot_count.to_string(),
2117        );
2118
2119        // remove from source options, otherwise it will be considered as an unknown field.
2120        source
2121            .as_mut()
2122            .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2123    }
2124
2125    if let Some(target_file_size_mb) = handler_args
2126        .with_options
2127        .get(COMPACTION_TARGET_FILE_SIZE_MB)
2128    {
2129        let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2130            ErrorCode::InvalidInputSyntax(format!(
2131                "{} must be greater than 0: {}",
2132                COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2133            ))
2134        })?;
2135        if target_file_size_mb == 0 {
2136            bail!(format!(
2137                "{} must be greater than 0",
2138                COMPACTION_TARGET_FILE_SIZE_MB
2139            ));
2140        }
2141        sink_with.insert(
2142            COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2143            target_file_size_mb.to_string(),
2144        );
2145        // remove from source options, otherwise it will be considered as an unknown field.
2146        source
2147            .as_mut()
2148            .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2149    }
2150
2151    if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2152        let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2153            ErrorCode::InvalidInputSyntax(format!(
2154                "invalid compaction_type: {}, must be one of {:?}",
2155                compaction_type,
2156                &[
2157                    CompactionType::Full,
2158                    CompactionType::SmallFiles,
2159                    CompactionType::FilesWithDelete
2160                ]
2161            ))
2162        })?;
2163
2164        sink_with.insert(
2165            COMPACTION_TYPE.to_owned(),
2166            compaction_type.as_str().to_owned(),
2167        );
2168
2169        // remove from source options, otherwise it will be considered as an unknown field.
2170        source
2171            .as_mut()
2172            .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2173    }
2174
2175    let partition_by = handler_args
2176        .with_options
2177        .get("partition_by")
2178        .map(|v| v.to_owned());
2179
2180    if let Some(partition_by) = &partition_by {
2181        let mut partition_columns = vec![];
2182        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2183            table
2184                .columns()
2185                .iter()
2186                .find(|col| col.name().eq_ignore_ascii_case(&column))
2187                .ok_or_else(|| {
2188                    ErrorCode::InvalidInputSyntax(format!(
2189                        "Partition source column does not exist in schema: {}",
2190                        column
2191                    ))
2192                })?;
2193
2194            partition_columns.push(column);
2195        }
2196
2197        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2198            |_| {
2199                ErrorCode::InvalidInputSyntax(
2200                    "The partition columns should be the prefix of the primary key".to_owned(),
2201                )
2202            },
2203        )?;
2204
2205        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2206
2207        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2208        source
2209            .as_mut()
2210            .map(|x| x.with_properties.remove("partition_by"));
2211    }
2212
2213    sink_handler_args.with_options =
2214        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2215    let SinkPlanContext {
2216        sink_plan,
2217        sink_catalog,
2218        ..
2219    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2220    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2221
2222    let mut source_name = table_name.clone();
2223    *source_name.0.last_mut().unwrap() = Ident::from(
2224        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2225    );
2226    let create_source_stmt = CreateSourceStatement {
2227        temporary: false,
2228        if_not_exists: false,
2229        columns: vec![],
2230        source_name,
2231        wildcard_idx: Some(0),
2232        constraints: vec![],
2233        with_properties: WithProperties(vec![]),
2234        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2235        source_watermarks: vec![],
2236        include_column_options: vec![],
2237    };
2238
2239    let mut source_handler_args = handler_args.clone();
2240    let source_with = with_common;
2241    source_handler_args.with_options =
2242        WithOptions::new(source_with, Default::default(), connection_ref);
2243
2244    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2245    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2246    let (with_properties, refresh_mode) =
2247        bind_connector_props(&source_handler_args, &format_encode, true)?;
2248
2249    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2250    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2251    let (iceberg_catalog, table_identifier) = {
2252        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2253        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2254        iceberg_sink.create_table_if_not_exists().await?;
2255
2256        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2257        let table_identifier = iceberg_sink.config.full_table_name()?;
2258        (iceberg_catalog, table_identifier)
2259    };
2260
2261    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2262    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2263        &session,
2264        &format_encode,
2265        Either::Left(&with_properties),
2266        create_source_type,
2267    )
2268    .await?;
2269    let mut col_id_gen = ColumnIdGenerator::new_initial();
2270
2271    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2272        source_handler_args,
2273        create_source_stmt.source_name,
2274        format_encode,
2275        with_properties,
2276        &create_source_stmt.columns,
2277        create_source_stmt.constraints,
2278        create_source_stmt.wildcard_idx,
2279        create_source_stmt.source_watermarks,
2280        columns_from_resolve_source,
2281        source_info,
2282        create_source_stmt.include_column_options,
2283        &mut col_id_gen,
2284        create_source_type,
2285        overwrite_options.source_rate_limit,
2286        SqlColumnStrategy::FollowChecked,
2287        refresh_mode,
2288    )
2289    .await?;
2290
2291    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2292    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2293    let _ = Jvm::get_or_init()?;
2294
2295    let catalog_writer = session.catalog_writer()?;
2296    let action = match job_type {
2297        TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2298        _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2299    };
2300    let res = execute_with_long_running_notification(
2301        catalog_writer.create_iceberg_table(
2302            PbTableJobInfo {
2303                source,
2304                table: Some(table.to_prost()),
2305                fragment_graph: Some(graph),
2306                job_type: job_type as _,
2307            },
2308            PbSinkJobInfo {
2309                sink: Some(sink_catalog.to_proto()),
2310                fragment_graph: Some(sink_graph),
2311            },
2312            iceberg_source_catalog.to_prost(),
2313            if_not_exists,
2314        ),
2315        &session,
2316        "CREATE TABLE",
2317        action,
2318    )
2319    .await;
2320
2321    if res.is_err() {
2322        let _ = iceberg_catalog
2323            .drop_table(&table_identifier)
2324            .await
2325            .inspect_err(|err| {
2326                tracing::error!(
2327                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2328                    table_identifier,
2329                    err.as_report()
2330                );
2331            });
2332        res?
2333    }
2334
2335    Ok(())
2336}
2337
2338pub fn check_create_table_with_source(
2339    with_options: &WithOptions,
2340    format_encode: Option<FormatEncodeOptions>,
2341    include_column_options: &IncludeOption,
2342    cdc_table_info: &Option<CdcTableInfo>,
2343) -> Result<Option<FormatEncodeOptions>> {
2344    // skip check for cdc table
2345    if cdc_table_info.is_some() {
2346        return Ok(format_encode);
2347    }
2348    let defined_source = with_options.is_source_connector();
2349
2350    if !include_column_options.is_empty() && !defined_source {
2351        return Err(ErrorCode::InvalidInputSyntax(
2352            "INCLUDE should be used with a connector".to_owned(),
2353        )
2354        .into());
2355    }
2356    if defined_source {
2357        format_encode.as_ref().ok_or_else(|| {
2358            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2359        })?;
2360    }
2361    Ok(format_encode)
2362}
2363
2364fn ensure_partition_columns_are_prefix_of_primary_key(
2365    partition_columns: &[String],
2366    primary_key_columns: &[String],
2367) -> std::result::Result<(), String> {
2368    if partition_columns.len() > primary_key_columns.len() {
2369        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2370    }
2371
2372    for (i, partition_col) in partition_columns.iter().enumerate() {
2373        if primary_key_columns.get(i) != Some(partition_col) {
2374            return Err(format!(
2375                "Partition column '{}' is not a prefix of the primary key.",
2376                partition_col
2377            ));
2378        }
2379    }
2380
2381    Ok(())
2382}
2383
2384#[allow(clippy::too_many_arguments)]
2385pub async fn generate_stream_graph_for_replace_table(
2386    _session: &Arc<SessionImpl>,
2387    table_name: ObjectName,
2388    original_catalog: &Arc<TableCatalog>,
2389    handler_args: HandlerArgs,
2390    statement: Statement,
2391    col_id_gen: ColumnIdGenerator,
2392    sql_column_strategy: SqlColumnStrategy,
2393) -> Result<(
2394    StreamFragmentGraph,
2395    TableCatalog,
2396    Option<SourceCatalog>,
2397    TableJobType,
2398)> {
2399    let Statement::CreateTable {
2400        columns,
2401        constraints,
2402        source_watermarks,
2403        append_only,
2404        on_conflict,
2405        with_version_columns,
2406        wildcard_idx,
2407        cdc_table_info,
2408        format_encode,
2409        include_column_options,
2410        engine,
2411        with_options,
2412        ..
2413    } = statement
2414    else {
2415        panic!("unexpected statement type: {:?}", statement);
2416    };
2417
2418    let format_encode = format_encode
2419        .clone()
2420        .map(|format_encode| format_encode.into_v2_with_warning());
2421
2422    let engine = match engine {
2423        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2424        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2425    };
2426
2427    let is_drop_connector =
2428        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2429    if is_drop_connector {
2430        debug_assert!(
2431            source_watermarks.is_empty()
2432                && include_column_options.is_empty()
2433                && with_options
2434                    .iter()
2435                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2436        );
2437    }
2438
2439    let props = CreateTableProps {
2440        definition: handler_args.normalized_sql.clone(),
2441        append_only,
2442        on_conflict: on_conflict.into(),
2443        with_version_columns: with_version_columns
2444            .iter()
2445            .map(|col| col.real_value())
2446            .collect(),
2447        webhook_info: original_catalog.webhook_info.clone(),
2448        engine,
2449    };
2450
2451    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2452        (Some(format_encode), None) => (
2453            gen_create_table_plan_with_source(
2454                handler_args,
2455                ExplainOptions::default(),
2456                table_name,
2457                columns,
2458                wildcard_idx,
2459                constraints,
2460                format_encode,
2461                source_watermarks,
2462                col_id_gen,
2463                include_column_options,
2464                props,
2465                sql_column_strategy,
2466            )
2467            .await?,
2468            TableJobType::General,
2469        ),
2470        (None, None) => {
2471            let context = OptimizerContext::from_handler_args(handler_args);
2472            let (plan, table) = gen_create_table_plan(
2473                context,
2474                table_name,
2475                columns,
2476                constraints,
2477                col_id_gen,
2478                source_watermarks,
2479                props,
2480                true,
2481            )?;
2482            ((plan, None, table), TableJobType::General)
2483        }
2484        (None, Some(cdc_table)) => {
2485            let session = &handler_args.session;
2486            let (source, resolved_table_name) =
2487                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2488
2489            let (cdc_with_options, normalized_external_table_name) =
2490                derive_with_options_for_cdc_table(
2491                    &source.with_properties,
2492                    cdc_table.external_table_name.clone(),
2493                )?;
2494
2495            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2496
2497            let context: OptimizerContextRef =
2498                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2499            let (plan, table) = gen_create_table_plan_for_cdc_table(
2500                context,
2501                source,
2502                normalized_external_table_name,
2503                columns,
2504                column_catalogs,
2505                pk_names,
2506                cdc_with_options,
2507                col_id_gen,
2508                on_conflict,
2509                with_version_columns
2510                    .iter()
2511                    .map(|col| col.real_value())
2512                    .collect(),
2513                include_column_options,
2514                table_name,
2515                resolved_table_name,
2516                original_catalog.database_id,
2517                original_catalog.schema_id,
2518                original_catalog.id(),
2519                engine,
2520            )?;
2521
2522            ((plan, None, table), TableJobType::SharedCdcSource)
2523        }
2524        (Some(_), Some(_)) => {
2525            return Err(ErrorCode::NotSupported(
2526                "Data format and encoding format doesn't apply to table created from a CDC source"
2527                    .into(),
2528                "Remove the FORMAT and ENCODE specification".into(),
2529            )
2530            .into());
2531        }
2532    };
2533
2534    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2535        Err(ErrorCode::InvalidInputSyntax(
2536            "alter primary key of table is not supported".to_owned(),
2537        ))?
2538    }
2539
2540    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2541
2542    // Fill the original table ID.
2543    table.id = original_catalog.id();
2544    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2545        table.associated_source_id = Some(source_id);
2546
2547        let source = source.as_mut().unwrap();
2548        source.id = source_id;
2549        source.associated_table_id = Some(table.id());
2550    }
2551
2552    Ok((graph, table, source, job_type))
2553}
2554
2555fn get_source_and_resolved_table_name(
2556    session: &Arc<SessionImpl>,
2557    cdc_table: CdcTableInfo,
2558    table_name: ObjectName,
2559) -> Result<(Arc<SourceCatalog>, String)> {
2560    let db_name = &session.database();
2561    let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2562
2563    let (source_schema, source_name) =
2564        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2565
2566    let source = {
2567        let catalog_reader = session.env().catalog_reader().read_guard();
2568        let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2569        let (source, _) = catalog_reader.get_source_by_name(
2570            db_name,
2571            SchemaPath::Name(schema_name.as_str()),
2572            source_name.as_str(),
2573        )?;
2574        source.clone()
2575    };
2576
2577    Ok((source, resolved_table_name))
2578}
2579
2580// validate the webhook_info and also bind the webhook_info to protobuf
2581fn bind_webhook_info(
2582    session: &Arc<SessionImpl>,
2583    columns_defs: &[ColumnDef],
2584    webhook_info: WebhookSourceInfo,
2585) -> Result<PbWebhookSourceInfo> {
2586    // validate columns
2587    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2588    {
2589        return Err(ErrorCode::InvalidInputSyntax(
2590            "Table with webhook source should have exactly one JSONB column".to_owned(),
2591        )
2592        .into());
2593    }
2594
2595    let WebhookSourceInfo {
2596        secret_ref,
2597        signature_expr,
2598        wait_for_persistence,
2599        is_batched,
2600    } = webhook_info;
2601
2602    // validate secret_ref
2603    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2604        let db_name = &session.database();
2605        let (schema_name, secret_name) =
2606            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2607        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2608        (
2609            Some(PbSecretRef {
2610                secret_id: secret_catalog.id,
2611                ref_as: match secret_ref.ref_as {
2612                    SecretRefAsType::Text => PbRefAsType::Text,
2613                    SecretRefAsType::File => PbRefAsType::File,
2614                }
2615                .into(),
2616            }),
2617            Some(secret_name),
2618        )
2619    } else {
2620        (None, None)
2621    };
2622
2623    let signature_expr = if let Some(signature_expr) = signature_expr {
2624        let secure_compare_context = SecureCompareContext {
2625            column_name: columns_defs[0].name.real_value(),
2626            secret_name,
2627        };
2628        let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2629        let expr = binder.bind_expr(&signature_expr)?;
2630
2631        // validate expr, ensuring it is SECURE_COMPARE()
2632        if expr.as_function_call().is_none()
2633            || expr.as_function_call().unwrap().func_type()
2634                != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2635        {
2636            return Err(ErrorCode::InvalidInputSyntax(
2637                "The signature verification function must be SECURE_COMPARE()".to_owned(),
2638            )
2639            .into());
2640        }
2641
2642        Some(expr.to_expr_proto())
2643    } else {
2644        session.notice_to_user(
2645            "VALIDATE clause is strongly recommended for safety or production usages",
2646        );
2647        None
2648    };
2649
2650    let pb_webhook_info = PbWebhookSourceInfo {
2651        secret_ref: pb_secret_ref,
2652        signature_expr,
2653        wait_for_persistence,
2654        is_batched,
2655    };
2656
2657    Ok(pb_webhook_info)
2658}
2659
2660#[cfg(test)]
2661mod tests {
2662    use risingwave_common::catalog::{
2663        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2664    };
2665    use risingwave_common::types::{DataType, StructType};
2666
2667    use super::*;
2668    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2669
2670    #[tokio::test]
2671    async fn test_create_table_handler() {
2672        let sql =
2673            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2674        let frontend = LocalFrontend::new(Default::default()).await;
2675        frontend.run_sql(sql).await.unwrap();
2676
2677        let session = frontend.session_ref();
2678        let catalog_reader = session.env().catalog_reader().read_guard();
2679        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2680
2681        // Check table exists.
2682        let (table, _) = catalog_reader
2683            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2684            .unwrap();
2685        assert_eq!(table.name(), "t");
2686
2687        let columns = table
2688            .columns
2689            .iter()
2690            .map(|col| (col.name(), col.data_type().clone()))
2691            .collect::<HashMap<&str, DataType>>();
2692
2693        let expected_columns = maplit::hashmap! {
2694            ROW_ID_COLUMN_NAME => DataType::Serial,
2695            "v1" => DataType::Int16,
2696            "v2" => StructType::new(
2697                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2698            )
2699            .with_ids([3, 4, 5].map(ColumnId::new))
2700            .into(),
2701            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2702        };
2703
2704        assert_eq!(columns, expected_columns, "{columns:#?}");
2705    }
2706
2707    #[test]
2708    fn test_bind_primary_key() {
2709        // Note: Column ID 0 is reserved for row ID column.
2710
2711        for (sql, expected) in [
2712            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2713            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2714            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2715            (
2716                "create table t (v1 int primary key, v2 int primary key)",
2717                Err("multiple primary keys are not allowed"),
2718            ),
2719            (
2720                "create table t (v1 int primary key primary key, v2 int)",
2721                Err("multiple primary keys are not allowed"),
2722            ),
2723            (
2724                "create table t (v1 int, v2 int, primary key (v1))",
2725                Ok(&[1]),
2726            ),
2727            (
2728                "create table t (v1 int, primary key (v2), v2 int)",
2729                Ok(&[2]),
2730            ),
2731            (
2732                "create table t (primary key (v2, v1), v1 int, v2 int)",
2733                Ok(&[2, 1]),
2734            ),
2735            (
2736                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2737                Err("multiple primary keys are not allowed"),
2738            ),
2739            (
2740                "create table t (v1 int primary key, primary key (v1), v2 int)",
2741                Err("multiple primary keys are not allowed"),
2742            ),
2743            (
2744                "create table t (v1 int, primary key (V3), v2 int)",
2745                Err("column \"v3\" named in key does not exist"),
2746            ),
2747        ] {
2748            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2749            let risingwave_sqlparser::ast::Statement::CreateTable {
2750                columns: column_defs,
2751                constraints,
2752                ..
2753            } = ast.remove(0)
2754            else {
2755                panic!("test case should be create table")
2756            };
2757            let actual: Result<_> = (|| {
2758                let mut columns = bind_sql_columns(&column_defs, false)?;
2759                let mut col_id_gen = ColumnIdGenerator::new_initial();
2760                for c in &mut columns {
2761                    col_id_gen.generate(c)?;
2762                }
2763
2764                let pk_names =
2765                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2766                let (_, pk_column_ids, _) =
2767                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2768                Ok(pk_column_ids)
2769            })();
2770            match (expected, actual) {
2771                (Ok(expected), Ok(actual)) => assert_eq!(
2772                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2773                    actual,
2774                    "sql: {sql}"
2775                ),
2776                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2777                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2778                (Err(expected), Err(actual)) => assert!(
2779                    actual.to_string().contains(expected),
2780                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2781                ),
2782            }
2783        }
2784    }
2785
2786    #[tokio::test]
2787    async fn test_duplicate_props_options() {
2788        let proto_file = create_proto_file(PROTO_FILE_DATA);
2789        let sql = format!(
2790            r#"CREATE TABLE t
2791    WITH (
2792        connector = 'kinesis',
2793        aws.region='user_test_topic',
2794        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2795        aws.credentials.access_key_id = 'your_access_key_1',
2796        aws.credentials.secret_access_key = 'your_secret_key_1'
2797    )
2798    FORMAT PLAIN ENCODE PROTOBUF (
2799        message = '.test.TestRecord',
2800        aws.credentials.access_key_id = 'your_access_key_2',
2801        aws.credentials.secret_access_key = 'your_secret_key_2',
2802        schema.location = 'file://{}',
2803    )"#,
2804            proto_file.path().to_str().unwrap()
2805        );
2806        let frontend = LocalFrontend::new(Default::default()).await;
2807        frontend.run_sql(sql).await.unwrap();
2808
2809        let session = frontend.session_ref();
2810        let catalog_reader = session.env().catalog_reader().read_guard();
2811        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2812
2813        // Check source exists.
2814        let (source, _) = catalog_reader
2815            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2816            .unwrap();
2817        assert_eq!(source.name, "t");
2818
2819        // AwsAuth params exist in options.
2820        assert_eq!(
2821            source
2822                .info
2823                .format_encode_options
2824                .get("aws.credentials.access_key_id")
2825                .unwrap(),
2826            "your_access_key_2"
2827        );
2828        assert_eq!(
2829            source
2830                .info
2831                .format_encode_options
2832                .get("aws.credentials.secret_access_key")
2833                .unwrap(),
2834            "your_secret_key_2"
2835        );
2836
2837        // AwsAuth params exist in props.
2838        assert_eq!(
2839            source
2840                .with_properties
2841                .get("aws.credentials.access_key_id")
2842                .unwrap(),
2843            "your_access_key_1"
2844        );
2845        assert_eq!(
2846            source
2847                .with_properties
2848                .get("aws.credentials.secret_access_key")
2849                .unwrap(),
2850            "your_secret_key_1"
2851        );
2852
2853        // Options are not merged into props.
2854        assert!(!source.with_properties.contains_key("schema.location"));
2855    }
2856}