risingwave_frontend/handler/
create_sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use anyhow::Context;
19use either::Either;
20use iceberg::arrow::type_to_arrow_type;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use maplit::{convert_args, hashmap, hashset};
24use pgwire::pg_response::{PgResponse, StatementType};
25use risingwave_common::array::arrow::IcebergArrowConvert;
26use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
27use risingwave_common::bail;
28use risingwave_common::catalog::{
29    ColumnCatalog, ICEBERG_SINK_PREFIX, ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    Schema,
31};
32use risingwave_common::license::Feature;
33use risingwave_common::secret::LocalSecretManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::types::DataType;
36use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
37use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
38use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
39use risingwave_connector::sink::kafka::KAFKA_SINK;
40use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
41use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
42use risingwave_connector::sink::{
43    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
44    SINK_USER_IGNORE_DELETE_OPTION, Sink, enforce_secret_sink,
45};
46use risingwave_connector::{
47    AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
48    SINK_TARGET_TABLE_NAME, WithPropertiesExt,
49};
50use risingwave_pb::catalog::connection_params::PbConnectionType;
51use risingwave_pb::telemetry::TelemetryDatabaseObject;
52use risingwave_sqlparser::ast::{
53    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
54    ObjectName, Query, Statement,
55};
56use risingwave_sqlparser::parser::Parser;
57
58use super::RwPgResponse;
59use super::create_mv::get_column_names;
60use super::create_source::UPSTREAM_SOURCE_KEY;
61use super::util::gen_query_from_table_name;
62use crate::binder::{Binder, Relation};
63use crate::catalog::table_catalog::TableType;
64use crate::error::{ErrorCode, Result, RwError};
65use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
66use crate::handler::HandlerArgs;
67use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
68use crate::handler::create_mv::parse_column_names;
69use crate::handler::util::{
70    LongRunningNotificationAction, check_connector_match_connection_type,
71    ensure_connection_type_allowed, execute_with_long_running_notification,
72    get_table_catalog_by_table_name, reject_internal_table_dependencies,
73};
74use crate::optimizer::backfill_order_strategy::plan_backfill_order;
75use crate::optimizer::plan_node::{
76    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
77    StreamProject, generic,
78};
79use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
80use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
81use crate::session::SessionImpl;
82use crate::session::current::notice_to_user;
83use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
84use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
85use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
86
87static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
88    LazyLock::new(|| {
89        hashset! {
90            PbConnectionType::Unspecified,
91            PbConnectionType::Kafka,
92            PbConnectionType::Iceberg,
93            PbConnectionType::Elasticsearch,
94        }
95    });
96
97static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
98    LazyLock::new(|| {
99        hashset! {
100            PbConnectionType::Unspecified,
101            PbConnectionType::SchemaRegistry,
102        }
103    });
104
105// used to store result of `gen_sink_plan`
106pub struct SinkPlanContext {
107    pub query: Box<Query>,
108    pub sink_plan: PlanRef,
109    pub sink_catalog: SinkCatalog,
110    pub target_table_catalog: Option<Arc<TableCatalog>>,
111    pub dependencies: HashSet<ObjectId>,
112}
113
114pub async fn gen_sink_plan(
115    handler_args: HandlerArgs,
116    stmt: CreateSinkStatement,
117    explain_options: Option<ExplainOptions>,
118    is_iceberg_engine_internal: bool,
119) -> Result<SinkPlanContext> {
120    let session = handler_args.session.clone();
121    let session = session.as_ref();
122    let user_specified_columns = !stmt.columns.is_empty();
123    let db_name = &session.database();
124    let (sink_schema_name, sink_table_name) =
125        Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
126
127    let mut with_options = handler_args.with_options.clone();
128
129    if session
130        .env()
131        .system_params_manager()
132        .get_params()
133        .load()
134        .enforce_secret()
135        && Feature::SecretManagement.check_available().is_ok()
136    {
137        enforce_secret_sink(&with_options)?;
138    }
139
140    resolve_privatelink_in_with_option(&mut with_options)?;
141    let (mut resolved_with_options, connection_type, connector_conn_ref) =
142        resolve_connection_ref_and_secret_ref(
143            with_options,
144            session,
145            Some(TelemetryDatabaseObject::Sink),
146        )?;
147    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
148
149    // if not using connection, we don't need to check connector match connection type
150    if !matches!(connection_type, PbConnectionType::Unspecified) {
151        let Some(connector) = resolved_with_options.get_connector() else {
152            return Err(RwError::from(ErrorCode::ProtocolError(format!(
153                "missing field '{}' in WITH clause",
154                CONNECTOR_TYPE_KEY
155            ))));
156        };
157        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
158    }
159
160    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
161
162    let context = if let Some(explain_options) = explain_options {
163        OptimizerContext::new(handler_args.clone(), explain_options)
164    } else {
165        OptimizerContext::from_handler_args(handler_args.clone())
166    };
167
168    let is_auto_schema_change = resolved_with_options
169        .remove(AUTO_SCHEMA_CHANGE_KEY)
170        .map(|value| {
171            value.parse::<bool>().map_err(|_| {
172                ErrorCode::InvalidInputSyntax(format!(
173                    "invalid value {} of '{}' option, expect",
174                    value, AUTO_SCHEMA_CHANGE_KEY
175                ))
176            })
177        })
178        .transpose()?
179        .unwrap_or(false);
180
181    if is_auto_schema_change {
182        Feature::SinkAutoSchemaChange.check_available()?;
183    }
184
185    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
186    if sink_into_table_name.is_some() {
187        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
188
189        if prev.is_some() {
190            return Err(RwError::from(ErrorCode::BindError(
191                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
192            )));
193        }
194    }
195    let connector = resolved_with_options
196        .get(CONNECTOR_TYPE_KEY)
197        .cloned()
198        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
199
200    // Used for debezium's table name
201    let sink_from_table_name;
202    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
203    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
204    let direct_sink_from_name: Option<(ObjectName, bool)>;
205    let mut query = match stmt.sink_from {
206        CreateSink::From(from_name) => {
207            sink_from_table_name = from_name.0.last().unwrap().real_value();
208            direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
209            if is_auto_schema_change && sink_into_table_name.is_some() {
210                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
211                    "auto schema change not supported for sink-into-table".to_owned(),
212                )));
213            }
214            if resolved_with_options
215                .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
216                && connector == RedshiftSink::SINK_NAME
217                || connector == SnowflakeV2Sink::SINK_NAME
218            {
219                if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
220                    // auto fill intermediate table name if target table name is specified
221                    if resolved_with_options
222                        .get(SINK_INTERMEDIATE_TABLE_NAME)
223                        .is_none()
224                    {
225                        // generate the intermediate table name with random value appended to the target table name
226                        let intermediate_table_name = format!(
227                            "rw_{}_{}_{}",
228                            sink_table_name,
229                            table_name,
230                            uuid::Uuid::new_v4()
231                        );
232                        resolved_with_options.insert(
233                            SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
234                            intermediate_table_name,
235                        );
236                    }
237                } else {
238                    return Err(RwError::from(ErrorCode::BindError(
239                        "'table.name' option must be specified.".to_owned(),
240                    )));
241                }
242            }
243            Box::new(gen_query_from_table_name(from_name))
244        }
245        CreateSink::AsQuery(query) => {
246            if is_auto_schema_change {
247                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
248                    "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
249                )));
250            }
251            sink_from_table_name = sink_table_name.clone();
252            direct_sink_from_name = None;
253            query
254        }
255    };
256
257    if is_iceberg_engine_internal && let Some((from_name, _)) = &direct_sink_from_name {
258        let (table, _) = get_table_catalog_by_table_name(session, from_name)?;
259        let pk_names = table.pk_column_names();
260        if pk_names.len() == 1 && pk_names[0].eq(ROW_ID_COLUMN_NAME) {
261            let [stmt]: [_; 1] = Parser::parse_sql(&format!(
262                "select {} as {}, * from {}",
263                ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, from_name
264            ))
265            .context("unable to parse query")?
266            .try_into()
267            .unwrap();
268            let Statement::Query(parsed_query) = stmt else {
269                panic!("unexpected statement: {:?}", stmt);
270            };
271            query = parsed_query;
272        }
273    }
274
275    let (sink_database_id, sink_schema_id) =
276        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
277
278    let (
279        dependent_relations,
280        dependent_udfs,
281        dependent_secrets,
282        bound,
283        auto_refresh_schema_from_table,
284    ) = {
285        let mut binder = Binder::new_for_stream(session);
286        let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
287        {
288            let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
289            if let Relation::BaseTable(table) = from_relation {
290                if table.table_catalog.table_type != TableType::Table {
291                    return Err(ErrorCode::InvalidInputSyntax(format!(
292                        "auto schema change only support on TABLE, but got {:?}",
293                        table.table_catalog.table_type
294                    ))
295                    .into());
296                }
297                if table.table_catalog.database_id != sink_database_id {
298                    return Err(ErrorCode::InvalidInputSyntax(
299                        "auto schema change sink does not support created from cross database table".to_owned()
300                    )
301                        .into());
302                }
303                for col in &table.table_catalog.columns {
304                    if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
305                        return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
306                    }
307                }
308                Some(table.table_catalog)
309            } else {
310                return Err(RwError::from(ErrorCode::NotSupported(
311                    "auto schema change only supported for TABLE".to_owned(),
312                    "try recreating the sink from table".to_owned(),
313                )));
314            }
315        } else {
316            None
317        };
318
319        let bound = binder.bind_query(&query)?;
320
321        (
322            binder.included_relations().clone(),
323            binder.included_udfs().clone(),
324            binder.included_secrets().clone(),
325            bound,
326            auto_refresh_schema_from_table,
327        )
328    };
329
330    reject_internal_table_dependencies(session, &dependent_relations, "CREATE SINK")?;
331
332    let col_names = if sink_into_table_name.is_some() {
333        parse_column_names(&stmt.columns)
334    } else {
335        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
336        get_column_names(&bound, stmt.columns)?
337    };
338
339    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
340    if emit_on_window_close {
341        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
342    }
343
344    let format_desc = match stmt.sink_schema {
345        // Case A: new syntax `format ... encode ...`
346        Some(f) => {
347            validate_compatibility(&connector, &f)?;
348            Some(bind_sink_format_desc(session,f)?)
349        }
350        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
351            // Case B: old syntax `type = '...'`
352            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
353                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
354                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
355                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
356                }
357                if let Some(v) = resolved_with_options.get(SINK_USER_IGNORE_DELETE_OPTION) {
358                    f.options.insert(SINK_USER_IGNORE_DELETE_OPTION.into(), v.into());
359                }
360                f
361            }),
362            // Case C: no format + encode required
363            None => None,
364        },
365    };
366
367    let definition = context.normalized_sql().to_owned();
368    let mut plan_root = if is_iceberg_engine_internal {
369        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
370    } else {
371        Planner::new_for_stream(context.into()).plan_query(bound)?
372    };
373    if let Some(col_names) = &col_names {
374        plan_root.set_out_names(col_names.clone())?;
375    };
376
377    let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
378        Some(flag) if flag.eq_ignore_ascii_case("false") => {
379            if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
380                true
381            } else {
382                return Err(ErrorCode::BindError(
383                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
384                )
385                .into());
386            }
387        }
388        _ => false,
389    };
390
391    let target_table_catalog = stmt
392        .into_table_name
393        .as_ref()
394        .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
395        .transpose()?;
396
397    if let Some(target_table_catalog) = &target_table_catalog {
398        if let Some(col_names) = col_names {
399            let target_table_columns = target_table_catalog
400                .columns()
401                .iter()
402                .map(|c| c.name())
403                .collect::<BTreeSet<_>>();
404            for c in col_names {
405                if !target_table_columns.contains(c.as_str()) {
406                    return Err(RwError::from(ErrorCode::BindError(format!(
407                        "Column {} not found in table {}",
408                        c,
409                        target_table_catalog.name()
410                    ))));
411                }
412            }
413        }
414        if target_table_catalog
415            .columns()
416            .iter()
417            .any(|col| !col.nullable())
418        {
419            notice_to_user(format!(
420                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
421                target_table_catalog.name(),
422            ));
423        }
424    }
425
426    let allow_snapshot_backfill = target_table_catalog.is_none() && !is_iceberg_engine_internal;
427
428    let sink_plan = plan_root.gen_sink_plan(
429        sink_table_name,
430        definition,
431        resolved_with_options,
432        emit_on_window_close,
433        db_name.to_owned(),
434        sink_from_table_name,
435        format_desc,
436        without_backfill,
437        target_table_catalog.clone(),
438        partition_info,
439        user_specified_columns,
440        auto_refresh_schema_from_table,
441        allow_snapshot_backfill,
442    )?;
443
444    let sink_desc = sink_plan.sink_desc().clone();
445
446    let mut sink_plan: PlanRef = sink_plan.into_stream_plan()?;
447
448    let ctx = sink_plan.ctx();
449    let explain_trace = ctx.is_explain_trace();
450    if explain_trace {
451        ctx.trace("Create Sink:");
452        ctx.trace(sink_plan.explain_to_string());
453    }
454    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
455
456    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
457    // during binding instead of visiting the optimized plan.
458    let dependencies =
459        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
460            .into_iter()
461            .chain(dependent_udfs.iter().copied().map_into())
462            .chain(
463                dependent_secrets
464                    .iter()
465                    .copied()
466                    .map(|id| id.as_object_id()),
467            )
468            .collect();
469
470    let sink_catalog = sink_desc.into_catalog(
471        sink_schema_id,
472        sink_database_id,
473        session.user_id(),
474        connector_conn_ref,
475    );
476
477    if let Some(table_catalog) = &target_table_catalog {
478        for column in sink_catalog.full_columns() {
479            if !column.can_dml() {
480                unreachable!(
481                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
482                );
483            }
484        }
485
486        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
487        let exprs = derive_default_column_project_for_sink(
488            &sink_catalog,
489            sink_plan.schema(),
490            &table_columns_without_rw_timestamp,
491            user_specified_columns,
492        )?;
493
494        let logical_project = generic::Project::new(exprs, sink_plan);
495
496        sink_plan = StreamProject::new(logical_project).into();
497
498        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
499            &table_columns_without_rw_timestamp,
500        )?;
501
502        if let Some(exprs) = exprs {
503            let logical_project = generic::Project::new(exprs, sink_plan);
504            sink_plan = StreamProject::new(logical_project).into();
505        }
506    };
507
508    Ok(SinkPlanContext {
509        query,
510        sink_plan,
511        sink_catalog,
512        target_table_catalog,
513        dependencies,
514    })
515}
516
517// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
518// Return:
519// `Some(PartitionComputeInfo)` if the sink need to compute partition.
520// `None` if the sink does not need to compute partition.
521pub async fn get_partition_compute_info(
522    with_options: &WithOptionsSecResolved,
523) -> Result<Option<PartitionComputeInfo>> {
524    let (options, secret_refs) = with_options.clone().into_parts();
525    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
526        return Ok(None);
527    };
528    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
529    match connector.as_str() {
530        ICEBERG_SINK => {
531            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
532            get_partition_compute_info_for_iceberg(&iceberg_config).await
533        }
534        _ => Ok(None),
535    }
536}
537
538#[allow(clippy::unused_async)]
539async fn get_partition_compute_info_for_iceberg(
540    _iceberg_config: &IcebergConfig,
541) -> Result<Option<PartitionComputeInfo>> {
542    // TODO: check table if exists
543    if _iceberg_config.create_table_if_not_exists {
544        return Ok(None);
545    }
546    let table = _iceberg_config.load_table().await?;
547    let partition_spec = table.metadata().default_partition_spec();
548    if partition_spec.is_unpartitioned() {
549        return Ok(None);
550    }
551
552    // Separate the partition spec into two parts: sparse partition and range partition.
553    // Sparse partition means that the data distribution is more sparse at a given time.
554    // Range partition means that the data distribution is likely same at a given time.
555    // Only compute the partition and shuffle by them for the sparse partition.
556    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
557        // Sparse partition
558        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
559        // Range partition
560        Transform::Year
561        | Transform::Month
562        | Transform::Day
563        | Transform::Hour
564        | Transform::Void
565        | Transform::Unknown => false,
566    });
567    if !has_sparse_partition {
568        return Ok(None);
569    }
570
571    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
572        table.metadata().default_partition_type().clone(),
573    ))
574    .map_err(|_| {
575        RwError::from(ErrorCode::SinkError(
576            "Fail to convert iceberg partition type to arrow type".into(),
577        ))
578    })?;
579    let ArrowDataType::Struct(struct_fields) = arrow_type else {
580        return Err(RwError::from(ErrorCode::SinkError(
581            "Partition type of iceberg should be a struct type".into(),
582        )));
583    };
584
585    let schema = table.metadata().current_schema();
586    let partition_fields = partition_spec
587        .fields()
588        .iter()
589        .map(|f| {
590            let source_f =
591                schema
592                    .field_by_id(f.source_id)
593                    .ok_or(RwError::from(ErrorCode::SinkError(
594                        "Fail to look up iceberg partition field".into(),
595                    )))?;
596            Ok((source_f.name.clone(), f.transform))
597        })
598        .collect::<Result<Vec<_>>>()?;
599
600    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
601        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
602        partition_fields,
603    })))
604}
605
606pub async fn handle_create_sink(
607    handle_args: HandlerArgs,
608    stmt: CreateSinkStatement,
609    is_iceberg_engine_internal: bool,
610) -> Result<RwPgResponse> {
611    let session = handle_args.session.clone();
612
613    session.check_cluster_limits().await?;
614
615    let if_not_exists = stmt.if_not_exists;
616    if let Either::Right(resp) = session.check_relation_name_duplicated(
617        stmt.sink_name.clone(),
618        StatementType::CREATE_SINK,
619        if_not_exists,
620    )? {
621        return Ok(resp);
622    }
623
624    if stmt.sink_name.base_name().starts_with(ICEBERG_SINK_PREFIX) {
625        return Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
626            "Sink name cannot start with reserved prefix '{}'",
627            ICEBERG_SINK_PREFIX
628        ))));
629    }
630
631    let (mut sink, graph, target_table_catalog, dependencies) = {
632        let backfill_order_strategy = handle_args.with_options.backfill_order_strategy();
633
634        let SinkPlanContext {
635            query,
636            sink_plan: plan,
637            sink_catalog: sink,
638            target_table_catalog,
639            dependencies,
640        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
641
642        let has_order_by = !query.order_by.is_empty();
643        if has_order_by {
644            plan.ctx().warn_to_user(
645                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
646                    .to_owned(),
647            );
648        }
649
650        let backfill_order =
651            plan_backfill_order(session.as_ref(), backfill_order_strategy, plan.clone())?;
652
653        let graph =
654            build_graph_with_strategy(plan, Some(GraphJobType::Sink), Some(backfill_order))?;
655
656        (sink, graph, target_table_catalog, dependencies)
657    };
658
659    if let Some(table_catalog) = target_table_catalog {
660        sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
661    }
662
663    let _job_guard =
664        session
665            .env()
666            .creating_streaming_job_tracker()
667            .guard(CreatingStreamingJobInfo::new(
668                session.session_id(),
669                sink.database_id,
670                sink.schema_id,
671                sink.name.clone(),
672            ));
673
674    let catalog_writer = session.catalog_writer()?;
675    execute_with_long_running_notification(
676        catalog_writer.create_sink(sink.to_proto(), graph, dependencies, if_not_exists),
677        &session,
678        "CREATE SINK",
679        LongRunningNotificationAction::MonitorBackfillJob,
680    )
681    .await?;
682
683    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
684}
685
686pub fn fetch_incoming_sinks(
687    session: &Arc<SessionImpl>,
688    table: &TableCatalog,
689) -> Result<Vec<Arc<SinkCatalog>>> {
690    let reader = session.env().catalog_reader().read_guard();
691    let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
692    let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
693        return Ok(vec![]);
694    };
695    let mut sinks = vec![];
696    for sink_id in incoming_sinks {
697        sinks.push(
698            schema
699                .get_sink_by_id(*sink_id)
700                .expect("should exist")
701                .clone(),
702        );
703    }
704    Ok(sinks)
705}
706
707fn derive_sink_to_table_expr(
708    sink_schema: &Schema,
709    idx: usize,
710    target_type: &DataType,
711) -> Result<ExprImpl> {
712    let input_type = &sink_schema.fields()[idx].data_type;
713
714    if !target_type.equals_datatype(input_type) {
715        bail!(
716            "column type mismatch: {:?} vs {:?}, column name: {:?}",
717            target_type,
718            input_type,
719            sink_schema.fields()[idx].name
720        );
721    } else {
722        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
723            idx,
724            input_type.clone(),
725        ))))
726    }
727}
728
729pub(crate) fn derive_default_column_project_for_sink(
730    sink: &SinkCatalog,
731    sink_schema: &Schema,
732    columns: &[ColumnCatalog],
733    user_specified_columns: bool,
734) -> Result<Vec<ExprImpl>> {
735    assert_eq!(sink.full_schema().len(), sink_schema.len());
736
737    let default_column_exprs = TableCatalog::default_column_exprs(columns);
738
739    let mut exprs = vec![];
740
741    let sink_visible_col_idxes = sink
742        .full_columns()
743        .iter()
744        .positions(|c| !c.is_hidden())
745        .collect_vec();
746    let sink_visible_col_idxes_by_name = sink
747        .full_columns()
748        .iter()
749        .enumerate()
750        .filter(|(_, c)| !c.is_hidden())
751        .map(|(i, c)| (c.name(), i))
752        .collect::<BTreeMap<_, _>>();
753
754    for (idx, column) in columns.iter().enumerate() {
755        if !column.can_dml() {
756            continue;
757        }
758
759        let default_col_expr =
760            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
761
762        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
763            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
764        };
765
766        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
767        // The missing columns will be filled with default value (`null` if not explicitly defined).
768        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
769        #[allow(clippy::collapsible_else_if)]
770        if user_specified_columns {
771            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
772                exprs.push(sink_col_expr(*idx)?);
773            } else {
774                exprs.push(default_col_expr());
775            }
776        } else {
777            if idx < sink_visible_col_idxes.len() {
778                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
779            } else {
780                exprs.push(default_col_expr());
781            };
782        }
783    }
784    Ok(exprs)
785}
786
787/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
788/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
789/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
790fn bind_sink_format_desc(
791    session: &SessionImpl,
792    value: FormatEncodeOptions,
793) -> Result<SinkFormatDesc> {
794    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
795    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
796    use risingwave_sqlparser::ast::{Encode as E, Format as F};
797
798    let format = match value.format {
799        F::Plain => SinkFormat::AppendOnly,
800        F::Upsert => SinkFormat::Upsert,
801        F::Debezium => SinkFormat::Debezium,
802        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
803            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
804        }
805    };
806    let encode = match value.row_encode {
807        E::Json => SinkEncode::Json,
808        E::Protobuf => SinkEncode::Protobuf,
809        E::Avro => SinkEncode::Avro,
810        E::Template => SinkEncode::Template,
811        E::Parquet => SinkEncode::Parquet,
812        E::Bytes => SinkEncode::Bytes,
813        e @ (E::Native | E::Csv | E::None | E::Text) => {
814            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
815        }
816    };
817
818    let mut key_encode = None;
819    if let Some(encode) = value.key_encode {
820        match encode {
821            E::Text => key_encode = Some(SinkEncode::Text),
822            E::Bytes => key_encode = Some(SinkEncode::Bytes),
823            _ => {
824                return Err(ErrorCode::BindError(format!(
825                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
826                ))
827                .into());
828            }
829        }
830    }
831
832    let (props, connection_type_flag, schema_registry_conn_ref) =
833        resolve_connection_ref_and_secret_ref(
834            WithOptions::try_from(value.row_options.as_slice())?,
835            session,
836            Some(TelemetryDatabaseObject::Sink),
837        )?;
838    ensure_connection_type_allowed(
839        connection_type_flag,
840        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
841    )?;
842    let (mut options, secret_refs) = props.into_parts();
843
844    options
845        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
846        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
847
848    Ok(SinkFormatDesc {
849        format,
850        encode,
851        options,
852        secret_refs,
853        key_encode,
854        connection_id: schema_registry_conn_ref,
855    })
856}
857
858static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
859    LazyLock::new(|| {
860        use risingwave_connector::sink::Sink as _;
861        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
862        use risingwave_connector::sink::file_sink::fs::FsSink;
863        use risingwave_connector::sink::file_sink::gcs::GcsSink;
864        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
865        use risingwave_connector::sink::file_sink::s3::S3Sink;
866        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
867        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
868        use risingwave_connector::sink::kafka::KafkaSink;
869        use risingwave_connector::sink::kinesis::KinesisSink;
870        use risingwave_connector::sink::mqtt::MqttSink;
871        use risingwave_connector::sink::pulsar::PulsarSink;
872        use risingwave_connector::sink::redis::RedisSink;
873
874        convert_args!(hashmap!(
875                GooglePubSubSink::SINK_NAME => hashmap!(
876                    Format::Plain => vec![Encode::Json],
877                ),
878                KafkaSink::SINK_NAME => hashmap!(
879                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf, Encode::Bytes],
880                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
881                    Format::Debezium => vec![Encode::Json],
882                ),
883                FileSink::<S3Sink>::SINK_NAME => hashmap!(
884                    Format::Plain => vec![Encode::Parquet, Encode::Json],
885                ),
886                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
887                    Format::Plain => vec![Encode::Parquet, Encode::Json],
888                ),
889                FileSink::<GcsSink>::SINK_NAME => hashmap!(
890                    Format::Plain => vec![Encode::Parquet, Encode::Json],
891                ),
892                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
893                    Format::Plain => vec![Encode::Parquet, Encode::Json],
894                ),
895                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
896                    Format::Plain => vec![Encode::Parquet, Encode::Json],
897                ),
898                FileSink::<FsSink>::SINK_NAME => hashmap!(
899                    Format::Plain => vec![Encode::Parquet, Encode::Json],
900                ),
901                KinesisSink::SINK_NAME => hashmap!(
902                    Format::Plain => vec![Encode::Json],
903                    Format::Upsert => vec![Encode::Json],
904                    Format::Debezium => vec![Encode::Json],
905                ),
906                MqttSink::SINK_NAME => hashmap!(
907                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
908                ),
909                PulsarSink::SINK_NAME => hashmap!(
910                    Format::Plain => vec![Encode::Json],
911                    Format::Upsert => vec![Encode::Json],
912                    Format::Debezium => vec![Encode::Json],
913                ),
914                RedisSink::SINK_NAME => hashmap!(
915                    Format::Plain => vec![Encode::Json, Encode::Template],
916                    Format::Upsert => vec![Encode::Json, Encode::Template],
917                ),
918        ))
919    });
920
921pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
922    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
923        .get(connector)
924        .ok_or_else(|| {
925            ErrorCode::BindError(format!(
926                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
927                connector
928            ))
929        })?;
930    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
931        ErrorCode::BindError(format!(
932            "connector {} does not support format {:?}",
933            connector, format_desc.format
934        ))
935    })?;
936    if !compatible_encodes.contains(&format_desc.row_encode) {
937        return Err(ErrorCode::BindError(format!(
938            "connector {} does not support format {:?} with encode {:?}",
939            connector, format_desc.format, format_desc.row_encode
940        ))
941        .into());
942    }
943
944    // only allow Kafka connector work with `bytes` as key encode
945    if let Some(encode) = &format_desc.key_encode
946        && connector != KAFKA_SINK
947        && matches!(encode, Encode::Bytes)
948    {
949        return Err(ErrorCode::BindError(format!(
950            "key encode bytes only works with kafka connector, but found {}",
951            connector
952        ))
953        .into());
954    }
955
956    Ok(())
957}
958
959#[cfg(test)]
960pub mod tests {
961    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
962
963    use crate::catalog::root_catalog::SchemaPath;
964    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
965
966    #[tokio::test]
967    async fn test_create_sink_handler() {
968        let proto_file = create_proto_file(PROTO_FILE_DATA);
969        let sql = format!(
970            r#"CREATE SOURCE t1
971    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
972    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
973            proto_file.path().to_str().unwrap()
974        );
975        let frontend = LocalFrontend::new(Default::default()).await;
976        frontend.run_sql(sql).await.unwrap();
977
978        let sql = "create materialized view mv1 as select t1.country from t1;";
979        frontend.run_sql(sql).await.unwrap();
980
981        let sql = r#"CREATE SINK snk1 FROM mv1
982                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
983                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
984                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
985        frontend.run_sql(sql).await.unwrap();
986
987        let session = frontend.session_ref();
988        let catalog_reader = session.env().catalog_reader().read_guard();
989        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
990
991        // Check source exists.
992        let (source, _) = catalog_reader
993            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
994            .unwrap();
995        assert_eq!(source.name, "t1");
996
997        // Check table exists.
998        let (table, schema_name) = catalog_reader
999            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
1000            .unwrap();
1001        assert_eq!(table.name(), "mv1");
1002
1003        // Check sink exists.
1004        let (sink, _) = catalog_reader
1005            .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
1006            .unwrap();
1007        assert_eq!(sink.name, "snk1");
1008    }
1009}