risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
41    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76    bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
80use crate::optimizer::plan_node::{
81    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
82};
83use crate::optimizer::property::{Order, RequiredDist};
84use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
85use crate::session::SessionImpl;
86use crate::session::current::notice_to_user;
87use crate::stream_fragmenter::{GraphJobType, build_graph};
88use crate::utils::OverwriteOptions;
89use crate::{Binder, Explain, TableCatalog, WithOptions};
90
91mod col_id_gen;
92pub use col_id_gen::*;
93use risingwave_connector::sink::iceberg::{
94    COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
95    ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ,
96    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
97    SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
98    parse_partition_by_exprs,
99};
100
101use crate::handler::drop_table::handle_drop_table;
102
103fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
104    for option_def in &c.options {
105        match option_def.option {
106            ColumnOption::GeneratedColumns(_) => {}
107            ColumnOption::DefaultValue(_) => {}
108            ColumnOption::DefaultValueInternal { .. } => {}
109            ColumnOption::Unique { is_primary: true } => {}
110            ColumnOption::Null => {}
111            ColumnOption::NotNull => {}
112            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
113        }
114    }
115    Ok(())
116}
117
118/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
119/// If a column is marked as `primary key`, its `ColumnId` is also returned.
120/// This primary key is not combined with table constraints yet.
121pub fn bind_sql_columns(
122    column_defs: &[ColumnDef],
123    is_for_drop_table_connector: bool,
124) -> Result<Vec<ColumnCatalog>> {
125    let mut columns = Vec::with_capacity(column_defs.len());
126
127    for column in column_defs {
128        ensure_column_options_supported(column)?;
129        // Destruct to make sure all fields are properly handled rather than ignored.
130        // Do NOT use `..` to ignore fields you do not want to deal with.
131        // Reject them with a clear NotImplemented error.
132        let ColumnDef {
133            name,
134            data_type,
135            collation,
136            options,
137            ..
138        } = column;
139
140        let data_type = data_type
141            .clone()
142            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
143        if let Some(collation) = collation {
144            // PostgreSQL will limit the datatypes that collate can work on.
145            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
146            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
147            //
148            // But we don't support real collation, we simply ignore it here.
149            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
150                bail_not_implemented!(
151                    "Collate collation other than `C` or `POSIX` is not implemented"
152                );
153            }
154
155            match data_type {
156                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
157                _ => {
158                    return Err(ErrorCode::NotSupported(
159                        format!("{} is not a collatable data type", data_type),
160                        "The only built-in collatable data types are `varchar`, please check your type".into(),
161                    ).into());
162                }
163            }
164        }
165
166        if !is_for_drop_table_connector {
167            // additional column name may have prefix _rw
168            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
169            // Under this case, we loosen the check for _rw prefix.
170            check_column_name_not_reserved(&name.real_value())?;
171        }
172
173        let nullable: bool = !options
174            .iter()
175            .any(|def| matches!(def.option, ColumnOption::NotNull));
176
177        columns.push(ColumnCatalog {
178            column_desc: ColumnDesc {
179                data_type: bind_data_type(&data_type)?,
180                column_id: ColumnId::placeholder(),
181                name: name.real_value(),
182                generated_or_default_column: None,
183                description: None,
184                additional_column: AdditionalColumn { column_type: None },
185                version: ColumnDescVersion::LATEST,
186                system_column: None,
187                nullable,
188            },
189            is_hidden: false,
190        });
191    }
192
193    Ok(columns)
194}
195
196fn check_generated_column_constraints(
197    column_name: &String,
198    column_id: ColumnId,
199    expr: &ExprImpl,
200    column_catalogs: &[ColumnCatalog],
201    generated_column_names: &[String],
202    pk_column_ids: &[ColumnId],
203) -> Result<()> {
204    let input_refs = expr.collect_input_refs(column_catalogs.len());
205    for idx in input_refs.ones() {
206        let referred_generated_column = &column_catalogs[idx].column_desc.name;
207        if generated_column_names
208            .iter()
209            .any(|c| c == referred_generated_column)
210        {
211            return Err(ErrorCode::BindError(format!(
212                "Generated can not reference another generated column. \
213                But here generated column \"{}\" referenced another generated column \"{}\"",
214                column_name, referred_generated_column
215            ))
216            .into());
217        }
218    }
219
220    if pk_column_ids.contains(&column_id) && expr.is_impure() {
221        return Err(ErrorCode::BindError(format!(
222            "Generated columns with impure expressions should not be part of the primary key. \
223            Here column \"{}\" is defined as part of the primary key.",
224            column_name
225        ))
226        .into());
227    }
228
229    Ok(())
230}
231
232/// Binds constraints that can be only specified in column definitions,
233/// currently generated columns and default columns.
234pub fn bind_sql_column_constraints(
235    session: &SessionImpl,
236    table_name: String,
237    column_catalogs: &mut [ColumnCatalog],
238    columns: &[ColumnDef],
239    pk_column_ids: &[ColumnId],
240) -> Result<()> {
241    let generated_column_names = {
242        let mut names = vec![];
243        for column in columns {
244            for option_def in &column.options {
245                if let ColumnOption::GeneratedColumns(_) = option_def.option {
246                    names.push(column.name.real_value());
247                    break;
248                }
249            }
250        }
251        names
252    };
253
254    let mut binder = Binder::new_for_ddl(session);
255    binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
256
257    for column in columns {
258        let Some(idx) = column_catalogs
259            .iter()
260            .position(|c| c.name() == column.name.real_value())
261        else {
262            // It's possible that we don't follow the user defined columns in SQL but take the
263            // ones resolved from the source, thus missing some columns. Simply ignore them.
264            continue;
265        };
266
267        for option_def in &column.options {
268            match &option_def.option {
269                ColumnOption::GeneratedColumns(expr) => {
270                    binder.set_clause(Some(Clause::GeneratedColumn));
271
272                    let expr_impl = binder.bind_expr(expr).with_context(|| {
273                        format!(
274                            "fail to bind expression in generated column \"{}\"",
275                            column.name.real_value()
276                        )
277                    })?;
278
279                    check_generated_column_constraints(
280                        &column.name.real_value(),
281                        column_catalogs[idx].column_id(),
282                        &expr_impl,
283                        column_catalogs,
284                        &generated_column_names,
285                        pk_column_ids,
286                    )?;
287
288                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
289                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
290                            expr: Some(expr_impl.to_expr_proto()),
291                        }),
292                    );
293                    binder.set_clause(None);
294                }
295                ColumnOption::DefaultValue(expr) => {
296                    let expr_impl = binder
297                        .bind_expr(expr)?
298                        .cast_assign(column_catalogs[idx].data_type())?;
299
300                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
301                    // schema change.
302                    //
303                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
304                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
305                    // here. Actually we do not require purity of the expression here since we're only to get a
306                    // snapshot value.
307                    let rewritten_expr_impl = session
308                        .pinned_snapshot()
309                        .inline_now_proc_time()
310                        .rewrite_expr(expr_impl.clone());
311
312                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
313                        let snapshot_value = snapshot_value?;
314
315                        column_catalogs[idx].column_desc.generated_or_default_column =
316                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
317                                snapshot_value: Some(snapshot_value.to_protobuf()),
318                                expr: Some(expr_impl.to_expr_proto()),
319                                // persist the original expression
320                            }));
321                    } else {
322                        return Err(ErrorCode::BindError(format!(
323                            "Default expression used in column `{}` cannot be evaluated. \
324                            Use generated columns instead if you mean to reference other columns.",
325                            column.name
326                        ))
327                        .into());
328                    }
329                }
330                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
331                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
332                    // should already be set during purifcation. So if we encounter an empty value here, it
333                    // means the user has specified it explicitly in the SQL statement, typically by
334                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
335                    if persisted.is_empty() {
336                        bail_bind_error!(
337                            "DEFAULT INTERNAL is only used for internal purposes, \
338                             please specify a concrete default value"
339                        );
340                    }
341
342                    let desc = DefaultColumnDesc::decode(&**persisted)
343                        .expect("failed to decode persisted `DefaultColumnDesc`");
344
345                    column_catalogs[idx].column_desc.generated_or_default_column =
346                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
347                }
348                _ => {}
349            }
350        }
351    }
352    Ok(())
353}
354
355/// Currently we only support Primary key table constraint, so just return pk names if it exists
356pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
357    let mut pk_column_names = vec![];
358
359    for constraint in table_constraints {
360        match constraint {
361            TableConstraint::Unique {
362                name: _,
363                columns,
364                is_primary: true,
365            } => {
366                if !pk_column_names.is_empty() {
367                    return Err(multiple_pk_definition_err());
368                }
369                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
370            }
371            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
372        }
373    }
374    Ok(pk_column_names)
375}
376
377pub fn bind_sql_pk_names(
378    columns_defs: &[ColumnDef],
379    pk_names_from_table_constraints: Vec<String>,
380) -> Result<Vec<String>> {
381    let mut pk_column_names = pk_names_from_table_constraints;
382
383    for column in columns_defs {
384        for option_def in &column.options {
385            if let ColumnOption::Unique { is_primary: true } = option_def.option {
386                if !pk_column_names.is_empty() {
387                    return Err(multiple_pk_definition_err());
388                }
389                pk_column_names.push(column.name.real_value());
390            };
391        }
392    }
393
394    Ok(pk_column_names)
395}
396
397fn multiple_pk_definition_err() -> RwError {
398    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
399}
400
401/// Binds primary keys defined in SQL.
402///
403/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
404/// added.
405pub fn bind_pk_and_row_id_on_relation(
406    mut columns: Vec<ColumnCatalog>,
407    pk_names: Vec<String>,
408    must_need_pk: bool,
409) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
410    for c in &columns {
411        assert!(c.column_id() != ColumnId::placeholder());
412    }
413
414    // Mapping from column name to column id.
415    let name_to_id = columns
416        .iter()
417        .map(|c| (c.name(), c.column_id()))
418        .collect::<HashMap<_, _>>();
419
420    let mut pk_column_ids: Vec<_> = pk_names
421        .iter()
422        .map(|name| {
423            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
424                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
425            })
426        })
427        .try_collect()?;
428
429    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
430    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
431
432    let row_id_index = need_row_id.then(|| {
433        let column = ColumnCatalog::row_id_column();
434        let index = columns.len();
435        pk_column_ids = vec![column.column_id()];
436        columns.push(column);
437        index
438    });
439
440    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
441        Err(ErrorCode::InvalidInputSyntax(format!(
442            "column \"{col}\" specified more than once"
443        )))?;
444    }
445
446    Ok((columns, pk_column_ids, row_id_index))
447}
448
449/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
450/// stream source.
451#[allow(clippy::too_many_arguments)]
452pub(crate) async fn gen_create_table_plan_with_source(
453    mut handler_args: HandlerArgs,
454    explain_options: ExplainOptions,
455    table_name: ObjectName,
456    column_defs: Vec<ColumnDef>,
457    wildcard_idx: Option<usize>,
458    constraints: Vec<TableConstraint>,
459    format_encode: FormatEncodeOptions,
460    source_watermarks: Vec<SourceWatermark>,
461    mut col_id_gen: ColumnIdGenerator,
462    include_column_options: IncludeOption,
463    props: CreateTableProps,
464    sql_column_strategy: SqlColumnStrategy,
465) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
466    if props.append_only
467        && format_encode.format != Format::Plain
468        && format_encode.format != Format::Native
469    {
470        return Err(ErrorCode::BindError(format!(
471            "Append only table does not support format {}.",
472            format_encode.format
473        ))
474        .into());
475    }
476
477    let session = &handler_args.session;
478    let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
479    if with_properties.is_shareable_cdc_connector() {
480        generated_columns_check_for_cdc_table(&column_defs)?;
481        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
482    } else if column_defs.iter().any(|col| {
483        col.options
484            .iter()
485            .any(|def| matches!(def.option, ColumnOption::NotNull))
486    }) {
487        // if non-cdc source
488        notice_to_user(
489            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
490        );
491    }
492
493    let db_name: &str = &session.database();
494    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
495
496    // TODO: omit this step if `sql_column_strategy` is `Follow`.
497    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
498        session,
499        &format_encode,
500        Either::Left(&with_properties),
501        CreateSourceType::Table,
502    )
503    .await?;
504
505    let overwrite_options = OverwriteOptions::new(&mut handler_args);
506    let rate_limit = overwrite_options.source_rate_limit;
507    let source = bind_create_source_or_table_with_connector(
508        handler_args.clone(),
509        table_name,
510        format_encode,
511        with_properties,
512        &column_defs,
513        constraints,
514        wildcard_idx,
515        source_watermarks,
516        columns_from_resolve_source,
517        source_info,
518        include_column_options,
519        &mut col_id_gen,
520        CreateSourceType::Table,
521        rate_limit,
522        sql_column_strategy,
523    )
524    .await?;
525
526    let context = OptimizerContext::new(handler_args, explain_options);
527
528    let (plan, table) = gen_table_plan_with_source(
529        context.into(),
530        schema_name,
531        source.clone(),
532        col_id_gen.into_version(),
533        props,
534    )?;
535
536    Ok((plan, Some(source), table))
537}
538
539/// `gen_create_table_plan` generates the plan for creating a table without an external stream
540/// source.
541#[allow(clippy::too_many_arguments)]
542pub(crate) fn gen_create_table_plan(
543    context: OptimizerContext,
544    table_name: ObjectName,
545    column_defs: Vec<ColumnDef>,
546    constraints: Vec<TableConstraint>,
547    mut col_id_gen: ColumnIdGenerator,
548    source_watermarks: Vec<SourceWatermark>,
549    props: CreateTableProps,
550    is_for_replace_plan: bool,
551) -> Result<(PlanRef, TableCatalog)> {
552    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
553    for c in &mut columns {
554        col_id_gen.generate(c)?;
555    }
556
557    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
558    if !secret_refs.is_empty() || !connection_refs.is_empty() {
559        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
560    }
561
562    gen_create_table_plan_without_source(
563        context,
564        table_name,
565        columns,
566        column_defs,
567        constraints,
568        source_watermarks,
569        col_id_gen.into_version(),
570        props,
571    )
572}
573
574#[allow(clippy::too_many_arguments)]
575pub(crate) fn gen_create_table_plan_without_source(
576    context: OptimizerContext,
577    table_name: ObjectName,
578    columns: Vec<ColumnCatalog>,
579    column_defs: Vec<ColumnDef>,
580    constraints: Vec<TableConstraint>,
581    source_watermarks: Vec<SourceWatermark>,
582    version: TableVersion,
583    props: CreateTableProps,
584) -> Result<(PlanRef, TableCatalog)> {
585    // XXX: Why not bind outside?
586    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
587    let (mut columns, pk_column_ids, row_id_index) =
588        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
589
590    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
591        context.session_ctx(),
592        table_name.real_value(),
593        source_watermarks,
594        &columns,
595    )?;
596
597    bind_sql_column_constraints(
598        context.session_ctx(),
599        table_name.real_value(),
600        &mut columns,
601        &column_defs,
602        &pk_column_ids,
603    )?;
604    let session = context.session_ctx().clone();
605
606    let db_name = &session.database();
607    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
608
609    let info = CreateTableInfo {
610        columns,
611        pk_column_ids,
612        row_id_index,
613        watermark_descs,
614        source_catalog: None,
615        version,
616    };
617
618    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
619}
620
621fn gen_table_plan_with_source(
622    context: OptimizerContextRef,
623    schema_name: Option<String>,
624    source_catalog: SourceCatalog,
625    version: TableVersion,
626    props: CreateTableProps,
627) -> Result<(PlanRef, TableCatalog)> {
628    let table_name = source_catalog.name.clone();
629
630    let info = CreateTableInfo {
631        columns: source_catalog.columns.clone(),
632        pk_column_ids: source_catalog.pk_col_ids.clone(),
633        row_id_index: source_catalog.row_id_index,
634        watermark_descs: source_catalog.watermark_descs.clone(),
635        source_catalog: Some(source_catalog),
636        version,
637    };
638
639    gen_table_plan_inner(context, schema_name, table_name, info, props)
640}
641
642/// On-conflict behavior either from user input or existing table catalog.
643#[derive(Clone, Copy)]
644pub enum EitherOnConflict {
645    Ast(Option<OnConflict>),
646    Resolved(ConflictBehavior),
647}
648
649impl From<Option<OnConflict>> for EitherOnConflict {
650    fn from(v: Option<OnConflict>) -> Self {
651        Self::Ast(v)
652    }
653}
654
655impl From<ConflictBehavior> for EitherOnConflict {
656    fn from(v: ConflictBehavior) -> Self {
657        Self::Resolved(v)
658    }
659}
660
661impl EitherOnConflict {
662    /// Resolves the conflict behavior based on the given information.
663    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
664        let conflict_behavior = match self {
665            EitherOnConflict::Ast(on_conflict) => {
666                if append_only {
667                    if row_id_as_pk {
668                        // Primary key will be generated, no conflict check needed.
669                        ConflictBehavior::NoCheck
670                    } else {
671                        // User defined PK on append-only table, enforce `DO NOTHING`.
672                        if let Some(on_conflict) = on_conflict
673                            && on_conflict != OnConflict::Nothing
674                        {
675                            return Err(ErrorCode::InvalidInputSyntax(
676                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
677                                     the ON CONFLICT behavior must be DO NOTHING."
678                                    .to_owned(),
679                            )
680                            .into());
681                        }
682                        ConflictBehavior::IgnoreConflict
683                    }
684                } else {
685                    // Default to `UPDATE FULL` for non-append-only tables.
686                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
687                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
688                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
689                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
690                    }
691                }
692            }
693            EitherOnConflict::Resolved(b) => b,
694        };
695
696        Ok(conflict_behavior)
697    }
698}
699
700/// Arguments of the functions that generate a table plan, part 1.
701///
702/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
703/// or resolving based on the user input.
704pub struct CreateTableInfo {
705    pub columns: Vec<ColumnCatalog>,
706    pub pk_column_ids: Vec<ColumnId>,
707    pub row_id_index: Option<usize>,
708    pub watermark_descs: Vec<WatermarkDesc>,
709    pub source_catalog: Option<SourceCatalog>,
710    pub version: TableVersion,
711}
712
713/// Arguments of the functions that generate a table plan, part 2.
714///
715/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
716/// obtained from the input or the context.
717pub struct CreateTableProps {
718    pub definition: String,
719    pub append_only: bool,
720    pub on_conflict: EitherOnConflict,
721    pub with_version_columns: Vec<String>,
722    pub webhook_info: Option<PbWebhookSourceInfo>,
723    pub engine: Engine,
724}
725
726#[allow(clippy::too_many_arguments)]
727fn gen_table_plan_inner(
728    context: OptimizerContextRef,
729    schema_name: Option<String>,
730    table_name: String,
731    info: CreateTableInfo,
732    props: CreateTableProps,
733) -> Result<(PlanRef, TableCatalog)> {
734    let CreateTableInfo {
735        ref columns,
736        row_id_index,
737        ref watermark_descs,
738        ref source_catalog,
739        ..
740    } = info;
741    let CreateTableProps { append_only, .. } = props;
742
743    let (database_id, schema_id) = context
744        .session_ctx()
745        .get_database_and_schema_id_for_create(schema_name)?;
746
747    let session = context.session_ctx().clone();
748    let retention_seconds = context.with_options().retention_seconds();
749
750    let source_node: LogicalPlanRef = LogicalSource::new(
751        source_catalog.clone().map(Rc::new),
752        columns.clone(),
753        row_id_index,
754        SourceNodeKind::CreateTable,
755        context.clone(),
756        None,
757    )?
758    .into();
759
760    let required_cols = FixedBitSet::with_capacity(columns.len());
761    let plan_root = PlanRoot::new_with_logical_plan(
762        source_node,
763        RequiredDist::Any,
764        Order::any(),
765        required_cols,
766        vec![],
767    );
768
769    if !append_only && !watermark_descs.is_empty() {
770        return Err(ErrorCode::NotSupported(
771            "Defining watermarks on table requires the table to be append only.".to_owned(),
772            "Use the key words `APPEND ONLY`".to_owned(),
773        )
774        .into());
775    }
776
777    if !append_only && retention_seconds.is_some() {
778        return Err(ErrorCode::NotSupported(
779            "Defining retention seconds on table requires the table to be append only.".to_owned(),
780            "Use the key words `APPEND ONLY`".to_owned(),
781        )
782        .into());
783    }
784
785    let materialize =
786        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
787
788    let mut table = materialize.table().clone();
789    table.owner = session.user_id();
790
791    Ok((materialize.into(), table))
792}
793
794/// Generate stream plan for cdc table based on shared source.
795/// In replace workflow, the `table_id` is the id of the table to be replaced
796/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
797#[allow(clippy::too_many_arguments)]
798pub(crate) fn gen_create_table_plan_for_cdc_table(
799    context: OptimizerContextRef,
800    source: Arc<SourceCatalog>,
801    external_table_name: String,
802    column_defs: Vec<ColumnDef>,
803    mut columns: Vec<ColumnCatalog>,
804    pk_names: Vec<String>,
805    cdc_with_options: WithOptionsSecResolved,
806    mut col_id_gen: ColumnIdGenerator,
807    on_conflict: Option<OnConflict>,
808    with_version_columns: Vec<String>,
809    include_column_options: IncludeOption,
810    table_name: ObjectName,
811    resolved_table_name: String, // table name without schema prefix
812    database_id: DatabaseId,
813    schema_id: SchemaId,
814    table_id: TableId,
815    engine: Engine,
816) -> Result<(PlanRef, TableCatalog)> {
817    let session = context.session_ctx().clone();
818
819    // append additional columns to the end
820    handle_addition_columns(
821        None,
822        &cdc_with_options,
823        include_column_options,
824        &mut columns,
825        true,
826    )?;
827
828    for c in &mut columns {
829        col_id_gen.generate(c)?;
830    }
831
832    let (mut columns, pk_column_ids, _row_id_index) =
833        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
834
835    // NOTES: In auto schema change, default value is not provided in column definition.
836    bind_sql_column_constraints(
837        context.session_ctx(),
838        table_name.real_value(),
839        &mut columns,
840        &column_defs,
841        &pk_column_ids,
842    )?;
843
844    let definition = context.normalized_sql().to_owned();
845
846    let pk_column_indices = {
847        let mut id_to_idx = HashMap::new();
848        columns.iter().enumerate().for_each(|(idx, c)| {
849            id_to_idx.insert(c.column_id(), idx);
850        });
851        // pk column id must exist in table columns.
852        pk_column_ids
853            .iter()
854            .map(|c| id_to_idx.get(c).copied().unwrap())
855            .collect_vec()
856    };
857    let table_pk = pk_column_indices
858        .iter()
859        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
860        .collect();
861
862    let (options, secret_refs) = cdc_with_options.into_parts();
863
864    let non_generated_column_descs = columns
865        .iter()
866        .filter(|&c| (!c.is_generated()))
867        .map(|c| c.column_desc.clone())
868        .collect_vec();
869    let non_generated_column_num = non_generated_column_descs.len();
870    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
871    let cdc_table_desc = CdcTableDesc {
872        table_id,
873        source_id: source.id.into(), // id of cdc source streaming job
874        external_table_name: external_table_name.clone(),
875        pk: table_pk,
876        columns: non_generated_column_descs,
877        stream_key: pk_column_indices,
878        connect_properties: options,
879        secret_refs,
880    };
881
882    tracing::debug!(?cdc_table_desc, "create cdc table");
883    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
884
885    let logical_scan = LogicalCdcScan::create(
886        external_table_name.clone(),
887        Rc::new(cdc_table_desc),
888        context.clone(),
889        options,
890    );
891
892    let scan_node: LogicalPlanRef = logical_scan.into();
893    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
894    let plan_root = PlanRoot::new_with_logical_plan(
895        scan_node,
896        RequiredDist::Any,
897        Order::any(),
898        required_cols,
899        vec![],
900    );
901
902    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
903    let materialize = plan_root.gen_table_plan(
904        context,
905        resolved_table_name,
906        database_id,
907        schema_id,
908        CreateTableInfo {
909            columns,
910            pk_column_ids,
911            row_id_index: None,
912            watermark_descs: vec![],
913            source_catalog: Some((*source).clone()),
914            version: col_id_gen.into_version(),
915        },
916        CreateTableProps {
917            definition,
918            append_only: false,
919            on_conflict: on_conflict.into(),
920            with_version_columns,
921            webhook_info: None,
922            engine,
923        },
924    )?;
925
926    let mut table = materialize.table().clone();
927    table.owner = session.user_id();
928    table.cdc_table_id = Some(cdc_table_id);
929    table.cdc_table_type = Some(cdc_table_type);
930    Ok((materialize.into(), table))
931}
932
933fn derive_with_options_for_cdc_table(
934    source_with_properties: &WithOptionsSecResolved,
935    external_table_name: String,
936) -> Result<WithOptionsSecResolved> {
937    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
938    // we should remove the prefix from `full_table_name`
939    let source_database_name: &str = source_with_properties
940        .get("database.name")
941        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
942        .as_str();
943    let mut with_options = source_with_properties.clone();
944    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
945        match connector.as_str() {
946            MYSQL_CDC_CONNECTOR => {
947                // MySQL doesn't allow '.' in database name and table name, so we can split the
948                // external table name by '.' to get the table name
949                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
950                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
951                })?;
952                // We allow multiple database names in the source definition
953                if !source_database_name
954                    .split(',')
955                    .map(|s| s.trim())
956                    .any(|name| name == db_name)
957                {
958                    return Err(anyhow!(
959                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
960                        db_name,
961                        source_database_name
962                    ).into());
963                }
964                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
965                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
966            }
967            POSTGRES_CDC_CONNECTOR => {
968                let (schema_name, table_name) = external_table_name
969                    .split_once('.')
970                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
971
972                // insert 'schema.name' into connect properties
973                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
974                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
975            }
976            SQL_SERVER_CDC_CONNECTOR => {
977                // SQL Server external table name can be in different formats:
978                // 1. 'databaseName.schemaName.tableName' (full format)
979                // 2. 'schemaName.tableName' (schema and table only)
980                // 3. 'tableName' (table only, will use default schema 'dbo')
981                // We will auto-fill missing parts from source configuration
982                let parts: Vec<&str> = external_table_name.split('.').collect();
983                let (_, schema_name, table_name) = match parts.len() {
984                    3 => {
985                        // Full format: database.schema.table
986                        let db_name = parts[0];
987                        let schema_name = parts[1];
988                        let table_name = parts[2];
989
990                        // Verify database name matches source configuration
991                        if db_name != source_database_name {
992                            return Err(anyhow!(
993                                "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
994                                db_name,
995                                source_database_name
996                            ).into());
997                        }
998                        (db_name, schema_name, table_name)
999                    }
1000                    2 => {
1001                        // Schema and table only: schema.table
1002                        let schema_name = parts[0];
1003                        let table_name = parts[1];
1004                        (source_database_name, schema_name, table_name)
1005                    }
1006                    1 => {
1007                        // Table only: table (use default schema 'dbo')
1008                        let table_name = parts[0];
1009                        (source_database_name, "dbo", table_name)
1010                    }
1011                    _ => {
1012                        return Err(anyhow!(
1013                            "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1014                        ).into());
1015                    }
1016                };
1017
1018                // insert 'schema.name' into connect properties
1019                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1020                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1021            }
1022            _ => {
1023                return Err(RwError::from(anyhow!(
1024                    "connector {} is not supported for cdc table",
1025                    connector
1026                )));
1027            }
1028        };
1029    }
1030    Ok(with_options)
1031}
1032
1033#[allow(clippy::too_many_arguments)]
1034pub(super) async fn handle_create_table_plan(
1035    handler_args: HandlerArgs,
1036    explain_options: ExplainOptions,
1037    format_encode: Option<FormatEncodeOptions>,
1038    cdc_table_info: Option<CdcTableInfo>,
1039    table_name: &ObjectName,
1040    column_defs: Vec<ColumnDef>,
1041    wildcard_idx: Option<usize>,
1042    constraints: Vec<TableConstraint>,
1043    source_watermarks: Vec<SourceWatermark>,
1044    append_only: bool,
1045    on_conflict: Option<OnConflict>,
1046    with_version_columns: Vec<String>,
1047    include_column_options: IncludeOption,
1048    webhook_info: Option<WebhookSourceInfo>,
1049    engine: Engine,
1050) -> Result<(
1051    PlanRef,
1052    Option<SourceCatalog>,
1053    TableCatalog,
1054    TableJobType,
1055    Option<SourceId>,
1056)> {
1057    let col_id_gen = ColumnIdGenerator::new_initial();
1058    let format_encode = check_create_table_with_source(
1059        &handler_args.with_options,
1060        format_encode,
1061        &include_column_options,
1062        &cdc_table_info,
1063    )?;
1064    let webhook_info = webhook_info
1065        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1066        .transpose()?;
1067
1068    let props = CreateTableProps {
1069        definition: handler_args.normalized_sql.clone(),
1070        append_only,
1071        on_conflict: on_conflict.into(),
1072        with_version_columns: with_version_columns.clone(),
1073        webhook_info,
1074        engine,
1075    };
1076
1077    let ((plan, source, table), job_type, shared_shource_id) = match (
1078        format_encode,
1079        cdc_table_info.as_ref(),
1080    ) {
1081        (Some(format_encode), None) => (
1082            gen_create_table_plan_with_source(
1083                handler_args,
1084                explain_options,
1085                table_name.clone(),
1086                column_defs,
1087                wildcard_idx,
1088                constraints,
1089                format_encode,
1090                source_watermarks,
1091                col_id_gen,
1092                include_column_options,
1093                props,
1094                SqlColumnStrategy::FollowChecked,
1095            )
1096            .await?,
1097            TableJobType::General,
1098            None,
1099        ),
1100        (None, None) => {
1101            let context = OptimizerContext::new(handler_args, explain_options);
1102            let (plan, table) = gen_create_table_plan(
1103                context,
1104                table_name.clone(),
1105                column_defs,
1106                constraints,
1107                col_id_gen,
1108                source_watermarks,
1109                props,
1110                false,
1111            )?;
1112
1113            ((plan, None, table), TableJobType::General, None)
1114        }
1115
1116        (None, Some(cdc_table)) => {
1117            sanity_check_for_table_on_cdc_source(
1118                append_only,
1119                &column_defs,
1120                &wildcard_idx,
1121                &constraints,
1122                &source_watermarks,
1123            )?;
1124
1125            generated_columns_check_for_cdc_table(&column_defs)?;
1126            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1127
1128            let session = &handler_args.session;
1129            let db_name = &session.database();
1130            let user_name = &session.user_name();
1131            let search_path = session.config().search_path();
1132            let (schema_name, resolved_table_name) =
1133                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1134            let (database_id, schema_id) =
1135                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1136
1137            // cdc table cannot be append-only
1138            let (format_encode, source_name) =
1139                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1140
1141            let source = {
1142                let catalog_reader = session.env().catalog_reader().read_guard();
1143                let schema_path =
1144                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1145
1146                let (source, _) = catalog_reader.get_source_by_name(
1147                    db_name,
1148                    schema_path,
1149                    source_name.as_str(),
1150                )?;
1151                source.clone()
1152            };
1153            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1154                &source.with_properties,
1155                cdc_table.external_table_name.clone(),
1156            )?;
1157
1158            let (columns, pk_names) = match wildcard_idx {
1159                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1160                None => {
1161                    for column_def in &column_defs {
1162                        for option_def in &column_def.options {
1163                            if let ColumnOption::DefaultValue(_)
1164                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1165                            {
1166                                return Err(ErrorCode::NotSupported(
1167                                            "Default value for columns defined on the table created from a CDC source".into(),
1168                                            "Remove the default value expression in the column definitions".into(),
1169                                        )
1170                                            .into());
1171                            }
1172                        }
1173                    }
1174
1175                    let (columns, pk_names) =
1176                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1177                    // read default value definition from external db
1178                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1179                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1180                        .context("failed to extract external table config")?;
1181
1182                    (columns, pk_names)
1183                }
1184            };
1185
1186            let context: OptimizerContextRef =
1187                OptimizerContext::new(handler_args, explain_options).into();
1188            let shared_source_id = source.id;
1189            let (plan, table) = gen_create_table_plan_for_cdc_table(
1190                context,
1191                source,
1192                cdc_table.external_table_name.clone(),
1193                column_defs,
1194                columns,
1195                pk_names,
1196                cdc_with_options,
1197                col_id_gen,
1198                on_conflict,
1199                with_version_columns,
1200                include_column_options,
1201                table_name.clone(),
1202                resolved_table_name,
1203                database_id,
1204                schema_id,
1205                TableId::placeholder(),
1206                engine,
1207            )?;
1208
1209            (
1210                (plan, None, table),
1211                TableJobType::SharedCdcSource,
1212                Some(shared_source_id),
1213            )
1214        }
1215        (Some(_), Some(_)) => {
1216            return Err(ErrorCode::NotSupported(
1217                "Data format and encoding format doesn't apply to table created from a CDC source"
1218                    .into(),
1219                "Remove the FORMAT and ENCODE specification".into(),
1220            )
1221            .into());
1222        }
1223    };
1224    Ok((plan, source, table, job_type, shared_shource_id))
1225}
1226
1227// For both table from cdc source and table with cdc connector
1228fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1229    let mut found_generated_column = false;
1230    for column in columns {
1231        let mut is_generated = false;
1232
1233        for option_def in &column.options {
1234            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1235                is_generated = true;
1236                break;
1237            }
1238        }
1239
1240        if is_generated {
1241            found_generated_column = true;
1242        } else if found_generated_column {
1243            return Err(ErrorCode::NotSupported(
1244                "Non-generated column found after a generated column.".into(),
1245                "Ensure that all generated columns appear at the end of the cdc table definition."
1246                    .into(),
1247            )
1248            .into());
1249        }
1250    }
1251    Ok(())
1252}
1253
1254// For both table from cdc source and table with cdc connector
1255fn not_null_check_for_cdc_table(
1256    wildcard_idx: &Option<usize>,
1257    column_defs: &Vec<ColumnDef>,
1258) -> Result<()> {
1259    if !wildcard_idx.is_some()
1260        && column_defs.iter().any(|col| {
1261            col.options
1262                .iter()
1263                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1264        })
1265    {
1266        return Err(ErrorCode::NotSupported(
1267            "CDC table with NOT NULL constraint is not supported".to_owned(),
1268            "Please remove the NOT NULL constraint for columns".to_owned(),
1269        )
1270        .into());
1271    }
1272    Ok(())
1273}
1274
1275// Only for table from cdc source
1276fn sanity_check_for_table_on_cdc_source(
1277    append_only: bool,
1278    column_defs: &Vec<ColumnDef>,
1279    wildcard_idx: &Option<usize>,
1280    constraints: &Vec<TableConstraint>,
1281    source_watermarks: &Vec<SourceWatermark>,
1282) -> Result<()> {
1283    // wildcard cannot be used with column definitions
1284    if wildcard_idx.is_some() && !column_defs.is_empty() {
1285        return Err(ErrorCode::NotSupported(
1286            "wildcard(*) and column definitions cannot be used together".to_owned(),
1287            "Remove the wildcard or column definitions".to_owned(),
1288        )
1289        .into());
1290    }
1291
1292    // cdc table must have primary key constraint or primary key column
1293    if !wildcard_idx.is_some()
1294        && !constraints.iter().any(|c| {
1295            matches!(
1296                c,
1297                TableConstraint::Unique {
1298                    is_primary: true,
1299                    ..
1300                }
1301            )
1302        })
1303        && !column_defs.iter().any(|col| {
1304            col.options
1305                .iter()
1306                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1307        })
1308    {
1309        return Err(ErrorCode::NotSupported(
1310            "CDC table without primary key constraint is not supported".to_owned(),
1311            "Please define a primary key".to_owned(),
1312        )
1313        .into());
1314    }
1315
1316    if append_only {
1317        return Err(ErrorCode::NotSupported(
1318            "append only modifier on the table created from a CDC source".into(),
1319            "Remove the APPEND ONLY clause".into(),
1320        )
1321        .into());
1322    }
1323
1324    if !source_watermarks.is_empty() {
1325        return Err(ErrorCode::NotSupported(
1326            "watermark defined on the table created from a CDC source".into(),
1327            "Remove the Watermark definitions".into(),
1328        )
1329        .into());
1330    }
1331
1332    Ok(())
1333}
1334
1335/// Derive schema for cdc table when create a new Table or alter an existing Table
1336async fn bind_cdc_table_schema_externally(
1337    cdc_with_options: WithOptionsSecResolved,
1338) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1339    // read cdc table schema from external db or parsing the schema from SQL definitions
1340    let (options, secret_refs) = cdc_with_options.into_parts();
1341    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1342        .context("failed to extract external table config")?;
1343
1344    let table = ExternalTableImpl::connect(config)
1345        .await
1346        .context("failed to auto derive table schema")?;
1347
1348    Ok((
1349        table
1350            .column_descs()
1351            .iter()
1352            .cloned()
1353            .map(|column_desc| ColumnCatalog {
1354                column_desc,
1355                is_hidden: false,
1356            })
1357            .collect(),
1358        table.pk_names().clone(),
1359    ))
1360}
1361
1362/// Derive schema for cdc table when create a new Table or alter an existing Table
1363fn bind_cdc_table_schema(
1364    column_defs: &Vec<ColumnDef>,
1365    constraints: &Vec<TableConstraint>,
1366    is_for_replace_plan: bool,
1367) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1368    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1369
1370    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1371    Ok((columns, pk_names))
1372}
1373
1374#[allow(clippy::too_many_arguments)]
1375pub async fn handle_create_table(
1376    mut handler_args: HandlerArgs,
1377    table_name: ObjectName,
1378    column_defs: Vec<ColumnDef>,
1379    wildcard_idx: Option<usize>,
1380    constraints: Vec<TableConstraint>,
1381    if_not_exists: bool,
1382    format_encode: Option<FormatEncodeOptions>,
1383    source_watermarks: Vec<SourceWatermark>,
1384    append_only: bool,
1385    on_conflict: Option<OnConflict>,
1386    with_version_columns: Vec<String>,
1387    cdc_table_info: Option<CdcTableInfo>,
1388    include_column_options: IncludeOption,
1389    webhook_info: Option<WebhookSourceInfo>,
1390    ast_engine: risingwave_sqlparser::ast::Engine,
1391) -> Result<RwPgResponse> {
1392    let session = handler_args.session.clone();
1393
1394    if append_only {
1395        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1396    }
1397
1398    session.check_cluster_limits().await?;
1399
1400    let engine = match ast_engine {
1401        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1402        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1403    };
1404    if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1405        // HACK: since we don't have atomic DDL, table with connector may lose data.
1406        // FIXME: remove this after https://github.com/risingwavelabs/risingwave/issues/21863
1407        if let Some(_rate_limit) = handler_args.with_options.insert(
1408            OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1409            "0".to_owned(),
1410        ) {
1411            // prevent user specified rate limit
1412            return Err(ErrorCode::NotSupported(
1413                "source_rate_limit for iceberg table engine during table creation".to_owned(),
1414                "Please remove source_rate_limit from WITH options.".to_owned(),
1415            )
1416            .into());
1417        }
1418    }
1419
1420    if let Either::Right(resp) = session.check_relation_name_duplicated(
1421        table_name.clone(),
1422        StatementType::CREATE_TABLE,
1423        if_not_exists,
1424    )? {
1425        return Ok(resp);
1426    }
1427
1428    let (graph, source, hummock_table, job_type, shared_source_id) = {
1429        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1430            handler_args.clone(),
1431            ExplainOptions::default(),
1432            format_encode,
1433            cdc_table_info,
1434            &table_name,
1435            column_defs.clone(),
1436            wildcard_idx,
1437            constraints.clone(),
1438            source_watermarks,
1439            append_only,
1440            on_conflict,
1441            with_version_columns,
1442            include_column_options,
1443            webhook_info,
1444            engine,
1445        )
1446        .await?;
1447        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1448
1449        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1450
1451        (graph, source, table, job_type, shared_source_id)
1452    };
1453
1454    tracing::trace!(
1455        "name={}, graph=\n{}",
1456        table_name,
1457        serde_json::to_string_pretty(&graph).unwrap()
1458    );
1459
1460    let dependencies = shared_source_id
1461        .map(|id| HashSet::from([id as ObjectId]))
1462        .unwrap_or_default();
1463
1464    // Handle engine
1465    match engine {
1466        Engine::Hummock => {
1467            let catalog_writer = session.catalog_writer()?;
1468            catalog_writer
1469                .create_table(
1470                    source.map(|s| s.to_prost()),
1471                    hummock_table.to_prost(),
1472                    graph,
1473                    job_type,
1474                    if_not_exists,
1475                    dependencies,
1476                )
1477                .await?;
1478        }
1479        Engine::Iceberg => {
1480            create_iceberg_engine_table(
1481                session,
1482                handler_args,
1483                source.map(|s| s.to_prost()),
1484                hummock_table,
1485                graph,
1486                table_name,
1487                job_type,
1488                if_not_exists,
1489            )
1490            .await?;
1491        }
1492    }
1493
1494    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1495}
1496
1497/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1498///
1499/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1500/// 2. create a hummock table
1501/// 3. create an iceberg sink
1502/// 4. create an iceberg source
1503///
1504/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1505#[allow(clippy::too_many_arguments)]
1506pub async fn create_iceberg_engine_table(
1507    session: Arc<SessionImpl>,
1508    handler_args: HandlerArgs,
1509    mut source: Option<PbSource>,
1510    table: TableCatalog,
1511    graph: StreamFragmentGraph,
1512    table_name: ObjectName,
1513    job_type: PbTableJobType,
1514    if_not_exists: bool,
1515) -> Result<()> {
1516    let meta_client = session.env().meta_client();
1517    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1518
1519    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1520        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1521    })?;
1522    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1523    let meta_store_user = meta_store_endpoint.username().to_owned();
1524    let meta_store_password = match meta_store_endpoint.password() {
1525        Some(password) => percent_decode_str(password)
1526            .decode_utf8()
1527            .map_err(|_| {
1528                ErrorCode::InternalError(
1529                    "failed to parse password from meta store endpoint".to_owned(),
1530                )
1531            })?
1532            .into_owned(),
1533        None => "".to_owned(),
1534    };
1535    let meta_store_host = meta_store_endpoint
1536        .host_str()
1537        .ok_or_else(|| {
1538            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1539        })?
1540        .to_owned();
1541    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1542        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1543    })?;
1544    let meta_store_database = meta_store_endpoint
1545        .path()
1546        .trim_start_matches('/')
1547        .to_owned();
1548
1549    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1550        bail!("failed to parse meta backend: {}", meta_store_backend);
1551    };
1552
1553    let catalog_uri = match meta_backend {
1554        MetaBackend::Postgres => {
1555            format!(
1556                "jdbc:postgresql://{}:{}/{}",
1557                meta_store_host.clone(),
1558                meta_store_port.clone(),
1559                meta_store_database.clone()
1560            )
1561        }
1562        MetaBackend::Mysql => {
1563            format!(
1564                "jdbc:mysql://{}:{}/{}",
1565                meta_store_host.clone(),
1566                meta_store_port.clone(),
1567                meta_store_database.clone()
1568            )
1569        }
1570        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1571            bail!(
1572                "Unsupported meta backend for iceberg engine table: {}",
1573                meta_store_backend
1574            );
1575        }
1576    };
1577
1578    let rw_db_name = session
1579        .env()
1580        .catalog_reader()
1581        .read_guard()
1582        .get_database_by_id(&table.database_id)?
1583        .name()
1584        .to_owned();
1585    let rw_schema_name = session
1586        .env()
1587        .catalog_reader()
1588        .read_guard()
1589        .get_schema_by_id(&table.database_id, &table.schema_id)?
1590        .name()
1591        .clone();
1592    let iceberg_catalog_name = rw_db_name.clone();
1593    let iceberg_database_name = rw_schema_name.clone();
1594    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1595
1596    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1597    let sink_decouple = session.config().sink_decouple();
1598    if matches!(sink_decouple, SinkDecouple::Disable) {
1599        bail!(
1600            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1601        );
1602    }
1603
1604    let mut connection_ref = BTreeMap::new();
1605    let with_common = if iceberg_engine_connection.is_empty() {
1606        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1607    } else {
1608        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1609        assert_eq!(parts.len(), 2);
1610        let connection_catalog =
1611            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1612        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1613            if params.connection_type == ConnectionType::Iceberg as i32 {
1614                // With iceberg engine connection:
1615                connection_ref.insert(
1616                    "connection".to_owned(),
1617                    ConnectionRefValue {
1618                        connection_name: ObjectName::from(vec![
1619                            Ident::from(parts[0]),
1620                            Ident::from(parts[1]),
1621                        ]),
1622                    },
1623                );
1624
1625                let mut with_common = BTreeMap::new();
1626                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1627                with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1628                with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1629
1630                if let Some(s) = params.properties.get("hosted_catalog")
1631                    && s.eq_ignore_ascii_case("true")
1632                {
1633                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1634                    with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1635                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1636                    with_common.insert(
1637                        "catalog.jdbc.password".to_owned(),
1638                        meta_store_password.clone(),
1639                    );
1640                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1641                }
1642
1643                with_common
1644            } else {
1645                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1646                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1647                )));
1648            }
1649        } else {
1650            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1651                "Private Link Service has been deprecated. Please create a new connection instead."
1652                    .to_owned(),
1653            )));
1654        }
1655    };
1656
1657    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1658    // Fetch primary key from columns
1659    let mut pks = table
1660        .pk_column_names()
1661        .iter()
1662        .map(|c| c.to_string())
1663        .collect::<Vec<String>>();
1664
1665    // For the table without primary key. We will use `_row_id` as primary key
1666    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1667        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1668        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1669            "select {} as {}, * from {}",
1670            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1671        ))
1672        .context("unable to parse query")?
1673        .try_into()
1674        .unwrap();
1675
1676        let Statement::Query(query) = &stmt else {
1677            panic!("unexpected statement: {:?}", stmt);
1678        };
1679        CreateSink::AsQuery(query.clone())
1680    } else {
1681        CreateSink::From(table_name.clone())
1682    };
1683
1684    let mut sink_name = table_name.clone();
1685    *sink_name.0.last_mut().unwrap() = Ident::from(
1686        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1687    );
1688    let create_sink_stmt = CreateSinkStatement {
1689        if_not_exists: false,
1690        sink_name,
1691        with_properties: WithProperties(vec![]),
1692        sink_from,
1693        columns: vec![],
1694        emit_mode: None,
1695        sink_schema: None,
1696        into_table_name: None,
1697    };
1698
1699    let mut sink_handler_args = handler_args.clone();
1700
1701    let mut sink_with = with_common.clone();
1702
1703    if table.append_only {
1704        sink_with.insert("type".to_owned(), "append-only".to_owned());
1705    } else {
1706        sink_with.insert("primary_key".to_owned(), pks.join(","));
1707        sink_with.insert("type".to_owned(), "upsert".to_owned());
1708    }
1709    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1710    //
1711    // Note: in theory, we don't need to backfill from the table to the sink,
1712    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1713    // so it may have potential data loss problem on the first barrier.
1714    //
1715    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1716    // data will be present in hummock table.
1717    //
1718    // For append-only table, we need to be more careful.
1719    //
1720    // The possible cases for a table:
1721    // - For table without connector: it doesn't matter, since there's no data before the table is created
1722    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1723    //   + If we support blocking DDL for table with connector, we need to be careful.
1724    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1725    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1726    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1727    let commit_checkpoint_interval = handler_args
1728        .with_options
1729        .get(COMMIT_CHECKPOINT_INTERVAL)
1730        .map(|v| v.to_owned())
1731        .unwrap_or_else(|| "60".to_owned());
1732    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1733        ErrorCode::InvalidInputSyntax(format!(
1734            "commit_checkpoint_interval must be a positive integer: {}",
1735            commit_checkpoint_interval
1736        ))
1737    })?;
1738
1739    if commit_checkpoint_interval == 0 {
1740        bail!("commit_checkpoint_interval must be a positive integer: 0");
1741    }
1742
1743    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1744    source
1745        .as_mut()
1746        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1747
1748    let sink_decouple = session.config().sink_decouple();
1749    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1750        bail!(
1751            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1752        )
1753    }
1754
1755    sink_with.insert(
1756        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1757        commit_checkpoint_interval.to_string(),
1758    );
1759    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1760
1761    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1762
1763    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1764        match enable_compaction.to_lowercase().as_str() {
1765            "true" => {
1766                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1767            }
1768            "false" => {
1769                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1770            }
1771            _ => {
1772                return Err(ErrorCode::InvalidInputSyntax(format!(
1773                    "enable_compaction must be true or false: {}",
1774                    enable_compaction
1775                ))
1776                .into());
1777            }
1778        }
1779
1780        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1781        source
1782            .as_mut()
1783            .map(|x| x.with_properties.remove("enable_compaction"));
1784    } else {
1785        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1786    }
1787
1788    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1789        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1790            ErrorCode::InvalidInputSyntax(format!(
1791                "compaction_interval_sec must be a positive integer: {}",
1792                commit_checkpoint_interval
1793            ))
1794        })?;
1795        if compaction_interval_sec == 0 {
1796            bail!("compaction_interval_sec must be a positive integer: 0");
1797        }
1798        sink_with.insert(
1799            "compaction_interval_sec".to_owned(),
1800            compaction_interval_sec.to_string(),
1801        );
1802        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1803        source
1804            .as_mut()
1805            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1806    }
1807
1808    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1809        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1810    {
1811        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1812        source
1813            .as_mut()
1814            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1815        match enable_snapshot_expiration.to_lowercase().as_str() {
1816            "true" => {
1817                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1818                true
1819            }
1820            "false" => {
1821                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1822                false
1823            }
1824            _ => {
1825                return Err(ErrorCode::InvalidInputSyntax(format!(
1826                    "enable_snapshot_expiration must be true or false: {}",
1827                    enable_snapshot_expiration
1828                ))
1829                .into());
1830            }
1831        }
1832    } else {
1833        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1834        true
1835    };
1836
1837    if has_enabled_snapshot_expiration {
1838        // configuration for snapshot expiration
1839        if let Some(snapshot_expiration_retain_last) = handler_args
1840            .with_options
1841            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1842        {
1843            sink_with.insert(
1844                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1845                snapshot_expiration_retain_last.to_owned(),
1846            );
1847            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1848            source
1849                .as_mut()
1850                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1851        }
1852
1853        if let Some(snapshot_expiration_max_age) = handler_args
1854            .with_options
1855            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1856        {
1857            sink_with.insert(
1858                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1859                snapshot_expiration_max_age.to_owned(),
1860            );
1861            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1862            source
1863                .as_mut()
1864                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1865        }
1866
1867        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1868            .with_options
1869            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1870        {
1871            sink_with.insert(
1872                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1873                snapshot_expiration_clear_expired_files.to_owned(),
1874            );
1875            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1876            source.as_mut().map(|x| {
1877                x.with_properties
1878                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1879            });
1880        }
1881
1882        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1883            .with_options
1884            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1885        {
1886            sink_with.insert(
1887                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1888                snapshot_expiration_clear_expired_meta_data.to_owned(),
1889            );
1890            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1891            source.as_mut().map(|x| {
1892                x.with_properties
1893                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1894            });
1895        }
1896    }
1897
1898    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1899        match write_mode.to_lowercase().as_str() {
1900            ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1901                sink_with.insert(
1902                    WRITE_MODE.to_owned(),
1903                    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1904                );
1905            }
1906
1907            ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1908                if table.append_only {
1909                    return Err(ErrorCode::NotSupported(
1910                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1911                        "Please use MERGE ON READ instead".to_owned(),
1912                    )
1913                    .into());
1914                }
1915
1916                sink_with.insert(
1917                    WRITE_MODE.to_owned(),
1918                    ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1919                );
1920            }
1921
1922            _ => {
1923                return Err(ErrorCode::InvalidInputSyntax(format!(
1924                    "write_mode must be one of: {}, {}",
1925                    ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1926                ))
1927                .into());
1928            }
1929        }
1930
1931        // remove write_mode from source options, otherwise it will be considered as an unknown field.
1932        source
1933            .as_mut()
1934            .map(|x| x.with_properties.remove("write_mode"));
1935    } else {
1936        sink_with.insert(
1937            WRITE_MODE.to_owned(),
1938            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1939        );
1940    }
1941
1942    let partition_by = handler_args
1943        .with_options
1944        .get("partition_by")
1945        .map(|v| v.to_owned());
1946
1947    if let Some(partition_by) = &partition_by {
1948        let mut partition_columns = vec![];
1949        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1950            table
1951                .columns()
1952                .iter()
1953                .find(|col| col.name().eq_ignore_ascii_case(&column))
1954                .ok_or_else(|| {
1955                    ErrorCode::InvalidInputSyntax(format!(
1956                        "Partition source column does not exist in schema: {}",
1957                        column
1958                    ))
1959                })?;
1960
1961            partition_columns.push(column.to_owned());
1962        }
1963
1964        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1965            |_| {
1966                ErrorCode::InvalidInputSyntax(
1967                    "The partition columns should be the prefix of the primary key".to_owned(),
1968                )
1969            },
1970        )?;
1971
1972        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1973
1974        // remove partition_by from source options, otherwise it will be considered as an unknown field.
1975        source
1976            .as_mut()
1977            .map(|x| x.with_properties.remove("partition_by"));
1978    }
1979
1980    sink_handler_args.with_options =
1981        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1982
1983    let mut source_name = table_name.clone();
1984    *source_name.0.last_mut().unwrap() = Ident::from(
1985        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1986    );
1987    let create_source_stmt = CreateSourceStatement {
1988        temporary: false,
1989        if_not_exists: false,
1990        columns: vec![],
1991        source_name,
1992        wildcard_idx: Some(0),
1993        constraints: vec![],
1994        with_properties: WithProperties(vec![]),
1995        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1996        source_watermarks: vec![],
1997        include_column_options: vec![],
1998    };
1999
2000    let mut source_handler_args = handler_args.clone();
2001    let source_with = with_common;
2002    source_handler_args.with_options =
2003        WithOptions::new(source_with, Default::default(), connection_ref);
2004
2005    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2006    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2007    let _ = Jvm::get_or_init()?;
2008
2009    let catalog_writer = session.catalog_writer()?;
2010    // TODO(iceberg): make iceberg engine table creation ddl atomic
2011    let has_connector = source.is_some();
2012    catalog_writer
2013        .create_table(
2014            source,
2015            table.to_prost(),
2016            graph,
2017            job_type,
2018            if_not_exists,
2019            HashSet::default(),
2020        )
2021        .await?;
2022    let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
2023    if res.is_err() {
2024        // Since we don't support ddl atomicity, we need to drop the partial created table.
2025        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
2026        res?;
2027    }
2028    let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
2029    if res.is_err() {
2030        // Since we don't support ddl atomicity, we need to drop the partial created table.
2031        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
2032        res?;
2033    }
2034
2035    if has_connector {
2036        alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
2037            handler_args,
2038            PbThrottleTarget::TableWithSource,
2039            table_name,
2040            -1,
2041        )
2042        .await?;
2043    }
2044
2045    Ok(())
2046}
2047
2048pub fn check_create_table_with_source(
2049    with_options: &WithOptions,
2050    format_encode: Option<FormatEncodeOptions>,
2051    include_column_options: &IncludeOption,
2052    cdc_table_info: &Option<CdcTableInfo>,
2053) -> Result<Option<FormatEncodeOptions>> {
2054    // skip check for cdc table
2055    if cdc_table_info.is_some() {
2056        return Ok(format_encode);
2057    }
2058    let defined_source = with_options.is_source_connector();
2059
2060    if !include_column_options.is_empty() && !defined_source {
2061        return Err(ErrorCode::InvalidInputSyntax(
2062            "INCLUDE should be used with a connector".to_owned(),
2063        )
2064        .into());
2065    }
2066    if defined_source {
2067        format_encode.as_ref().ok_or_else(|| {
2068            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2069        })?;
2070    }
2071    Ok(format_encode)
2072}
2073
2074fn ensure_partition_columns_are_prefix_of_primary_key(
2075    partition_columns: &[String],
2076    primary_key_columns: &[String],
2077) -> std::result::Result<(), String> {
2078    if partition_columns.len() > primary_key_columns.len() {
2079        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2080    }
2081
2082    for (i, partition_col) in partition_columns.iter().enumerate() {
2083        if primary_key_columns.get(i) != Some(partition_col) {
2084            return Err(format!(
2085                "Partition column '{}' is not a prefix of the primary key.",
2086                partition_col
2087            ));
2088        }
2089    }
2090
2091    Ok(())
2092}
2093
2094#[allow(clippy::too_many_arguments)]
2095pub async fn generate_stream_graph_for_replace_table(
2096    _session: &Arc<SessionImpl>,
2097    table_name: ObjectName,
2098    original_catalog: &Arc<TableCatalog>,
2099    handler_args: HandlerArgs,
2100    statement: Statement,
2101    col_id_gen: ColumnIdGenerator,
2102    sql_column_strategy: SqlColumnStrategy,
2103) -> Result<(
2104    StreamFragmentGraph,
2105    TableCatalog,
2106    Option<SourceCatalog>,
2107    TableJobType,
2108)> {
2109    let Statement::CreateTable {
2110        columns,
2111        constraints,
2112        source_watermarks,
2113        append_only,
2114        on_conflict,
2115        with_version_columns,
2116        wildcard_idx,
2117        cdc_table_info,
2118        format_encode,
2119        include_column_options,
2120        engine,
2121        with_options,
2122        ..
2123    } = statement
2124    else {
2125        panic!("unexpected statement type: {:?}", statement);
2126    };
2127
2128    let format_encode = format_encode
2129        .clone()
2130        .map(|format_encode| format_encode.into_v2_with_warning());
2131
2132    let engine = match engine {
2133        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2134        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2135    };
2136
2137    let is_drop_connector =
2138        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2139    if is_drop_connector {
2140        debug_assert!(
2141            source_watermarks.is_empty()
2142                && include_column_options.is_empty()
2143                && with_options
2144                    .iter()
2145                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2146        );
2147    }
2148
2149    let props = CreateTableProps {
2150        definition: handler_args.normalized_sql.clone(),
2151        append_only,
2152        on_conflict: on_conflict.into(),
2153        with_version_columns: with_version_columns
2154            .iter()
2155            .map(|col| col.real_value())
2156            .collect(),
2157        webhook_info: original_catalog.webhook_info.clone(),
2158        engine,
2159    };
2160
2161    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2162        (Some(format_encode), None) => (
2163            gen_create_table_plan_with_source(
2164                handler_args,
2165                ExplainOptions::default(),
2166                table_name,
2167                columns,
2168                wildcard_idx,
2169                constraints,
2170                format_encode,
2171                source_watermarks,
2172                col_id_gen,
2173                include_column_options,
2174                props,
2175                sql_column_strategy,
2176            )
2177            .await?,
2178            TableJobType::General,
2179        ),
2180        (None, None) => {
2181            let context = OptimizerContext::from_handler_args(handler_args);
2182            let (plan, table) = gen_create_table_plan(
2183                context,
2184                table_name,
2185                columns,
2186                constraints,
2187                col_id_gen,
2188                source_watermarks,
2189                props,
2190                true,
2191            )?;
2192            ((plan, None, table), TableJobType::General)
2193        }
2194        (None, Some(cdc_table)) => {
2195            let session = &handler_args.session;
2196            let (source, resolved_table_name, database_id, schema_id) =
2197                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2198
2199            let cdc_with_options = derive_with_options_for_cdc_table(
2200                &source.with_properties,
2201                cdc_table.external_table_name.clone(),
2202            )?;
2203
2204            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2205
2206            let context: OptimizerContextRef =
2207                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2208            let (plan, table) = gen_create_table_plan_for_cdc_table(
2209                context,
2210                source,
2211                cdc_table.external_table_name.clone(),
2212                columns,
2213                column_catalogs,
2214                pk_names,
2215                cdc_with_options,
2216                col_id_gen,
2217                on_conflict,
2218                with_version_columns
2219                    .iter()
2220                    .map(|col| col.real_value())
2221                    .collect(),
2222                include_column_options,
2223                table_name,
2224                resolved_table_name,
2225                database_id,
2226                schema_id,
2227                original_catalog.id(),
2228                engine,
2229            )?;
2230
2231            ((plan, None, table), TableJobType::SharedCdcSource)
2232        }
2233        (Some(_), Some(_)) => {
2234            return Err(ErrorCode::NotSupported(
2235                "Data format and encoding format doesn't apply to table created from a CDC source"
2236                    .into(),
2237                "Remove the FORMAT and ENCODE specification".into(),
2238            )
2239            .into());
2240        }
2241    };
2242
2243    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2244        Err(ErrorCode::InvalidInputSyntax(
2245            "alter primary key of table is not supported".to_owned(),
2246        ))?
2247    }
2248
2249    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2250
2251    // Fill the original table ID.
2252    table.id = original_catalog.id();
2253    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2254        table.associated_source_id = Some(source_id);
2255
2256        let source = source.as_mut().unwrap();
2257        source.id = source_id.table_id;
2258        source.associated_table_id = Some(table.id());
2259    }
2260
2261    Ok((graph, table, source, job_type))
2262}
2263
2264fn get_source_and_resolved_table_name(
2265    session: &Arc<SessionImpl>,
2266    cdc_table: CdcTableInfo,
2267    table_name: ObjectName,
2268) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2269    let db_name = &session.database();
2270    let (schema_name, resolved_table_name) =
2271        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2272    let (database_id, schema_id) =
2273        session.get_database_and_schema_id_for_create(schema_name.clone())?;
2274
2275    let (format_encode, source_name) =
2276        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2277
2278    let source = {
2279        let catalog_reader = session.env().catalog_reader().read_guard();
2280        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2281        let (source, _) = catalog_reader.get_source_by_name(
2282            db_name,
2283            SchemaPath::Name(schema_name.as_str()),
2284            source_name.as_str(),
2285        )?;
2286        source.clone()
2287    };
2288
2289    Ok((source, resolved_table_name, database_id, schema_id))
2290}
2291
2292// validate the webhook_info and also bind the webhook_info to protobuf
2293fn bind_webhook_info(
2294    session: &Arc<SessionImpl>,
2295    columns_defs: &[ColumnDef],
2296    webhook_info: WebhookSourceInfo,
2297) -> Result<PbWebhookSourceInfo> {
2298    // validate columns
2299    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2300    {
2301        return Err(ErrorCode::InvalidInputSyntax(
2302            "Table with webhook source should have exactly one JSONB column".to_owned(),
2303        )
2304        .into());
2305    }
2306
2307    let WebhookSourceInfo {
2308        secret_ref,
2309        signature_expr,
2310        wait_for_persistence,
2311        is_batched,
2312    } = webhook_info;
2313
2314    // validate secret_ref
2315    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2316        let db_name = &session.database();
2317        let (schema_name, secret_name) =
2318            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2319        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2320        (
2321            Some(PbSecretRef {
2322                secret_id: secret_catalog.id.secret_id(),
2323                ref_as: match secret_ref.ref_as {
2324                    SecretRefAsType::Text => PbRefAsType::Text,
2325                    SecretRefAsType::File => PbRefAsType::File,
2326                }
2327                .into(),
2328            }),
2329            Some(secret_name),
2330        )
2331    } else {
2332        (None, None)
2333    };
2334
2335    let secure_compare_context = SecureCompareContext {
2336        column_name: columns_defs[0].name.real_value(),
2337        secret_name,
2338    };
2339    let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2340    let expr = binder.bind_expr(&signature_expr)?;
2341
2342    // validate expr, ensuring it is SECURE_COMPARE()
2343    if expr.as_function_call().is_none()
2344        || expr.as_function_call().unwrap().func_type()
2345            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2346    {
2347        return Err(ErrorCode::InvalidInputSyntax(
2348            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2349        )
2350        .into());
2351    }
2352
2353    let pb_webhook_info = PbWebhookSourceInfo {
2354        secret_ref: pb_secret_ref,
2355        signature_expr: Some(expr.to_expr_proto()),
2356        wait_for_persistence,
2357        is_batched,
2358    };
2359
2360    Ok(pb_webhook_info)
2361}
2362
2363#[cfg(test)]
2364mod tests {
2365    use risingwave_common::catalog::{
2366        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2367    };
2368    use risingwave_common::types::{DataType, StructType};
2369
2370    use super::*;
2371    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2372
2373    #[tokio::test]
2374    async fn test_create_table_handler() {
2375        let sql =
2376            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2377        let frontend = LocalFrontend::new(Default::default()).await;
2378        frontend.run_sql(sql).await.unwrap();
2379
2380        let session = frontend.session_ref();
2381        let catalog_reader = session.env().catalog_reader().read_guard();
2382        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2383
2384        // Check table exists.
2385        let (table, _) = catalog_reader
2386            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2387            .unwrap();
2388        assert_eq!(table.name(), "t");
2389
2390        let columns = table
2391            .columns
2392            .iter()
2393            .map(|col| (col.name(), col.data_type().clone()))
2394            .collect::<HashMap<&str, DataType>>();
2395
2396        let expected_columns = maplit::hashmap! {
2397            ROW_ID_COLUMN_NAME => DataType::Serial,
2398            "v1" => DataType::Int16,
2399            "v2" => StructType::new(
2400                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2401            )
2402            .with_ids([3, 4, 5].map(ColumnId::new))
2403            .into(),
2404            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2405        };
2406
2407        assert_eq!(columns, expected_columns, "{columns:#?}");
2408    }
2409
2410    #[test]
2411    fn test_bind_primary_key() {
2412        // Note: Column ID 0 is reserved for row ID column.
2413
2414        for (sql, expected) in [
2415            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2416            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2417            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2418            (
2419                "create table t (v1 int primary key, v2 int primary key)",
2420                Err("multiple primary keys are not allowed"),
2421            ),
2422            (
2423                "create table t (v1 int primary key primary key, v2 int)",
2424                Err("multiple primary keys are not allowed"),
2425            ),
2426            (
2427                "create table t (v1 int, v2 int, primary key (v1))",
2428                Ok(&[1]),
2429            ),
2430            (
2431                "create table t (v1 int, primary key (v2), v2 int)",
2432                Ok(&[2]),
2433            ),
2434            (
2435                "create table t (primary key (v2, v1), v1 int, v2 int)",
2436                Ok(&[2, 1]),
2437            ),
2438            (
2439                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2440                Err("multiple primary keys are not allowed"),
2441            ),
2442            (
2443                "create table t (v1 int primary key, primary key (v1), v2 int)",
2444                Err("multiple primary keys are not allowed"),
2445            ),
2446            (
2447                "create table t (v1 int, primary key (V3), v2 int)",
2448                Err("column \"v3\" named in key does not exist"),
2449            ),
2450        ] {
2451            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2452            let risingwave_sqlparser::ast::Statement::CreateTable {
2453                columns: column_defs,
2454                constraints,
2455                ..
2456            } = ast.remove(0)
2457            else {
2458                panic!("test case should be create table")
2459            };
2460            let actual: Result<_> = (|| {
2461                let mut columns = bind_sql_columns(&column_defs, false)?;
2462                let mut col_id_gen = ColumnIdGenerator::new_initial();
2463                for c in &mut columns {
2464                    col_id_gen.generate(c)?;
2465                }
2466
2467                let pk_names =
2468                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2469                let (_, pk_column_ids, _) =
2470                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2471                Ok(pk_column_ids)
2472            })();
2473            match (expected, actual) {
2474                (Ok(expected), Ok(actual)) => assert_eq!(
2475                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2476                    actual,
2477                    "sql: {sql}"
2478                ),
2479                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2480                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2481                (Err(expected), Err(actual)) => assert!(
2482                    actual.to_string().contains(expected),
2483                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2484                ),
2485            }
2486        }
2487    }
2488
2489    #[tokio::test]
2490    async fn test_duplicate_props_options() {
2491        let proto_file = create_proto_file(PROTO_FILE_DATA);
2492        let sql = format!(
2493            r#"CREATE TABLE t
2494    WITH (
2495        connector = 'kinesis',
2496        aws.region='user_test_topic',
2497        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2498        aws.credentials.access_key_id = 'your_access_key_1',
2499        aws.credentials.secret_access_key = 'your_secret_key_1'
2500    )
2501    FORMAT PLAIN ENCODE PROTOBUF (
2502        message = '.test.TestRecord',
2503        aws.credentials.access_key_id = 'your_access_key_2',
2504        aws.credentials.secret_access_key = 'your_secret_key_2',
2505        schema.location = 'file://{}',
2506    )"#,
2507            proto_file.path().to_str().unwrap()
2508        );
2509        let frontend = LocalFrontend::new(Default::default()).await;
2510        frontend.run_sql(sql).await.unwrap();
2511
2512        let session = frontend.session_ref();
2513        let catalog_reader = session.env().catalog_reader().read_guard();
2514        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2515
2516        // Check source exists.
2517        let (source, _) = catalog_reader
2518            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2519            .unwrap();
2520        assert_eq!(source.name, "t");
2521
2522        // AwsAuth params exist in options.
2523        assert_eq!(
2524            source
2525                .info
2526                .format_encode_options
2527                .get("aws.credentials.access_key_id")
2528                .unwrap(),
2529            "your_access_key_2"
2530        );
2531        assert_eq!(
2532            source
2533                .info
2534                .format_encode_options
2535                .get("aws.credentials.secret_access_key")
2536                .unwrap(),
2537            "your_secret_key_2"
2538        );
2539
2540        // AwsAuth params exist in props.
2541        assert_eq!(
2542            source
2543                .with_properties
2544                .get("aws.credentials.access_key_id")
2545                .unwrap(),
2546            "your_access_key_1"
2547        );
2548        assert_eq!(
2549            source
2550                .with_properties
2551                .get("aws.credentials.secret_access_key")
2552                .unwrap(),
2553            "your_secret_key_1"
2554        );
2555
2556        // Options are not merged into props.
2557        assert!(!source.with_properties.contains_key("schema.location"));
2558    }
2559}