risingwave_frontend/handler/
create_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{ColumnCatalog, ConnectionId, ObjectId, Schema, UserId};
28use risingwave_common::license::Feature;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::types::DataType;
32use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
33use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
34use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
35use risingwave_connector::sink::kafka::KAFKA_SINK;
36use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
37use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
38use risingwave_connector::sink::{
39    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40    Sink, enforce_secret_sink,
41};
42use risingwave_connector::{
43    AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
44    SINK_TARGET_TABLE_NAME, WithPropertiesExt,
45};
46use risingwave_pb::catalog::connection_params::PbConnectionType;
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50    ObjectName, Query,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::UPSTREAM_SOURCE_KEY;
56use super::util::gen_query_from_table_name;
57use crate::binder::{Binder, Relation};
58use crate::catalog::table_catalog::TableType;
59use crate::error::{ErrorCode, Result, RwError};
60use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
61use crate::handler::HandlerArgs;
62use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
63use crate::handler::create_mv::parse_column_names;
64use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
65use crate::optimizer::plan_node::{
66    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
67    StreamProject, generic,
68};
69use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
70use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
71use crate::session::SessionImpl;
72use crate::session::current::notice_to_user;
73use crate::stream_fragmenter::{GraphJobType, build_graph};
74use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
75use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
76
77static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
78    LazyLock::new(|| {
79        hashset! {
80            PbConnectionType::Unspecified,
81            PbConnectionType::Kafka,
82            PbConnectionType::Iceberg,
83            PbConnectionType::Elasticsearch,
84        }
85    });
86
87static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
88    LazyLock::new(|| {
89        hashset! {
90            PbConnectionType::Unspecified,
91            PbConnectionType::SchemaRegistry,
92        }
93    });
94
95// used to store result of `gen_sink_plan`
96pub struct SinkPlanContext {
97    pub query: Box<Query>,
98    pub sink_plan: PlanRef,
99    pub sink_catalog: SinkCatalog,
100    pub target_table_catalog: Option<Arc<TableCatalog>>,
101    pub dependencies: HashSet<ObjectId>,
102}
103
104pub async fn gen_sink_plan(
105    handler_args: HandlerArgs,
106    stmt: CreateSinkStatement,
107    explain_options: Option<ExplainOptions>,
108    is_iceberg_engine_internal: bool,
109) -> Result<SinkPlanContext> {
110    let session = handler_args.session.clone();
111    let session = session.as_ref();
112    let user_specified_columns = !stmt.columns.is_empty();
113    let db_name = &session.database();
114    let (sink_schema_name, sink_table_name) =
115        Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
116
117    let mut with_options = handler_args.with_options.clone();
118
119    if session
120        .env()
121        .system_params_manager()
122        .get_params()
123        .load()
124        .enforce_secret()
125        && Feature::SecretManagement.check_available().is_ok()
126    {
127        enforce_secret_sink(&with_options)?;
128    }
129
130    resolve_privatelink_in_with_option(&mut with_options)?;
131    let (mut resolved_with_options, connection_type, connector_conn_ref) =
132        resolve_connection_ref_and_secret_ref(
133            with_options,
134            session,
135            Some(TelemetryDatabaseObject::Sink),
136        )?;
137    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
138
139    // if not using connection, we don't need to check connector match connection type
140    if !matches!(connection_type, PbConnectionType::Unspecified) {
141        let Some(connector) = resolved_with_options.get_connector() else {
142            return Err(RwError::from(ErrorCode::ProtocolError(format!(
143                "missing field '{}' in WITH clause",
144                CONNECTOR_TYPE_KEY
145            ))));
146        };
147        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
148    }
149
150    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
151
152    let context = if let Some(explain_options) = explain_options {
153        OptimizerContext::new(handler_args.clone(), explain_options)
154    } else {
155        OptimizerContext::from_handler_args(handler_args.clone())
156    };
157
158    let is_auto_schema_change = resolved_with_options
159        .remove(AUTO_SCHEMA_CHANGE_KEY)
160        .map(|value| {
161            value.parse::<bool>().map_err(|_| {
162                ErrorCode::InvalidInputSyntax(format!(
163                    "invalid value {} of '{}' option, expect",
164                    value, AUTO_SCHEMA_CHANGE_KEY
165                ))
166            })
167        })
168        .transpose()?
169        .unwrap_or(false);
170
171    if is_auto_schema_change {
172        Feature::SinkAutoSchemaChange.check_available()?;
173    }
174
175    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
176    if sink_into_table_name.is_some() {
177        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
178
179        if prev.is_some() {
180            return Err(RwError::from(ErrorCode::BindError(
181                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
182            )));
183        }
184    }
185    let connector = resolved_with_options
186        .get(CONNECTOR_TYPE_KEY)
187        .cloned()
188        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
189
190    // Used for debezium's table name
191    let sink_from_table_name;
192    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
193    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
194    let direct_sink_from_name: Option<(ObjectName, bool)>;
195    let query = match stmt.sink_from {
196        CreateSink::From(from_name) => {
197            sink_from_table_name = from_name.0.last().unwrap().real_value();
198            direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
199            if is_auto_schema_change && sink_into_table_name.is_some() {
200                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
201                    "auto schema change not supported for sink-into-table".to_owned(),
202                )));
203            }
204            if resolved_with_options
205                .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
206                && connector == RedshiftSink::SINK_NAME
207                || connector == SnowflakeV2Sink::SINK_NAME
208            {
209                if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
210                    // auto fill intermediate table name if target table name is specified
211                    if resolved_with_options
212                        .get(SINK_INTERMEDIATE_TABLE_NAME)
213                        .is_none()
214                    {
215                        // generate the intermediate table name with random value appended to the target table name
216                        let intermediate_table_name = format!(
217                            "rw_{}_{}_{}",
218                            sink_table_name,
219                            table_name,
220                            uuid::Uuid::new_v4()
221                        );
222                        resolved_with_options.insert(
223                            SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
224                            intermediate_table_name,
225                        );
226                    }
227                } else {
228                    return Err(RwError::from(ErrorCode::BindError(
229                        "'table.name' option must be specified.".to_owned(),
230                    )));
231                }
232            }
233            Box::new(gen_query_from_table_name(from_name))
234        }
235        CreateSink::AsQuery(query) => {
236            if is_auto_schema_change {
237                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
238                    "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
239                )));
240            }
241            sink_from_table_name = sink_table_name.clone();
242            direct_sink_from_name = None;
243            query
244        }
245    };
246
247    let (sink_database_id, sink_schema_id) =
248        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
249
250    let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
251        let mut binder = Binder::new_for_stream(session);
252        let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
253        {
254            let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
255            if let Relation::BaseTable(table) = from_relation {
256                if table.table_catalog.table_type != TableType::Table {
257                    return Err(ErrorCode::InvalidInputSyntax(format!(
258                        "auto schema change only support on TABLE, but got {:?}",
259                        table.table_catalog.table_type
260                    ))
261                    .into());
262                }
263                if table.table_catalog.database_id != sink_database_id {
264                    return Err(ErrorCode::InvalidInputSyntax(
265                        "auto schema change sink does not support created from cross database table".to_owned()
266                    )
267                        .into());
268                }
269                for col in &table.table_catalog.columns {
270                    if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
271                        return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
272                    }
273                }
274                Some(table.table_catalog)
275            } else {
276                return Err(RwError::from(ErrorCode::NotSupported(
277                    "auto schema change only supported for TABLE".to_owned(),
278                    "try recreating the sink from table".to_owned(),
279                )));
280            }
281        } else {
282            None
283        };
284
285        let bound = binder.bind_query(&query)?;
286
287        (
288            binder.included_relations().clone(),
289            binder.included_udfs().clone(),
290            bound,
291            auto_refresh_schema_from_table,
292        )
293    };
294
295    let col_names = if sink_into_table_name.is_some() {
296        parse_column_names(&stmt.columns)
297    } else {
298        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
299        get_column_names(&bound, stmt.columns)?
300    };
301
302    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
303    if emit_on_window_close {
304        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
305    }
306
307    let format_desc = match stmt.sink_schema {
308        // Case A: new syntax `format ... encode ...`
309        Some(f) => {
310            validate_compatibility(&connector, &f)?;
311            Some(bind_sink_format_desc(session,f)?)
312        }
313        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
314            // Case B: old syntax `type = '...'`
315            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
316                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
317                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
318                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
319                }
320                f
321            }),
322            // Case C: no format + encode required
323            None => None,
324        },
325    };
326
327    let definition = context.normalized_sql().to_owned();
328    let mut plan_root = if is_iceberg_engine_internal {
329        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
330    } else {
331        Planner::new_for_stream(context.into()).plan_query(bound)?
332    };
333    if let Some(col_names) = &col_names {
334        plan_root.set_out_names(col_names.clone())?;
335    };
336
337    let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
338        Some(flag) if flag.eq_ignore_ascii_case("false") => {
339            if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
340                true
341            } else {
342                return Err(ErrorCode::BindError(
343                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
344                )
345                .into());
346            }
347        }
348        _ => false,
349    };
350
351    let target_table_catalog = stmt
352        .into_table_name
353        .as_ref()
354        .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
355        .transpose()?;
356
357    if let Some(target_table_catalog) = &target_table_catalog {
358        if let Some(col_names) = col_names {
359            let target_table_columns = target_table_catalog
360                .columns()
361                .iter()
362                .map(|c| c.name())
363                .collect::<BTreeSet<_>>();
364            for c in col_names {
365                if !target_table_columns.contains(c.as_str()) {
366                    return Err(RwError::from(ErrorCode::BindError(format!(
367                        "Column {} not found in table {}",
368                        c,
369                        target_table_catalog.name()
370                    ))));
371                }
372            }
373        }
374        if target_table_catalog
375            .columns()
376            .iter()
377            .any(|col| !col.nullable())
378        {
379            notice_to_user(format!(
380                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
381                target_table_catalog.name(),
382            ));
383        }
384    }
385
386    let sink_plan = plan_root.gen_sink_plan(
387        sink_table_name,
388        definition,
389        resolved_with_options,
390        emit_on_window_close,
391        db_name.to_owned(),
392        sink_from_table_name,
393        format_desc,
394        without_backfill,
395        target_table_catalog.clone(),
396        partition_info,
397        user_specified_columns,
398        auto_refresh_schema_from_table,
399    )?;
400
401    let sink_desc = sink_plan.sink_desc().clone();
402
403    let mut sink_plan: PlanRef = sink_plan.into();
404
405    let ctx = sink_plan.ctx();
406    let explain_trace = ctx.is_explain_trace();
407    if explain_trace {
408        ctx.trace("Create Sink:");
409        ctx.trace(sink_plan.explain_to_string());
410    }
411    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
412
413    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
414    // during binding instead of visiting the optimized plan.
415    let dependencies =
416        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
417            .into_iter()
418            .map(|id| id.as_raw_id() as ObjectId)
419            .chain(
420                dependent_udfs
421                    .into_iter()
422                    .map(|id| id.function_id() as ObjectId),
423            )
424            .collect();
425
426    let sink_catalog = sink_desc.into_catalog(
427        sink_schema_id,
428        sink_database_id,
429        UserId::new(session.user_id()),
430        connector_conn_ref.map(ConnectionId::from),
431    );
432
433    if let Some(table_catalog) = &target_table_catalog {
434        for column in sink_catalog.full_columns() {
435            if !column.can_dml() {
436                unreachable!(
437                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
438                );
439            }
440        }
441
442        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
443        let exprs = derive_default_column_project_for_sink(
444            &sink_catalog,
445            sink_plan.schema(),
446            &table_columns_without_rw_timestamp,
447            user_specified_columns,
448        )?;
449
450        let logical_project = generic::Project::new(exprs, sink_plan);
451
452        sink_plan = StreamProject::new(logical_project).into();
453
454        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
455            &table_columns_without_rw_timestamp,
456        )?;
457
458        if let Some(exprs) = exprs {
459            let logical_project = generic::Project::new(exprs, sink_plan);
460            sink_plan = StreamProject::new(logical_project).into();
461        }
462    };
463
464    Ok(SinkPlanContext {
465        query,
466        sink_plan,
467        sink_catalog,
468        target_table_catalog,
469        dependencies,
470    })
471}
472
473// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
474// Return:
475// `Some(PartitionComputeInfo)` if the sink need to compute partition.
476// `None` if the sink does not need to compute partition.
477pub async fn get_partition_compute_info(
478    with_options: &WithOptionsSecResolved,
479) -> Result<Option<PartitionComputeInfo>> {
480    let (options, secret_refs) = with_options.clone().into_parts();
481    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
482        return Ok(None);
483    };
484    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
485    match connector.as_str() {
486        ICEBERG_SINK => {
487            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
488            get_partition_compute_info_for_iceberg(&iceberg_config).await
489        }
490        _ => Ok(None),
491    }
492}
493
494#[allow(clippy::unused_async)]
495async fn get_partition_compute_info_for_iceberg(
496    _iceberg_config: &IcebergConfig,
497) -> Result<Option<PartitionComputeInfo>> {
498    // TODO: check table if exists
499    if _iceberg_config.create_table_if_not_exists {
500        return Ok(None);
501    }
502    let table = _iceberg_config.load_table().await?;
503    let partition_spec = table.metadata().default_partition_spec();
504    if partition_spec.is_unpartitioned() {
505        return Ok(None);
506    }
507
508    // Separate the partition spec into two parts: sparse partition and range partition.
509    // Sparse partition means that the data distribution is more sparse at a given time.
510    // Range partition means that the data distribution is likely same at a given time.
511    // Only compute the partition and shuffle by them for the sparse partition.
512    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
513        // Sparse partition
514        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
515        // Range partition
516        Transform::Year
517        | Transform::Month
518        | Transform::Day
519        | Transform::Hour
520        | Transform::Void
521        | Transform::Unknown => false,
522    });
523    if !has_sparse_partition {
524        return Ok(None);
525    }
526
527    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
528        table.metadata().default_partition_type().clone(),
529    ))
530    .map_err(|_| {
531        RwError::from(ErrorCode::SinkError(
532            "Fail to convert iceberg partition type to arrow type".into(),
533        ))
534    })?;
535    let ArrowDataType::Struct(struct_fields) = arrow_type else {
536        return Err(RwError::from(ErrorCode::SinkError(
537            "Partition type of iceberg should be a struct type".into(),
538        )));
539    };
540
541    let schema = table.metadata().current_schema();
542    let partition_fields = partition_spec
543        .fields()
544        .iter()
545        .map(|f| {
546            let source_f =
547                schema
548                    .field_by_id(f.source_id)
549                    .ok_or(RwError::from(ErrorCode::SinkError(
550                        "Fail to look up iceberg partition field".into(),
551                    )))?;
552            Ok((source_f.name.clone(), f.transform))
553        })
554        .collect::<Result<Vec<_>>>()?;
555
556    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
557        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
558        partition_fields,
559    })))
560}
561
562pub async fn handle_create_sink(
563    handle_args: HandlerArgs,
564    stmt: CreateSinkStatement,
565    is_iceberg_engine_internal: bool,
566) -> Result<RwPgResponse> {
567    let session = handle_args.session.clone();
568
569    session.check_cluster_limits().await?;
570
571    let if_not_exists = stmt.if_not_exists;
572    if let Either::Right(resp) = session.check_relation_name_duplicated(
573        stmt.sink_name.clone(),
574        StatementType::CREATE_SINK,
575        if_not_exists,
576    )? {
577        return Ok(resp);
578    }
579
580    let (mut sink, graph, target_table_catalog, dependencies) = {
581        let SinkPlanContext {
582            query,
583            sink_plan: plan,
584            sink_catalog: sink,
585            target_table_catalog,
586            dependencies,
587        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
588
589        let has_order_by = !query.order_by.is_empty();
590        if has_order_by {
591            plan.ctx().warn_to_user(
592                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
593                    .to_owned(),
594            );
595        }
596
597        let graph = build_graph(plan, Some(GraphJobType::Sink))?;
598
599        (sink, graph, target_table_catalog, dependencies)
600    };
601
602    if let Some(table_catalog) = target_table_catalog {
603        sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
604    }
605
606    let _job_guard =
607        session
608            .env()
609            .creating_streaming_job_tracker()
610            .guard(CreatingStreamingJobInfo::new(
611                session.session_id(),
612                sink.database_id,
613                sink.schema_id,
614                sink.name.clone(),
615            ));
616
617    let catalog_writer = session.catalog_writer()?;
618    catalog_writer
619        .create_sink(sink.to_proto(), graph, dependencies, if_not_exists)
620        .await?;
621
622    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
623}
624
625pub fn fetch_incoming_sinks(
626    session: &Arc<SessionImpl>,
627    table: &TableCatalog,
628) -> Result<Vec<Arc<SinkCatalog>>> {
629    let reader = session.env().catalog_reader().read_guard();
630    let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
631    let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
632        return Ok(vec![]);
633    };
634    let mut sinks = vec![];
635    for sink_id in incoming_sinks {
636        sinks.push(
637            schema
638                .get_sink_by_id(sink_id)
639                .expect("should exist")
640                .clone(),
641        );
642    }
643    Ok(sinks)
644}
645
646fn derive_sink_to_table_expr(
647    sink_schema: &Schema,
648    idx: usize,
649    target_type: &DataType,
650) -> Result<ExprImpl> {
651    let input_type = &sink_schema.fields()[idx].data_type;
652
653    if target_type != input_type {
654        bail!(
655            "column type mismatch: {:?} vs {:?}, column name: {:?}",
656            target_type,
657            input_type,
658            sink_schema.fields()[idx].name
659        );
660    } else {
661        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
662            idx,
663            input_type.clone(),
664        ))))
665    }
666}
667
668pub(crate) fn derive_default_column_project_for_sink(
669    sink: &SinkCatalog,
670    sink_schema: &Schema,
671    columns: &[ColumnCatalog],
672    user_specified_columns: bool,
673) -> Result<Vec<ExprImpl>> {
674    assert_eq!(sink.full_schema().len(), sink_schema.len());
675
676    let default_column_exprs = TableCatalog::default_column_exprs(columns);
677
678    let mut exprs = vec![];
679
680    let sink_visible_col_idxes = sink
681        .full_columns()
682        .iter()
683        .positions(|c| !c.is_hidden())
684        .collect_vec();
685    let sink_visible_col_idxes_by_name = sink
686        .full_columns()
687        .iter()
688        .enumerate()
689        .filter(|(_, c)| !c.is_hidden())
690        .map(|(i, c)| (c.name(), i))
691        .collect::<BTreeMap<_, _>>();
692
693    for (idx, column) in columns.iter().enumerate() {
694        if !column.can_dml() {
695            continue;
696        }
697
698        let default_col_expr =
699            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
700
701        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
702            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
703        };
704
705        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
706        // The missing columns will be filled with default value (`null` if not explicitly defined).
707        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
708        #[allow(clippy::collapsible_else_if)]
709        if user_specified_columns {
710            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
711                exprs.push(sink_col_expr(*idx)?);
712            } else {
713                exprs.push(default_col_expr());
714            }
715        } else {
716            if idx < sink_visible_col_idxes.len() {
717                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
718            } else {
719                exprs.push(default_col_expr());
720            };
721        }
722    }
723    Ok(exprs)
724}
725
726/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
727/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
728/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
729fn bind_sink_format_desc(
730    session: &SessionImpl,
731    value: FormatEncodeOptions,
732) -> Result<SinkFormatDesc> {
733    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
734    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
735    use risingwave_sqlparser::ast::{Encode as E, Format as F};
736
737    let format = match value.format {
738        F::Plain => SinkFormat::AppendOnly,
739        F::Upsert => SinkFormat::Upsert,
740        F::Debezium => SinkFormat::Debezium,
741        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
742            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
743        }
744    };
745    let encode = match value.row_encode {
746        E::Json => SinkEncode::Json,
747        E::Protobuf => SinkEncode::Protobuf,
748        E::Avro => SinkEncode::Avro,
749        E::Template => SinkEncode::Template,
750        E::Parquet => SinkEncode::Parquet,
751        e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
752            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
753        }
754    };
755
756    let mut key_encode = None;
757    if let Some(encode) = value.key_encode {
758        match encode {
759            E::Text => key_encode = Some(SinkEncode::Text),
760            E::Bytes => key_encode = Some(SinkEncode::Bytes),
761            _ => {
762                return Err(ErrorCode::BindError(format!(
763                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
764                ))
765                .into());
766            }
767        }
768    }
769
770    let (props, connection_type_flag, schema_registry_conn_ref) =
771        resolve_connection_ref_and_secret_ref(
772            WithOptions::try_from(value.row_options.as_slice())?,
773            session,
774            Some(TelemetryDatabaseObject::Sink),
775        )?;
776    ensure_connection_type_allowed(
777        connection_type_flag,
778        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
779    )?;
780    let (mut options, secret_refs) = props.into_parts();
781
782    options
783        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
784        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
785
786    Ok(SinkFormatDesc {
787        format,
788        encode,
789        options,
790        secret_refs,
791        key_encode,
792        connection_id: schema_registry_conn_ref,
793    })
794}
795
796static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
797    LazyLock::new(|| {
798        use risingwave_connector::sink::Sink as _;
799        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
800        use risingwave_connector::sink::file_sink::fs::FsSink;
801        use risingwave_connector::sink::file_sink::gcs::GcsSink;
802        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
803        use risingwave_connector::sink::file_sink::s3::S3Sink;
804        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
805        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
806        use risingwave_connector::sink::kafka::KafkaSink;
807        use risingwave_connector::sink::kinesis::KinesisSink;
808        use risingwave_connector::sink::mqtt::MqttSink;
809        use risingwave_connector::sink::pulsar::PulsarSink;
810        use risingwave_connector::sink::redis::RedisSink;
811
812        convert_args!(hashmap!(
813                GooglePubSubSink::SINK_NAME => hashmap!(
814                    Format::Plain => vec![Encode::Json],
815                ),
816                KafkaSink::SINK_NAME => hashmap!(
817                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
818                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
819                    Format::Debezium => vec![Encode::Json],
820                ),
821                FileSink::<S3Sink>::SINK_NAME => hashmap!(
822                    Format::Plain => vec![Encode::Parquet, Encode::Json],
823                ),
824                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
825                    Format::Plain => vec![Encode::Parquet, Encode::Json],
826                ),
827                FileSink::<GcsSink>::SINK_NAME => hashmap!(
828                    Format::Plain => vec![Encode::Parquet, Encode::Json],
829                ),
830                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
831                    Format::Plain => vec![Encode::Parquet, Encode::Json],
832                ),
833                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
834                    Format::Plain => vec![Encode::Parquet, Encode::Json],
835                ),
836                FileSink::<FsSink>::SINK_NAME => hashmap!(
837                    Format::Plain => vec![Encode::Parquet, Encode::Json],
838                ),
839                KinesisSink::SINK_NAME => hashmap!(
840                    Format::Plain => vec![Encode::Json],
841                    Format::Upsert => vec![Encode::Json],
842                    Format::Debezium => vec![Encode::Json],
843                ),
844                MqttSink::SINK_NAME => hashmap!(
845                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
846                ),
847                PulsarSink::SINK_NAME => hashmap!(
848                    Format::Plain => vec![Encode::Json],
849                    Format::Upsert => vec![Encode::Json],
850                    Format::Debezium => vec![Encode::Json],
851                ),
852                RedisSink::SINK_NAME => hashmap!(
853                    Format::Plain => vec![Encode::Json, Encode::Template],
854                    Format::Upsert => vec![Encode::Json, Encode::Template],
855                ),
856        ))
857    });
858
859pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
860    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
861        .get(connector)
862        .ok_or_else(|| {
863            ErrorCode::BindError(format!(
864                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
865                connector
866            ))
867        })?;
868    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
869        ErrorCode::BindError(format!(
870            "connector {} does not support format {:?}",
871            connector, format_desc.format
872        ))
873    })?;
874    if !compatible_encodes.contains(&format_desc.row_encode) {
875        return Err(ErrorCode::BindError(format!(
876            "connector {} does not support format {:?} with encode {:?}",
877            connector, format_desc.format, format_desc.row_encode
878        ))
879        .into());
880    }
881
882    // only allow Kafka connector work with `bytes` as key encode
883    if let Some(encode) = &format_desc.key_encode
884        && connector != KAFKA_SINK
885        && matches!(encode, Encode::Bytes)
886    {
887        return Err(ErrorCode::BindError(format!(
888            "key encode bytes only works with kafka connector, but found {}",
889            connector
890        ))
891        .into());
892    }
893
894    Ok(())
895}
896
897#[cfg(test)]
898pub mod tests {
899    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
900
901    use crate::catalog::root_catalog::SchemaPath;
902    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
903
904    #[tokio::test]
905    async fn test_create_sink_handler() {
906        let proto_file = create_proto_file(PROTO_FILE_DATA);
907        let sql = format!(
908            r#"CREATE SOURCE t1
909    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
910    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
911            proto_file.path().to_str().unwrap()
912        );
913        let frontend = LocalFrontend::new(Default::default()).await;
914        frontend.run_sql(sql).await.unwrap();
915
916        let sql = "create materialized view mv1 as select t1.country from t1;";
917        frontend.run_sql(sql).await.unwrap();
918
919        let sql = r#"CREATE SINK snk1 FROM mv1
920                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
921                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
922                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
923        frontend.run_sql(sql).await.unwrap();
924
925        let session = frontend.session_ref();
926        let catalog_reader = session.env().catalog_reader().read_guard();
927        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
928
929        // Check source exists.
930        let (source, _) = catalog_reader
931            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
932            .unwrap();
933        assert_eq!(source.name, "t1");
934
935        // Check table exists.
936        let (table, schema_name) = catalog_reader
937            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
938            .unwrap();
939        assert_eq!(table.name(), "mv1");
940
941        // Check sink exists.
942        let (sink, _) = catalog_reader
943            .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
944            .unwrap();
945        assert_eq!(sink.name, "snk1");
946    }
947}