risingwave_frontend/handler/
create_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28    ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::WithPropertiesExt;
35use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
36use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
37use risingwave_connector::sink::kafka::KAFKA_SINK;
38use risingwave_connector::sink::{
39    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40    enforce_secret_sink,
41};
42use risingwave_pb::catalog::PbSink;
43use risingwave_pb::catalog::connection_params::PbConnectionType;
44use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50    Query, Statement,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
56use super::util::gen_query_from_table_name;
57use crate::binder::Binder;
58use crate::catalog::SinkId;
59use crate::catalog::source_catalog::SourceCatalog;
60use crate::error::{ErrorCode, Result, RwError};
61use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
62use crate::handler::HandlerArgs;
63use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
64use crate::handler::create_mv::parse_column_names;
65use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
66use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
67use crate::optimizer::plan_node::{
68    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, generic,
69};
70use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
71use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
72use crate::session::SessionImpl;
73use crate::session::current::notice_to_user;
74use crate::stream_fragmenter::{GraphJobType, build_graph};
75use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
76use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
77
78static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
79    LazyLock::new(|| {
80        hashset! {
81            PbConnectionType::Unspecified,
82            PbConnectionType::Kafka,
83            PbConnectionType::Iceberg,
84            PbConnectionType::Elasticsearch,
85        }
86    });
87
88static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
89    LazyLock::new(|| {
90        hashset! {
91            PbConnectionType::Unspecified,
92            PbConnectionType::SchemaRegistry,
93        }
94    });
95
96// used to store result of `gen_sink_plan`
97pub struct SinkPlanContext {
98    pub query: Box<Query>,
99    pub sink_plan: PlanRef,
100    pub sink_catalog: SinkCatalog,
101    pub target_table_catalog: Option<Arc<TableCatalog>>,
102    pub dependencies: HashSet<ObjectId>,
103}
104
105pub async fn gen_sink_plan(
106    handler_args: HandlerArgs,
107    stmt: CreateSinkStatement,
108    explain_options: Option<ExplainOptions>,
109    is_iceberg_engine_internal: bool,
110) -> Result<SinkPlanContext> {
111    let session = handler_args.session.clone();
112    let session = session.as_ref();
113    let user_specified_columns = !stmt.columns.is_empty();
114    let db_name = &session.database();
115    let (sink_schema_name, sink_table_name) =
116        Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
117
118    let mut with_options = handler_args.with_options.clone();
119
120    if session
121        .env()
122        .system_params_manager()
123        .get_params()
124        .load()
125        .enforce_secret()
126        && Feature::SecretManagement.check_available().is_ok()
127    {
128        enforce_secret_sink(&with_options)?;
129    }
130
131    resolve_privatelink_in_with_option(&mut with_options)?;
132    let (mut resolved_with_options, connection_type, connector_conn_ref) =
133        resolve_connection_ref_and_secret_ref(
134            with_options,
135            session,
136            Some(TelemetryDatabaseObject::Sink),
137        )?;
138    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
139
140    // if not using connection, we don't need to check connector match connection type
141    if !matches!(connection_type, PbConnectionType::Unspecified) {
142        let Some(connector) = resolved_with_options.get_connector() else {
143            return Err(RwError::from(ErrorCode::ProtocolError(format!(
144                "missing field '{}' in WITH clause",
145                CONNECTOR_TYPE_KEY
146            ))));
147        };
148        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
149    }
150
151    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
152
153    let context = if let Some(explain_options) = explain_options {
154        OptimizerContext::new(handler_args.clone(), explain_options)
155    } else {
156        OptimizerContext::from_handler_args(handler_args.clone())
157    };
158
159    // Used for debezium's table name
160    let sink_from_table_name;
161    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
162    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
163    let direct_sink;
164    let query = match stmt.sink_from {
165        CreateSink::From(from_name) => {
166            sink_from_table_name = from_name.0.last().unwrap().real_value();
167            direct_sink = true;
168            Box::new(gen_query_from_table_name(from_name))
169        }
170        CreateSink::AsQuery(query) => {
171            sink_from_table_name = sink_table_name.clone();
172            direct_sink = false;
173            query
174        }
175    };
176
177    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
178
179    let (sink_database_id, sink_schema_id) =
180        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
181
182    let (dependent_relations, dependent_udfs, bound) = {
183        let mut binder = Binder::new_for_stream(session);
184        let bound = binder.bind_query(*query.clone())?;
185        (
186            binder.included_relations().clone(),
187            binder.included_udfs().clone(),
188            bound,
189        )
190    };
191
192    let col_names = if sink_into_table_name.is_some() {
193        parse_column_names(&stmt.columns)
194    } else {
195        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
196        get_column_names(&bound, stmt.columns)?
197    };
198
199    if sink_into_table_name.is_some() {
200        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
201
202        if prev.is_some() {
203            return Err(RwError::from(ErrorCode::BindError(
204                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
205            )));
206        }
207    }
208
209    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
210    if emit_on_window_close {
211        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
212    }
213
214    let connector = resolved_with_options
215        .get(CONNECTOR_TYPE_KEY)
216        .cloned()
217        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
218
219    let format_desc = match stmt.sink_schema {
220        // Case A: new syntax `format ... encode ...`
221        Some(f) => {
222            validate_compatibility(&connector, &f)?;
223            Some(bind_sink_format_desc(session,f)?)
224        }
225        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
226            // Case B: old syntax `type = '...'`
227            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
228                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
229                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
230                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
231                }
232                f
233            }),
234            // Case C: no format + encode required
235            None => None,
236        },
237    };
238
239    let definition = context.normalized_sql().to_owned();
240    let mut plan_root = if is_iceberg_engine_internal {
241        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
242    } else {
243        Planner::new_for_stream(context.into()).plan_query(bound)?
244    };
245    if let Some(col_names) = &col_names {
246        plan_root.set_out_names(col_names.clone())?;
247    };
248
249    let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
250        Some(flag) if flag.eq_ignore_ascii_case("false") => {
251            if direct_sink || is_iceberg_engine_internal {
252                true
253            } else {
254                return Err(ErrorCode::BindError(
255                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
256                )
257                .into());
258            }
259        }
260        _ => false,
261    };
262
263    let target_table_catalog = stmt
264        .into_table_name
265        .as_ref()
266        .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
267        .transpose()?;
268
269    if let Some(target_table_catalog) = &target_table_catalog {
270        if let Some(col_names) = col_names {
271            let target_table_columns = target_table_catalog
272                .columns()
273                .iter()
274                .map(|c| c.name())
275                .collect::<BTreeSet<_>>();
276            for c in col_names {
277                if !target_table_columns.contains(c.as_str()) {
278                    return Err(RwError::from(ErrorCode::BindError(format!(
279                        "Column {} not found in table {}",
280                        c,
281                        target_table_catalog.name()
282                    ))));
283                }
284            }
285        }
286        if target_table_catalog
287            .columns()
288            .iter()
289            .any(|col| !col.nullable())
290        {
291            notice_to_user(format!(
292                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
293                target_table_catalog.name(),
294            ));
295        }
296    }
297
298    let sink_plan = plan_root.gen_sink_plan(
299        sink_table_name,
300        definition,
301        resolved_with_options,
302        emit_on_window_close,
303        db_name.to_owned(),
304        sink_from_table_name,
305        format_desc,
306        without_backfill,
307        target_table_catalog.clone(),
308        partition_info,
309        user_specified_columns,
310    )?;
311
312    let sink_desc = sink_plan.sink_desc().clone();
313
314    let mut sink_plan: PlanRef = sink_plan.into();
315
316    let ctx = sink_plan.ctx();
317    let explain_trace = ctx.is_explain_trace();
318    if explain_trace {
319        ctx.trace("Create Sink:");
320        ctx.trace(sink_plan.explain_to_string());
321    }
322    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
323
324    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
325    // during binding instead of visiting the optimized plan.
326    let dependencies =
327        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
328            .into_iter()
329            .map(|id| id.table_id() as ObjectId)
330            .chain(
331                dependent_udfs
332                    .into_iter()
333                    .map(|id| id.function_id() as ObjectId),
334            )
335            .collect();
336
337    let sink_catalog = sink_desc.into_catalog(
338        SchemaId::new(sink_schema_id),
339        DatabaseId::new(sink_database_id),
340        UserId::new(session.user_id()),
341        connector_conn_ref.map(ConnectionId::from),
342    );
343
344    if let Some(table_catalog) = &target_table_catalog {
345        for column in sink_catalog.full_columns() {
346            if !column.can_dml() {
347                unreachable!(
348                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
349                );
350            }
351        }
352
353        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
354        let exprs = derive_default_column_project_for_sink(
355            &sink_catalog,
356            sink_plan.schema(),
357            &table_columns_without_rw_timestamp,
358            user_specified_columns,
359        )?;
360
361        let logical_project = generic::Project::new(exprs, sink_plan);
362
363        sink_plan = StreamProject::new(logical_project).into();
364
365        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
366            &table_columns_without_rw_timestamp,
367        )?;
368
369        if let Some(exprs) = exprs {
370            let logical_project = generic::Project::new(exprs, sink_plan);
371            sink_plan = StreamProject::new(logical_project).into();
372        }
373    };
374
375    Ok(SinkPlanContext {
376        query,
377        sink_plan,
378        sink_catalog,
379        target_table_catalog,
380        dependencies,
381    })
382}
383
384// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
385// Return:
386// `Some(PartitionComputeInfo)` if the sink need to compute partition.
387// `None` if the sink does not need to compute partition.
388pub async fn get_partition_compute_info(
389    with_options: &WithOptionsSecResolved,
390) -> Result<Option<PartitionComputeInfo>> {
391    let (options, secret_refs) = with_options.clone().into_parts();
392    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
393        return Ok(None);
394    };
395    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
396    match connector.as_str() {
397        ICEBERG_SINK => {
398            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
399            get_partition_compute_info_for_iceberg(&iceberg_config).await
400        }
401        _ => Ok(None),
402    }
403}
404
405#[allow(clippy::unused_async)]
406async fn get_partition_compute_info_for_iceberg(
407    _iceberg_config: &IcebergConfig,
408) -> Result<Option<PartitionComputeInfo>> {
409    // TODO: check table if exists
410    if _iceberg_config.create_table_if_not_exists {
411        return Ok(None);
412    }
413    let table = _iceberg_config.load_table().await?;
414    let partition_spec = table.metadata().default_partition_spec();
415    if partition_spec.is_unpartitioned() {
416        return Ok(None);
417    }
418
419    // Separate the partition spec into two parts: sparse partition and range partition.
420    // Sparse partition means that the data distribution is more sparse at a given time.
421    // Range partition means that the data distribution is likely same at a given time.
422    // Only compute the partition and shuffle by them for the sparse partition.
423    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
424        // Sparse partition
425        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
426        // Range partition
427        Transform::Year
428        | Transform::Month
429        | Transform::Day
430        | Transform::Hour
431        | Transform::Void
432        | Transform::Unknown => false,
433    });
434    if !has_sparse_partition {
435        return Ok(None);
436    }
437
438    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
439        table.metadata().default_partition_type().clone(),
440    ))
441    .map_err(|_| {
442        RwError::from(ErrorCode::SinkError(
443            "Fail to convert iceberg partition type to arrow type".into(),
444        ))
445    })?;
446    let ArrowDataType::Struct(struct_fields) = arrow_type else {
447        return Err(RwError::from(ErrorCode::SinkError(
448            "Partition type of iceberg should be a struct type".into(),
449        )));
450    };
451
452    let schema = table.metadata().current_schema();
453    let partition_fields = partition_spec
454        .fields()
455        .iter()
456        .map(|f| {
457            let source_f =
458                schema
459                    .field_by_id(f.source_id)
460                    .ok_or(RwError::from(ErrorCode::SinkError(
461                        "Fail to look up iceberg partition field".into(),
462                    )))?;
463            Ok((source_f.name.clone(), f.transform))
464        })
465        .collect::<Result<Vec<_>>>()?;
466
467    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
468        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
469        partition_fields,
470    })))
471}
472
473pub async fn handle_create_sink(
474    handle_args: HandlerArgs,
475    stmt: CreateSinkStatement,
476    is_iceberg_engine_internal: bool,
477) -> Result<RwPgResponse> {
478    let session = handle_args.session.clone();
479
480    session.check_cluster_limits().await?;
481
482    let if_not_exists = stmt.if_not_exists;
483    if let Either::Right(resp) = session.check_relation_name_duplicated(
484        stmt.sink_name.clone(),
485        StatementType::CREATE_SINK,
486        if_not_exists,
487    )? {
488        return Ok(resp);
489    }
490
491    let (mut sink, graph, target_table_catalog, dependencies) = {
492        let SinkPlanContext {
493            query,
494            sink_plan: plan,
495            sink_catalog: sink,
496            target_table_catalog,
497            dependencies,
498        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
499
500        let has_order_by = !query.order_by.is_empty();
501        if has_order_by {
502            plan.ctx().warn_to_user(
503                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
504                    .to_owned(),
505            );
506        }
507
508        let graph = build_graph(plan, Some(GraphJobType::Sink))?;
509
510        (sink, graph, target_table_catalog, dependencies)
511    };
512
513    let mut target_table_replace_plan = None;
514    if let Some(table_catalog) = target_table_catalog {
515        use crate::handler::alter_table_column::hijack_merger_for_target_table;
516
517        let (mut graph, mut table, source, target_job_type) =
518            reparse_table_for_sink(&session, &table_catalog).await?;
519
520        sink.original_target_columns = table.columns.clone();
521
522        table
523            .incoming_sinks
524            .clone_from(&table_catalog.incoming_sinks);
525
526        let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
527        let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
528
529        let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
530        for existing_sink in incoming_sinks {
531            hijack_merger_for_target_table(
532                &mut graph,
533                &columns_without_rw_timestamp,
534                &existing_sink,
535                Some(&existing_sink.unique_identity()),
536            )?;
537        }
538
539        // for new creating sink, we don't have a unique identity because the sink id is not generated yet.
540        hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
541
542        target_table_replace_plan = Some(ReplaceJobPlan {
543            replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
544                replace_job_plan::ReplaceTable {
545                    table: Some(table.to_prost()),
546                    source: source.map(|x| x.to_prost()),
547                    job_type: target_job_type as _,
548                },
549            )),
550            fragment_graph: Some(graph),
551        });
552    }
553
554    let _job_guard =
555        session
556            .env()
557            .creating_streaming_job_tracker()
558            .guard(CreatingStreamingJobInfo::new(
559                session.session_id(),
560                sink.database_id.database_id,
561                sink.schema_id.schema_id,
562                sink.name.clone(),
563            ));
564
565    let catalog_writer = session.catalog_writer()?;
566    catalog_writer
567        .create_sink(
568            sink.to_proto(),
569            graph,
570            target_table_replace_plan,
571            dependencies,
572            if_not_exists,
573        )
574        .await?;
575
576    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
577}
578
579pub fn fetch_incoming_sinks(
580    session: &Arc<SessionImpl>,
581    incoming_sink_ids: &HashSet<SinkId>,
582) -> Result<Vec<Arc<SinkCatalog>>> {
583    let reader = session.env().catalog_reader().read_guard();
584    let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
585    let db_name = &session.database();
586    for schema in reader.iter_schemas(db_name)? {
587        for sink in schema.iter_sink() {
588            if incoming_sink_ids.contains(&sink.id.sink_id) {
589                sinks.push(sink.clone());
590            }
591        }
592    }
593
594    Ok(sinks)
595}
596
597pub(crate) async fn reparse_table_for_sink(
598    session: &Arc<SessionImpl>,
599    table_catalog: &Arc<TableCatalog>,
600) -> Result<(
601    StreamFragmentGraph,
602    TableCatalog,
603    Option<SourceCatalog>,
604    TableJobType,
605)> {
606    // Retrieve the original table definition and parse it to AST.
607    let definition = table_catalog.create_sql_ast_purified()?;
608    let Statement::CreateTable { name, .. } = &definition else {
609        panic!("unexpected statement: {:?}", definition);
610    };
611    let table_name = name.clone();
612
613    // Create handler args as if we're creating a new table with the altered definition.
614    let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
615    let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
616
617    let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
618        session,
619        table_name,
620        table_catalog,
621        handler_args,
622        definition,
623        col_id_gen,
624        SqlColumnStrategy::FollowUnchecked,
625    )
626    .await?;
627
628    Ok((graph, table, source, job_type))
629}
630
631pub(crate) fn insert_merger_to_union_with_project(
632    node: &mut StreamNode,
633    project_node: &PbNodeBody,
634    uniq_identity: Option<&str>,
635) {
636    if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
637        // TODO: MergeNode is used as a placeholder, see issue #17658
638        node.input.push(StreamNode {
639            input: vec![StreamNode {
640                node_body: Some(NodeBody::Merge(Box::new(MergeNode {
641                    ..Default::default()
642                }))),
643                ..Default::default()
644            }],
645            identity: uniq_identity
646                .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
647                .to_owned(),
648            fields: node.fields.clone(),
649            node_body: Some(project_node.clone()),
650            ..Default::default()
651        });
652
653        return;
654    }
655
656    for input in &mut node.input {
657        insert_merger_to_union_with_project(input, project_node, uniq_identity);
658    }
659}
660
661fn derive_sink_to_table_expr(
662    sink_schema: &Schema,
663    idx: usize,
664    target_type: &DataType,
665) -> Result<ExprImpl> {
666    let input_type = &sink_schema.fields()[idx].data_type;
667
668    if target_type != input_type {
669        bail!(
670            "column type mismatch: {:?} vs {:?}, column name: {:?}",
671            target_type,
672            input_type,
673            sink_schema.fields()[idx].name
674        );
675    } else {
676        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
677            idx,
678            input_type.clone(),
679        ))))
680    }
681}
682
683pub(crate) fn derive_default_column_project_for_sink(
684    sink: &SinkCatalog,
685    sink_schema: &Schema,
686    columns: &[ColumnCatalog],
687    user_specified_columns: bool,
688) -> Result<Vec<ExprImpl>> {
689    assert_eq!(sink.full_schema().len(), sink_schema.len());
690
691    let default_column_exprs = TableCatalog::default_column_exprs(columns);
692
693    let mut exprs = vec![];
694
695    let sink_visible_col_idxes = sink
696        .full_columns()
697        .iter()
698        .positions(|c| !c.is_hidden())
699        .collect_vec();
700    let sink_visible_col_idxes_by_name = sink
701        .full_columns()
702        .iter()
703        .enumerate()
704        .filter(|(_, c)| !c.is_hidden())
705        .map(|(i, c)| (c.name(), i))
706        .collect::<BTreeMap<_, _>>();
707
708    for (idx, column) in columns.iter().enumerate() {
709        if !column.can_dml() {
710            continue;
711        }
712
713        let default_col_expr =
714            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
715
716        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
717            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
718        };
719
720        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
721        // The missing columns will be filled with default value (`null` if not explicitly defined).
722        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
723        #[allow(clippy::collapsible_else_if)]
724        if user_specified_columns {
725            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
726                exprs.push(sink_col_expr(*idx)?);
727            } else {
728                exprs.push(default_col_expr());
729            }
730        } else {
731            if idx < sink_visible_col_idxes.len() {
732                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
733            } else {
734                exprs.push(default_col_expr());
735            };
736        }
737    }
738    Ok(exprs)
739}
740
741/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
742/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
743/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
744fn bind_sink_format_desc(
745    session: &SessionImpl,
746    value: FormatEncodeOptions,
747) -> Result<SinkFormatDesc> {
748    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
749    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
750    use risingwave_sqlparser::ast::{Encode as E, Format as F};
751
752    let format = match value.format {
753        F::Plain => SinkFormat::AppendOnly,
754        F::Upsert => SinkFormat::Upsert,
755        F::Debezium => SinkFormat::Debezium,
756        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
757            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
758        }
759    };
760    let encode = match value.row_encode {
761        E::Json => SinkEncode::Json,
762        E::Protobuf => SinkEncode::Protobuf,
763        E::Avro => SinkEncode::Avro,
764        E::Template => SinkEncode::Template,
765        E::Parquet => SinkEncode::Parquet,
766        e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
767            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
768        }
769    };
770
771    let mut key_encode = None;
772    if let Some(encode) = value.key_encode {
773        match encode {
774            E::Text => key_encode = Some(SinkEncode::Text),
775            E::Bytes => key_encode = Some(SinkEncode::Bytes),
776            _ => {
777                return Err(ErrorCode::BindError(format!(
778                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
779                ))
780                .into());
781            }
782        }
783    }
784
785    let (props, connection_type_flag, schema_registry_conn_ref) =
786        resolve_connection_ref_and_secret_ref(
787            WithOptions::try_from(value.row_options.as_slice())?,
788            session,
789            Some(TelemetryDatabaseObject::Sink),
790        )?;
791    ensure_connection_type_allowed(
792        connection_type_flag,
793        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
794    )?;
795    let (mut options, secret_refs) = props.into_parts();
796
797    options
798        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
799        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
800
801    Ok(SinkFormatDesc {
802        format,
803        encode,
804        options,
805        secret_refs,
806        key_encode,
807        connection_id: schema_registry_conn_ref,
808    })
809}
810
811static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
812    LazyLock::new(|| {
813        use risingwave_connector::sink::Sink as _;
814        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
815        use risingwave_connector::sink::file_sink::fs::FsSink;
816        use risingwave_connector::sink::file_sink::gcs::GcsSink;
817        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
818        use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
819        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
820        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
821        use risingwave_connector::sink::kafka::KafkaSink;
822        use risingwave_connector::sink::kinesis::KinesisSink;
823        use risingwave_connector::sink::mqtt::MqttSink;
824        use risingwave_connector::sink::pulsar::PulsarSink;
825        use risingwave_connector::sink::redis::RedisSink;
826
827        convert_args!(hashmap!(
828                GooglePubSubSink::SINK_NAME => hashmap!(
829                    Format::Plain => vec![Encode::Json],
830                ),
831                KafkaSink::SINK_NAME => hashmap!(
832                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
833                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
834                    Format::Debezium => vec![Encode::Json],
835                ),
836                FileSink::<S3Sink>::SINK_NAME => hashmap!(
837                    Format::Plain => vec![Encode::Parquet, Encode::Json],
838                ),
839                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
840                    Format::Plain => vec![Encode::Parquet, Encode::Json],
841                ),
842                FileSink::<GcsSink>::SINK_NAME => hashmap!(
843                    Format::Plain => vec![Encode::Parquet, Encode::Json],
844                ),
845                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
846                    Format::Plain => vec![Encode::Parquet, Encode::Json],
847                ),
848                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
849                    Format::Plain => vec![Encode::Parquet, Encode::Json],
850                ),
851                FileSink::<FsSink>::SINK_NAME => hashmap!(
852                    Format::Plain => vec![Encode::Parquet, Encode::Json],
853                ),
854                KinesisSink::SINK_NAME => hashmap!(
855                    Format::Plain => vec![Encode::Json],
856                    Format::Upsert => vec![Encode::Json],
857                    Format::Debezium => vec![Encode::Json],
858                ),
859                MqttSink::SINK_NAME => hashmap!(
860                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
861                ),
862                PulsarSink::SINK_NAME => hashmap!(
863                    Format::Plain => vec![Encode::Json],
864                    Format::Upsert => vec![Encode::Json],
865                    Format::Debezium => vec![Encode::Json],
866                ),
867                RedisSink::SINK_NAME => hashmap!(
868                    Format::Plain => vec![Encode::Json, Encode::Template],
869                    Format::Upsert => vec![Encode::Json, Encode::Template],
870                ),
871        ))
872    });
873
874pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
875    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
876        .get(connector)
877        .ok_or_else(|| {
878            ErrorCode::BindError(format!(
879                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
880                connector
881            ))
882        })?;
883    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
884        ErrorCode::BindError(format!(
885            "connector {} does not support format {:?}",
886            connector, format_desc.format
887        ))
888    })?;
889    if !compatible_encodes.contains(&format_desc.row_encode) {
890        return Err(ErrorCode::BindError(format!(
891            "connector {} does not support format {:?} with encode {:?}",
892            connector, format_desc.format, format_desc.row_encode
893        ))
894        .into());
895    }
896
897    // only allow Kafka connector work with `bytes` as key encode
898    if let Some(encode) = &format_desc.key_encode
899        && connector != KAFKA_SINK
900        && matches!(encode, Encode::Bytes)
901    {
902        return Err(ErrorCode::BindError(format!(
903            "key encode bytes only works with kafka connector, but found {}",
904            connector
905        ))
906        .into());
907    }
908
909    Ok(())
910}
911
912#[cfg(test)]
913pub mod tests {
914    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
915
916    use crate::catalog::root_catalog::SchemaPath;
917    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
918
919    #[tokio::test]
920    async fn test_create_sink_handler() {
921        let proto_file = create_proto_file(PROTO_FILE_DATA);
922        let sql = format!(
923            r#"CREATE SOURCE t1
924    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
925    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
926            proto_file.path().to_str().unwrap()
927        );
928        let frontend = LocalFrontend::new(Default::default()).await;
929        frontend.run_sql(sql).await.unwrap();
930
931        let sql = "create materialized view mv1 as select t1.country from t1;";
932        frontend.run_sql(sql).await.unwrap();
933
934        let sql = r#"CREATE SINK snk1 FROM mv1
935                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
936                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
937                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
938        frontend.run_sql(sql).await.unwrap();
939
940        let session = frontend.session_ref();
941        let catalog_reader = session.env().catalog_reader().read_guard();
942        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
943
944        // Check source exists.
945        let (source, _) = catalog_reader
946            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
947            .unwrap();
948        assert_eq!(source.name, "t1");
949
950        // Check table exists.
951        let (table, schema_name) = catalog_reader
952            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
953            .unwrap();
954        assert_eq!(table.name(), "mv1");
955
956        // Check sink exists.
957        let (sink, _) = catalog_reader
958            .get_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
959            .unwrap();
960        assert_eq!(sink.name, "snk1");
961    }
962}