risingwave_frontend/handler/
create_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28    ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::WithPropertiesExt;
35use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
36use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
37use risingwave_connector::sink::kafka::KAFKA_SINK;
38use risingwave_connector::sink::{
39    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40    enforce_secret_sink,
41};
42use risingwave_pb::catalog::connection_params::PbConnectionType;
43use risingwave_pb::catalog::{PbSink, PbSource, Table};
44use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50    Query, Statement,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
56use super::util::gen_query_from_table_name;
57use crate::binder::Binder;
58use crate::catalog::SinkId;
59use crate::error::{ErrorCode, Result, RwError};
60use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
61use crate::handler::HandlerArgs;
62use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
63use crate::handler::create_mv::parse_column_names;
64use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
65use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
66use crate::optimizer::plan_node::{
67    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, generic,
68};
69use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
70use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
71use crate::session::SessionImpl;
72use crate::session::current::notice_to_user;
73use crate::stream_fragmenter::{GraphJobType, build_graph};
74use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
75use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
76
77static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
78    LazyLock::new(|| {
79        hashset! {
80            PbConnectionType::Unspecified,
81            PbConnectionType::Kafka,
82            PbConnectionType::Iceberg,
83            PbConnectionType::Elasticsearch,
84        }
85    });
86
87static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
88    LazyLock::new(|| {
89        hashset! {
90            PbConnectionType::Unspecified,
91            PbConnectionType::SchemaRegistry,
92        }
93    });
94
95// used to store result of `gen_sink_plan`
96pub struct SinkPlanContext {
97    pub query: Box<Query>,
98    pub sink_plan: PlanRef,
99    pub sink_catalog: SinkCatalog,
100    pub target_table_catalog: Option<Arc<TableCatalog>>,
101    pub dependencies: HashSet<ObjectId>,
102}
103
104pub async fn gen_sink_plan(
105    handler_args: HandlerArgs,
106    stmt: CreateSinkStatement,
107    explain_options: Option<ExplainOptions>,
108    is_iceberg_engine_internal: bool,
109) -> Result<SinkPlanContext> {
110    let session = handler_args.session.clone();
111    let session = session.as_ref();
112    let user_specified_columns = !stmt.columns.is_empty();
113    let db_name = &session.database();
114    let (sink_schema_name, sink_table_name) =
115        Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
116
117    let mut with_options = handler_args.with_options.clone();
118
119    if session
120        .env()
121        .system_params_manager()
122        .get_params()
123        .load()
124        .enforce_secret()
125        && Feature::SecretManagement.check_available().is_ok()
126    {
127        enforce_secret_sink(&with_options)?;
128    }
129
130    resolve_privatelink_in_with_option(&mut with_options)?;
131    let (mut resolved_with_options, connection_type, connector_conn_ref) =
132        resolve_connection_ref_and_secret_ref(
133            with_options,
134            session,
135            Some(TelemetryDatabaseObject::Sink),
136        )?;
137    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
138
139    // if not using connection, we don't need to check connector match connection type
140    if !matches!(connection_type, PbConnectionType::Unspecified) {
141        let Some(connector) = resolved_with_options.get_connector() else {
142            return Err(RwError::from(ErrorCode::ProtocolError(format!(
143                "missing field '{}' in WITH clause",
144                CONNECTOR_TYPE_KEY
145            ))));
146        };
147        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
148    }
149
150    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
151
152    let context = if let Some(explain_options) = explain_options {
153        OptimizerContext::new(handler_args.clone(), explain_options)
154    } else {
155        OptimizerContext::from_handler_args(handler_args.clone())
156    };
157
158    // Used for debezium's table name
159    let sink_from_table_name;
160    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
161    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
162    let direct_sink;
163    let query = match stmt.sink_from {
164        CreateSink::From(from_name) => {
165            sink_from_table_name = from_name.0.last().unwrap().real_value();
166            direct_sink = true;
167            Box::new(gen_query_from_table_name(from_name))
168        }
169        CreateSink::AsQuery(query) => {
170            sink_from_table_name = sink_table_name.clone();
171            direct_sink = false;
172            query
173        }
174    };
175
176    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
177
178    let (sink_database_id, sink_schema_id) =
179        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
180
181    let (dependent_relations, dependent_udfs, bound) = {
182        let mut binder = Binder::new_for_stream(session);
183        let bound = binder.bind_query(*query.clone())?;
184        (
185            binder.included_relations().clone(),
186            binder.included_udfs().clone(),
187            bound,
188        )
189    };
190
191    let col_names = if sink_into_table_name.is_some() {
192        parse_column_names(&stmt.columns)
193    } else {
194        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
195        get_column_names(&bound, stmt.columns)?
196    };
197
198    if sink_into_table_name.is_some() {
199        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
200
201        if prev.is_some() {
202            return Err(RwError::from(ErrorCode::BindError(
203                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
204            )));
205        }
206    }
207
208    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
209    if emit_on_window_close {
210        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
211    }
212
213    let connector = resolved_with_options
214        .get(CONNECTOR_TYPE_KEY)
215        .cloned()
216        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
217
218    let format_desc = match stmt.sink_schema {
219        // Case A: new syntax `format ... encode ...`
220        Some(f) => {
221            validate_compatibility(&connector, &f)?;
222            Some(bind_sink_format_desc(session,f)?)
223        }
224        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
225            // Case B: old syntax `type = '...'`
226            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
227                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
228                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
229                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
230                }
231                f
232            }),
233            // Case C: no format + encode required
234            None => None,
235        },
236    };
237
238    let definition = context.normalized_sql().to_owned();
239    let mut plan_root = if is_iceberg_engine_internal {
240        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
241    } else {
242        Planner::new_for_stream(context.into()).plan_query(bound)?
243    };
244    if let Some(col_names) = &col_names {
245        plan_root.set_out_names(col_names.clone())?;
246    };
247
248    let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
249        Some(flag) if flag.eq_ignore_ascii_case("false") => {
250            if direct_sink || is_iceberg_engine_internal {
251                true
252            } else {
253                return Err(ErrorCode::BindError(
254                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
255                )
256                .into());
257            }
258        }
259        _ => false,
260    };
261
262    let target_table_catalog = stmt
263        .into_table_name
264        .as_ref()
265        .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
266        .transpose()?;
267
268    if let Some(target_table_catalog) = &target_table_catalog {
269        if let Some(col_names) = col_names {
270            let target_table_columns = target_table_catalog
271                .columns()
272                .iter()
273                .map(|c| c.name())
274                .collect::<BTreeSet<_>>();
275            for c in col_names {
276                if !target_table_columns.contains(c.as_str()) {
277                    return Err(RwError::from(ErrorCode::BindError(format!(
278                        "Column {} not found in table {}",
279                        c,
280                        target_table_catalog.name()
281                    ))));
282                }
283            }
284        }
285        if target_table_catalog
286            .columns()
287            .iter()
288            .any(|col| !col.nullable())
289        {
290            notice_to_user(format!(
291                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
292                target_table_catalog.name(),
293            ));
294        }
295    }
296
297    let sink_plan = plan_root.gen_sink_plan(
298        sink_table_name,
299        definition,
300        resolved_with_options,
301        emit_on_window_close,
302        db_name.to_owned(),
303        sink_from_table_name,
304        format_desc,
305        without_backfill,
306        target_table_catalog.clone(),
307        partition_info,
308        user_specified_columns,
309    )?;
310
311    let sink_desc = sink_plan.sink_desc().clone();
312
313    let mut sink_plan: PlanRef = sink_plan.into();
314
315    let ctx = sink_plan.ctx();
316    let explain_trace = ctx.is_explain_trace();
317    if explain_trace {
318        ctx.trace("Create Sink:");
319        ctx.trace(sink_plan.explain_to_string());
320    }
321    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
322
323    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
324    // during binding instead of visiting the optimized plan.
325    let dependencies =
326        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
327            .into_iter()
328            .map(|id| id.table_id() as ObjectId)
329            .chain(
330                dependent_udfs
331                    .into_iter()
332                    .map(|id| id.function_id() as ObjectId),
333            )
334            .collect();
335
336    let sink_catalog = sink_desc.into_catalog(
337        SchemaId::new(sink_schema_id),
338        DatabaseId::new(sink_database_id),
339        UserId::new(session.user_id()),
340        connector_conn_ref.map(ConnectionId::from),
341    );
342
343    if let Some(table_catalog) = &target_table_catalog {
344        for column in sink_catalog.full_columns() {
345            if !column.can_dml() {
346                unreachable!(
347                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
348                );
349            }
350        }
351
352        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
353        let exprs = derive_default_column_project_for_sink(
354            &sink_catalog,
355            sink_plan.schema(),
356            &table_columns_without_rw_timestamp,
357            user_specified_columns,
358        )?;
359
360        let logical_project = generic::Project::new(exprs, sink_plan);
361
362        sink_plan = StreamProject::new(logical_project).into();
363
364        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
365            &table_columns_without_rw_timestamp,
366        )?;
367
368        if let Some(exprs) = exprs {
369            let logical_project = generic::Project::new(exprs, sink_plan);
370            sink_plan = StreamProject::new(logical_project).into();
371        }
372    };
373
374    Ok(SinkPlanContext {
375        query,
376        sink_plan,
377        sink_catalog,
378        target_table_catalog,
379        dependencies,
380    })
381}
382
383// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
384// Return:
385// `Some(PartitionComputeInfo)` if the sink need to compute partition.
386// `None` if the sink does not need to compute partition.
387pub async fn get_partition_compute_info(
388    with_options: &WithOptionsSecResolved,
389) -> Result<Option<PartitionComputeInfo>> {
390    let (options, secret_refs) = with_options.clone().into_parts();
391    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
392        return Ok(None);
393    };
394    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
395    match connector.as_str() {
396        ICEBERG_SINK => {
397            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
398            get_partition_compute_info_for_iceberg(&iceberg_config).await
399        }
400        _ => Ok(None),
401    }
402}
403
404#[allow(clippy::unused_async)]
405async fn get_partition_compute_info_for_iceberg(
406    _iceberg_config: &IcebergConfig,
407) -> Result<Option<PartitionComputeInfo>> {
408    // TODO: check table if exists
409    if _iceberg_config.create_table_if_not_exists {
410        return Ok(None);
411    }
412    let table = _iceberg_config.load_table().await?;
413    let partition_spec = table.metadata().default_partition_spec();
414    if partition_spec.is_unpartitioned() {
415        return Ok(None);
416    }
417
418    // Separate the partition spec into two parts: sparse partition and range partition.
419    // Sparse partition means that the data distribution is more sparse at a given time.
420    // Range partition means that the data distribution is likely same at a given time.
421    // Only compute the partition and shuffle by them for the sparse partition.
422    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
423        // Sparse partition
424        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
425        // Range partition
426        Transform::Year
427        | Transform::Month
428        | Transform::Day
429        | Transform::Hour
430        | Transform::Void
431        | Transform::Unknown => false,
432    });
433    if !has_sparse_partition {
434        return Ok(None);
435    }
436
437    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
438        table.metadata().default_partition_type().clone(),
439    ))
440    .map_err(|_| {
441        RwError::from(ErrorCode::SinkError(
442            "Fail to convert iceberg partition type to arrow type".into(),
443        ))
444    })?;
445    let ArrowDataType::Struct(struct_fields) = arrow_type else {
446        return Err(RwError::from(ErrorCode::SinkError(
447            "Partition type of iceberg should be a struct type".into(),
448        )));
449    };
450
451    let schema = table.metadata().current_schema();
452    let partition_fields = partition_spec
453        .fields()
454        .iter()
455        .map(|f| {
456            let source_f =
457                schema
458                    .field_by_id(f.source_id)
459                    .ok_or(RwError::from(ErrorCode::SinkError(
460                        "Fail to look up iceberg partition field".into(),
461                    )))?;
462            Ok((source_f.name.clone(), f.transform))
463        })
464        .collect::<Result<Vec<_>>>()?;
465
466    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
467        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
468        partition_fields,
469    })))
470}
471
472pub async fn handle_create_sink(
473    handle_args: HandlerArgs,
474    stmt: CreateSinkStatement,
475    is_iceberg_engine_internal: bool,
476) -> Result<RwPgResponse> {
477    let session = handle_args.session.clone();
478
479    session.check_cluster_limits().await?;
480
481    if let Either::Right(resp) = session.check_relation_name_duplicated(
482        stmt.sink_name.clone(),
483        StatementType::CREATE_SINK,
484        stmt.if_not_exists,
485    )? {
486        return Ok(resp);
487    }
488
489    let (mut sink, graph, target_table_catalog, dependencies) = {
490        let SinkPlanContext {
491            query,
492            sink_plan: plan,
493            sink_catalog: sink,
494            target_table_catalog,
495            dependencies,
496        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
497
498        let has_order_by = !query.order_by.is_empty();
499        if has_order_by {
500            plan.ctx().warn_to_user(
501                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
502                    .to_owned(),
503            );
504        }
505
506        let graph = build_graph(plan, Some(GraphJobType::Sink))?;
507
508        (sink, graph, target_table_catalog, dependencies)
509    };
510
511    let mut target_table_replace_plan = None;
512    if let Some(table_catalog) = target_table_catalog {
513        use crate::handler::alter_table_column::hijack_merger_for_target_table;
514
515        let (mut graph, mut table, source, target_job_type) =
516            reparse_table_for_sink(&session, &table_catalog).await?;
517
518        sink.original_target_columns = table
519            .columns
520            .iter()
521            .map(|col| ColumnCatalog::from(col.clone()))
522            .collect_vec();
523
524        table
525            .incoming_sinks
526            .clone_from(&table_catalog.incoming_sinks);
527
528        let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
529        let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
530
531        let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
532        for existing_sink in incoming_sinks {
533            hijack_merger_for_target_table(
534                &mut graph,
535                &columns_without_rw_timestamp,
536                &existing_sink,
537                Some(&existing_sink.unique_identity()),
538            )?;
539        }
540
541        // for new creating sink, we don't have a unique identity because the sink id is not generated yet.
542        hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
543
544        target_table_replace_plan = Some(ReplaceJobPlan {
545            replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
546                replace_job_plan::ReplaceTable {
547                    table: Some(table),
548                    source,
549                    job_type: target_job_type as _,
550                },
551            )),
552            fragment_graph: Some(graph),
553            table_col_index_mapping: None,
554        });
555    }
556
557    let _job_guard =
558        session
559            .env()
560            .creating_streaming_job_tracker()
561            .guard(CreatingStreamingJobInfo::new(
562                session.session_id(),
563                sink.database_id.database_id,
564                sink.schema_id.schema_id,
565                sink.name.clone(),
566            ));
567
568    let catalog_writer = session.catalog_writer()?;
569    catalog_writer
570        .create_sink(
571            sink.to_proto(),
572            graph,
573            target_table_replace_plan,
574            dependencies,
575        )
576        .await?;
577
578    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
579}
580
581pub fn fetch_incoming_sinks(
582    session: &Arc<SessionImpl>,
583    incoming_sink_ids: &HashSet<SinkId>,
584) -> Result<Vec<Arc<SinkCatalog>>> {
585    let reader = session.env().catalog_reader().read_guard();
586    let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
587    let db_name = &session.database();
588    for schema in reader.iter_schemas(db_name)? {
589        for sink in schema.iter_sink() {
590            if incoming_sink_ids.contains(&sink.id.sink_id) {
591                sinks.push(sink.clone());
592            }
593        }
594    }
595
596    Ok(sinks)
597}
598
599pub(crate) async fn reparse_table_for_sink(
600    session: &Arc<SessionImpl>,
601    table_catalog: &Arc<TableCatalog>,
602) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
603    // Retrieve the original table definition and parse it to AST.
604    let definition = table_catalog.create_sql_ast_purified()?;
605    let Statement::CreateTable { name, .. } = &definition else {
606        panic!("unexpected statement: {:?}", definition);
607    };
608    let table_name = name.clone();
609
610    // Create handler args as if we're creating a new table with the altered definition.
611    let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
612    let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
613
614    let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
615        session,
616        table_name,
617        table_catalog,
618        handler_args,
619        definition,
620        col_id_gen,
621        SqlColumnStrategy::FollowUnchecked,
622    )
623    .await?;
624
625    Ok((graph, table, source, job_type))
626}
627
628pub(crate) fn insert_merger_to_union_with_project(
629    node: &mut StreamNode,
630    project_node: &PbNodeBody,
631    uniq_identity: Option<&str>,
632) {
633    if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
634        // TODO: MergeNode is used as a placeholder, see issue #17658
635        node.input.push(StreamNode {
636            input: vec![StreamNode {
637                node_body: Some(NodeBody::Merge(Box::new(MergeNode {
638                    ..Default::default()
639                }))),
640                ..Default::default()
641            }],
642            identity: uniq_identity
643                .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
644                .to_owned(),
645            fields: node.fields.clone(),
646            node_body: Some(project_node.clone()),
647            ..Default::default()
648        });
649
650        return;
651    }
652
653    for input in &mut node.input {
654        insert_merger_to_union_with_project(input, project_node, uniq_identity);
655    }
656}
657
658fn derive_sink_to_table_expr(
659    sink_schema: &Schema,
660    idx: usize,
661    target_type: &DataType,
662) -> Result<ExprImpl> {
663    let input_type = &sink_schema.fields()[idx].data_type;
664
665    if target_type != input_type {
666        bail!(
667            "column type mismatch: {:?} vs {:?}, column name: {:?}",
668            target_type,
669            input_type,
670            sink_schema.fields()[idx].name
671        );
672    } else {
673        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
674            idx,
675            input_type.clone(),
676        ))))
677    }
678}
679
680pub(crate) fn derive_default_column_project_for_sink(
681    sink: &SinkCatalog,
682    sink_schema: &Schema,
683    columns: &[ColumnCatalog],
684    user_specified_columns: bool,
685) -> Result<Vec<ExprImpl>> {
686    assert_eq!(sink.full_schema().len(), sink_schema.len());
687
688    let default_column_exprs = TableCatalog::default_column_exprs(columns);
689
690    let mut exprs = vec![];
691
692    let sink_visible_col_idxes = sink
693        .full_columns()
694        .iter()
695        .positions(|c| !c.is_hidden())
696        .collect_vec();
697    let sink_visible_col_idxes_by_name = sink
698        .full_columns()
699        .iter()
700        .enumerate()
701        .filter(|(_, c)| !c.is_hidden())
702        .map(|(i, c)| (c.name(), i))
703        .collect::<BTreeMap<_, _>>();
704
705    for (idx, column) in columns.iter().enumerate() {
706        if !column.can_dml() {
707            continue;
708        }
709
710        let default_col_expr =
711            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
712
713        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
714            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
715        };
716
717        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
718        // The missing columns will be filled with default value (`null` if not explicitly defined).
719        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
720        #[allow(clippy::collapsible_else_if)]
721        if user_specified_columns {
722            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
723                exprs.push(sink_col_expr(*idx)?);
724            } else {
725                exprs.push(default_col_expr());
726            }
727        } else {
728            if idx < sink_visible_col_idxes.len() {
729                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
730            } else {
731                exprs.push(default_col_expr());
732            };
733        }
734    }
735    Ok(exprs)
736}
737
738/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
739/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
740/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
741fn bind_sink_format_desc(
742    session: &SessionImpl,
743    value: FormatEncodeOptions,
744) -> Result<SinkFormatDesc> {
745    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
746    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
747    use risingwave_sqlparser::ast::{Encode as E, Format as F};
748
749    let format = match value.format {
750        F::Plain => SinkFormat::AppendOnly,
751        F::Upsert => SinkFormat::Upsert,
752        F::Debezium => SinkFormat::Debezium,
753        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
754            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
755        }
756    };
757    let encode = match value.row_encode {
758        E::Json => SinkEncode::Json,
759        E::Protobuf => SinkEncode::Protobuf,
760        E::Avro => SinkEncode::Avro,
761        E::Template => SinkEncode::Template,
762        E::Parquet => SinkEncode::Parquet,
763        e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
764            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
765        }
766    };
767
768    let mut key_encode = None;
769    if let Some(encode) = value.key_encode {
770        match encode {
771            E::Text => key_encode = Some(SinkEncode::Text),
772            E::Bytes => key_encode = Some(SinkEncode::Bytes),
773            _ => {
774                return Err(ErrorCode::BindError(format!(
775                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
776                ))
777                .into());
778            }
779        }
780    }
781
782    let (props, connection_type_flag, schema_registry_conn_ref) =
783        resolve_connection_ref_and_secret_ref(
784            WithOptions::try_from(value.row_options.as_slice())?,
785            session,
786            Some(TelemetryDatabaseObject::Sink),
787        )?;
788    ensure_connection_type_allowed(
789        connection_type_flag,
790        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
791    )?;
792    let (mut options, secret_refs) = props.into_parts();
793
794    options
795        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
796        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
797
798    Ok(SinkFormatDesc {
799        format,
800        encode,
801        options,
802        secret_refs,
803        key_encode,
804        connection_id: schema_registry_conn_ref,
805    })
806}
807
808static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
809    LazyLock::new(|| {
810        use risingwave_connector::sink::Sink as _;
811        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
812        use risingwave_connector::sink::file_sink::fs::FsSink;
813        use risingwave_connector::sink::file_sink::gcs::GcsSink;
814        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
815        use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
816        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
817        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
818        use risingwave_connector::sink::kafka::KafkaSink;
819        use risingwave_connector::sink::kinesis::KinesisSink;
820        use risingwave_connector::sink::mqtt::MqttSink;
821        use risingwave_connector::sink::pulsar::PulsarSink;
822        use risingwave_connector::sink::redis::RedisSink;
823
824        convert_args!(hashmap!(
825                GooglePubSubSink::SINK_NAME => hashmap!(
826                    Format::Plain => vec![Encode::Json],
827                ),
828                KafkaSink::SINK_NAME => hashmap!(
829                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
830                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
831                    Format::Debezium => vec![Encode::Json],
832                ),
833                FileSink::<S3Sink>::SINK_NAME => hashmap!(
834                    Format::Plain => vec![Encode::Parquet, Encode::Json],
835                ),
836                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
837                    Format::Plain => vec![Encode::Parquet, Encode::Json],
838                ),
839                FileSink::<GcsSink>::SINK_NAME => hashmap!(
840                    Format::Plain => vec![Encode::Parquet, Encode::Json],
841                ),
842                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
843                    Format::Plain => vec![Encode::Parquet, Encode::Json],
844                ),
845                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
846                    Format::Plain => vec![Encode::Parquet, Encode::Json],
847                ),
848                FileSink::<FsSink>::SINK_NAME => hashmap!(
849                    Format::Plain => vec![Encode::Parquet, Encode::Json],
850                ),
851                KinesisSink::SINK_NAME => hashmap!(
852                    Format::Plain => vec![Encode::Json],
853                    Format::Upsert => vec![Encode::Json],
854                    Format::Debezium => vec![Encode::Json],
855                ),
856                MqttSink::SINK_NAME => hashmap!(
857                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
858                ),
859                PulsarSink::SINK_NAME => hashmap!(
860                    Format::Plain => vec![Encode::Json],
861                    Format::Upsert => vec![Encode::Json],
862                    Format::Debezium => vec![Encode::Json],
863                ),
864                RedisSink::SINK_NAME => hashmap!(
865                    Format::Plain => vec![Encode::Json, Encode::Template],
866                    Format::Upsert => vec![Encode::Json, Encode::Template],
867                ),
868        ))
869    });
870
871pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
872    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
873        .get(connector)
874        .ok_or_else(|| {
875            ErrorCode::BindError(format!(
876                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
877                connector
878            ))
879        })?;
880    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
881        ErrorCode::BindError(format!(
882            "connector {} does not support format {:?}",
883            connector, format_desc.format
884        ))
885    })?;
886    if !compatible_encodes.contains(&format_desc.row_encode) {
887        return Err(ErrorCode::BindError(format!(
888            "connector {} does not support format {:?} with encode {:?}",
889            connector, format_desc.format, format_desc.row_encode
890        ))
891        .into());
892    }
893
894    // only allow Kafka connector work with `bytes` as key encode
895    if let Some(encode) = &format_desc.key_encode
896        && connector != KAFKA_SINK
897        && matches!(encode, Encode::Bytes)
898    {
899        return Err(ErrorCode::BindError(format!(
900            "key encode bytes only works with kafka connector, but found {}",
901            connector
902        ))
903        .into());
904    }
905
906    Ok(())
907}
908
909#[cfg(test)]
910pub mod tests {
911    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
912
913    use crate::catalog::root_catalog::SchemaPath;
914    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
915
916    #[tokio::test]
917    async fn test_create_sink_handler() {
918        let proto_file = create_proto_file(PROTO_FILE_DATA);
919        let sql = format!(
920            r#"CREATE SOURCE t1
921    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
922    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
923            proto_file.path().to_str().unwrap()
924        );
925        let frontend = LocalFrontend::new(Default::default()).await;
926        frontend.run_sql(sql).await.unwrap();
927
928        let sql = "create materialized view mv1 as select t1.country from t1;";
929        frontend.run_sql(sql).await.unwrap();
930
931        let sql = r#"CREATE SINK snk1 FROM mv1
932                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
933                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
934                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
935        frontend.run_sql(sql).await.unwrap();
936
937        let session = frontend.session_ref();
938        let catalog_reader = session.env().catalog_reader().read_guard();
939        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
940
941        // Check source exists.
942        let (source, _) = catalog_reader
943            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
944            .unwrap();
945        assert_eq!(source.name, "t1");
946
947        // Check table exists.
948        let (table, schema_name) = catalog_reader
949            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
950            .unwrap();
951        assert_eq!(table.name(), "mv1");
952
953        // Check sink exists.
954        let (sink, _) = catalog_reader
955            .get_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
956            .unwrap();
957        assert_eq!(sink.name, "snk1");
958    }
959}