risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77    bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::SourceSchemaCompatExt;
80use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
81use crate::optimizer::plan_node::{
82    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
83};
84use crate::optimizer::property::{Order, RequiredDist};
85use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
86use crate::session::SessionImpl;
87use crate::session::current::notice_to_user;
88use crate::stream_fragmenter::{GraphJobType, build_graph};
89use crate::utils::OverwriteOptions;
90use crate::{Binder, Explain, TableCatalog, WithOptions};
91
92mod col_id_gen;
93pub use col_id_gen::*;
94use risingwave_connector::sink::SinkParam;
95use risingwave_connector::sink::iceberg::{
96    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
97    COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
98    COMPACTION_TRIGGER_SNAPSHOT_COUNT, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
99    ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
100    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
101    SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
102    parse_partition_by_exprs,
103};
104use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
105
106use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
107
108fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
109    for option_def in &c.options {
110        match option_def.option {
111            ColumnOption::GeneratedColumns(_) => {}
112            ColumnOption::DefaultValue(_) => {}
113            ColumnOption::DefaultValueInternal { .. } => {}
114            ColumnOption::Unique { is_primary: true } => {}
115            ColumnOption::Null => {}
116            ColumnOption::NotNull => {}
117            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
118        }
119    }
120    Ok(())
121}
122
123/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
124/// If a column is marked as `primary key`, its `ColumnId` is also returned.
125/// This primary key is not combined with table constraints yet.
126pub fn bind_sql_columns(
127    column_defs: &[ColumnDef],
128    is_for_drop_table_connector: bool,
129) -> Result<Vec<ColumnCatalog>> {
130    let mut columns = Vec::with_capacity(column_defs.len());
131
132    for column in column_defs {
133        ensure_column_options_supported(column)?;
134        // Destruct to make sure all fields are properly handled rather than ignored.
135        // Do NOT use `..` to ignore fields you do not want to deal with.
136        // Reject them with a clear NotImplemented error.
137        let ColumnDef {
138            name,
139            data_type,
140            collation,
141            options,
142            ..
143        } = column;
144
145        let data_type = data_type
146            .clone()
147            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
148        if let Some(collation) = collation {
149            // PostgreSQL will limit the datatypes that collate can work on.
150            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
151            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
152            //
153            // But we don't support real collation, we simply ignore it here.
154            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
155                bail_not_implemented!(
156                    "Collate collation other than `C` or `POSIX` is not implemented"
157                );
158            }
159
160            match data_type {
161                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
162                _ => {
163                    return Err(ErrorCode::NotSupported(
164                        format!("{} is not a collatable data type", data_type),
165                        "The only built-in collatable data types are `varchar`, please check your type".into(),
166                    ).into());
167                }
168            }
169        }
170
171        if !is_for_drop_table_connector {
172            // additional column name may have prefix _rw
173            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
174            // Under this case, we loosen the check for _rw prefix.
175            check_column_name_not_reserved(&name.real_value())?;
176        }
177
178        let nullable: bool = !options
179            .iter()
180            .any(|def| matches!(def.option, ColumnOption::NotNull));
181
182        columns.push(ColumnCatalog {
183            column_desc: ColumnDesc {
184                data_type: bind_data_type(&data_type)?,
185                column_id: ColumnId::placeholder(),
186                name: name.real_value(),
187                generated_or_default_column: None,
188                description: None,
189                additional_column: AdditionalColumn { column_type: None },
190                version: ColumnDescVersion::LATEST,
191                system_column: None,
192                nullable,
193            },
194            is_hidden: false,
195        });
196    }
197
198    Ok(columns)
199}
200
201fn check_generated_column_constraints(
202    column_name: &String,
203    column_id: ColumnId,
204    expr: &ExprImpl,
205    column_catalogs: &[ColumnCatalog],
206    generated_column_names: &[String],
207    pk_column_ids: &[ColumnId],
208) -> Result<()> {
209    let input_refs = expr.collect_input_refs(column_catalogs.len());
210    for idx in input_refs.ones() {
211        let referred_generated_column = &column_catalogs[idx].column_desc.name;
212        if generated_column_names
213            .iter()
214            .any(|c| c == referred_generated_column)
215        {
216            return Err(ErrorCode::BindError(format!(
217                "Generated can not reference another generated column. \
218                But here generated column \"{}\" referenced another generated column \"{}\"",
219                column_name, referred_generated_column
220            ))
221            .into());
222        }
223    }
224
225    if pk_column_ids.contains(&column_id) && expr.is_impure() {
226        return Err(ErrorCode::BindError(format!(
227            "Generated columns with impure expressions should not be part of the primary key. \
228            Here column \"{}\" is defined as part of the primary key.",
229            column_name
230        ))
231        .into());
232    }
233
234    Ok(())
235}
236
237/// Binds constraints that can be only specified in column definitions,
238/// currently generated columns and default columns.
239pub fn bind_sql_column_constraints(
240    session: &SessionImpl,
241    table_name: String,
242    column_catalogs: &mut [ColumnCatalog],
243    columns: &[ColumnDef],
244    pk_column_ids: &[ColumnId],
245) -> Result<()> {
246    let generated_column_names = {
247        let mut names = vec![];
248        for column in columns {
249            for option_def in &column.options {
250                if let ColumnOption::GeneratedColumns(_) = option_def.option {
251                    names.push(column.name.real_value());
252                    break;
253                }
254            }
255        }
256        names
257    };
258
259    let mut binder = Binder::new_for_ddl(session);
260    binder.bind_columns_to_context(table_name, column_catalogs)?;
261
262    for column in columns {
263        let Some(idx) = column_catalogs
264            .iter()
265            .position(|c| c.name() == column.name.real_value())
266        else {
267            // It's possible that we don't follow the user defined columns in SQL but take the
268            // ones resolved from the source, thus missing some columns. Simply ignore them.
269            continue;
270        };
271
272        for option_def in &column.options {
273            match &option_def.option {
274                ColumnOption::GeneratedColumns(expr) => {
275                    binder.set_clause(Some(Clause::GeneratedColumn));
276
277                    let expr_impl = binder.bind_expr(expr).with_context(|| {
278                        format!(
279                            "fail to bind expression in generated column \"{}\"",
280                            column.name.real_value()
281                        )
282                    })?;
283
284                    check_generated_column_constraints(
285                        &column.name.real_value(),
286                        column_catalogs[idx].column_id(),
287                        &expr_impl,
288                        column_catalogs,
289                        &generated_column_names,
290                        pk_column_ids,
291                    )?;
292
293                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
294                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
295                            expr: Some(expr_impl.to_expr_proto()),
296                        }),
297                    );
298                    binder.set_clause(None);
299                }
300                ColumnOption::DefaultValue(expr) => {
301                    let expr_impl = binder
302                        .bind_expr(expr)?
303                        .cast_assign(column_catalogs[idx].data_type())?;
304
305                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
306                    // schema change.
307                    //
308                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
309                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
310                    // here. Actually we do not require purity of the expression here since we're only to get a
311                    // snapshot value.
312                    let rewritten_expr_impl = session
313                        .pinned_snapshot()
314                        .inline_now_proc_time()
315                        .rewrite_expr(expr_impl.clone());
316
317                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
318                        let snapshot_value = snapshot_value?;
319
320                        column_catalogs[idx].column_desc.generated_or_default_column =
321                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
322                                snapshot_value: Some(snapshot_value.to_protobuf()),
323                                expr: Some(expr_impl.to_expr_proto()),
324                                // persist the original expression
325                            }));
326                    } else {
327                        return Err(ErrorCode::BindError(format!(
328                            "Default expression used in column `{}` cannot be evaluated. \
329                            Use generated columns instead if you mean to reference other columns.",
330                            column.name
331                        ))
332                        .into());
333                    }
334                }
335                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
336                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
337                    // should already be set during purifcation. So if we encounter an empty value here, it
338                    // means the user has specified it explicitly in the SQL statement, typically by
339                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
340                    if persisted.is_empty() {
341                        bail_bind_error!(
342                            "DEFAULT INTERNAL is only used for internal purposes, \
343                             please specify a concrete default value"
344                        );
345                    }
346
347                    let desc = DefaultColumnDesc::decode(&**persisted)
348                        .expect("failed to decode persisted `DefaultColumnDesc`");
349
350                    column_catalogs[idx].column_desc.generated_or_default_column =
351                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
352                }
353                _ => {}
354            }
355        }
356    }
357    Ok(())
358}
359
360/// Currently we only support Primary key table constraint, so just return pk names if it exists
361pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
362    let mut pk_column_names = vec![];
363
364    for constraint in table_constraints {
365        match constraint {
366            TableConstraint::Unique {
367                name: _,
368                columns,
369                is_primary: true,
370            } => {
371                if !pk_column_names.is_empty() {
372                    return Err(multiple_pk_definition_err());
373                }
374                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
375            }
376            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
377        }
378    }
379    Ok(pk_column_names)
380}
381
382pub fn bind_sql_pk_names(
383    columns_defs: &[ColumnDef],
384    pk_names_from_table_constraints: Vec<String>,
385) -> Result<Vec<String>> {
386    let mut pk_column_names = pk_names_from_table_constraints;
387
388    for column in columns_defs {
389        for option_def in &column.options {
390            if let ColumnOption::Unique { is_primary: true } = option_def.option {
391                if !pk_column_names.is_empty() {
392                    return Err(multiple_pk_definition_err());
393                }
394                pk_column_names.push(column.name.real_value());
395            };
396        }
397    }
398
399    Ok(pk_column_names)
400}
401
402fn multiple_pk_definition_err() -> RwError {
403    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
404}
405
406/// Binds primary keys defined in SQL.
407///
408/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
409/// added.
410pub fn bind_pk_and_row_id_on_relation(
411    mut columns: Vec<ColumnCatalog>,
412    pk_names: Vec<String>,
413    must_need_pk: bool,
414) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
415    for c in &columns {
416        assert!(c.column_id() != ColumnId::placeholder());
417    }
418
419    // Mapping from column name to column id.
420    let name_to_id = columns
421        .iter()
422        .map(|c| (c.name(), c.column_id()))
423        .collect::<HashMap<_, _>>();
424
425    let mut pk_column_ids: Vec<_> = pk_names
426        .iter()
427        .map(|name| {
428            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
429                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
430            })
431        })
432        .try_collect()?;
433
434    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
435    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
436
437    let row_id_index = need_row_id.then(|| {
438        let column = ColumnCatalog::row_id_column();
439        let index = columns.len();
440        pk_column_ids = vec![column.column_id()];
441        columns.push(column);
442        index
443    });
444
445    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
446        Err(ErrorCode::InvalidInputSyntax(format!(
447            "column \"{col}\" specified more than once"
448        )))?;
449    }
450
451    Ok((columns, pk_column_ids, row_id_index))
452}
453
454/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
455/// stream source.
456#[allow(clippy::too_many_arguments)]
457pub(crate) async fn gen_create_table_plan_with_source(
458    mut handler_args: HandlerArgs,
459    explain_options: ExplainOptions,
460    table_name: ObjectName,
461    column_defs: Vec<ColumnDef>,
462    wildcard_idx: Option<usize>,
463    constraints: Vec<TableConstraint>,
464    format_encode: FormatEncodeOptions,
465    source_watermarks: Vec<SourceWatermark>,
466    mut col_id_gen: ColumnIdGenerator,
467    include_column_options: IncludeOption,
468    props: CreateTableProps,
469    sql_column_strategy: SqlColumnStrategy,
470) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
471    if props.append_only
472        && format_encode.format != Format::Plain
473        && format_encode.format != Format::Native
474    {
475        return Err(ErrorCode::BindError(format!(
476            "Append only table does not support format {}.",
477            format_encode.format
478        ))
479        .into());
480    }
481
482    let session = &handler_args.session;
483    let (with_properties, refresh_mode) =
484        bind_connector_props(&handler_args, &format_encode, false)?;
485    if with_properties.is_shareable_cdc_connector() {
486        generated_columns_check_for_cdc_table(&column_defs)?;
487        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
488    } else if column_defs.iter().any(|col| {
489        col.options
490            .iter()
491            .any(|def| matches!(def.option, ColumnOption::NotNull))
492    }) {
493        // if non-cdc source
494        notice_to_user(
495            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
496        );
497    }
498
499    let db_name: &str = &session.database();
500    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
501
502    // TODO: omit this step if `sql_column_strategy` is `Follow`.
503    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
504        session,
505        &format_encode,
506        Either::Left(&with_properties),
507        CreateSourceType::Table,
508    )
509    .await?;
510
511    let overwrite_options = OverwriteOptions::new(&mut handler_args);
512    let rate_limit = overwrite_options.source_rate_limit;
513    let source = bind_create_source_or_table_with_connector(
514        handler_args.clone(),
515        table_name,
516        format_encode,
517        with_properties,
518        &column_defs,
519        constraints,
520        wildcard_idx,
521        source_watermarks,
522        columns_from_resolve_source,
523        source_info,
524        include_column_options,
525        &mut col_id_gen,
526        CreateSourceType::Table,
527        rate_limit,
528        sql_column_strategy,
529        refresh_mode,
530    )
531    .await?;
532
533    let context = OptimizerContext::new(handler_args, explain_options);
534
535    let (plan, table) = gen_table_plan_with_source(
536        context.into(),
537        schema_name,
538        source.clone(),
539        col_id_gen.into_version(),
540        props,
541    )?;
542
543    Ok((plan, Some(source), table))
544}
545
546/// `gen_create_table_plan` generates the plan for creating a table without an external stream
547/// source.
548#[allow(clippy::too_many_arguments)]
549pub(crate) fn gen_create_table_plan(
550    context: OptimizerContext,
551    table_name: ObjectName,
552    column_defs: Vec<ColumnDef>,
553    constraints: Vec<TableConstraint>,
554    mut col_id_gen: ColumnIdGenerator,
555    source_watermarks: Vec<SourceWatermark>,
556    props: CreateTableProps,
557    is_for_replace_plan: bool,
558) -> Result<(PlanRef, TableCatalog)> {
559    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
560    for c in &mut columns {
561        col_id_gen.generate(c)?;
562    }
563
564    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
565    if !secret_refs.is_empty() || !connection_refs.is_empty() {
566        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
567    }
568
569    gen_create_table_plan_without_source(
570        context,
571        table_name,
572        columns,
573        column_defs,
574        constraints,
575        source_watermarks,
576        col_id_gen.into_version(),
577        props,
578    )
579}
580
581#[allow(clippy::too_many_arguments)]
582pub(crate) fn gen_create_table_plan_without_source(
583    context: OptimizerContext,
584    table_name: ObjectName,
585    columns: Vec<ColumnCatalog>,
586    column_defs: Vec<ColumnDef>,
587    constraints: Vec<TableConstraint>,
588    source_watermarks: Vec<SourceWatermark>,
589    version: TableVersion,
590    props: CreateTableProps,
591) -> Result<(PlanRef, TableCatalog)> {
592    // XXX: Why not bind outside?
593    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
594    let (mut columns, pk_column_ids, row_id_index) =
595        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
596
597    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
598        context.session_ctx(),
599        table_name.real_value(),
600        source_watermarks,
601        &columns,
602    )?;
603
604    bind_sql_column_constraints(
605        context.session_ctx(),
606        table_name.real_value(),
607        &mut columns,
608        &column_defs,
609        &pk_column_ids,
610    )?;
611    let session = context.session_ctx().clone();
612
613    let db_name = &session.database();
614    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
615
616    let info = CreateTableInfo {
617        columns,
618        pk_column_ids,
619        row_id_index,
620        watermark_descs,
621        source_catalog: None,
622        version,
623    };
624
625    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
626}
627
628fn gen_table_plan_with_source(
629    context: OptimizerContextRef,
630    schema_name: Option<String>,
631    source_catalog: SourceCatalog,
632    version: TableVersion,
633    props: CreateTableProps,
634) -> Result<(PlanRef, TableCatalog)> {
635    let table_name = source_catalog.name.clone();
636
637    let info = CreateTableInfo {
638        columns: source_catalog.columns.clone(),
639        pk_column_ids: source_catalog.pk_col_ids.clone(),
640        row_id_index: source_catalog.row_id_index,
641        watermark_descs: source_catalog.watermark_descs.clone(),
642        source_catalog: Some(source_catalog),
643        version,
644    };
645
646    gen_table_plan_inner(context, schema_name, table_name, info, props)
647}
648
649/// On-conflict behavior either from user input or existing table catalog.
650#[derive(Clone, Copy)]
651pub enum EitherOnConflict {
652    Ast(Option<OnConflict>),
653    Resolved(ConflictBehavior),
654}
655
656impl From<Option<OnConflict>> for EitherOnConflict {
657    fn from(v: Option<OnConflict>) -> Self {
658        Self::Ast(v)
659    }
660}
661
662impl From<ConflictBehavior> for EitherOnConflict {
663    fn from(v: ConflictBehavior) -> Self {
664        Self::Resolved(v)
665    }
666}
667
668impl EitherOnConflict {
669    /// Resolves the conflict behavior based on the given information.
670    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
671        let conflict_behavior = match self {
672            EitherOnConflict::Ast(on_conflict) => {
673                if append_only {
674                    if row_id_as_pk {
675                        // Primary key will be generated, no conflict check needed.
676                        ConflictBehavior::NoCheck
677                    } else {
678                        // User defined PK on append-only table, enforce `DO NOTHING`.
679                        if let Some(on_conflict) = on_conflict
680                            && on_conflict != OnConflict::Nothing
681                        {
682                            return Err(ErrorCode::InvalidInputSyntax(
683                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
684                                     the ON CONFLICT behavior must be DO NOTHING."
685                                    .to_owned(),
686                            )
687                            .into());
688                        }
689                        ConflictBehavior::IgnoreConflict
690                    }
691                } else {
692                    // Default to `UPDATE FULL` for non-append-only tables.
693                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
694                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
695                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
696                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
697                    }
698                }
699            }
700            EitherOnConflict::Resolved(b) => b,
701        };
702
703        Ok(conflict_behavior)
704    }
705}
706
707/// Arguments of the functions that generate a table plan, part 1.
708///
709/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
710/// or resolving based on the user input.
711pub struct CreateTableInfo {
712    pub columns: Vec<ColumnCatalog>,
713    pub pk_column_ids: Vec<ColumnId>,
714    pub row_id_index: Option<usize>,
715    pub watermark_descs: Vec<WatermarkDesc>,
716    pub source_catalog: Option<SourceCatalog>,
717    pub version: TableVersion,
718}
719
720/// Arguments of the functions that generate a table plan, part 2.
721///
722/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
723/// obtained from the input or the context.
724pub struct CreateTableProps {
725    pub definition: String,
726    pub append_only: bool,
727    pub on_conflict: EitherOnConflict,
728    pub with_version_columns: Vec<String>,
729    pub webhook_info: Option<PbWebhookSourceInfo>,
730    pub engine: Engine,
731}
732
733#[allow(clippy::too_many_arguments)]
734fn gen_table_plan_inner(
735    context: OptimizerContextRef,
736    schema_name: Option<String>,
737    table_name: String,
738    info: CreateTableInfo,
739    props: CreateTableProps,
740) -> Result<(PlanRef, TableCatalog)> {
741    let CreateTableInfo {
742        ref columns,
743        row_id_index,
744        ref watermark_descs,
745        ref source_catalog,
746        ..
747    } = info;
748    let CreateTableProps { append_only, .. } = props;
749
750    let (database_id, schema_id) = context
751        .session_ctx()
752        .get_database_and_schema_id_for_create(schema_name)?;
753
754    let session = context.session_ctx().clone();
755    let retention_seconds = context.with_options().retention_seconds();
756
757    let source_node: LogicalPlanRef = LogicalSource::new(
758        source_catalog.clone().map(Rc::new),
759        columns.clone(),
760        row_id_index,
761        SourceNodeKind::CreateTable,
762        context.clone(),
763        None,
764    )?
765    .into();
766
767    let required_cols = FixedBitSet::with_capacity(columns.len());
768    let plan_root = PlanRoot::new_with_logical_plan(
769        source_node,
770        RequiredDist::Any,
771        Order::any(),
772        required_cols,
773        vec![],
774    );
775
776    if !append_only && !watermark_descs.is_empty() {
777        return Err(ErrorCode::NotSupported(
778            "Defining watermarks on table requires the table to be append only.".to_owned(),
779            "Use the key words `APPEND ONLY`".to_owned(),
780        )
781        .into());
782    }
783
784    if !append_only && retention_seconds.is_some() {
785        if session
786            .config()
787            .unsafe_enable_storage_retention_for_non_append_only_tables()
788        {
789            tracing::warn!(
790                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
791                table_name
792            );
793            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
794                                  This may lead to stream inconsistency and unrecoverable \
795                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
796                                  corresponding to the TTLed primary keys";
797            session.notice_to_user(NOTICE);
798        } else {
799            return Err(ErrorCode::NotSupported(
800                "Defining retention seconds on table requires the table to be append only."
801                    .to_owned(),
802                "Use the key words `APPEND ONLY`".to_owned(),
803            )
804            .into());
805        }
806    }
807
808    let materialize =
809        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
810
811    let mut table = materialize.table().clone();
812    table.owner = session.user_id();
813
814    Ok((materialize.into(), table))
815}
816
817/// Generate stream plan for cdc table based on shared source.
818/// In replace workflow, the `table_id` is the id of the table to be replaced
819/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
820#[allow(clippy::too_many_arguments)]
821pub(crate) fn gen_create_table_plan_for_cdc_table(
822    context: OptimizerContextRef,
823    source: Arc<SourceCatalog>,
824    external_table_name: String,
825    column_defs: Vec<ColumnDef>,
826    mut columns: Vec<ColumnCatalog>,
827    pk_names: Vec<String>,
828    cdc_with_options: WithOptionsSecResolved,
829    mut col_id_gen: ColumnIdGenerator,
830    on_conflict: Option<OnConflict>,
831    with_version_columns: Vec<String>,
832    include_column_options: IncludeOption,
833    table_name: ObjectName,
834    resolved_table_name: String, // table name without schema prefix
835    database_id: DatabaseId,
836    schema_id: SchemaId,
837    table_id: TableId,
838    engine: Engine,
839) -> Result<(PlanRef, TableCatalog)> {
840    let session = context.session_ctx().clone();
841
842    // append additional columns to the end
843    handle_addition_columns(
844        None,
845        &cdc_with_options,
846        include_column_options,
847        &mut columns,
848        true,
849    )?;
850
851    for c in &mut columns {
852        col_id_gen.generate(c)?;
853    }
854
855    let (mut columns, pk_column_ids, _row_id_index) =
856        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
857
858    // NOTES: In auto schema change, default value is not provided in column definition.
859    bind_sql_column_constraints(
860        context.session_ctx(),
861        table_name.real_value(),
862        &mut columns,
863        &column_defs,
864        &pk_column_ids,
865    )?;
866
867    let definition = context.normalized_sql().to_owned();
868
869    let pk_column_indices = {
870        let mut id_to_idx = HashMap::new();
871        columns.iter().enumerate().for_each(|(idx, c)| {
872            id_to_idx.insert(c.column_id(), idx);
873        });
874        // pk column id must exist in table columns.
875        pk_column_ids
876            .iter()
877            .map(|c| id_to_idx.get(c).copied().unwrap())
878            .collect_vec()
879    };
880    let table_pk = pk_column_indices
881        .iter()
882        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
883        .collect();
884
885    let (options, secret_refs) = cdc_with_options.into_parts();
886
887    let non_generated_column_descs = columns
888        .iter()
889        .filter(|&c| !c.is_generated())
890        .map(|c| c.column_desc.clone())
891        .collect_vec();
892    let non_generated_column_num = non_generated_column_descs.len();
893    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
894    let cdc_table_desc = CdcTableDesc {
895        table_id,
896        source_id: source.id, // id of cdc source streaming job
897        external_table_name: external_table_name.clone(),
898        pk: table_pk,
899        columns: non_generated_column_descs,
900        stream_key: pk_column_indices,
901        connect_properties: options,
902        secret_refs,
903    };
904
905    tracing::debug!(?cdc_table_desc, "create cdc table");
906    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
907
908    let logical_scan = LogicalCdcScan::create(
909        external_table_name.clone(),
910        Rc::new(cdc_table_desc),
911        context.clone(),
912        options,
913    );
914
915    let scan_node: LogicalPlanRef = logical_scan.into();
916    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
917    let plan_root = PlanRoot::new_with_logical_plan(
918        scan_node,
919        RequiredDist::Any,
920        Order::any(),
921        required_cols,
922        vec![],
923    );
924
925    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
926    let materialize = plan_root.gen_table_plan(
927        context,
928        resolved_table_name,
929        database_id,
930        schema_id,
931        CreateTableInfo {
932            columns,
933            pk_column_ids,
934            row_id_index: None,
935            watermark_descs: vec![],
936            source_catalog: Some((*source).clone()),
937            version: col_id_gen.into_version(),
938        },
939        CreateTableProps {
940            definition,
941            append_only: false,
942            on_conflict: on_conflict.into(),
943            with_version_columns,
944            webhook_info: None,
945            engine,
946        },
947    )?;
948
949    let mut table = materialize.table().clone();
950    table.owner = session.user_id();
951    table.cdc_table_id = Some(cdc_table_id);
952    table.cdc_table_type = Some(cdc_table_type);
953    Ok((materialize.into(), table))
954}
955
956fn derive_with_options_for_cdc_table(
957    source_with_properties: &WithOptionsSecResolved,
958    external_table_name: String,
959) -> Result<WithOptionsSecResolved> {
960    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
961    // we should remove the prefix from `full_table_name`
962    let source_database_name: &str = source_with_properties
963        .get("database.name")
964        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
965        .as_str();
966    let mut with_options = source_with_properties.clone();
967    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
968        match connector.as_str() {
969            MYSQL_CDC_CONNECTOR => {
970                // MySQL doesn't allow '.' in database name and table name, so we can split the
971                // external table name by '.' to get the table name
972                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
973                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
974                })?;
975                // We allow multiple database names in the source definition
976                if !source_database_name
977                    .split(',')
978                    .map(|s| s.trim())
979                    .any(|name| name == db_name)
980                {
981                    return Err(anyhow!(
982                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
983                        db_name,
984                        source_database_name
985                    ).into());
986                }
987                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
988                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
989            }
990            POSTGRES_CDC_CONNECTOR => {
991                let (schema_name, table_name) = external_table_name
992                    .split_once('.')
993                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
994
995                // insert 'schema.name' into connect properties
996                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
997                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
998            }
999            SQL_SERVER_CDC_CONNECTOR => {
1000                // SQL Server external table name can be in different formats:
1001                // 1. 'databaseName.schemaName.tableName' (full format)
1002                // 2. 'schemaName.tableName' (schema and table only)
1003                // 3. 'tableName' (table only, will use default schema 'dbo')
1004                // We will auto-fill missing parts from source configuration
1005                let parts: Vec<&str> = external_table_name.split('.').collect();
1006                let (_, schema_name, table_name) = match parts.len() {
1007                    3 => {
1008                        // Full format: database.schema.table
1009                        let db_name = parts[0];
1010                        let schema_name = parts[1];
1011                        let table_name = parts[2];
1012
1013                        // Verify database name matches source configuration
1014                        if db_name != source_database_name {
1015                            return Err(anyhow!(
1016                                "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
1017                                db_name,
1018                                source_database_name
1019                            ).into());
1020                        }
1021                        (db_name, schema_name, table_name)
1022                    }
1023                    2 => {
1024                        // Schema and table only: schema.table
1025                        let schema_name = parts[0];
1026                        let table_name = parts[1];
1027                        (source_database_name, schema_name, table_name)
1028                    }
1029                    1 => {
1030                        // Table only: table (use default schema 'dbo')
1031                        let table_name = parts[0];
1032                        (source_database_name, "dbo", table_name)
1033                    }
1034                    _ => {
1035                        return Err(anyhow!(
1036                            "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1037                        ).into());
1038                    }
1039                };
1040
1041                // insert 'schema.name' into connect properties
1042                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1043                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1044            }
1045            _ => {
1046                return Err(RwError::from(anyhow!(
1047                    "connector {} is not supported for cdc table",
1048                    connector
1049                )));
1050            }
1051        };
1052    }
1053    Ok(with_options)
1054}
1055
1056#[allow(clippy::too_many_arguments)]
1057pub(super) async fn handle_create_table_plan(
1058    handler_args: HandlerArgs,
1059    explain_options: ExplainOptions,
1060    format_encode: Option<FormatEncodeOptions>,
1061    cdc_table_info: Option<CdcTableInfo>,
1062    table_name: &ObjectName,
1063    column_defs: Vec<ColumnDef>,
1064    wildcard_idx: Option<usize>,
1065    constraints: Vec<TableConstraint>,
1066    source_watermarks: Vec<SourceWatermark>,
1067    append_only: bool,
1068    on_conflict: Option<OnConflict>,
1069    with_version_columns: Vec<String>,
1070    include_column_options: IncludeOption,
1071    webhook_info: Option<WebhookSourceInfo>,
1072    engine: Engine,
1073) -> Result<(
1074    PlanRef,
1075    Option<SourceCatalog>,
1076    TableCatalog,
1077    TableJobType,
1078    Option<SourceId>,
1079)> {
1080    let col_id_gen = ColumnIdGenerator::new_initial();
1081    let format_encode = check_create_table_with_source(
1082        &handler_args.with_options,
1083        format_encode,
1084        &include_column_options,
1085        &cdc_table_info,
1086    )?;
1087    let webhook_info = webhook_info
1088        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1089        .transpose()?;
1090
1091    let props = CreateTableProps {
1092        definition: handler_args.normalized_sql.clone(),
1093        append_only,
1094        on_conflict: on_conflict.into(),
1095        with_version_columns: with_version_columns.clone(),
1096        webhook_info,
1097        engine,
1098    };
1099
1100    let ((plan, source, table), job_type, shared_shource_id) = match (
1101        format_encode,
1102        cdc_table_info.as_ref(),
1103    ) {
1104        (Some(format_encode), None) => (
1105            gen_create_table_plan_with_source(
1106                handler_args,
1107                explain_options,
1108                table_name.clone(),
1109                column_defs,
1110                wildcard_idx,
1111                constraints,
1112                format_encode,
1113                source_watermarks,
1114                col_id_gen,
1115                include_column_options,
1116                props,
1117                SqlColumnStrategy::FollowChecked,
1118            )
1119            .await?,
1120            TableJobType::General,
1121            None,
1122        ),
1123        (None, None) => {
1124            let context = OptimizerContext::new(handler_args, explain_options);
1125            let (plan, table) = gen_create_table_plan(
1126                context,
1127                table_name.clone(),
1128                column_defs,
1129                constraints,
1130                col_id_gen,
1131                source_watermarks,
1132                props,
1133                false,
1134            )?;
1135
1136            ((plan, None, table), TableJobType::General, None)
1137        }
1138
1139        (None, Some(cdc_table)) => {
1140            sanity_check_for_table_on_cdc_source(
1141                append_only,
1142                &column_defs,
1143                &wildcard_idx,
1144                &constraints,
1145                &source_watermarks,
1146            )?;
1147
1148            generated_columns_check_for_cdc_table(&column_defs)?;
1149            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1150
1151            let session = &handler_args.session;
1152            let db_name = &session.database();
1153            let user_name = &session.user_name();
1154            let search_path = session.config().search_path();
1155            let (schema_name, resolved_table_name) =
1156                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1157            let (database_id, schema_id) =
1158                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1159
1160            // cdc table cannot be append-only
1161            let (format_encode, source_name) =
1162                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1163
1164            let source = {
1165                let catalog_reader = session.env().catalog_reader().read_guard();
1166                let schema_path =
1167                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1168
1169                let (source, _) = catalog_reader.get_source_by_name(
1170                    db_name,
1171                    schema_path,
1172                    source_name.as_str(),
1173                )?;
1174                source.clone()
1175            };
1176            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1177                &source.with_properties,
1178                cdc_table.external_table_name.clone(),
1179            )?;
1180
1181            let (columns, pk_names) = match wildcard_idx {
1182                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1183                None => {
1184                    for column_def in &column_defs {
1185                        for option_def in &column_def.options {
1186                            if let ColumnOption::DefaultValue(_)
1187                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1188                            {
1189                                return Err(ErrorCode::NotSupported(
1190                                            "Default value for columns defined on the table created from a CDC source".into(),
1191                                            "Remove the default value expression in the column definitions".into(),
1192                                        )
1193                                            .into());
1194                            }
1195                        }
1196                    }
1197
1198                    let (columns, pk_names) =
1199                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1200                    // read default value definition from external db
1201                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1202                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1203                        .context("failed to extract external table config")?;
1204
1205                    (columns, pk_names)
1206                }
1207            };
1208
1209            let context: OptimizerContextRef =
1210                OptimizerContext::new(handler_args, explain_options).into();
1211            let shared_source_id = source.id;
1212            let (plan, table) = gen_create_table_plan_for_cdc_table(
1213                context,
1214                source,
1215                cdc_table.external_table_name.clone(),
1216                column_defs,
1217                columns,
1218                pk_names,
1219                cdc_with_options,
1220                col_id_gen,
1221                on_conflict,
1222                with_version_columns,
1223                include_column_options,
1224                table_name.clone(),
1225                resolved_table_name,
1226                database_id,
1227                schema_id,
1228                TableId::placeholder(),
1229                engine,
1230            )?;
1231
1232            (
1233                (plan, None, table),
1234                TableJobType::SharedCdcSource,
1235                Some(shared_source_id),
1236            )
1237        }
1238        (Some(_), Some(_)) => {
1239            return Err(ErrorCode::NotSupported(
1240                "Data format and encoding format doesn't apply to table created from a CDC source"
1241                    .into(),
1242                "Remove the FORMAT and ENCODE specification".into(),
1243            )
1244            .into());
1245        }
1246    };
1247    Ok((plan, source, table, job_type, shared_shource_id))
1248}
1249
1250// For both table from cdc source and table with cdc connector
1251fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1252    let mut found_generated_column = false;
1253    for column in columns {
1254        let mut is_generated = false;
1255
1256        for option_def in &column.options {
1257            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1258                is_generated = true;
1259                break;
1260            }
1261        }
1262
1263        if is_generated {
1264            found_generated_column = true;
1265        } else if found_generated_column {
1266            return Err(ErrorCode::NotSupported(
1267                "Non-generated column found after a generated column.".into(),
1268                "Ensure that all generated columns appear at the end of the cdc table definition."
1269                    .into(),
1270            )
1271            .into());
1272        }
1273    }
1274    Ok(())
1275}
1276
1277// For both table from cdc source and table with cdc connector
1278fn not_null_check_for_cdc_table(
1279    wildcard_idx: &Option<usize>,
1280    column_defs: &Vec<ColumnDef>,
1281) -> Result<()> {
1282    if !wildcard_idx.is_some()
1283        && column_defs.iter().any(|col| {
1284            col.options
1285                .iter()
1286                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1287        })
1288    {
1289        return Err(ErrorCode::NotSupported(
1290            "CDC table with NOT NULL constraint is not supported".to_owned(),
1291            "Please remove the NOT NULL constraint for columns".to_owned(),
1292        )
1293        .into());
1294    }
1295    Ok(())
1296}
1297
1298// Only for table from cdc source
1299fn sanity_check_for_table_on_cdc_source(
1300    append_only: bool,
1301    column_defs: &Vec<ColumnDef>,
1302    wildcard_idx: &Option<usize>,
1303    constraints: &Vec<TableConstraint>,
1304    source_watermarks: &Vec<SourceWatermark>,
1305) -> Result<()> {
1306    // wildcard cannot be used with column definitions
1307    if wildcard_idx.is_some() && !column_defs.is_empty() {
1308        return Err(ErrorCode::NotSupported(
1309            "wildcard(*) and column definitions cannot be used together".to_owned(),
1310            "Remove the wildcard or column definitions".to_owned(),
1311        )
1312        .into());
1313    }
1314
1315    // cdc table must have primary key constraint or primary key column
1316    if !wildcard_idx.is_some()
1317        && !constraints.iter().any(|c| {
1318            matches!(
1319                c,
1320                TableConstraint::Unique {
1321                    is_primary: true,
1322                    ..
1323                }
1324            )
1325        })
1326        && !column_defs.iter().any(|col| {
1327            col.options
1328                .iter()
1329                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1330        })
1331    {
1332        return Err(ErrorCode::NotSupported(
1333            "CDC table without primary key constraint is not supported".to_owned(),
1334            "Please define a primary key".to_owned(),
1335        )
1336        .into());
1337    }
1338
1339    if append_only {
1340        return Err(ErrorCode::NotSupported(
1341            "append only modifier on the table created from a CDC source".into(),
1342            "Remove the APPEND ONLY clause".into(),
1343        )
1344        .into());
1345    }
1346
1347    if !source_watermarks.is_empty() {
1348        return Err(ErrorCode::NotSupported(
1349            "watermark defined on the table created from a CDC source".into(),
1350            "Remove the Watermark definitions".into(),
1351        )
1352        .into());
1353    }
1354
1355    Ok(())
1356}
1357
1358/// Derive schema for cdc table when create a new Table or alter an existing Table
1359async fn bind_cdc_table_schema_externally(
1360    cdc_with_options: WithOptionsSecResolved,
1361) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1362    // read cdc table schema from external db or parsing the schema from SQL definitions
1363    let (options, secret_refs) = cdc_with_options.into_parts();
1364    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1365        .context("failed to extract external table config")?;
1366
1367    let table = ExternalTableImpl::connect(config)
1368        .await
1369        .context("failed to auto derive table schema")?;
1370
1371    Ok((
1372        table
1373            .column_descs()
1374            .iter()
1375            .cloned()
1376            .map(|column_desc| ColumnCatalog {
1377                column_desc,
1378                is_hidden: false,
1379            })
1380            .collect(),
1381        table.pk_names().clone(),
1382    ))
1383}
1384
1385/// Derive schema for cdc table when create a new Table or alter an existing Table
1386fn bind_cdc_table_schema(
1387    column_defs: &Vec<ColumnDef>,
1388    constraints: &Vec<TableConstraint>,
1389    is_for_replace_plan: bool,
1390) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1391    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1392
1393    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1394    Ok((columns, pk_names))
1395}
1396
1397#[allow(clippy::too_many_arguments)]
1398pub async fn handle_create_table(
1399    handler_args: HandlerArgs,
1400    table_name: ObjectName,
1401    column_defs: Vec<ColumnDef>,
1402    wildcard_idx: Option<usize>,
1403    constraints: Vec<TableConstraint>,
1404    if_not_exists: bool,
1405    format_encode: Option<FormatEncodeOptions>,
1406    source_watermarks: Vec<SourceWatermark>,
1407    append_only: bool,
1408    on_conflict: Option<OnConflict>,
1409    with_version_columns: Vec<String>,
1410    cdc_table_info: Option<CdcTableInfo>,
1411    include_column_options: IncludeOption,
1412    webhook_info: Option<WebhookSourceInfo>,
1413    ast_engine: risingwave_sqlparser::ast::Engine,
1414) -> Result<RwPgResponse> {
1415    let session = handler_args.session.clone();
1416
1417    if append_only {
1418        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1419    }
1420
1421    session.check_cluster_limits().await?;
1422
1423    let engine = match ast_engine {
1424        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1425        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1426    };
1427
1428    if let Either::Right(resp) = session.check_relation_name_duplicated(
1429        table_name.clone(),
1430        StatementType::CREATE_TABLE,
1431        if_not_exists,
1432    )? {
1433        return Ok(resp);
1434    }
1435
1436    let (graph, source, hummock_table, job_type, shared_source_id) = {
1437        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1438            handler_args.clone(),
1439            ExplainOptions::default(),
1440            format_encode,
1441            cdc_table_info,
1442            &table_name,
1443            column_defs.clone(),
1444            wildcard_idx,
1445            constraints.clone(),
1446            source_watermarks,
1447            append_only,
1448            on_conflict,
1449            with_version_columns,
1450            include_column_options,
1451            webhook_info,
1452            engine,
1453        )
1454        .await?;
1455        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1456
1457        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1458
1459        (graph, source, table, job_type, shared_source_id)
1460    };
1461
1462    tracing::trace!(
1463        "name={}, graph=\n{}",
1464        table_name,
1465        serde_json::to_string_pretty(&graph).unwrap()
1466    );
1467
1468    let dependencies = shared_source_id
1469        .map(|id| HashSet::from([id.as_object_id()]))
1470        .unwrap_or_default();
1471
1472    // Handle engine
1473    match engine {
1474        Engine::Hummock => {
1475            let catalog_writer = session.catalog_writer()?;
1476            catalog_writer
1477                .create_table(
1478                    source.map(|s| s.to_prost()),
1479                    hummock_table.to_prost(),
1480                    graph,
1481                    job_type,
1482                    if_not_exists,
1483                    dependencies,
1484                )
1485                .await?;
1486        }
1487        Engine::Iceberg => {
1488            let hummock_table_name = hummock_table.name.clone();
1489            session.create_staging_table(hummock_table.clone());
1490            let res = create_iceberg_engine_table(
1491                session.clone(),
1492                handler_args,
1493                source.map(|s| s.to_prost()),
1494                hummock_table,
1495                graph,
1496                table_name,
1497                job_type,
1498                if_not_exists,
1499            )
1500            .await;
1501            session.drop_staging_table(&hummock_table_name);
1502            res?
1503        }
1504    }
1505
1506    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1507}
1508
1509/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1510///
1511/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1512/// 2. create a hummock table
1513/// 3. create an iceberg sink
1514/// 4. create an iceberg source
1515///
1516/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1517#[allow(clippy::too_many_arguments)]
1518pub async fn create_iceberg_engine_table(
1519    session: Arc<SessionImpl>,
1520    handler_args: HandlerArgs,
1521    mut source: Option<PbSource>,
1522    table: TableCatalog,
1523    graph: StreamFragmentGraph,
1524    table_name: ObjectName,
1525    job_type: PbTableJobType,
1526    if_not_exists: bool,
1527) -> Result<()> {
1528    let meta_client = session.env().meta_client();
1529    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1530
1531    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1532        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1533    })?;
1534    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1535    let meta_store_user = meta_store_endpoint.username().to_owned();
1536    let meta_store_password = match meta_store_endpoint.password() {
1537        Some(password) => percent_decode_str(password)
1538            .decode_utf8()
1539            .map_err(|_| {
1540                ErrorCode::InternalError(
1541                    "failed to parse password from meta store endpoint".to_owned(),
1542                )
1543            })?
1544            .into_owned(),
1545        None => "".to_owned(),
1546    };
1547    let meta_store_host = meta_store_endpoint
1548        .host_str()
1549        .ok_or_else(|| {
1550            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1551        })?
1552        .to_owned();
1553    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1554        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1555    })?;
1556    let meta_store_database = meta_store_endpoint
1557        .path()
1558        .trim_start_matches('/')
1559        .to_owned();
1560
1561    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1562        bail!("failed to parse meta backend: {}", meta_store_backend);
1563    };
1564
1565    let catalog_uri = match meta_backend {
1566        MetaBackend::Postgres => {
1567            format!(
1568                "jdbc:postgresql://{}:{}/{}",
1569                meta_store_host.clone(),
1570                meta_store_port.clone(),
1571                meta_store_database.clone()
1572            )
1573        }
1574        MetaBackend::Mysql => {
1575            format!(
1576                "jdbc:mysql://{}:{}/{}",
1577                meta_store_host.clone(),
1578                meta_store_port.clone(),
1579                meta_store_database.clone()
1580            )
1581        }
1582        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1583            bail!(
1584                "Unsupported meta backend for iceberg engine table: {}",
1585                meta_store_backend
1586            );
1587        }
1588    };
1589
1590    let rw_db_name = session
1591        .env()
1592        .catalog_reader()
1593        .read_guard()
1594        .get_database_by_id(table.database_id)?
1595        .name()
1596        .to_owned();
1597    let rw_schema_name = session
1598        .env()
1599        .catalog_reader()
1600        .read_guard()
1601        .get_schema_by_id(table.database_id, table.schema_id)?
1602        .name()
1603        .clone();
1604    let iceberg_catalog_name = rw_db_name.clone();
1605    let iceberg_database_name = rw_schema_name.clone();
1606    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1607
1608    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1609    let sink_decouple = session.config().sink_decouple();
1610    if matches!(sink_decouple, SinkDecouple::Disable) {
1611        bail!(
1612            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1613        );
1614    }
1615
1616    let mut connection_ref = BTreeMap::new();
1617    let with_common = if iceberg_engine_connection.is_empty() {
1618        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1619    } else {
1620        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1621        assert_eq!(parts.len(), 2);
1622        let connection_catalog =
1623            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1624        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1625            if params.connection_type == ConnectionType::Iceberg as i32 {
1626                // With iceberg engine connection:
1627                connection_ref.insert(
1628                    "connection".to_owned(),
1629                    ConnectionRefValue {
1630                        connection_name: ObjectName::from(vec![
1631                            Ident::from(parts[0]),
1632                            Ident::from(parts[1]),
1633                        ]),
1634                    },
1635                );
1636
1637                let mut with_common = BTreeMap::new();
1638                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1639                with_common.insert("database.name".to_owned(), iceberg_database_name);
1640                with_common.insert("table.name".to_owned(), iceberg_table_name);
1641
1642                if let Some(s) = params.properties.get("hosted_catalog")
1643                    && s.eq_ignore_ascii_case("true")
1644                {
1645                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1646                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1647                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1648                    with_common.insert(
1649                        "catalog.jdbc.password".to_owned(),
1650                        meta_store_password.clone(),
1651                    );
1652                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1653                }
1654
1655                with_common
1656            } else {
1657                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1658                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1659                )));
1660            }
1661        } else {
1662            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1663                "Private Link Service has been deprecated. Please create a new connection instead."
1664                    .to_owned(),
1665            )));
1666        }
1667    };
1668
1669    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1670    // Fetch primary key from columns
1671    let mut pks = table
1672        .pk_column_names()
1673        .iter()
1674        .map(|c| c.to_string())
1675        .collect::<Vec<String>>();
1676
1677    // For the table without primary key. We will use `_row_id` as primary key
1678    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1679        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1680        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1681            "select {} as {}, * from {}",
1682            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1683        ))
1684        .context("unable to parse query")?
1685        .try_into()
1686        .unwrap();
1687
1688        let Statement::Query(query) = &stmt else {
1689            panic!("unexpected statement: {:?}", stmt);
1690        };
1691        CreateSink::AsQuery(query.clone())
1692    } else {
1693        CreateSink::From(table_name.clone())
1694    };
1695
1696    let mut sink_name = table_name.clone();
1697    *sink_name.0.last_mut().unwrap() = Ident::from(
1698        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1699    );
1700    let create_sink_stmt = CreateSinkStatement {
1701        if_not_exists: false,
1702        sink_name,
1703        with_properties: WithProperties(vec![]),
1704        sink_from,
1705        columns: vec![],
1706        emit_mode: None,
1707        sink_schema: None,
1708        into_table_name: None,
1709    };
1710
1711    let mut sink_handler_args = handler_args.clone();
1712
1713    let mut sink_with = with_common.clone();
1714
1715    if table.append_only {
1716        sink_with.insert("type".to_owned(), "append-only".to_owned());
1717    } else {
1718        sink_with.insert("primary_key".to_owned(), pks.join(","));
1719        sink_with.insert("type".to_owned(), "upsert".to_owned());
1720    }
1721    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1722    //
1723    // Note: in theory, we don't need to backfill from the table to the sink,
1724    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1725    // so it may have potential data loss problem on the first barrier.
1726    //
1727    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1728    // data will be present in hummock table.
1729    //
1730    // For append-only table, we need to be more careful.
1731    //
1732    // The possible cases for a table:
1733    // - For table without connector: it doesn't matter, since there's no data before the table is created
1734    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1735    //   + If we support blocking DDL for table with connector, we need to be careful.
1736    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1737    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1738    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1739    let commit_checkpoint_interval = handler_args
1740        .with_options
1741        .get(COMMIT_CHECKPOINT_INTERVAL)
1742        .map(|v| v.to_owned())
1743        .unwrap_or_else(|| "60".to_owned());
1744    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1745        ErrorCode::InvalidInputSyntax(format!(
1746            "commit_checkpoint_interval must be greater than 0: {}",
1747            commit_checkpoint_interval
1748        ))
1749    })?;
1750
1751    if commit_checkpoint_interval == 0 {
1752        bail!("commit_checkpoint_interval must be greater than 0");
1753    }
1754
1755    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1756    source
1757        .as_mut()
1758        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1759
1760    let sink_decouple = session.config().sink_decouple();
1761    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1762        bail!(
1763            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1764        )
1765    }
1766
1767    sink_with.insert(
1768        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1769        commit_checkpoint_interval.to_string(),
1770    );
1771    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1772
1773    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1774
1775    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1776        match enable_compaction.to_lowercase().as_str() {
1777            "true" => {
1778                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1779            }
1780            "false" => {
1781                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1782            }
1783            _ => {
1784                return Err(ErrorCode::InvalidInputSyntax(format!(
1785                    "enable_compaction must be true or false: {}",
1786                    enable_compaction
1787                ))
1788                .into());
1789            }
1790        }
1791
1792        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1793        source
1794            .as_mut()
1795            .map(|x| x.with_properties.remove("enable_compaction"));
1796    } else {
1797        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1798    }
1799
1800    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1801        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1802            ErrorCode::InvalidInputSyntax(format!(
1803                "compaction_interval_sec must be greater than 0: {}",
1804                commit_checkpoint_interval
1805            ))
1806        })?;
1807        if compaction_interval_sec == 0 {
1808            bail!("compaction_interval_sec must be greater than 0");
1809        }
1810        sink_with.insert(
1811            "compaction_interval_sec".to_owned(),
1812            compaction_interval_sec.to_string(),
1813        );
1814        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1815        source
1816            .as_mut()
1817            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1818    }
1819
1820    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1821        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1822    {
1823        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1824        source
1825            .as_mut()
1826            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1827        match enable_snapshot_expiration.to_lowercase().as_str() {
1828            "true" => {
1829                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1830                true
1831            }
1832            "false" => {
1833                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1834                false
1835            }
1836            _ => {
1837                return Err(ErrorCode::InvalidInputSyntax(format!(
1838                    "enable_snapshot_expiration must be true or false: {}",
1839                    enable_snapshot_expiration
1840                ))
1841                .into());
1842            }
1843        }
1844    } else {
1845        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1846        true
1847    };
1848
1849    if has_enabled_snapshot_expiration {
1850        // configuration for snapshot expiration
1851        if let Some(snapshot_expiration_retain_last) = handler_args
1852            .with_options
1853            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1854        {
1855            sink_with.insert(
1856                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1857                snapshot_expiration_retain_last.to_owned(),
1858            );
1859            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1860            source
1861                .as_mut()
1862                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1863        }
1864
1865        if let Some(snapshot_expiration_max_age) = handler_args
1866            .with_options
1867            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1868        {
1869            sink_with.insert(
1870                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1871                snapshot_expiration_max_age.to_owned(),
1872            );
1873            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1874            source
1875                .as_mut()
1876                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1877        }
1878
1879        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1880            .with_options
1881            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1882        {
1883            sink_with.insert(
1884                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1885                snapshot_expiration_clear_expired_files.to_owned(),
1886            );
1887            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1888            source.as_mut().map(|x| {
1889                x.with_properties
1890                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1891            });
1892        }
1893
1894        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1895            .with_options
1896            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1897        {
1898            sink_with.insert(
1899                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1900                snapshot_expiration_clear_expired_meta_data.to_owned(),
1901            );
1902            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1903            source.as_mut().map(|x| {
1904                x.with_properties
1905                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1906            });
1907        }
1908    }
1909
1910    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1911        match write_mode.to_lowercase().as_str() {
1912            ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1913                sink_with.insert(
1914                    WRITE_MODE.to_owned(),
1915                    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1916                );
1917            }
1918
1919            ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1920                if table.append_only {
1921                    return Err(ErrorCode::NotSupported(
1922                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1923                        "Please use MERGE ON READ instead".to_owned(),
1924                    )
1925                    .into());
1926                }
1927
1928                sink_with.insert(
1929                    WRITE_MODE.to_owned(),
1930                    ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1931                );
1932            }
1933
1934            _ => {
1935                return Err(ErrorCode::InvalidInputSyntax(format!(
1936                    "write_mode must be one of: {}, {}",
1937                    ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1938                ))
1939                .into());
1940            }
1941        }
1942
1943        // remove write_mode from source options, otherwise it will be considered as an unknown field.
1944        source
1945            .as_mut()
1946            .map(|x| x.with_properties.remove("write_mode"));
1947    } else {
1948        sink_with.insert(
1949            WRITE_MODE.to_owned(),
1950            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1951        );
1952    }
1953
1954    if let Some(max_snapshots_num_before_compaction) =
1955        handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
1956    {
1957        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
1958            .parse::<u32>()
1959            .map_err(|_| {
1960                ErrorCode::InvalidInputSyntax(format!(
1961                    "{} must be greater than 0: {}",
1962                    COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
1963                ))
1964            })?;
1965        if max_snapshots_num_before_compaction == 0 {
1966            bail!(format!(
1967                "{} must be greater than 0",
1968                COMPACTION_MAX_SNAPSHOTS_NUM
1969            ));
1970        }
1971        sink_with.insert(
1972            COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
1973            max_snapshots_num_before_compaction.to_string(),
1974        );
1975
1976        // remove from source options, otherwise it will be considered as an unknown field.
1977        source
1978            .as_mut()
1979            .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
1980    }
1981
1982    if let Some(small_files_threshold_mb) = handler_args
1983        .with_options
1984        .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
1985    {
1986        let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
1987            ErrorCode::InvalidInputSyntax(format!(
1988                "{} must be greater than 0: {}",
1989                COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
1990            ))
1991        })?;
1992        if small_files_threshold_mb == 0 {
1993            bail!(format!(
1994                "{} must be a greater than 0",
1995                COMPACTION_SMALL_FILES_THRESHOLD_MB
1996            ));
1997        }
1998        sink_with.insert(
1999            COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2000            small_files_threshold_mb.to_string(),
2001        );
2002
2003        // remove from source options, otherwise it will be considered as an unknown field.
2004        source.as_mut().map(|x| {
2005            x.with_properties
2006                .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2007        });
2008    }
2009
2010    if let Some(delete_files_count_threshold) = handler_args
2011        .with_options
2012        .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2013    {
2014        let delete_files_count_threshold =
2015            delete_files_count_threshold.parse::<usize>().map_err(|_| {
2016                ErrorCode::InvalidInputSyntax(format!(
2017                    "{} must be greater than 0: {}",
2018                    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2019                ))
2020            })?;
2021        if delete_files_count_threshold == 0 {
2022            bail!(format!(
2023                "{} must be greater than 0",
2024                COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2025            ));
2026        }
2027        sink_with.insert(
2028            COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2029            delete_files_count_threshold.to_string(),
2030        );
2031
2032        // remove from source options, otherwise it will be considered as an unknown field.
2033        source.as_mut().map(|x| {
2034            x.with_properties
2035                .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2036        });
2037    }
2038
2039    if let Some(trigger_snapshot_count) = handler_args
2040        .with_options
2041        .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2042    {
2043        let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2044            ErrorCode::InvalidInputSyntax(format!(
2045                "{} must be greater than 0: {}",
2046                COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2047            ))
2048        })?;
2049        if trigger_snapshot_count == 0 {
2050            bail!(format!(
2051                "{} must be greater than 0",
2052                COMPACTION_TRIGGER_SNAPSHOT_COUNT
2053            ));
2054        }
2055        sink_with.insert(
2056            COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2057            trigger_snapshot_count.to_string(),
2058        );
2059
2060        // remove from source options, otherwise it will be considered as an unknown field.
2061        source
2062            .as_mut()
2063            .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2064    }
2065
2066    if let Some(target_file_size_mb) = handler_args
2067        .with_options
2068        .get(COMPACTION_TARGET_FILE_SIZE_MB)
2069    {
2070        let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2071            ErrorCode::InvalidInputSyntax(format!(
2072                "{} must be greater than 0: {}",
2073                COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2074            ))
2075        })?;
2076        if target_file_size_mb == 0 {
2077            bail!(format!(
2078                "{} must be greater than 0",
2079                COMPACTION_TARGET_FILE_SIZE_MB
2080            ));
2081        }
2082        sink_with.insert(
2083            COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2084            target_file_size_mb.to_string(),
2085        );
2086        // remove from source options, otherwise it will be considered as an unknown field.
2087        source
2088            .as_mut()
2089            .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2090    }
2091
2092    let partition_by = handler_args
2093        .with_options
2094        .get("partition_by")
2095        .map(|v| v.to_owned());
2096
2097    if let Some(partition_by) = &partition_by {
2098        let mut partition_columns = vec![];
2099        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2100            table
2101                .columns()
2102                .iter()
2103                .find(|col| col.name().eq_ignore_ascii_case(&column))
2104                .ok_or_else(|| {
2105                    ErrorCode::InvalidInputSyntax(format!(
2106                        "Partition source column does not exist in schema: {}",
2107                        column
2108                    ))
2109                })?;
2110
2111            partition_columns.push(column);
2112        }
2113
2114        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2115            |_| {
2116                ErrorCode::InvalidInputSyntax(
2117                    "The partition columns should be the prefix of the primary key".to_owned(),
2118                )
2119            },
2120        )?;
2121
2122        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2123
2124        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2125        source
2126            .as_mut()
2127            .map(|x| x.with_properties.remove("partition_by"));
2128    }
2129
2130    sink_handler_args.with_options =
2131        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2132    let SinkPlanContext {
2133        sink_plan,
2134        sink_catalog,
2135        ..
2136    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2137    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2138
2139    let mut source_name = table_name.clone();
2140    *source_name.0.last_mut().unwrap() = Ident::from(
2141        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2142    );
2143    let create_source_stmt = CreateSourceStatement {
2144        temporary: false,
2145        if_not_exists: false,
2146        columns: vec![],
2147        source_name,
2148        wildcard_idx: Some(0),
2149        constraints: vec![],
2150        with_properties: WithProperties(vec![]),
2151        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2152        source_watermarks: vec![],
2153        include_column_options: vec![],
2154    };
2155
2156    let mut source_handler_args = handler_args.clone();
2157    let source_with = with_common;
2158    source_handler_args.with_options =
2159        WithOptions::new(source_with, Default::default(), connection_ref);
2160
2161    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2162    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2163    let (with_properties, refresh_mode) =
2164        bind_connector_props(&source_handler_args, &format_encode, true)?;
2165
2166    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2167    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2168    let (iceberg_catalog, table_identifier) = {
2169        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2170        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2171        iceberg_sink.create_table_if_not_exists().await?;
2172
2173        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2174        let table_identifier = iceberg_sink.config.full_table_name()?;
2175        (iceberg_catalog, table_identifier)
2176    };
2177
2178    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2179    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2180        &session,
2181        &format_encode,
2182        Either::Left(&with_properties),
2183        create_source_type,
2184    )
2185    .await?;
2186    let mut col_id_gen = ColumnIdGenerator::new_initial();
2187
2188    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2189        source_handler_args,
2190        create_source_stmt.source_name,
2191        format_encode,
2192        with_properties,
2193        &create_source_stmt.columns,
2194        create_source_stmt.constraints,
2195        create_source_stmt.wildcard_idx,
2196        create_source_stmt.source_watermarks,
2197        columns_from_resolve_source,
2198        source_info,
2199        create_source_stmt.include_column_options,
2200        &mut col_id_gen,
2201        create_source_type,
2202        overwrite_options.source_rate_limit,
2203        SqlColumnStrategy::FollowChecked,
2204        refresh_mode,
2205    )
2206    .await?;
2207
2208    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2209    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2210    let _ = Jvm::get_or_init()?;
2211
2212    let catalog_writer = session.catalog_writer()?;
2213    let res = catalog_writer
2214        .create_iceberg_table(
2215            PbTableJobInfo {
2216                source,
2217                table: Some(table.to_prost()),
2218                fragment_graph: Some(graph),
2219                job_type: job_type as _,
2220            },
2221            PbSinkJobInfo {
2222                sink: Some(sink_catalog.to_proto()),
2223                fragment_graph: Some(sink_graph),
2224            },
2225            iceberg_source_catalog.to_prost(),
2226            if_not_exists,
2227        )
2228        .await;
2229    if res.is_err() {
2230        let _ = iceberg_catalog
2231            .drop_table(&table_identifier)
2232            .await
2233            .inspect_err(|err| {
2234                tracing::error!(
2235                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2236                    table_identifier,
2237                    err.as_report()
2238                );
2239            });
2240        res?
2241    }
2242
2243    Ok(())
2244}
2245
2246pub fn check_create_table_with_source(
2247    with_options: &WithOptions,
2248    format_encode: Option<FormatEncodeOptions>,
2249    include_column_options: &IncludeOption,
2250    cdc_table_info: &Option<CdcTableInfo>,
2251) -> Result<Option<FormatEncodeOptions>> {
2252    // skip check for cdc table
2253    if cdc_table_info.is_some() {
2254        return Ok(format_encode);
2255    }
2256    let defined_source = with_options.is_source_connector();
2257
2258    if !include_column_options.is_empty() && !defined_source {
2259        return Err(ErrorCode::InvalidInputSyntax(
2260            "INCLUDE should be used with a connector".to_owned(),
2261        )
2262        .into());
2263    }
2264    if defined_source {
2265        format_encode.as_ref().ok_or_else(|| {
2266            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2267        })?;
2268    }
2269    Ok(format_encode)
2270}
2271
2272fn ensure_partition_columns_are_prefix_of_primary_key(
2273    partition_columns: &[String],
2274    primary_key_columns: &[String],
2275) -> std::result::Result<(), String> {
2276    if partition_columns.len() > primary_key_columns.len() {
2277        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2278    }
2279
2280    for (i, partition_col) in partition_columns.iter().enumerate() {
2281        if primary_key_columns.get(i) != Some(partition_col) {
2282            return Err(format!(
2283                "Partition column '{}' is not a prefix of the primary key.",
2284                partition_col
2285            ));
2286        }
2287    }
2288
2289    Ok(())
2290}
2291
2292#[allow(clippy::too_many_arguments)]
2293pub async fn generate_stream_graph_for_replace_table(
2294    _session: &Arc<SessionImpl>,
2295    table_name: ObjectName,
2296    original_catalog: &Arc<TableCatalog>,
2297    handler_args: HandlerArgs,
2298    statement: Statement,
2299    col_id_gen: ColumnIdGenerator,
2300    sql_column_strategy: SqlColumnStrategy,
2301) -> Result<(
2302    StreamFragmentGraph,
2303    TableCatalog,
2304    Option<SourceCatalog>,
2305    TableJobType,
2306)> {
2307    let Statement::CreateTable {
2308        columns,
2309        constraints,
2310        source_watermarks,
2311        append_only,
2312        on_conflict,
2313        with_version_columns,
2314        wildcard_idx,
2315        cdc_table_info,
2316        format_encode,
2317        include_column_options,
2318        engine,
2319        with_options,
2320        ..
2321    } = statement
2322    else {
2323        panic!("unexpected statement type: {:?}", statement);
2324    };
2325
2326    let format_encode = format_encode
2327        .clone()
2328        .map(|format_encode| format_encode.into_v2_with_warning());
2329
2330    let engine = match engine {
2331        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2332        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2333    };
2334
2335    let is_drop_connector =
2336        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2337    if is_drop_connector {
2338        debug_assert!(
2339            source_watermarks.is_empty()
2340                && include_column_options.is_empty()
2341                && with_options
2342                    .iter()
2343                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2344        );
2345    }
2346
2347    let props = CreateTableProps {
2348        definition: handler_args.normalized_sql.clone(),
2349        append_only,
2350        on_conflict: on_conflict.into(),
2351        with_version_columns: with_version_columns
2352            .iter()
2353            .map(|col| col.real_value())
2354            .collect(),
2355        webhook_info: original_catalog.webhook_info.clone(),
2356        engine,
2357    };
2358
2359    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2360        (Some(format_encode), None) => (
2361            gen_create_table_plan_with_source(
2362                handler_args,
2363                ExplainOptions::default(),
2364                table_name,
2365                columns,
2366                wildcard_idx,
2367                constraints,
2368                format_encode,
2369                source_watermarks,
2370                col_id_gen,
2371                include_column_options,
2372                props,
2373                sql_column_strategy,
2374            )
2375            .await?,
2376            TableJobType::General,
2377        ),
2378        (None, None) => {
2379            let context = OptimizerContext::from_handler_args(handler_args);
2380            let (plan, table) = gen_create_table_plan(
2381                context,
2382                table_name,
2383                columns,
2384                constraints,
2385                col_id_gen,
2386                source_watermarks,
2387                props,
2388                true,
2389            )?;
2390            ((plan, None, table), TableJobType::General)
2391        }
2392        (None, Some(cdc_table)) => {
2393            let session = &handler_args.session;
2394            let (source, resolved_table_name, database_id, schema_id) =
2395                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2396
2397            let cdc_with_options = derive_with_options_for_cdc_table(
2398                &source.with_properties,
2399                cdc_table.external_table_name.clone(),
2400            )?;
2401
2402            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2403
2404            let context: OptimizerContextRef =
2405                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2406            let (plan, table) = gen_create_table_plan_for_cdc_table(
2407                context,
2408                source,
2409                cdc_table.external_table_name.clone(),
2410                columns,
2411                column_catalogs,
2412                pk_names,
2413                cdc_with_options,
2414                col_id_gen,
2415                on_conflict,
2416                with_version_columns
2417                    .iter()
2418                    .map(|col| col.real_value())
2419                    .collect(),
2420                include_column_options,
2421                table_name,
2422                resolved_table_name,
2423                database_id,
2424                schema_id,
2425                original_catalog.id(),
2426                engine,
2427            )?;
2428
2429            ((plan, None, table), TableJobType::SharedCdcSource)
2430        }
2431        (Some(_), Some(_)) => {
2432            return Err(ErrorCode::NotSupported(
2433                "Data format and encoding format doesn't apply to table created from a CDC source"
2434                    .into(),
2435                "Remove the FORMAT and ENCODE specification".into(),
2436            )
2437            .into());
2438        }
2439    };
2440
2441    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2442        Err(ErrorCode::InvalidInputSyntax(
2443            "alter primary key of table is not supported".to_owned(),
2444        ))?
2445    }
2446
2447    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2448
2449    // Fill the original table ID.
2450    table.id = original_catalog.id();
2451    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2452        table.associated_source_id = Some(source_id);
2453
2454        let source = source.as_mut().unwrap();
2455        source.id = source_id;
2456        source.associated_table_id = Some(table.id());
2457    }
2458
2459    Ok((graph, table, source, job_type))
2460}
2461
2462fn get_source_and_resolved_table_name(
2463    session: &Arc<SessionImpl>,
2464    cdc_table: CdcTableInfo,
2465    table_name: ObjectName,
2466) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2467    let db_name = &session.database();
2468    let (schema_name, resolved_table_name) =
2469        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2470    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
2471
2472    let (format_encode, source_name) =
2473        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2474
2475    let source = {
2476        let catalog_reader = session.env().catalog_reader().read_guard();
2477        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2478        let (source, _) = catalog_reader.get_source_by_name(
2479            db_name,
2480            SchemaPath::Name(schema_name.as_str()),
2481            source_name.as_str(),
2482        )?;
2483        source.clone()
2484    };
2485
2486    Ok((source, resolved_table_name, database_id, schema_id))
2487}
2488
2489// validate the webhook_info and also bind the webhook_info to protobuf
2490fn bind_webhook_info(
2491    session: &Arc<SessionImpl>,
2492    columns_defs: &[ColumnDef],
2493    webhook_info: WebhookSourceInfo,
2494) -> Result<PbWebhookSourceInfo> {
2495    // validate columns
2496    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2497    {
2498        return Err(ErrorCode::InvalidInputSyntax(
2499            "Table with webhook source should have exactly one JSONB column".to_owned(),
2500        )
2501        .into());
2502    }
2503
2504    let WebhookSourceInfo {
2505        secret_ref,
2506        signature_expr,
2507        wait_for_persistence,
2508        is_batched,
2509    } = webhook_info;
2510
2511    // validate secret_ref
2512    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2513        let db_name = &session.database();
2514        let (schema_name, secret_name) =
2515            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2516        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2517        (
2518            Some(PbSecretRef {
2519                secret_id: secret_catalog.id,
2520                ref_as: match secret_ref.ref_as {
2521                    SecretRefAsType::Text => PbRefAsType::Text,
2522                    SecretRefAsType::File => PbRefAsType::File,
2523                }
2524                .into(),
2525            }),
2526            Some(secret_name),
2527        )
2528    } else {
2529        (None, None)
2530    };
2531
2532    let secure_compare_context = SecureCompareContext {
2533        column_name: columns_defs[0].name.real_value(),
2534        secret_name,
2535    };
2536    let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2537    let expr = binder.bind_expr(&signature_expr)?;
2538
2539    // validate expr, ensuring it is SECURE_COMPARE()
2540    if expr.as_function_call().is_none()
2541        || expr.as_function_call().unwrap().func_type()
2542            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2543    {
2544        return Err(ErrorCode::InvalidInputSyntax(
2545            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2546        )
2547        .into());
2548    }
2549
2550    let pb_webhook_info = PbWebhookSourceInfo {
2551        secret_ref: pb_secret_ref,
2552        signature_expr: Some(expr.to_expr_proto()),
2553        wait_for_persistence,
2554        is_batched,
2555    };
2556
2557    Ok(pb_webhook_info)
2558}
2559
2560#[cfg(test)]
2561mod tests {
2562    use risingwave_common::catalog::{
2563        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2564    };
2565    use risingwave_common::types::{DataType, StructType};
2566
2567    use super::*;
2568    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2569
2570    #[tokio::test]
2571    async fn test_create_table_handler() {
2572        let sql =
2573            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2574        let frontend = LocalFrontend::new(Default::default()).await;
2575        frontend.run_sql(sql).await.unwrap();
2576
2577        let session = frontend.session_ref();
2578        let catalog_reader = session.env().catalog_reader().read_guard();
2579        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2580
2581        // Check table exists.
2582        let (table, _) = catalog_reader
2583            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2584            .unwrap();
2585        assert_eq!(table.name(), "t");
2586
2587        let columns = table
2588            .columns
2589            .iter()
2590            .map(|col| (col.name(), col.data_type().clone()))
2591            .collect::<HashMap<&str, DataType>>();
2592
2593        let expected_columns = maplit::hashmap! {
2594            ROW_ID_COLUMN_NAME => DataType::Serial,
2595            "v1" => DataType::Int16,
2596            "v2" => StructType::new(
2597                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2598            )
2599            .with_ids([3, 4, 5].map(ColumnId::new))
2600            .into(),
2601            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2602        };
2603
2604        assert_eq!(columns, expected_columns, "{columns:#?}");
2605    }
2606
2607    #[test]
2608    fn test_bind_primary_key() {
2609        // Note: Column ID 0 is reserved for row ID column.
2610
2611        for (sql, expected) in [
2612            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2613            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2614            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2615            (
2616                "create table t (v1 int primary key, v2 int primary key)",
2617                Err("multiple primary keys are not allowed"),
2618            ),
2619            (
2620                "create table t (v1 int primary key primary key, v2 int)",
2621                Err("multiple primary keys are not allowed"),
2622            ),
2623            (
2624                "create table t (v1 int, v2 int, primary key (v1))",
2625                Ok(&[1]),
2626            ),
2627            (
2628                "create table t (v1 int, primary key (v2), v2 int)",
2629                Ok(&[2]),
2630            ),
2631            (
2632                "create table t (primary key (v2, v1), v1 int, v2 int)",
2633                Ok(&[2, 1]),
2634            ),
2635            (
2636                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2637                Err("multiple primary keys are not allowed"),
2638            ),
2639            (
2640                "create table t (v1 int primary key, primary key (v1), v2 int)",
2641                Err("multiple primary keys are not allowed"),
2642            ),
2643            (
2644                "create table t (v1 int, primary key (V3), v2 int)",
2645                Err("column \"v3\" named in key does not exist"),
2646            ),
2647        ] {
2648            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2649            let risingwave_sqlparser::ast::Statement::CreateTable {
2650                columns: column_defs,
2651                constraints,
2652                ..
2653            } = ast.remove(0)
2654            else {
2655                panic!("test case should be create table")
2656            };
2657            let actual: Result<_> = (|| {
2658                let mut columns = bind_sql_columns(&column_defs, false)?;
2659                let mut col_id_gen = ColumnIdGenerator::new_initial();
2660                for c in &mut columns {
2661                    col_id_gen.generate(c)?;
2662                }
2663
2664                let pk_names =
2665                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2666                let (_, pk_column_ids, _) =
2667                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2668                Ok(pk_column_ids)
2669            })();
2670            match (expected, actual) {
2671                (Ok(expected), Ok(actual)) => assert_eq!(
2672                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2673                    actual,
2674                    "sql: {sql}"
2675                ),
2676                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2677                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2678                (Err(expected), Err(actual)) => assert!(
2679                    actual.to_string().contains(expected),
2680                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2681                ),
2682            }
2683        }
2684    }
2685
2686    #[tokio::test]
2687    async fn test_duplicate_props_options() {
2688        let proto_file = create_proto_file(PROTO_FILE_DATA);
2689        let sql = format!(
2690            r#"CREATE TABLE t
2691    WITH (
2692        connector = 'kinesis',
2693        aws.region='user_test_topic',
2694        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2695        aws.credentials.access_key_id = 'your_access_key_1',
2696        aws.credentials.secret_access_key = 'your_secret_key_1'
2697    )
2698    FORMAT PLAIN ENCODE PROTOBUF (
2699        message = '.test.TestRecord',
2700        aws.credentials.access_key_id = 'your_access_key_2',
2701        aws.credentials.secret_access_key = 'your_secret_key_2',
2702        schema.location = 'file://{}',
2703    )"#,
2704            proto_file.path().to_str().unwrap()
2705        );
2706        let frontend = LocalFrontend::new(Default::default()).await;
2707        frontend.run_sql(sql).await.unwrap();
2708
2709        let session = frontend.session_ref();
2710        let catalog_reader = session.env().catalog_reader().read_guard();
2711        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2712
2713        // Check source exists.
2714        let (source, _) = catalog_reader
2715            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2716            .unwrap();
2717        assert_eq!(source.name, "t");
2718
2719        // AwsAuth params exist in options.
2720        assert_eq!(
2721            source
2722                .info
2723                .format_encode_options
2724                .get("aws.credentials.access_key_id")
2725                .unwrap(),
2726            "your_access_key_2"
2727        );
2728        assert_eq!(
2729            source
2730                .info
2731                .format_encode_options
2732                .get("aws.credentials.secret_access_key")
2733                .unwrap(),
2734            "your_secret_key_2"
2735        );
2736
2737        // AwsAuth params exist in props.
2738        assert_eq!(
2739            source
2740                .with_properties
2741                .get("aws.credentials.access_key_id")
2742                .unwrap(),
2743            "your_access_key_1"
2744        );
2745        assert_eq!(
2746            source
2747                .with_properties
2748                .get("aws.credentials.secret_access_key")
2749                .unwrap(),
2750            "your_secret_key_1"
2751        );
2752
2753        // Options are not merged into props.
2754        assert!(!source.with_properties.contains_key("schema.location"));
2755    }
2756}