risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pgwire::pg_response::{PgResponse, StatementType};
25use prost::Message as _;
26use risingwave_common::catalog::{
27    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
28    RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
29};
30use risingwave_common::config::MetaBackend;
31use risingwave_common::license::Feature;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::jvm_runtime::JVM;
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41    DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::source::OptionalAssociatedTableId;
47use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc};
48use risingwave_pb::ddl_service::TableJobType;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
66use super::{RwPgResponse, create_sink, create_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77    bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::SourceSchemaCompatExt;
80use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
81use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
82use crate::optimizer::property::{Order, RequiredDist};
83use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
84use crate::session::SessionImpl;
85use crate::session::current::notice_to_user;
86use crate::stream_fragmenter::build_graph;
87use crate::utils::OverwriteOptions;
88use crate::{Binder, TableCatalog, WithOptions};
89
90mod col_id_gen;
91pub use col_id_gen::*;
92
93fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
94    for option_def in &c.options {
95        match option_def.option {
96            ColumnOption::GeneratedColumns(_) => {}
97            ColumnOption::DefaultValue(_) => {}
98            ColumnOption::DefaultValueInternal { .. } => {}
99            ColumnOption::Unique { is_primary: true } => {}
100            ColumnOption::Null => {}
101            ColumnOption::NotNull => {}
102            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
103        }
104    }
105    Ok(())
106}
107
108/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
109/// If a column is marked as `primary key`, its `ColumnId` is also returned.
110/// This primary key is not combined with table constraints yet.
111pub fn bind_sql_columns(
112    column_defs: &[ColumnDef],
113    is_for_drop_table_connector: bool,
114) -> Result<Vec<ColumnCatalog>> {
115    let mut columns = Vec::with_capacity(column_defs.len());
116
117    for column in column_defs {
118        ensure_column_options_supported(column)?;
119        // Destruct to make sure all fields are properly handled rather than ignored.
120        // Do NOT use `..` to ignore fields you do not want to deal with.
121        // Reject them with a clear NotImplemented error.
122        let ColumnDef {
123            name,
124            data_type,
125            collation,
126            options,
127            ..
128        } = column;
129
130        let data_type = data_type
131            .clone()
132            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
133        if let Some(collation) = collation {
134            // PostgreSQL will limit the datatypes that collate can work on.
135            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
136            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
137            //
138            // But we don't support real collation, we simply ignore it here.
139            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
140                bail_not_implemented!(
141                    "Collate collation other than `C` or `POSIX` is not implemented"
142                );
143            }
144
145            match data_type {
146                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
147                _ => {
148                    return Err(ErrorCode::NotSupported(
149                        format!("{} is not a collatable data type", data_type),
150                        "The only built-in collatable data types are `varchar`, please check your type".into(),
151                    ).into());
152                }
153            }
154        }
155
156        if !is_for_drop_table_connector {
157            // additional column name may have prefix _rw
158            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
159            // Under this case, we loosen the check for _rw prefix.
160            check_column_name_not_reserved(&name.real_value())?;
161        }
162
163        let nullable: bool = !options
164            .iter()
165            .any(|def| matches!(def.option, ColumnOption::NotNull));
166
167        columns.push(ColumnCatalog {
168            column_desc: ColumnDesc {
169                data_type: bind_data_type(&data_type)?,
170                column_id: ColumnId::placeholder(),
171                name: name.real_value(),
172                generated_or_default_column: None,
173                description: None,
174                additional_column: AdditionalColumn { column_type: None },
175                version: ColumnDescVersion::LATEST,
176                system_column: None,
177                nullable,
178            },
179            is_hidden: false,
180        });
181    }
182
183    Ok(columns)
184}
185
186fn check_generated_column_constraints(
187    column_name: &String,
188    column_id: ColumnId,
189    expr: &ExprImpl,
190    column_catalogs: &[ColumnCatalog],
191    generated_column_names: &[String],
192    pk_column_ids: &[ColumnId],
193) -> Result<()> {
194    let input_refs = expr.collect_input_refs(column_catalogs.len());
195    for idx in input_refs.ones() {
196        let referred_generated_column = &column_catalogs[idx].column_desc.name;
197        if generated_column_names
198            .iter()
199            .any(|c| c == referred_generated_column)
200        {
201            return Err(ErrorCode::BindError(format!(
202                "Generated can not reference another generated column. \
203                But here generated column \"{}\" referenced another generated column \"{}\"",
204                column_name, referred_generated_column
205            ))
206            .into());
207        }
208    }
209
210    if pk_column_ids.contains(&column_id) && expr.is_impure() {
211        return Err(ErrorCode::BindError(format!(
212            "Generated columns with impure expressions should not be part of the primary key. \
213            Here column \"{}\" is defined as part of the primary key.",
214            column_name
215        ))
216        .into());
217    }
218
219    Ok(())
220}
221
222/// Binds constraints that can be only specified in column definitions,
223/// currently generated columns and default columns.
224pub fn bind_sql_column_constraints(
225    session: &SessionImpl,
226    table_name: String,
227    column_catalogs: &mut [ColumnCatalog],
228    columns: Vec<ColumnDef>,
229    pk_column_ids: &[ColumnId],
230) -> Result<()> {
231    let generated_column_names = {
232        let mut names = vec![];
233        for column in &columns {
234            for option_def in &column.options {
235                if let ColumnOption::GeneratedColumns(_) = option_def.option {
236                    names.push(column.name.real_value());
237                    break;
238                }
239            }
240        }
241        names
242    };
243
244    let mut binder = Binder::new_for_ddl(session);
245    binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
246
247    for column in columns {
248        let Some(idx) = column_catalogs
249            .iter()
250            .position(|c| c.name() == column.name.real_value())
251        else {
252            // It's possible that we don't follow the user defined columns in SQL but take the
253            // ones resolved from the source, thus missing some columns. Simply ignore them.
254            continue;
255        };
256
257        for option_def in column.options {
258            match option_def.option {
259                ColumnOption::GeneratedColumns(expr) => {
260                    binder.set_clause(Some(Clause::GeneratedColumn));
261
262                    let expr_impl = binder.bind_expr(expr).with_context(|| {
263                        format!(
264                            "fail to bind expression in generated column \"{}\"",
265                            column.name.real_value()
266                        )
267                    })?;
268
269                    check_generated_column_constraints(
270                        &column.name.real_value(),
271                        column_catalogs[idx].column_id(),
272                        &expr_impl,
273                        column_catalogs,
274                        &generated_column_names,
275                        pk_column_ids,
276                    )?;
277
278                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
279                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
280                            expr: Some(expr_impl.to_expr_proto()),
281                        }),
282                    );
283                    binder.set_clause(None);
284                }
285                ColumnOption::DefaultValue(expr) => {
286                    let expr_impl = binder
287                        .bind_expr(expr)?
288                        .cast_assign(column_catalogs[idx].data_type().clone())?;
289
290                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
291                    // schema change.
292                    //
293                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
294                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
295                    // here. Actually we do not require purity of the expression here since we're only to get a
296                    // snapshot value.
297                    let rewritten_expr_impl = session
298                        .pinned_snapshot()
299                        .inline_now_proc_time()
300                        .rewrite_expr(expr_impl.clone());
301
302                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
303                        let snapshot_value = snapshot_value?;
304
305                        column_catalogs[idx].column_desc.generated_or_default_column =
306                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
307                                snapshot_value: Some(snapshot_value.to_protobuf()),
308                                expr: Some(expr_impl.to_expr_proto()),
309                                // persist the original expression
310                            }));
311                    } else {
312                        return Err(ErrorCode::BindError(format!(
313                            "Default expression used in column `{}` cannot be evaluated. \
314                            Use generated columns instead if you mean to reference other columns.",
315                            column.name
316                        ))
317                        .into());
318                    }
319                }
320                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
321                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
322                    // should already be set during purifcation. So if we encounter an empty value here, it
323                    // means the user has specified it explicitly in the SQL statement, typically by
324                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
325                    if persisted.is_empty() {
326                        bail_bind_error!(
327                            "DEFAULT INTERNAL is only used for internal purposes, \
328                             please specify a concrete default value"
329                        );
330                    }
331
332                    let desc = DefaultColumnDesc::decode(&*persisted)
333                        .expect("failed to decode persisted `DefaultColumnDesc`");
334
335                    column_catalogs[idx].column_desc.generated_or_default_column =
336                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
337                }
338                _ => {}
339            }
340        }
341    }
342    Ok(())
343}
344
345/// Currently we only support Primary key table constraint, so just return pk names if it exists
346pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
347    let mut pk_column_names = vec![];
348
349    for constraint in table_constraints {
350        match constraint {
351            TableConstraint::Unique {
352                name: _,
353                columns,
354                is_primary: true,
355            } => {
356                if !pk_column_names.is_empty() {
357                    return Err(multiple_pk_definition_err());
358                }
359                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
360            }
361            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
362        }
363    }
364    Ok(pk_column_names)
365}
366
367pub fn bind_sql_pk_names(
368    columns_defs: &[ColumnDef],
369    pk_names_from_table_constraints: Vec<String>,
370) -> Result<Vec<String>> {
371    let mut pk_column_names = pk_names_from_table_constraints;
372
373    for column in columns_defs {
374        for option_def in &column.options {
375            if let ColumnOption::Unique { is_primary: true } = option_def.option {
376                if !pk_column_names.is_empty() {
377                    return Err(multiple_pk_definition_err());
378                }
379                pk_column_names.push(column.name.real_value());
380            };
381        }
382    }
383
384    Ok(pk_column_names)
385}
386
387fn multiple_pk_definition_err() -> RwError {
388    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
389}
390
391/// Binds primary keys defined in SQL.
392///
393/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
394/// added.
395pub fn bind_pk_and_row_id_on_relation(
396    mut columns: Vec<ColumnCatalog>,
397    pk_names: Vec<String>,
398    must_need_pk: bool,
399) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
400    for c in &columns {
401        assert!(c.column_id() != ColumnId::placeholder());
402    }
403
404    // Mapping from column name to column id.
405    let name_to_id = columns
406        .iter()
407        .map(|c| (c.name(), c.column_id()))
408        .collect::<HashMap<_, _>>();
409
410    let mut pk_column_ids: Vec<_> = pk_names
411        .iter()
412        .map(|name| {
413            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
414                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
415            })
416        })
417        .try_collect()?;
418
419    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
420    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
421
422    let row_id_index = need_row_id.then(|| {
423        let column = ColumnCatalog::row_id_column();
424        let index = columns.len();
425        pk_column_ids = vec![column.column_id()];
426        columns.push(column);
427        index
428    });
429
430    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
431        Err(ErrorCode::InvalidInputSyntax(format!(
432            "column \"{col}\" specified more than once"
433        )))?;
434    }
435
436    Ok((columns, pk_column_ids, row_id_index))
437}
438
439/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
440/// stream source.
441#[allow(clippy::too_many_arguments)]
442pub(crate) async fn gen_create_table_plan_with_source(
443    mut handler_args: HandlerArgs,
444    explain_options: ExplainOptions,
445    table_name: ObjectName,
446    column_defs: Vec<ColumnDef>,
447    wildcard_idx: Option<usize>,
448    constraints: Vec<TableConstraint>,
449    format_encode: FormatEncodeOptions,
450    source_watermarks: Vec<SourceWatermark>,
451    mut col_id_gen: ColumnIdGenerator,
452    include_column_options: IncludeOption,
453    props: CreateTableProps,
454    sql_column_strategy: SqlColumnStrategy,
455) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
456    if props.append_only
457        && format_encode.format != Format::Plain
458        && format_encode.format != Format::Native
459    {
460        return Err(ErrorCode::BindError(format!(
461            "Append only table does not support format {}.",
462            format_encode.format
463        ))
464        .into());
465    }
466
467    let session = &handler_args.session;
468    let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
469    if with_properties.is_shareable_cdc_connector() {
470        generated_columns_check_for_cdc_table(&column_defs)?;
471        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
472    } else if column_defs.iter().any(|col| {
473        col.options
474            .iter()
475            .any(|def| matches!(def.option, ColumnOption::NotNull))
476    }) {
477        // if non-cdc source
478        notice_to_user(
479            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
480        );
481    }
482
483    let db_name: &str = &session.database();
484    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
485
486    // TODO: omit this step if `sql_column_strategy` is `Follow`.
487    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
488        session,
489        &format_encode,
490        Either::Left(&with_properties),
491        CreateSourceType::Table,
492    )
493    .await?;
494
495    let overwrite_options = OverwriteOptions::new(&mut handler_args);
496    let rate_limit = overwrite_options.source_rate_limit;
497    let source_catalog = bind_create_source_or_table_with_connector(
498        handler_args.clone(),
499        table_name,
500        format_encode,
501        with_properties,
502        &column_defs,
503        constraints,
504        wildcard_idx,
505        source_watermarks,
506        columns_from_resolve_source,
507        source_info,
508        include_column_options,
509        &mut col_id_gen,
510        CreateSourceType::Table,
511        rate_limit,
512        sql_column_strategy,
513    )
514    .await?;
515
516    let pb_source = source_catalog.to_prost();
517
518    let context = OptimizerContext::new(handler_args, explain_options);
519
520    let (plan, table) = gen_table_plan_with_source(
521        context.into(),
522        schema_name,
523        source_catalog,
524        col_id_gen.into_version(),
525        props,
526    )?;
527
528    Ok((plan, Some(pb_source), table))
529}
530
531/// `gen_create_table_plan` generates the plan for creating a table without an external stream
532/// source.
533#[allow(clippy::too_many_arguments)]
534pub(crate) fn gen_create_table_plan(
535    context: OptimizerContext,
536    table_name: ObjectName,
537    column_defs: Vec<ColumnDef>,
538    constraints: Vec<TableConstraint>,
539    mut col_id_gen: ColumnIdGenerator,
540    source_watermarks: Vec<SourceWatermark>,
541    props: CreateTableProps,
542    is_for_replace_plan: bool,
543) -> Result<(PlanRef, PbTable)> {
544    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
545    for c in &mut columns {
546        col_id_gen.generate(c)?;
547    }
548
549    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
550    if !secret_refs.is_empty() || !connection_refs.is_empty() {
551        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
552    }
553
554    gen_create_table_plan_without_source(
555        context,
556        table_name,
557        columns,
558        column_defs,
559        constraints,
560        source_watermarks,
561        col_id_gen.into_version(),
562        props,
563    )
564}
565
566#[allow(clippy::too_many_arguments)]
567pub(crate) fn gen_create_table_plan_without_source(
568    context: OptimizerContext,
569    table_name: ObjectName,
570    columns: Vec<ColumnCatalog>,
571    column_defs: Vec<ColumnDef>,
572    constraints: Vec<TableConstraint>,
573    source_watermarks: Vec<SourceWatermark>,
574    version: TableVersion,
575    props: CreateTableProps,
576) -> Result<(PlanRef, PbTable)> {
577    // XXX: Why not bind outside?
578    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
579    let (mut columns, pk_column_ids, row_id_index) =
580        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
581
582    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
583        context.session_ctx(),
584        table_name.real_value(),
585        source_watermarks,
586        &columns,
587    )?;
588
589    bind_sql_column_constraints(
590        context.session_ctx(),
591        table_name.real_value(),
592        &mut columns,
593        column_defs,
594        &pk_column_ids,
595    )?;
596    let session = context.session_ctx().clone();
597
598    let db_name = &session.database();
599    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
600
601    let info = CreateTableInfo {
602        columns,
603        pk_column_ids,
604        row_id_index,
605        watermark_descs,
606        source_catalog: None,
607        version,
608    };
609
610    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
611}
612
613fn gen_table_plan_with_source(
614    context: OptimizerContextRef,
615    schema_name: Option<String>,
616    source_catalog: SourceCatalog,
617    version: TableVersion,
618    props: CreateTableProps,
619) -> Result<(PlanRef, PbTable)> {
620    let table_name = source_catalog.name.clone();
621
622    let info = CreateTableInfo {
623        columns: source_catalog.columns.clone(),
624        pk_column_ids: source_catalog.pk_col_ids.clone(),
625        row_id_index: source_catalog.row_id_index,
626        watermark_descs: source_catalog.watermark_descs.clone(),
627        source_catalog: Some(source_catalog),
628        version,
629    };
630
631    gen_table_plan_inner(context, schema_name, table_name, info, props)
632}
633
634/// On-conflict behavior either from user input or existing table catalog.
635#[derive(Clone, Copy)]
636pub enum EitherOnConflict {
637    Ast(Option<OnConflict>),
638    Resolved(ConflictBehavior),
639}
640
641impl From<Option<OnConflict>> for EitherOnConflict {
642    fn from(v: Option<OnConflict>) -> Self {
643        Self::Ast(v)
644    }
645}
646
647impl From<ConflictBehavior> for EitherOnConflict {
648    fn from(v: ConflictBehavior) -> Self {
649        Self::Resolved(v)
650    }
651}
652
653impl EitherOnConflict {
654    /// Resolves the conflict behavior based on the given information.
655    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
656        let conflict_behavior = match self {
657            EitherOnConflict::Ast(on_conflict) => {
658                if append_only {
659                    if row_id_as_pk {
660                        // Primary key will be generated, no conflict check needed.
661                        ConflictBehavior::NoCheck
662                    } else {
663                        // User defined PK on append-only table, enforce `DO NOTHING`.
664                        if let Some(on_conflict) = on_conflict
665                            && on_conflict != OnConflict::Nothing
666                        {
667                            return Err(ErrorCode::InvalidInputSyntax(
668                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
669                                     the ON CONFLICT behavior must be DO NOTHING."
670                                    .to_owned(),
671                            )
672                            .into());
673                        }
674                        ConflictBehavior::IgnoreConflict
675                    }
676                } else {
677                    // Default to `UPDATE FULL` for non-append-only tables.
678                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
679                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
680                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
681                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
682                    }
683                }
684            }
685            EitherOnConflict::Resolved(b) => b,
686        };
687
688        Ok(conflict_behavior)
689    }
690}
691
692/// Arguments of the functions that generate a table plan, part 1.
693///
694/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
695/// or resolving based on the user input.
696pub struct CreateTableInfo {
697    pub columns: Vec<ColumnCatalog>,
698    pub pk_column_ids: Vec<ColumnId>,
699    pub row_id_index: Option<usize>,
700    pub watermark_descs: Vec<WatermarkDesc>,
701    pub source_catalog: Option<SourceCatalog>,
702    pub version: TableVersion,
703}
704
705/// Arguments of the functions that generate a table plan, part 2.
706///
707/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
708/// obtained from the input or the context.
709pub struct CreateTableProps {
710    pub definition: String,
711    pub append_only: bool,
712    pub on_conflict: EitherOnConflict,
713    pub with_version_column: Option<String>,
714    pub webhook_info: Option<PbWebhookSourceInfo>,
715    pub engine: Engine,
716}
717
718#[allow(clippy::too_many_arguments)]
719fn gen_table_plan_inner(
720    context: OptimizerContextRef,
721    schema_name: Option<String>,
722    table_name: String,
723    info: CreateTableInfo,
724    props: CreateTableProps,
725) -> Result<(PlanRef, PbTable)> {
726    let CreateTableInfo {
727        ref columns,
728        row_id_index,
729        ref watermark_descs,
730        ref source_catalog,
731        ..
732    } = info;
733    let CreateTableProps { append_only, .. } = props;
734
735    let (database_id, schema_id) = context
736        .session_ctx()
737        .get_database_and_schema_id_for_create(schema_name)?;
738
739    let session = context.session_ctx().clone();
740    let retention_seconds = context.with_options().retention_seconds();
741
742    let source_node: PlanRef = LogicalSource::new(
743        source_catalog.clone().map(Rc::new),
744        columns.clone(),
745        row_id_index,
746        SourceNodeKind::CreateTable,
747        context.clone(),
748        None,
749    )?
750    .into();
751
752    let required_cols = FixedBitSet::with_capacity(columns.len());
753    let plan_root = PlanRoot::new_with_logical_plan(
754        source_node,
755        RequiredDist::Any,
756        Order::any(),
757        required_cols,
758        vec![],
759    );
760
761    if !append_only && !watermark_descs.is_empty() {
762        return Err(ErrorCode::NotSupported(
763            "Defining watermarks on table requires the table to be append only.".to_owned(),
764            "Use the key words `APPEND ONLY`".to_owned(),
765        )
766        .into());
767    }
768
769    if !append_only && retention_seconds.is_some() {
770        return Err(ErrorCode::NotSupported(
771            "Defining retention seconds on table requires the table to be append only.".to_owned(),
772            "Use the key words `APPEND ONLY`".to_owned(),
773        )
774        .into());
775    }
776
777    let materialize =
778        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
779
780    let mut table = materialize.table().to_prost();
781
782    table.owner = session.user_id();
783    Ok((materialize.into(), table))
784}
785
786/// Generate stream plan for cdc table based on shared source.
787/// In replace workflow, the `table_id` is the id of the table to be replaced
788/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
789#[allow(clippy::too_many_arguments)]
790pub(crate) fn gen_create_table_plan_for_cdc_table(
791    context: OptimizerContextRef,
792    source: Arc<SourceCatalog>,
793    external_table_name: String,
794    column_defs: Vec<ColumnDef>,
795    mut columns: Vec<ColumnCatalog>,
796    pk_names: Vec<String>,
797    cdc_with_options: WithOptionsSecResolved,
798    mut col_id_gen: ColumnIdGenerator,
799    on_conflict: Option<OnConflict>,
800    with_version_column: Option<String>,
801    include_column_options: IncludeOption,
802    table_name: ObjectName,
803    resolved_table_name: String, // table name without schema prefix
804    database_id: DatabaseId,
805    schema_id: SchemaId,
806    table_id: TableId,
807    engine: Engine,
808) -> Result<(PlanRef, PbTable)> {
809    let session = context.session_ctx().clone();
810
811    // append additional columns to the end
812    handle_addition_columns(
813        None,
814        &cdc_with_options,
815        include_column_options,
816        &mut columns,
817        true,
818    )?;
819
820    for c in &mut columns {
821        col_id_gen.generate(c)?;
822    }
823
824    let (mut columns, pk_column_ids, _row_id_index) =
825        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
826
827    // NOTES: In auto schema change, default value is not provided in column definition.
828    bind_sql_column_constraints(
829        context.session_ctx(),
830        table_name.real_value(),
831        &mut columns,
832        column_defs,
833        &pk_column_ids,
834    )?;
835
836    let definition = context.normalized_sql().to_owned();
837
838    let pk_column_indices = {
839        let mut id_to_idx = HashMap::new();
840        columns.iter().enumerate().for_each(|(idx, c)| {
841            id_to_idx.insert(c.column_id(), idx);
842        });
843        // pk column id must exist in table columns.
844        pk_column_ids
845            .iter()
846            .map(|c| id_to_idx.get(c).copied().unwrap())
847            .collect_vec()
848    };
849    let table_pk = pk_column_indices
850        .iter()
851        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
852        .collect();
853
854    let (options, secret_refs) = cdc_with_options.into_parts();
855
856    let non_generated_column_descs = columns
857        .iter()
858        .filter(|&c| (!c.is_generated()))
859        .map(|c| c.column_desc.clone())
860        .collect_vec();
861    let non_generated_column_num = non_generated_column_descs.len();
862
863    let cdc_table_desc = CdcTableDesc {
864        table_id,
865        source_id: source.id.into(), // id of cdc source streaming job
866        external_table_name: external_table_name.clone(),
867        pk: table_pk,
868        columns: non_generated_column_descs,
869        stream_key: pk_column_indices,
870        connect_properties: options,
871        secret_refs,
872    };
873
874    tracing::debug!(?cdc_table_desc, "create cdc table");
875
876    let options = CdcScanOptions::from_with_options(context.with_options())?;
877
878    let logical_scan = LogicalCdcScan::create(
879        external_table_name.clone(),
880        Rc::new(cdc_table_desc),
881        context.clone(),
882        options,
883    );
884
885    let scan_node: PlanRef = logical_scan.into();
886    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
887    let plan_root = PlanRoot::new_with_logical_plan(
888        scan_node,
889        RequiredDist::Any,
890        Order::any(),
891        required_cols,
892        vec![],
893    );
894
895    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
896    let materialize = plan_root.gen_table_plan(
897        context,
898        resolved_table_name,
899        database_id,
900        schema_id,
901        CreateTableInfo {
902            columns,
903            pk_column_ids,
904            row_id_index: None,
905            watermark_descs: vec![],
906            source_catalog: Some((*source).clone()),
907            version: col_id_gen.into_version(),
908        },
909        CreateTableProps {
910            definition,
911            append_only: false,
912            on_conflict: on_conflict.into(),
913            with_version_column,
914            webhook_info: None,
915            engine,
916        },
917    )?;
918
919    let mut table = materialize.table().to_prost();
920    table.owner = session.user_id();
921    table.cdc_table_id = Some(cdc_table_id);
922    table.dependent_relations = vec![source.id];
923
924    Ok((materialize.into(), table))
925}
926
927fn derive_with_options_for_cdc_table(
928    source_with_properties: &WithOptionsSecResolved,
929    external_table_name: String,
930) -> Result<WithOptionsSecResolved> {
931    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
932    // we should remove the prefix from `full_table_name`
933    let mut with_options = source_with_properties.clone();
934    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
935        match connector.as_str() {
936            MYSQL_CDC_CONNECTOR => {
937                // MySQL doesn't allow '.' in database name and table name, so we can split the
938                // external table name by '.' to get the table name
939                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
940                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
941                })?;
942                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
943                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
944            }
945            POSTGRES_CDC_CONNECTOR => {
946                let (schema_name, table_name) = external_table_name
947                    .split_once('.')
948                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
949
950                // insert 'schema.name' into connect properties
951                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
952                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
953            }
954            SQL_SERVER_CDC_CONNECTOR => {
955                // SQL Server external table name is in 'databaseName.schemaName.tableName' pattern,
956                // we remove the database name prefix and split the schema name and table name
957                let schema_table_name = external_table_name
958                    .split_once('.')
959                    .ok_or_else(|| {
960                        anyhow!("The upstream table name must be in 'database.schema.table' format")
961                    })?
962                    .1;
963
964                let (schema_name, table_name) =
965                    schema_table_name.split_once('.').ok_or_else(|| {
966                        anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'")
967                    })?;
968
969                // insert 'schema.name' into connect properties
970                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
971                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
972            }
973            _ => {
974                return Err(RwError::from(anyhow!(
975                    "connector {} is not supported for cdc table",
976                    connector
977                )));
978            }
979        };
980    }
981    Ok(with_options)
982}
983
984#[allow(clippy::too_many_arguments)]
985pub(super) async fn handle_create_table_plan(
986    handler_args: HandlerArgs,
987    explain_options: ExplainOptions,
988    format_encode: Option<FormatEncodeOptions>,
989    cdc_table_info: Option<CdcTableInfo>,
990    table_name: ObjectName,
991    column_defs: Vec<ColumnDef>,
992    wildcard_idx: Option<usize>,
993    constraints: Vec<TableConstraint>,
994    source_watermarks: Vec<SourceWatermark>,
995    append_only: bool,
996    on_conflict: Option<OnConflict>,
997    with_version_column: Option<String>,
998    include_column_options: IncludeOption,
999    webhook_info: Option<WebhookSourceInfo>,
1000    engine: Engine,
1001) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
1002    let col_id_gen = ColumnIdGenerator::new_initial();
1003    let format_encode = check_create_table_with_source(
1004        &handler_args.with_options,
1005        format_encode,
1006        &include_column_options,
1007        &cdc_table_info,
1008    )?;
1009    let webhook_info = webhook_info
1010        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1011        .transpose()?;
1012
1013    let props = CreateTableProps {
1014        definition: handler_args.normalized_sql.clone(),
1015        append_only,
1016        on_conflict: on_conflict.into(),
1017        with_version_column: with_version_column.clone(),
1018        webhook_info,
1019        engine,
1020    };
1021
1022    let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1023        (Some(format_encode), None) => (
1024            gen_create_table_plan_with_source(
1025                handler_args,
1026                explain_options,
1027                table_name.clone(),
1028                column_defs,
1029                wildcard_idx,
1030                constraints,
1031                format_encode,
1032                source_watermarks,
1033                col_id_gen,
1034                include_column_options,
1035                props,
1036                SqlColumnStrategy::FollowChecked,
1037            )
1038            .await?,
1039            TableJobType::General,
1040        ),
1041        (None, None) => {
1042            let context = OptimizerContext::new(handler_args, explain_options);
1043            let (plan, table) = gen_create_table_plan(
1044                context,
1045                table_name.clone(),
1046                column_defs,
1047                constraints,
1048                col_id_gen,
1049                source_watermarks,
1050                props,
1051                false,
1052            )?;
1053
1054            ((plan, None, table), TableJobType::General)
1055        }
1056
1057        (None, Some(cdc_table)) => {
1058            sanity_check_for_table_on_cdc_source(
1059                append_only,
1060                &column_defs,
1061                &wildcard_idx,
1062                &constraints,
1063                &source_watermarks,
1064            )?;
1065
1066            generated_columns_check_for_cdc_table(&column_defs)?;
1067            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1068
1069            let session = &handler_args.session;
1070            let db_name = &session.database();
1071            let user_name = &session.user_name();
1072            let search_path = session.config().search_path();
1073            let (schema_name, resolved_table_name) =
1074                Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1075            let (database_id, schema_id) =
1076                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1077
1078            // cdc table cannot be append-only
1079            let (format_encode, source_name) =
1080                Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1081
1082            let source = {
1083                let catalog_reader = session.env().catalog_reader().read_guard();
1084                let schema_path =
1085                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1086
1087                let (source, _) = catalog_reader.get_source_by_name(
1088                    db_name,
1089                    schema_path,
1090                    source_name.as_str(),
1091                )?;
1092                source.clone()
1093            };
1094            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1095                &source.with_properties,
1096                cdc_table.external_table_name.clone(),
1097            )?;
1098
1099            let (columns, pk_names) = match wildcard_idx {
1100                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1101                None => {
1102                    for column_def in &column_defs {
1103                        for option_def in &column_def.options {
1104                            if let ColumnOption::DefaultValue(_)
1105                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1106                            {
1107                                return Err(ErrorCode::NotSupported(
1108                                            "Default value for columns defined on the table created from a CDC source".into(),
1109                                            "Remove the default value expression in the column definitions".into(),
1110                                        )
1111                                            .into());
1112                            }
1113                        }
1114                    }
1115
1116                    let (columns, pk_names) =
1117                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1118                    // read default value definition from external db
1119                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1120                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1121                        .context("failed to extract external table config")?;
1122
1123                    // NOTE: if the external table has a default column, we will only treat it as a normal column.
1124                    (columns, pk_names)
1125                }
1126            };
1127
1128            let context: OptimizerContextRef =
1129                OptimizerContext::new(handler_args, explain_options).into();
1130            let (plan, table) = gen_create_table_plan_for_cdc_table(
1131                context,
1132                source,
1133                cdc_table.external_table_name.clone(),
1134                column_defs,
1135                columns,
1136                pk_names,
1137                cdc_with_options,
1138                col_id_gen,
1139                on_conflict,
1140                with_version_column,
1141                include_column_options,
1142                table_name,
1143                resolved_table_name,
1144                database_id,
1145                schema_id,
1146                TableId::placeholder(),
1147                engine,
1148            )?;
1149
1150            ((plan, None, table), TableJobType::SharedCdcSource)
1151        }
1152        (Some(_), Some(_)) => {
1153            return Err(ErrorCode::NotSupported(
1154                "Data format and encoding format doesn't apply to table created from a CDC source"
1155                    .into(),
1156                "Remove the FORMAT and ENCODE specification".into(),
1157            )
1158            .into());
1159        }
1160    };
1161    Ok((plan, source, table, job_type))
1162}
1163
1164// For both table from cdc source and table with cdc connector
1165fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1166    let mut found_generated_column = false;
1167    for column in columns {
1168        let mut is_generated = false;
1169
1170        for option_def in &column.options {
1171            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1172                is_generated = true;
1173                break;
1174            }
1175        }
1176
1177        if is_generated {
1178            found_generated_column = true;
1179        } else if found_generated_column {
1180            return Err(ErrorCode::NotSupported(
1181                "Non-generated column found after a generated column.".into(),
1182                "Ensure that all generated columns appear at the end of the cdc table definition."
1183                    .into(),
1184            )
1185            .into());
1186        }
1187    }
1188    Ok(())
1189}
1190
1191// For both table from cdc source and table with cdc connector
1192fn not_null_check_for_cdc_table(
1193    wildcard_idx: &Option<usize>,
1194    column_defs: &Vec<ColumnDef>,
1195) -> Result<()> {
1196    if !wildcard_idx.is_some()
1197        && column_defs.iter().any(|col| {
1198            col.options
1199                .iter()
1200                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1201        })
1202    {
1203        return Err(ErrorCode::NotSupported(
1204            "CDC table with NOT NULL constraint is not supported".to_owned(),
1205            "Please remove the NOT NULL constraint for columns".to_owned(),
1206        )
1207        .into());
1208    }
1209    Ok(())
1210}
1211
1212// Only for table from cdc source
1213fn sanity_check_for_table_on_cdc_source(
1214    append_only: bool,
1215    column_defs: &Vec<ColumnDef>,
1216    wildcard_idx: &Option<usize>,
1217    constraints: &Vec<TableConstraint>,
1218    source_watermarks: &Vec<SourceWatermark>,
1219) -> Result<()> {
1220    // wildcard cannot be used with column definitions
1221    if wildcard_idx.is_some() && !column_defs.is_empty() {
1222        return Err(ErrorCode::NotSupported(
1223            "wildcard(*) and column definitions cannot be used together".to_owned(),
1224            "Remove the wildcard or column definitions".to_owned(),
1225        )
1226        .into());
1227    }
1228
1229    // cdc table must have primary key constraint or primary key column
1230    if !wildcard_idx.is_some()
1231        && !constraints.iter().any(|c| {
1232            matches!(
1233                c,
1234                TableConstraint::Unique {
1235                    is_primary: true,
1236                    ..
1237                }
1238            )
1239        })
1240        && !column_defs.iter().any(|col| {
1241            col.options
1242                .iter()
1243                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1244        })
1245    {
1246        return Err(ErrorCode::NotSupported(
1247            "CDC table without primary key constraint is not supported".to_owned(),
1248            "Please define a primary key".to_owned(),
1249        )
1250        .into());
1251    }
1252
1253    if append_only {
1254        return Err(ErrorCode::NotSupported(
1255            "append only modifier on the table created from a CDC source".into(),
1256            "Remove the APPEND ONLY clause".into(),
1257        )
1258        .into());
1259    }
1260
1261    if !source_watermarks.is_empty() {
1262        return Err(ErrorCode::NotSupported(
1263            "watermark defined on the table created from a CDC source".into(),
1264            "Remove the Watermark definitions".into(),
1265        )
1266        .into());
1267    }
1268
1269    Ok(())
1270}
1271
1272/// Derive schema for cdc table when create a new Table or alter an existing Table
1273async fn bind_cdc_table_schema_externally(
1274    cdc_with_options: WithOptionsSecResolved,
1275) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1276    // read cdc table schema from external db or parsing the schema from SQL definitions
1277    Feature::CdcTableSchemaMap.check_available().map_err(
1278        |err: risingwave_common::license::FeatureNotAvailable| {
1279            ErrorCode::NotSupported(
1280                err.to_report_string(),
1281                "Please define the schema manually".to_owned(),
1282            )
1283        },
1284    )?;
1285    let (options, secret_refs) = cdc_with_options.into_parts();
1286    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1287        .context("failed to extract external table config")?;
1288
1289    let table = ExternalTableImpl::connect(config)
1290        .await
1291        .context("failed to auto derive table schema")?;
1292    Ok((
1293        table
1294            .column_descs()
1295            .iter()
1296            .cloned()
1297            .map(|column_desc| ColumnCatalog {
1298                column_desc,
1299                is_hidden: false,
1300            })
1301            .collect(),
1302        table.pk_names().clone(),
1303    ))
1304}
1305
1306/// Derive schema for cdc table when create a new Table or alter an existing Table
1307fn bind_cdc_table_schema(
1308    column_defs: &Vec<ColumnDef>,
1309    constraints: &Vec<TableConstraint>,
1310    is_for_replace_plan: bool,
1311) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1312    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1313
1314    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1315    Ok((columns, pk_names))
1316}
1317
1318#[allow(clippy::too_many_arguments)]
1319pub async fn handle_create_table(
1320    handler_args: HandlerArgs,
1321    table_name: ObjectName,
1322    column_defs: Vec<ColumnDef>,
1323    wildcard_idx: Option<usize>,
1324    constraints: Vec<TableConstraint>,
1325    if_not_exists: bool,
1326    format_encode: Option<FormatEncodeOptions>,
1327    source_watermarks: Vec<SourceWatermark>,
1328    append_only: bool,
1329    on_conflict: Option<OnConflict>,
1330    with_version_column: Option<String>,
1331    cdc_table_info: Option<CdcTableInfo>,
1332    include_column_options: IncludeOption,
1333    webhook_info: Option<WebhookSourceInfo>,
1334    ast_engine: risingwave_sqlparser::ast::Engine,
1335) -> Result<RwPgResponse> {
1336    let session = handler_args.session.clone();
1337
1338    if append_only {
1339        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1340    }
1341
1342    session.check_cluster_limits().await?;
1343
1344    let engine = match ast_engine {
1345        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1346        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1347    };
1348
1349    if let Either::Right(resp) = session.check_relation_name_duplicated(
1350        table_name.clone(),
1351        StatementType::CREATE_TABLE,
1352        if_not_exists,
1353    )? {
1354        return Ok(resp);
1355    }
1356
1357    let (graph, source, table, job_type) = {
1358        let (plan, source, table, job_type) = handle_create_table_plan(
1359            handler_args.clone(),
1360            ExplainOptions::default(),
1361            format_encode,
1362            cdc_table_info,
1363            table_name.clone(),
1364            column_defs.clone(),
1365            wildcard_idx,
1366            constraints.clone(),
1367            source_watermarks,
1368            append_only,
1369            on_conflict,
1370            with_version_column,
1371            include_column_options,
1372            webhook_info,
1373            engine,
1374        )
1375        .await?;
1376
1377        let graph = build_graph(plan)?;
1378
1379        (graph, source, table, job_type)
1380    };
1381
1382    tracing::trace!(
1383        "name={}, graph=\n{}",
1384        table_name,
1385        serde_json::to_string_pretty(&graph).unwrap()
1386    );
1387
1388    // Handle engine
1389    match engine {
1390        Engine::Hummock => {
1391            let catalog_writer = session.catalog_writer()?;
1392            catalog_writer
1393                .create_table(source, table, graph, job_type)
1394                .await?;
1395        }
1396        Engine::Iceberg => {
1397            create_iceberg_engine_table(
1398                session,
1399                handler_args,
1400                source,
1401                table,
1402                graph,
1403                job_type,
1404                column_defs,
1405                constraints,
1406                table_name,
1407            )
1408            .await?;
1409        }
1410    }
1411
1412    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1413}
1414
1415#[allow(clippy::too_many_arguments)]
1416pub async fn create_iceberg_engine_table(
1417    session: Arc<SessionImpl>,
1418    handler_args: HandlerArgs,
1419    mut source: Option<PbSource>,
1420    table: PbTable,
1421    graph: StreamFragmentGraph,
1422    job_type: TableJobType,
1423    column_defs: Vec<ColumnDef>,
1424    constraints: Vec<TableConstraint>,
1425    table_name: ObjectName,
1426) -> Result<()> {
1427    // 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1428    // 2. create a hummock table
1429    // 3. create an iceberg sink
1430    // 4. create an iceberg source
1431
1432    let meta_client = session.env().meta_client();
1433    let system_params = meta_client.get_system_params().await?;
1434    let state_store_endpoint = system_params.state_store().to_owned();
1435    let data_directory = system_params.data_directory().to_owned();
1436    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1437
1438    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1439        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1440    })?;
1441    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1442    let meta_store_user = meta_store_endpoint.username().to_owned();
1443    let meta_store_password = meta_store_endpoint
1444        .password()
1445        .ok_or_else(|| {
1446            ErrorCode::InternalError("failed to parse password from meta store endpoint".to_owned())
1447        })?
1448        .to_owned();
1449    let meta_store_host = meta_store_endpoint
1450        .host_str()
1451        .ok_or_else(|| {
1452            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1453        })?
1454        .to_owned();
1455    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1456        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1457    })?;
1458    let meta_store_database = meta_store_endpoint
1459        .path()
1460        .trim_start_matches('/')
1461        .to_owned();
1462
1463    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1464        bail!("failed to parse meta backend: {}", meta_store_backend);
1465    };
1466
1467    let catalog_uri = match meta_backend {
1468        MetaBackend::Postgres => {
1469            format!(
1470                "jdbc:postgresql://{}:{}/{}",
1471                meta_store_host.clone(),
1472                meta_store_port.clone(),
1473                meta_store_database.clone()
1474            )
1475        }
1476        MetaBackend::Mysql => {
1477            format!(
1478                "jdbc:mysql://{}:{}/{}",
1479                meta_store_host.clone(),
1480                meta_store_port.clone(),
1481                meta_store_database.clone()
1482            )
1483        }
1484        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1485            bail!(
1486                "Unsupported meta backend for iceberg engine table: {}",
1487                meta_store_backend
1488            );
1489        }
1490    };
1491
1492    let rw_db_name = session
1493        .env()
1494        .catalog_reader()
1495        .read_guard()
1496        .get_database_by_id(&table.database_id)?
1497        .name()
1498        .to_owned();
1499    let rw_schema_name = session
1500        .env()
1501        .catalog_reader()
1502        .read_guard()
1503        .get_schema_by_id(&table.database_id, &table.schema_id)?
1504        .name()
1505        .clone();
1506    let iceberg_catalog_name = rw_db_name.clone();
1507    let iceberg_database_name = rw_schema_name.clone();
1508    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1509
1510    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1511    let sink_decouple = session.config().sink_decouple();
1512    if matches!(sink_decouple, SinkDecouple::Disable) {
1513        bail!(
1514            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = false` to resolve it"
1515        );
1516    }
1517
1518    let mut connection_ref = BTreeMap::new();
1519    let with_common = if iceberg_engine_connection.is_empty() {
1520        #[allow(dead_code)]
1521        {
1522            let (s3_region, s3_endpoint, s3_ak, s3_sk, warehouse_path) = match state_store_endpoint
1523            {
1524                s3 if s3.starts_with("hummock+s3://") => {
1525                    let s3_region = if let Ok(s3_region) = std::env::var("AWS_REGION") {
1526                        s3_region
1527                    } else {
1528                        bail!(
1529                            "To create an iceberg engine table with s3 backend, AWS_REGION needed to be set"
1530                        );
1531                    };
1532                    let s3_bucket = s3.strip_prefix("hummock+s3://").unwrap().to_owned();
1533                    (
1534                        Some(s3_region),
1535                        None,
1536                        None,
1537                        None,
1538                        Some(format!(
1539                            "s3://{}/{}/iceberg/{}",
1540                            s3_bucket, data_directory, iceberg_catalog_name
1541                        )),
1542                    )
1543                }
1544                minio if minio.starts_with("hummock+minio://") => {
1545                    let server = minio.strip_prefix("hummock+minio://").unwrap();
1546                    let (access_key_id, rest) = server.split_once(':').unwrap();
1547                    let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
1548                    let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://")
1549                    {
1550                        rest = rest_stripped;
1551                        "https://"
1552                    } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
1553                        rest = rest_stripped;
1554                        "http://"
1555                    } else {
1556                        "http://"
1557                    };
1558                    let (address, s3_bucket) = rest.split_once('/').unwrap();
1559                    (
1560                        Some("us-east-1".to_owned()),
1561                        Some(format!("{}{}", endpoint_prefix, address)),
1562                        Some(access_key_id.to_owned()),
1563                        Some(secret_access_key.to_owned()),
1564                        Some(format!(
1565                            "s3://{}/{}/iceberg/{}",
1566                            s3_bucket, data_directory, iceberg_catalog_name
1567                        )),
1568                    )
1569                }
1570                _ => {
1571                    bail!(
1572                        "iceberg engine can't operate with this state store endpoint: {}",
1573                        state_store_endpoint
1574                    );
1575                }
1576            };
1577
1578            let mut with_common = BTreeMap::new();
1579            with_common.insert("enable_config_load".to_owned(), "true".to_owned());
1580            with_common.insert("connector".to_owned(), "iceberg".to_owned());
1581            with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1582            if let Some(warehouse_path) = warehouse_path.clone() {
1583                with_common.insert("warehouse.path".to_owned(), warehouse_path.to_owned());
1584            }
1585            if let Some(s3_endpoint) = s3_endpoint.clone() {
1586                with_common.insert("s3.endpoint".to_owned(), s3_endpoint);
1587            }
1588            if let Some(s3_ak) = s3_ak.clone() {
1589                with_common.insert("s3.access.key".to_owned(), s3_ak.to_owned());
1590            }
1591            if let Some(s3_sk) = s3_sk.clone() {
1592                with_common.insert("s3.secret.key".to_owned(), s3_sk.to_owned());
1593            }
1594            if let Some(s3_region) = s3_region.clone() {
1595                with_common.insert("s3.region".to_owned(), s3_region.to_owned());
1596            }
1597            with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1598            with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1599            with_common.insert(
1600                "catalog.jdbc.password".to_owned(),
1601                meta_store_password.clone(),
1602            );
1603            with_common.insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1604            with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1605            with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1606        }
1607        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1608    } else {
1609        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1610        assert_eq!(parts.len(), 2);
1611        let connection_catalog =
1612            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1613        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1614            if params.connection_type == ConnectionType::Iceberg as i32 {
1615                // With iceberg engine connection:
1616                connection_ref.insert(
1617                    "connection".to_owned(),
1618                    ConnectionRefValue {
1619                        connection_name: ObjectName::from(vec![
1620                            Ident::from(parts[0]),
1621                            Ident::from(parts[1]),
1622                        ]),
1623                    },
1624                );
1625
1626                let mut with_common = BTreeMap::new();
1627                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1628                with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1629                with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1630                with_common
1631            } else {
1632                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1633                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1634                )));
1635            }
1636        } else {
1637            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1638                "Private Link Service has been deprecated. Please create a new connection instead."
1639                    .to_owned(),
1640            )));
1641        }
1642    };
1643
1644    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1645    // Fetch primary key from columns
1646    let mut pks = column_defs
1647        .into_iter()
1648        .filter(|c| {
1649            c.options
1650                .iter()
1651                .any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true }))
1652        })
1653        .map(|c| c.name.to_string())
1654        .collect::<Vec<String>>();
1655
1656    // Fetch primary key from constraints
1657    if pks.is_empty() {
1658        pks = constraints
1659            .into_iter()
1660            .filter(|c| {
1661                matches!(
1662                    c,
1663                    TableConstraint::Unique {
1664                        is_primary: true,
1665                        ..
1666                    }
1667                )
1668            })
1669            .flat_map(|c| match c {
1670                TableConstraint::Unique { columns, .. } => columns
1671                    .into_iter()
1672                    .map(|c| c.to_string())
1673                    .collect::<Vec<String>>(),
1674                _ => vec![],
1675            })
1676            .collect::<Vec<String>>();
1677    }
1678
1679    // For the table without primary key. We will use `_row_id` as primary key
1680    let sink_from = if pks.is_empty() {
1681        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1682        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1683            "select {} as {}, * from {}",
1684            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1685        ))
1686        .context("unable to parse query")?
1687        .try_into()
1688        .unwrap();
1689
1690        let Statement::Query(query) = &stmt else {
1691            panic!("unexpected statement: {:?}", stmt);
1692        };
1693        CreateSink::AsQuery(query.clone())
1694    } else {
1695        CreateSink::From(table_name.clone())
1696    };
1697
1698    let mut sink_name = table_name.clone();
1699    *sink_name.0.last_mut().unwrap() = Ident::from(
1700        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1701    );
1702    let create_sink_stmt = CreateSinkStatement {
1703        if_not_exists: false,
1704        sink_name,
1705        with_properties: WithProperties(vec![]),
1706        sink_from,
1707        columns: vec![],
1708        emit_mode: None,
1709        sink_schema: None,
1710        into_table_name: None,
1711    };
1712
1713    let mut sink_handler_args = handler_args.clone();
1714
1715    let mut sink_with = with_common.clone();
1716
1717    sink_with.insert("primary_key".to_owned(), pks.join(","));
1718    sink_with.insert("type".to_owned(), "upsert".to_owned());
1719
1720    let commit_checkpoint_interval = handler_args
1721        .with_options
1722        .get(COMMIT_CHECKPOINT_INTERVAL)
1723        .map(|v| v.to_owned())
1724        .unwrap_or_else(|| "60".to_owned());
1725    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1726        ErrorCode::InvalidInputSyntax(format!(
1727            "commit_checkpoint_interval must be a positive integer: {}",
1728            commit_checkpoint_interval
1729        ))
1730    })?;
1731
1732    if commit_checkpoint_interval == 0 {
1733        bail!("commit_checkpoint_interval must be a positive integer: 0");
1734    }
1735
1736    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1737    source
1738        .as_mut()
1739        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1740
1741    let sink_decouple = session.config().sink_decouple();
1742    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1743        bail!(
1744            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1745        )
1746    }
1747
1748    sink_with.insert(
1749        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1750        commit_checkpoint_interval.to_string(),
1751    );
1752    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1753
1754    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1755
1756    sink_handler_args.with_options =
1757        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1758
1759    let mut source_name = table_name.clone();
1760    *source_name.0.last_mut().unwrap() = Ident::from(
1761        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1762    );
1763    let create_source_stmt = CreateSourceStatement {
1764        temporary: false,
1765        if_not_exists: false,
1766        columns: vec![],
1767        source_name,
1768        wildcard_idx: Some(0),
1769        constraints: vec![],
1770        with_properties: WithProperties(vec![]),
1771        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1772        source_watermarks: vec![],
1773        include_column_options: vec![],
1774    };
1775
1776    let mut source_handler_args = handler_args.clone();
1777    let source_with = with_common;
1778    source_handler_args.with_options =
1779        WithOptions::new(source_with, Default::default(), connection_ref);
1780
1781    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
1782    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
1783    let _ = JVM.get_or_init()?;
1784
1785    let catalog_writer = session.catalog_writer()?;
1786    // TODO(iceberg): make iceberg engine table creation ddl atomic
1787    catalog_writer
1788        .create_table(source, table, graph, job_type)
1789        .await?;
1790    create_sink::handle_create_sink(sink_handler_args, create_sink_stmt).await?;
1791    create_source::handle_create_source(source_handler_args, create_source_stmt).await?;
1792
1793    Ok(())
1794}
1795
1796pub fn check_create_table_with_source(
1797    with_options: &WithOptions,
1798    format_encode: Option<FormatEncodeOptions>,
1799    include_column_options: &IncludeOption,
1800    cdc_table_info: &Option<CdcTableInfo>,
1801) -> Result<Option<FormatEncodeOptions>> {
1802    // skip check for cdc table
1803    if cdc_table_info.is_some() {
1804        return Ok(format_encode);
1805    }
1806    let defined_source = with_options.is_source_connector();
1807
1808    if !include_column_options.is_empty() && !defined_source {
1809        return Err(ErrorCode::InvalidInputSyntax(
1810            "INCLUDE should be used with a connector".to_owned(),
1811        )
1812        .into());
1813    }
1814    if defined_source {
1815        format_encode.as_ref().ok_or_else(|| {
1816            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1817        })?;
1818    }
1819    Ok(format_encode)
1820}
1821
1822#[allow(clippy::too_many_arguments)]
1823pub async fn generate_stream_graph_for_replace_table(
1824    _session: &Arc<SessionImpl>,
1825    table_name: ObjectName,
1826    original_catalog: &Arc<TableCatalog>,
1827    handler_args: HandlerArgs,
1828    statement: Statement,
1829    col_id_gen: ColumnIdGenerator,
1830    sql_column_strategy: SqlColumnStrategy,
1831) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
1832    use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
1833
1834    let Statement::CreateTable {
1835        columns,
1836        constraints,
1837        source_watermarks,
1838        append_only,
1839        on_conflict,
1840        with_version_column,
1841        wildcard_idx,
1842        cdc_table_info,
1843        format_encode,
1844        include_column_options,
1845        engine,
1846        with_options,
1847        ..
1848    } = statement
1849    else {
1850        panic!("unexpected statement type: {:?}", statement);
1851    };
1852
1853    let format_encode = format_encode
1854        .clone()
1855        .map(|format_encode| format_encode.into_v2_with_warning());
1856
1857    let engine = match engine {
1858        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1859        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1860    };
1861
1862    let is_drop_connector =
1863        original_catalog.associated_source_id().is_some() && format_encode.is_none();
1864    if is_drop_connector {
1865        debug_assert!(
1866            source_watermarks.is_empty()
1867                && include_column_options.is_empty()
1868                && with_options
1869                    .iter()
1870                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
1871        );
1872    }
1873
1874    let props = CreateTableProps {
1875        definition: handler_args.normalized_sql.clone(),
1876        append_only,
1877        on_conflict: on_conflict.into(),
1878        with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
1879        webhook_info: original_catalog.webhook_info.clone(),
1880        engine,
1881    };
1882
1883    let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1884        (Some(format_encode), None) => (
1885            gen_create_table_plan_with_source(
1886                handler_args,
1887                ExplainOptions::default(),
1888                table_name,
1889                columns,
1890                wildcard_idx,
1891                constraints,
1892                format_encode,
1893                source_watermarks,
1894                col_id_gen,
1895                include_column_options,
1896                props,
1897                sql_column_strategy,
1898            )
1899            .await?,
1900            TableJobType::General,
1901        ),
1902        (None, None) => {
1903            let context = OptimizerContext::from_handler_args(handler_args);
1904            let (plan, table) = gen_create_table_plan(
1905                context,
1906                table_name,
1907                columns,
1908                constraints,
1909                col_id_gen,
1910                source_watermarks,
1911                props,
1912                true,
1913            )?;
1914            ((plan, None, table), TableJobType::General)
1915        }
1916        (None, Some(cdc_table)) => {
1917            let session = &handler_args.session;
1918            let (source, resolved_table_name, database_id, schema_id) =
1919                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
1920
1921            let cdc_with_options = derive_with_options_for_cdc_table(
1922                &source.with_properties,
1923                cdc_table.external_table_name.clone(),
1924            )?;
1925
1926            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
1927
1928            let context: OptimizerContextRef =
1929                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
1930            let (plan, table) = gen_create_table_plan_for_cdc_table(
1931                context,
1932                source,
1933                cdc_table.external_table_name.clone(),
1934                columns,
1935                column_catalogs,
1936                pk_names,
1937                cdc_with_options,
1938                col_id_gen,
1939                on_conflict,
1940                with_version_column.map(|x| x.real_value()),
1941                IncludeOption::default(),
1942                table_name,
1943                resolved_table_name,
1944                database_id,
1945                schema_id,
1946                original_catalog.id(),
1947                engine,
1948            )?;
1949
1950            ((plan, None, table), TableJobType::SharedCdcSource)
1951        }
1952        (Some(_), Some(_)) => {
1953            return Err(ErrorCode::NotSupported(
1954                "Data format and encoding format doesn't apply to table created from a CDC source"
1955                    .into(),
1956                "Remove the FORMAT and ENCODE specification".into(),
1957            )
1958            .into());
1959        }
1960    };
1961
1962    // TODO: avoid this backward conversion.
1963    if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
1964        Err(ErrorCode::InvalidInputSyntax(
1965            "alter primary key of table is not supported".to_owned(),
1966        ))?
1967    }
1968
1969    let graph = build_graph(plan)?;
1970
1971    // Fill the original table ID.
1972    let mut table = Table {
1973        id: original_catalog.id().table_id(),
1974        ..table
1975    };
1976    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
1977        table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
1978            source_id.table_id,
1979        ));
1980        source.as_mut().unwrap().id = source_id.table_id;
1981        source.as_mut().unwrap().optional_associated_table_id =
1982            Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
1983    }
1984
1985    Ok((graph, table, source, job_type))
1986}
1987
1988fn get_source_and_resolved_table_name(
1989    session: &Arc<SessionImpl>,
1990    cdc_table: CdcTableInfo,
1991    table_name: ObjectName,
1992) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
1993    let db_name = &session.database();
1994    let (schema_name, resolved_table_name) =
1995        Binder::resolve_schema_qualified_name(db_name, table_name)?;
1996    let (database_id, schema_id) =
1997        session.get_database_and_schema_id_for_create(schema_name.clone())?;
1998
1999    let (format_encode, source_name) =
2000        Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2001
2002    let source = {
2003        let catalog_reader = session.env().catalog_reader().read_guard();
2004        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2005        let (source, _) = catalog_reader.get_source_by_name(
2006            db_name,
2007            SchemaPath::Name(schema_name.as_str()),
2008            source_name.as_str(),
2009        )?;
2010        source.clone()
2011    };
2012
2013    Ok((source, resolved_table_name, database_id, schema_id))
2014}
2015
2016// validate the webhook_info and also bind the webhook_info to protobuf
2017fn bind_webhook_info(
2018    session: &Arc<SessionImpl>,
2019    columns_defs: &[ColumnDef],
2020    webhook_info: WebhookSourceInfo,
2021) -> Result<PbWebhookSourceInfo> {
2022    // validate columns
2023    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2024    {
2025        return Err(ErrorCode::InvalidInputSyntax(
2026            "Table with webhook source should have exactly one JSONB column".to_owned(),
2027        )
2028        .into());
2029    }
2030
2031    let WebhookSourceInfo {
2032        secret_ref,
2033        signature_expr,
2034        wait_for_persistence,
2035    } = webhook_info;
2036
2037    // validate secret_ref
2038    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2039        let db_name = &session.database();
2040        let (schema_name, secret_name) =
2041            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2042        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2043        (
2044            Some(PbSecretRef {
2045                secret_id: secret_catalog.id.secret_id(),
2046                ref_as: match secret_ref.ref_as {
2047                    SecretRefAsType::Text => PbRefAsType::Text,
2048                    SecretRefAsType::File => PbRefAsType::File,
2049                }
2050                .into(),
2051            }),
2052            Some(secret_name),
2053        )
2054    } else {
2055        (None, None)
2056    };
2057
2058    let secure_compare_context = SecureCompareContext {
2059        column_name: columns_defs[0].name.real_value(),
2060        secret_name,
2061    };
2062    let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2063    let expr = binder.bind_expr(signature_expr.clone())?;
2064
2065    // validate expr, ensuring it is SECURE_COMPARE()
2066    if expr.as_function_call().is_none()
2067        || expr.as_function_call().unwrap().func_type()
2068            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2069    {
2070        return Err(ErrorCode::InvalidInputSyntax(
2071            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2072        )
2073        .into());
2074    }
2075
2076    let pb_webhook_info = PbWebhookSourceInfo {
2077        secret_ref: pb_secret_ref,
2078        signature_expr: Some(expr.to_expr_proto()),
2079        wait_for_persistence,
2080    };
2081
2082    Ok(pb_webhook_info)
2083}
2084
2085#[cfg(test)]
2086mod tests {
2087    use risingwave_common::catalog::{
2088        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2089    };
2090    use risingwave_common::types::{DataType, StructType};
2091
2092    use super::*;
2093    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2094
2095    #[tokio::test]
2096    async fn test_create_table_handler() {
2097        let sql =
2098            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2099        let frontend = LocalFrontend::new(Default::default()).await;
2100        frontend.run_sql(sql).await.unwrap();
2101
2102        let session = frontend.session_ref();
2103        let catalog_reader = session.env().catalog_reader().read_guard();
2104        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2105
2106        // Check table exists.
2107        let (table, _) = catalog_reader
2108            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2109            .unwrap();
2110        assert_eq!(table.name(), "t");
2111
2112        let columns = table
2113            .columns
2114            .iter()
2115            .map(|col| (col.name(), col.data_type().clone()))
2116            .collect::<HashMap<&str, DataType>>();
2117
2118        let expected_columns = maplit::hashmap! {
2119            ROW_ID_COLUMN_NAME => DataType::Serial,
2120            "v1" => DataType::Int16,
2121            "v2" => StructType::new(
2122                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2123            )
2124            .with_ids([3, 4, 5].map(ColumnId::new))
2125            .into(),
2126            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2127        };
2128
2129        assert_eq!(columns, expected_columns, "{columns:#?}");
2130    }
2131
2132    #[test]
2133    fn test_bind_primary_key() {
2134        // Note: Column ID 0 is reserved for row ID column.
2135
2136        for (sql, expected) in [
2137            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2138            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2139            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2140            (
2141                "create table t (v1 int primary key, v2 int primary key)",
2142                Err("multiple primary keys are not allowed"),
2143            ),
2144            (
2145                "create table t (v1 int primary key primary key, v2 int)",
2146                Err("multiple primary keys are not allowed"),
2147            ),
2148            (
2149                "create table t (v1 int, v2 int, primary key (v1))",
2150                Ok(&[1]),
2151            ),
2152            (
2153                "create table t (v1 int, primary key (v2), v2 int)",
2154                Ok(&[2]),
2155            ),
2156            (
2157                "create table t (primary key (v2, v1), v1 int, v2 int)",
2158                Ok(&[2, 1]),
2159            ),
2160            (
2161                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2162                Err("multiple primary keys are not allowed"),
2163            ),
2164            (
2165                "create table t (v1 int primary key, primary key (v1), v2 int)",
2166                Err("multiple primary keys are not allowed"),
2167            ),
2168            (
2169                "create table t (v1 int, primary key (V3), v2 int)",
2170                Err("column \"v3\" named in key does not exist"),
2171            ),
2172        ] {
2173            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2174            let risingwave_sqlparser::ast::Statement::CreateTable {
2175                columns: column_defs,
2176                constraints,
2177                ..
2178            } = ast.remove(0)
2179            else {
2180                panic!("test case should be create table")
2181            };
2182            let actual: Result<_> = (|| {
2183                let mut columns = bind_sql_columns(&column_defs, false)?;
2184                let mut col_id_gen = ColumnIdGenerator::new_initial();
2185                for c in &mut columns {
2186                    col_id_gen.generate(c)?;
2187                }
2188
2189                let pk_names =
2190                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2191                let (_, pk_column_ids, _) =
2192                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2193                Ok(pk_column_ids)
2194            })();
2195            match (expected, actual) {
2196                (Ok(expected), Ok(actual)) => assert_eq!(
2197                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2198                    actual,
2199                    "sql: {sql}"
2200                ),
2201                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2202                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2203                (Err(expected), Err(actual)) => assert!(
2204                    actual.to_string().contains(expected),
2205                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2206                ),
2207            }
2208        }
2209    }
2210
2211    #[tokio::test]
2212    async fn test_duplicate_props_options() {
2213        let proto_file = create_proto_file(PROTO_FILE_DATA);
2214        let sql = format!(
2215            r#"CREATE TABLE t
2216    WITH (
2217        connector = 'kinesis',
2218        aws.region='user_test_topic',
2219        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2220        aws.credentials.access_key_id = 'your_access_key_1',
2221        aws.credentials.secret_access_key = 'your_secret_key_1'
2222    )
2223    FORMAT PLAIN ENCODE PROTOBUF (
2224        message = '.test.TestRecord',
2225        aws.credentials.access_key_id = 'your_access_key_2',
2226        aws.credentials.secret_access_key = 'your_secret_key_2',
2227        schema.location = 'file://{}',
2228    )"#,
2229            proto_file.path().to_str().unwrap()
2230        );
2231        let frontend = LocalFrontend::new(Default::default()).await;
2232        frontend.run_sql(sql).await.unwrap();
2233
2234        let session = frontend.session_ref();
2235        let catalog_reader = session.env().catalog_reader().read_guard();
2236        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2237
2238        // Check source exists.
2239        let (source, _) = catalog_reader
2240            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2241            .unwrap();
2242        assert_eq!(source.name, "t");
2243
2244        // AwsAuth params exist in options.
2245        assert_eq!(
2246            source
2247                .info
2248                .format_encode_options
2249                .get("aws.credentials.access_key_id")
2250                .unwrap(),
2251            "your_access_key_2"
2252        );
2253        assert_eq!(
2254            source
2255                .info
2256                .format_encode_options
2257                .get("aws.credentials.secret_access_key")
2258                .unwrap(),
2259            "your_secret_key_2"
2260        );
2261
2262        // AwsAuth params exist in props.
2263        assert_eq!(
2264            source
2265                .with_properties
2266                .get("aws.credentials.access_key_id")
2267                .unwrap(),
2268            "your_access_key_1"
2269        );
2270        assert_eq!(
2271            source
2272                .with_properties
2273                .get("aws.credentials.secret_access_key")
2274                .unwrap(),
2275            "your_secret_key_1"
2276        );
2277
2278        // Options are not merged into props.
2279        assert!(!source.with_properties.contains_key("schema.location"));
2280    }
2281}