risingwave_frontend/handler/
create_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28    ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
35use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
36use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
37use risingwave_connector::sink::kafka::KAFKA_SINK;
38use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
39use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
40use risingwave_connector::sink::{
41    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
42    Sink, enforce_secret_sink,
43};
44use risingwave_connector::{
45    AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
46    SINK_TARGET_TABLE_NAME, WithPropertiesExt,
47};
48use risingwave_pb::catalog::connection_params::PbConnectionType;
49use risingwave_pb::telemetry::TelemetryDatabaseObject;
50use risingwave_sqlparser::ast::{
51    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
52    ObjectName, Query,
53};
54
55use super::RwPgResponse;
56use super::create_mv::get_column_names;
57use super::create_source::UPSTREAM_SOURCE_KEY;
58use super::util::gen_query_from_table_name;
59use crate::binder::{Binder, Relation};
60use crate::catalog::table_catalog::TableType;
61use crate::error::{ErrorCode, Result, RwError};
62use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
63use crate::handler::HandlerArgs;
64use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
65use crate::handler::create_mv::parse_column_names;
66use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
67use crate::optimizer::plan_node::{
68    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
69    StreamProject, generic,
70};
71use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
72use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
73use crate::session::SessionImpl;
74use crate::session::current::notice_to_user;
75use crate::stream_fragmenter::{GraphJobType, build_graph};
76use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
77use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
78
79static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
80    LazyLock::new(|| {
81        hashset! {
82            PbConnectionType::Unspecified,
83            PbConnectionType::Kafka,
84            PbConnectionType::Iceberg,
85            PbConnectionType::Elasticsearch,
86        }
87    });
88
89static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
90    LazyLock::new(|| {
91        hashset! {
92            PbConnectionType::Unspecified,
93            PbConnectionType::SchemaRegistry,
94        }
95    });
96
97// used to store result of `gen_sink_plan`
98pub struct SinkPlanContext {
99    pub query: Box<Query>,
100    pub sink_plan: PlanRef,
101    pub sink_catalog: SinkCatalog,
102    pub target_table_catalog: Option<Arc<TableCatalog>>,
103    pub dependencies: HashSet<ObjectId>,
104}
105
106pub async fn gen_sink_plan(
107    handler_args: HandlerArgs,
108    stmt: CreateSinkStatement,
109    explain_options: Option<ExplainOptions>,
110    is_iceberg_engine_internal: bool,
111) -> Result<SinkPlanContext> {
112    let session = handler_args.session.clone();
113    let session = session.as_ref();
114    let user_specified_columns = !stmt.columns.is_empty();
115    let db_name = &session.database();
116    let (sink_schema_name, sink_table_name) =
117        Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
118
119    let mut with_options = handler_args.with_options.clone();
120
121    if session
122        .env()
123        .system_params_manager()
124        .get_params()
125        .load()
126        .enforce_secret()
127        && Feature::SecretManagement.check_available().is_ok()
128    {
129        enforce_secret_sink(&with_options)?;
130    }
131
132    resolve_privatelink_in_with_option(&mut with_options)?;
133    let (mut resolved_with_options, connection_type, connector_conn_ref) =
134        resolve_connection_ref_and_secret_ref(
135            with_options,
136            session,
137            Some(TelemetryDatabaseObject::Sink),
138        )?;
139    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
140
141    // if not using connection, we don't need to check connector match connection type
142    if !matches!(connection_type, PbConnectionType::Unspecified) {
143        let Some(connector) = resolved_with_options.get_connector() else {
144            return Err(RwError::from(ErrorCode::ProtocolError(format!(
145                "missing field '{}' in WITH clause",
146                CONNECTOR_TYPE_KEY
147            ))));
148        };
149        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
150    }
151
152    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
153
154    let context = if let Some(explain_options) = explain_options {
155        OptimizerContext::new(handler_args.clone(), explain_options)
156    } else {
157        OptimizerContext::from_handler_args(handler_args.clone())
158    };
159
160    let is_auto_schema_change = resolved_with_options
161        .remove(AUTO_SCHEMA_CHANGE_KEY)
162        .map(|value| {
163            value.parse::<bool>().map_err(|_| {
164                ErrorCode::InvalidInputSyntax(format!(
165                    "invalid value {} of '{}' option, expect",
166                    value, AUTO_SCHEMA_CHANGE_KEY
167                ))
168            })
169        })
170        .transpose()?
171        .unwrap_or(false);
172
173    if is_auto_schema_change {
174        Feature::SinkAutoSchemaChange.check_available()?;
175    }
176
177    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
178    if sink_into_table_name.is_some() {
179        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
180
181        if prev.is_some() {
182            return Err(RwError::from(ErrorCode::BindError(
183                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
184            )));
185        }
186    }
187    let connector = resolved_with_options
188        .get(CONNECTOR_TYPE_KEY)
189        .cloned()
190        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
191
192    // Used for debezium's table name
193    let sink_from_table_name;
194    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
195    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
196    let direct_sink_from_name: Option<(ObjectName, bool)>;
197    let query = match stmt.sink_from {
198        CreateSink::From(from_name) => {
199            sink_from_table_name = from_name.0.last().unwrap().real_value();
200            direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
201            if is_auto_schema_change && sink_into_table_name.is_some() {
202                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
203                    "auto schema change not supported for sink-into-table".to_owned(),
204                )));
205            }
206            if resolved_with_options
207                .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
208                && connector == RedshiftSink::SINK_NAME
209                || connector == SnowflakeV2Sink::SINK_NAME
210            {
211                if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
212                    // auto fill intermediate table name if target table name is specified
213                    if resolved_with_options
214                        .get(SINK_INTERMEDIATE_TABLE_NAME)
215                        .is_none()
216                    {
217                        // generate the intermediate table name with random value appended to the target table name
218                        let intermediate_table_name = format!(
219                            "rw_{}_{}_{}",
220                            sink_table_name,
221                            table_name,
222                            uuid::Uuid::new_v4()
223                        );
224                        resolved_with_options.insert(
225                            SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
226                            intermediate_table_name,
227                        );
228                    }
229                } else {
230                    return Err(RwError::from(ErrorCode::BindError(
231                        "'table.name' option must be specified.".to_owned(),
232                    )));
233                }
234            }
235            Box::new(gen_query_from_table_name(from_name))
236        }
237        CreateSink::AsQuery(query) => {
238            if is_auto_schema_change {
239                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
240                    "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
241                )));
242            }
243            sink_from_table_name = sink_table_name.clone();
244            direct_sink_from_name = None;
245            query
246        }
247    };
248
249    let (sink_database_id, sink_schema_id) =
250        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
251
252    let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
253        let mut binder = Binder::new_for_stream(session);
254        let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
255        {
256            let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
257            if let Relation::BaseTable(table) = from_relation {
258                if table.table_catalog.table_type != TableType::Table {
259                    return Err(ErrorCode::InvalidInputSyntax(format!(
260                        "auto schema change only support on TABLE, but got {:?}",
261                        table.table_catalog.table_type
262                    ))
263                    .into());
264                }
265                if table.table_catalog.database_id != sink_database_id {
266                    return Err(ErrorCode::InvalidInputSyntax(
267                        "auto schema change sink does not support created from cross database table".to_owned()
268                    )
269                        .into());
270                }
271                for col in &table.table_catalog.columns {
272                    if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
273                        return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
274                    }
275                }
276                Some(table.table_catalog)
277            } else {
278                return Err(RwError::from(ErrorCode::NotSupported(
279                    "auto schema change only supported for TABLE".to_owned(),
280                    "try recreating the sink from table".to_owned(),
281                )));
282            }
283        } else {
284            None
285        };
286
287        let bound = binder.bind_query(&query)?;
288
289        (
290            binder.included_relations().clone(),
291            binder.included_udfs().clone(),
292            bound,
293            auto_refresh_schema_from_table,
294        )
295    };
296
297    let col_names = if sink_into_table_name.is_some() {
298        parse_column_names(&stmt.columns)
299    } else {
300        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
301        get_column_names(&bound, stmt.columns)?
302    };
303
304    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
305    if emit_on_window_close {
306        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
307    }
308
309    let format_desc = match stmt.sink_schema {
310        // Case A: new syntax `format ... encode ...`
311        Some(f) => {
312            validate_compatibility(&connector, &f)?;
313            Some(bind_sink_format_desc(session,f)?)
314        }
315        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
316            // Case B: old syntax `type = '...'`
317            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
318                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
319                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
320                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
321                }
322                f
323            }),
324            // Case C: no format + encode required
325            None => None,
326        },
327    };
328
329    let definition = context.normalized_sql().to_owned();
330    let mut plan_root = if is_iceberg_engine_internal {
331        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
332    } else {
333        Planner::new_for_stream(context.into()).plan_query(bound)?
334    };
335    if let Some(col_names) = &col_names {
336        plan_root.set_out_names(col_names.clone())?;
337    };
338
339    let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
340        Some(flag) if flag.eq_ignore_ascii_case("false") => {
341            if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
342                true
343            } else {
344                return Err(ErrorCode::BindError(
345                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
346                )
347                .into());
348            }
349        }
350        _ => false,
351    };
352
353    let target_table_catalog = stmt
354        .into_table_name
355        .as_ref()
356        .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
357        .transpose()?;
358
359    if let Some(target_table_catalog) = &target_table_catalog {
360        if let Some(col_names) = col_names {
361            let target_table_columns = target_table_catalog
362                .columns()
363                .iter()
364                .map(|c| c.name())
365                .collect::<BTreeSet<_>>();
366            for c in col_names {
367                if !target_table_columns.contains(c.as_str()) {
368                    return Err(RwError::from(ErrorCode::BindError(format!(
369                        "Column {} not found in table {}",
370                        c,
371                        target_table_catalog.name()
372                    ))));
373                }
374            }
375        }
376        if target_table_catalog
377            .columns()
378            .iter()
379            .any(|col| !col.nullable())
380        {
381            notice_to_user(format!(
382                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
383                target_table_catalog.name(),
384            ));
385        }
386    }
387
388    let sink_plan = plan_root.gen_sink_plan(
389        sink_table_name,
390        definition,
391        resolved_with_options,
392        emit_on_window_close,
393        db_name.to_owned(),
394        sink_from_table_name,
395        format_desc,
396        without_backfill,
397        target_table_catalog.clone(),
398        partition_info,
399        user_specified_columns,
400        auto_refresh_schema_from_table,
401    )?;
402
403    let sink_desc = sink_plan.sink_desc().clone();
404
405    let mut sink_plan: PlanRef = sink_plan.into();
406
407    let ctx = sink_plan.ctx();
408    let explain_trace = ctx.is_explain_trace();
409    if explain_trace {
410        ctx.trace("Create Sink:");
411        ctx.trace(sink_plan.explain_to_string());
412    }
413    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
414
415    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
416    // during binding instead of visiting the optimized plan.
417    let dependencies =
418        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
419            .into_iter()
420            .map(|id| id.table_id() as ObjectId)
421            .chain(
422                dependent_udfs
423                    .into_iter()
424                    .map(|id| id.function_id() as ObjectId),
425            )
426            .collect();
427
428    let sink_catalog = sink_desc.into_catalog(
429        SchemaId::new(sink_schema_id),
430        DatabaseId::new(sink_database_id),
431        UserId::new(session.user_id()),
432        connector_conn_ref.map(ConnectionId::from),
433    );
434
435    if let Some(table_catalog) = &target_table_catalog {
436        for column in sink_catalog.full_columns() {
437            if !column.can_dml() {
438                unreachable!(
439                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
440                );
441            }
442        }
443
444        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
445        let exprs = derive_default_column_project_for_sink(
446            &sink_catalog,
447            sink_plan.schema(),
448            &table_columns_without_rw_timestamp,
449            user_specified_columns,
450        )?;
451
452        let logical_project = generic::Project::new(exprs, sink_plan);
453
454        sink_plan = StreamProject::new(logical_project).into();
455
456        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
457            &table_columns_without_rw_timestamp,
458        )?;
459
460        if let Some(exprs) = exprs {
461            let logical_project = generic::Project::new(exprs, sink_plan);
462            sink_plan = StreamProject::new(logical_project).into();
463        }
464    };
465
466    Ok(SinkPlanContext {
467        query,
468        sink_plan,
469        sink_catalog,
470        target_table_catalog,
471        dependencies,
472    })
473}
474
475// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
476// Return:
477// `Some(PartitionComputeInfo)` if the sink need to compute partition.
478// `None` if the sink does not need to compute partition.
479pub async fn get_partition_compute_info(
480    with_options: &WithOptionsSecResolved,
481) -> Result<Option<PartitionComputeInfo>> {
482    let (options, secret_refs) = with_options.clone().into_parts();
483    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
484        return Ok(None);
485    };
486    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
487    match connector.as_str() {
488        ICEBERG_SINK => {
489            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
490            get_partition_compute_info_for_iceberg(&iceberg_config).await
491        }
492        _ => Ok(None),
493    }
494}
495
496#[allow(clippy::unused_async)]
497async fn get_partition_compute_info_for_iceberg(
498    _iceberg_config: &IcebergConfig,
499) -> Result<Option<PartitionComputeInfo>> {
500    // TODO: check table if exists
501    if _iceberg_config.create_table_if_not_exists {
502        return Ok(None);
503    }
504    let table = _iceberg_config.load_table().await?;
505    let partition_spec = table.metadata().default_partition_spec();
506    if partition_spec.is_unpartitioned() {
507        return Ok(None);
508    }
509
510    // Separate the partition spec into two parts: sparse partition and range partition.
511    // Sparse partition means that the data distribution is more sparse at a given time.
512    // Range partition means that the data distribution is likely same at a given time.
513    // Only compute the partition and shuffle by them for the sparse partition.
514    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
515        // Sparse partition
516        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
517        // Range partition
518        Transform::Year
519        | Transform::Month
520        | Transform::Day
521        | Transform::Hour
522        | Transform::Void
523        | Transform::Unknown => false,
524    });
525    if !has_sparse_partition {
526        return Ok(None);
527    }
528
529    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
530        table.metadata().default_partition_type().clone(),
531    ))
532    .map_err(|_| {
533        RwError::from(ErrorCode::SinkError(
534            "Fail to convert iceberg partition type to arrow type".into(),
535        ))
536    })?;
537    let ArrowDataType::Struct(struct_fields) = arrow_type else {
538        return Err(RwError::from(ErrorCode::SinkError(
539            "Partition type of iceberg should be a struct type".into(),
540        )));
541    };
542
543    let schema = table.metadata().current_schema();
544    let partition_fields = partition_spec
545        .fields()
546        .iter()
547        .map(|f| {
548            let source_f =
549                schema
550                    .field_by_id(f.source_id)
551                    .ok_or(RwError::from(ErrorCode::SinkError(
552                        "Fail to look up iceberg partition field".into(),
553                    )))?;
554            Ok((source_f.name.clone(), f.transform))
555        })
556        .collect::<Result<Vec<_>>>()?;
557
558    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
559        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
560        partition_fields,
561    })))
562}
563
564pub async fn handle_create_sink(
565    handle_args: HandlerArgs,
566    stmt: CreateSinkStatement,
567    is_iceberg_engine_internal: bool,
568) -> Result<RwPgResponse> {
569    let session = handle_args.session.clone();
570
571    session.check_cluster_limits().await?;
572
573    let if_not_exists = stmt.if_not_exists;
574    if let Either::Right(resp) = session.check_relation_name_duplicated(
575        stmt.sink_name.clone(),
576        StatementType::CREATE_SINK,
577        if_not_exists,
578    )? {
579        return Ok(resp);
580    }
581
582    let (mut sink, graph, target_table_catalog, dependencies) = {
583        let SinkPlanContext {
584            query,
585            sink_plan: plan,
586            sink_catalog: sink,
587            target_table_catalog,
588            dependencies,
589        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
590
591        let has_order_by = !query.order_by.is_empty();
592        if has_order_by {
593            plan.ctx().warn_to_user(
594                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
595                    .to_owned(),
596            );
597        }
598
599        let graph = build_graph(plan, Some(GraphJobType::Sink))?;
600
601        (sink, graph, target_table_catalog, dependencies)
602    };
603
604    if let Some(table_catalog) = target_table_catalog {
605        sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
606    }
607
608    let _job_guard =
609        session
610            .env()
611            .creating_streaming_job_tracker()
612            .guard(CreatingStreamingJobInfo::new(
613                session.session_id(),
614                sink.database_id.database_id,
615                sink.schema_id.schema_id,
616                sink.name.clone(),
617            ));
618
619    let catalog_writer = session.catalog_writer()?;
620    catalog_writer
621        .create_sink(sink.to_proto(), graph, dependencies, if_not_exists)
622        .await?;
623
624    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
625}
626
627pub fn fetch_incoming_sinks(
628    session: &Arc<SessionImpl>,
629    table: &TableCatalog,
630) -> Result<Vec<Arc<SinkCatalog>>> {
631    let reader = session.env().catalog_reader().read_guard();
632    let schema = reader.get_schema_by_id(&table.database_id, &table.schema_id)?;
633    let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
634        return Ok(vec![]);
635    };
636    let mut sinks = vec![];
637    for sink_id in incoming_sinks {
638        sinks.push(
639            schema
640                .get_sink_by_id(sink_id)
641                .expect("should exist")
642                .clone(),
643        );
644    }
645    Ok(sinks)
646}
647
648fn derive_sink_to_table_expr(
649    sink_schema: &Schema,
650    idx: usize,
651    target_type: &DataType,
652) -> Result<ExprImpl> {
653    let input_type = &sink_schema.fields()[idx].data_type;
654
655    if target_type != input_type {
656        bail!(
657            "column type mismatch: {:?} vs {:?}, column name: {:?}",
658            target_type,
659            input_type,
660            sink_schema.fields()[idx].name
661        );
662    } else {
663        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
664            idx,
665            input_type.clone(),
666        ))))
667    }
668}
669
670pub(crate) fn derive_default_column_project_for_sink(
671    sink: &SinkCatalog,
672    sink_schema: &Schema,
673    columns: &[ColumnCatalog],
674    user_specified_columns: bool,
675) -> Result<Vec<ExprImpl>> {
676    assert_eq!(sink.full_schema().len(), sink_schema.len());
677
678    let default_column_exprs = TableCatalog::default_column_exprs(columns);
679
680    let mut exprs = vec![];
681
682    let sink_visible_col_idxes = sink
683        .full_columns()
684        .iter()
685        .positions(|c| !c.is_hidden())
686        .collect_vec();
687    let sink_visible_col_idxes_by_name = sink
688        .full_columns()
689        .iter()
690        .enumerate()
691        .filter(|(_, c)| !c.is_hidden())
692        .map(|(i, c)| (c.name(), i))
693        .collect::<BTreeMap<_, _>>();
694
695    for (idx, column) in columns.iter().enumerate() {
696        if !column.can_dml() {
697            continue;
698        }
699
700        let default_col_expr =
701            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
702
703        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
704            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
705        };
706
707        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
708        // The missing columns will be filled with default value (`null` if not explicitly defined).
709        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
710        #[allow(clippy::collapsible_else_if)]
711        if user_specified_columns {
712            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
713                exprs.push(sink_col_expr(*idx)?);
714            } else {
715                exprs.push(default_col_expr());
716            }
717        } else {
718            if idx < sink_visible_col_idxes.len() {
719                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
720            } else {
721                exprs.push(default_col_expr());
722            };
723        }
724    }
725    Ok(exprs)
726}
727
728/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
729/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
730/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
731fn bind_sink_format_desc(
732    session: &SessionImpl,
733    value: FormatEncodeOptions,
734) -> Result<SinkFormatDesc> {
735    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
736    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
737    use risingwave_sqlparser::ast::{Encode as E, Format as F};
738
739    let format = match value.format {
740        F::Plain => SinkFormat::AppendOnly,
741        F::Upsert => SinkFormat::Upsert,
742        F::Debezium => SinkFormat::Debezium,
743        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
744            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
745        }
746    };
747    let encode = match value.row_encode {
748        E::Json => SinkEncode::Json,
749        E::Protobuf => SinkEncode::Protobuf,
750        E::Avro => SinkEncode::Avro,
751        E::Template => SinkEncode::Template,
752        E::Parquet => SinkEncode::Parquet,
753        e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
754            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
755        }
756    };
757
758    let mut key_encode = None;
759    if let Some(encode) = value.key_encode {
760        match encode {
761            E::Text => key_encode = Some(SinkEncode::Text),
762            E::Bytes => key_encode = Some(SinkEncode::Bytes),
763            _ => {
764                return Err(ErrorCode::BindError(format!(
765                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
766                ))
767                .into());
768            }
769        }
770    }
771
772    let (props, connection_type_flag, schema_registry_conn_ref) =
773        resolve_connection_ref_and_secret_ref(
774            WithOptions::try_from(value.row_options.as_slice())?,
775            session,
776            Some(TelemetryDatabaseObject::Sink),
777        )?;
778    ensure_connection_type_allowed(
779        connection_type_flag,
780        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
781    )?;
782    let (mut options, secret_refs) = props.into_parts();
783
784    options
785        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
786        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
787
788    Ok(SinkFormatDesc {
789        format,
790        encode,
791        options,
792        secret_refs,
793        key_encode,
794        connection_id: schema_registry_conn_ref,
795    })
796}
797
798static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
799    LazyLock::new(|| {
800        use risingwave_connector::sink::Sink as _;
801        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
802        use risingwave_connector::sink::file_sink::fs::FsSink;
803        use risingwave_connector::sink::file_sink::gcs::GcsSink;
804        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
805        use risingwave_connector::sink::file_sink::s3::S3Sink;
806        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
807        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
808        use risingwave_connector::sink::kafka::KafkaSink;
809        use risingwave_connector::sink::kinesis::KinesisSink;
810        use risingwave_connector::sink::mqtt::MqttSink;
811        use risingwave_connector::sink::pulsar::PulsarSink;
812        use risingwave_connector::sink::redis::RedisSink;
813
814        convert_args!(hashmap!(
815                GooglePubSubSink::SINK_NAME => hashmap!(
816                    Format::Plain => vec![Encode::Json],
817                ),
818                KafkaSink::SINK_NAME => hashmap!(
819                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
820                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
821                    Format::Debezium => vec![Encode::Json],
822                ),
823                FileSink::<S3Sink>::SINK_NAME => hashmap!(
824                    Format::Plain => vec![Encode::Parquet, Encode::Json],
825                ),
826                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
827                    Format::Plain => vec![Encode::Parquet, Encode::Json],
828                ),
829                FileSink::<GcsSink>::SINK_NAME => hashmap!(
830                    Format::Plain => vec![Encode::Parquet, Encode::Json],
831                ),
832                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
833                    Format::Plain => vec![Encode::Parquet, Encode::Json],
834                ),
835                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
836                    Format::Plain => vec![Encode::Parquet, Encode::Json],
837                ),
838                FileSink::<FsSink>::SINK_NAME => hashmap!(
839                    Format::Plain => vec![Encode::Parquet, Encode::Json],
840                ),
841                KinesisSink::SINK_NAME => hashmap!(
842                    Format::Plain => vec![Encode::Json],
843                    Format::Upsert => vec![Encode::Json],
844                    Format::Debezium => vec![Encode::Json],
845                ),
846                MqttSink::SINK_NAME => hashmap!(
847                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
848                ),
849                PulsarSink::SINK_NAME => hashmap!(
850                    Format::Plain => vec![Encode::Json],
851                    Format::Upsert => vec![Encode::Json],
852                    Format::Debezium => vec![Encode::Json],
853                ),
854                RedisSink::SINK_NAME => hashmap!(
855                    Format::Plain => vec![Encode::Json, Encode::Template],
856                    Format::Upsert => vec![Encode::Json, Encode::Template],
857                ),
858        ))
859    });
860
861pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
862    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
863        .get(connector)
864        .ok_or_else(|| {
865            ErrorCode::BindError(format!(
866                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
867                connector
868            ))
869        })?;
870    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
871        ErrorCode::BindError(format!(
872            "connector {} does not support format {:?}",
873            connector, format_desc.format
874        ))
875    })?;
876    if !compatible_encodes.contains(&format_desc.row_encode) {
877        return Err(ErrorCode::BindError(format!(
878            "connector {} does not support format {:?} with encode {:?}",
879            connector, format_desc.format, format_desc.row_encode
880        ))
881        .into());
882    }
883
884    // only allow Kafka connector work with `bytes` as key encode
885    if let Some(encode) = &format_desc.key_encode
886        && connector != KAFKA_SINK
887        && matches!(encode, Encode::Bytes)
888    {
889        return Err(ErrorCode::BindError(format!(
890            "key encode bytes only works with kafka connector, but found {}",
891            connector
892        ))
893        .into());
894    }
895
896    Ok(())
897}
898
899#[cfg(test)]
900pub mod tests {
901    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
902
903    use crate::catalog::root_catalog::SchemaPath;
904    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
905
906    #[tokio::test]
907    async fn test_create_sink_handler() {
908        let proto_file = create_proto_file(PROTO_FILE_DATA);
909        let sql = format!(
910            r#"CREATE SOURCE t1
911    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
912    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
913            proto_file.path().to_str().unwrap()
914        );
915        let frontend = LocalFrontend::new(Default::default()).await;
916        frontend.run_sql(sql).await.unwrap();
917
918        let sql = "create materialized view mv1 as select t1.country from t1;";
919        frontend.run_sql(sql).await.unwrap();
920
921        let sql = r#"CREATE SINK snk1 FROM mv1
922                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
923                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
924                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
925        frontend.run_sql(sql).await.unwrap();
926
927        let session = frontend.session_ref();
928        let catalog_reader = session.env().catalog_reader().read_guard();
929        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
930
931        // Check source exists.
932        let (source, _) = catalog_reader
933            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
934            .unwrap();
935        assert_eq!(source.name, "t1");
936
937        // Check table exists.
938        let (table, schema_name) = catalog_reader
939            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
940            .unwrap();
941        assert_eq!(table.name(), "mv1");
942
943        // Check sink exists.
944        let (sink, _) = catalog_reader
945            .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
946            .unwrap();
947        assert_eq!(sink.name, "snk1");
948    }
949}