risingwave_frontend/handler/
create_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use itertools::Itertools;
20use maplit::{convert_args, hashmap, hashset};
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_common::bail;
23use risingwave_common::catalog::{
24    ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
25};
26use risingwave_common::secret::LocalSecretManager;
27use risingwave_common::types::DataType;
28use risingwave_connector::WithPropertiesExt;
29use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
30use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
31use risingwave_connector::sink::kafka::KAFKA_SINK;
32use risingwave_connector::sink::{
33    CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
34};
35use risingwave_pb::catalog::connection_params::PbConnectionType;
36use risingwave_pb::catalog::{PbSink, PbSource, Table};
37use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
38use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
39use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
40use risingwave_pb::telemetry::TelemetryDatabaseObject;
41use risingwave_sqlparser::ast::{
42    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
43    Query, Statement,
44};
45
46use super::RwPgResponse;
47use super::create_mv::get_column_names;
48use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
49use super::util::gen_query_from_table_name;
50use crate::binder::Binder;
51use crate::catalog::SinkId;
52use crate::error::{ErrorCode, Result, RwError};
53use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
54use crate::handler::HandlerArgs;
55use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
56use crate::handler::create_mv::parse_column_names;
57use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
58use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
59use crate::optimizer::plan_node::{LogicalSource, PartitionComputeInfo, StreamProject, generic};
60use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
61use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
62use crate::session::SessionImpl;
63use crate::session::current::notice_to_user;
64use crate::stream_fragmenter::build_graph;
65use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
66use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
67
68static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
69    LazyLock::new(|| {
70        hashset! {
71            PbConnectionType::Unspecified,
72            PbConnectionType::Kafka,
73            PbConnectionType::Iceberg,
74            PbConnectionType::Elasticsearch,
75        }
76    });
77
78static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
79    LazyLock::new(|| {
80        hashset! {
81            PbConnectionType::Unspecified,
82            PbConnectionType::SchemaRegistry,
83        }
84    });
85
86// used to store result of `gen_sink_plan`
87pub struct SinkPlanContext {
88    pub query: Box<Query>,
89    pub sink_plan: PlanRef,
90    pub sink_catalog: SinkCatalog,
91    pub target_table_catalog: Option<Arc<TableCatalog>>,
92    pub dependencies: HashSet<ObjectId>,
93}
94
95pub async fn gen_sink_plan(
96    handler_args: HandlerArgs,
97    stmt: CreateSinkStatement,
98    explain_options: Option<ExplainOptions>,
99) -> Result<SinkPlanContext> {
100    let session = handler_args.session.clone();
101    let session = session.as_ref();
102    let user_specified_columns = !stmt.columns.is_empty();
103    let db_name = &session.database();
104    let (sink_schema_name, sink_table_name) =
105        Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
106
107    let mut with_options = handler_args.with_options.clone();
108
109    resolve_privatelink_in_with_option(&mut with_options)?;
110    let (mut resolved_with_options, connection_type, connector_conn_ref) =
111        resolve_connection_ref_and_secret_ref(
112            with_options,
113            session,
114            TelemetryDatabaseObject::Sink,
115        )?;
116    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
117
118    // if not using connection, we don't need to check connector match connection type
119    if !matches!(connection_type, PbConnectionType::Unspecified) {
120        let connector = resolved_with_options.get_connector().unwrap();
121        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
122    }
123
124    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
125
126    let context = if let Some(explain_options) = explain_options {
127        OptimizerContext::new(handler_args.clone(), explain_options)
128    } else {
129        OptimizerContext::from_handler_args(handler_args.clone())
130    };
131
132    // Used for debezium's table name
133    let sink_from_table_name;
134    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
135    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
136    let direct_sink;
137    let query = match stmt.sink_from {
138        CreateSink::From(from_name) => {
139            sink_from_table_name = from_name.0.last().unwrap().real_value();
140            direct_sink = true;
141            Box::new(gen_query_from_table_name(from_name))
142        }
143        CreateSink::AsQuery(query) => {
144            sink_from_table_name = sink_table_name.clone();
145            direct_sink = false;
146            query
147        }
148    };
149
150    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
151
152    let (sink_database_id, sink_schema_id) =
153        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
154
155    let (dependent_relations, dependent_udfs, bound) = {
156        let mut binder = Binder::new_for_stream(session);
157        let bound = binder.bind_query(*query.clone())?;
158        (
159            binder.included_relations().clone(),
160            binder.included_udfs().clone(),
161            bound,
162        )
163    };
164
165    let col_names = if sink_into_table_name.is_some() {
166        parse_column_names(&stmt.columns)
167    } else {
168        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
169        get_column_names(&bound, stmt.columns)?
170    };
171
172    if sink_into_table_name.is_some() {
173        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
174
175        if prev.is_some() {
176            return Err(RwError::from(ErrorCode::BindError(
177                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
178            )));
179        }
180    }
181
182    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
183    if emit_on_window_close {
184        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
185    }
186
187    let connector = resolved_with_options
188        .get(CONNECTOR_TYPE_KEY)
189        .cloned()
190        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
191
192    let format_desc = match stmt.sink_schema {
193        // Case A: new syntax `format ... encode ...`
194        Some(f) => {
195            validate_compatibility(&connector, &f)?;
196            Some(bind_sink_format_desc(session,f)?)
197        }
198        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
199            // Case B: old syntax `type = '...'`
200            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
201                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
202                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
203                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
204                }
205                f
206            }),
207            // Case C: no format + encode required
208            None => None,
209        },
210    };
211
212    let definition = context.normalized_sql().to_owned();
213    let mut plan_root = Planner::new_for_stream(context.into()).plan_query(bound)?;
214    if let Some(col_names) = &col_names {
215        plan_root.set_out_names(col_names.clone())?;
216    };
217
218    let without_backfill = match resolved_with_options.remove(SINK_WITHOUT_BACKFILL) {
219        Some(flag) if flag.eq_ignore_ascii_case("false") => {
220            if direct_sink {
221                true
222            } else {
223                return Err(ErrorCode::BindError(
224                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
225                )
226                .into());
227            }
228        }
229        _ => false,
230    };
231
232    let target_table_catalog = stmt
233        .into_table_name
234        .as_ref()
235        .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
236        .transpose()?;
237
238    if let Some(target_table_catalog) = &target_table_catalog {
239        if let Some(col_names) = col_names {
240            let target_table_columns = target_table_catalog
241                .columns()
242                .iter()
243                .map(|c| c.name())
244                .collect::<BTreeSet<_>>();
245            for c in col_names {
246                if !target_table_columns.contains(c.as_str()) {
247                    return Err(RwError::from(ErrorCode::BindError(format!(
248                        "Column {} not found in table {}",
249                        c,
250                        target_table_catalog.name()
251                    ))));
252                }
253            }
254        }
255        if target_table_catalog
256            .columns()
257            .iter()
258            .any(|col| !col.nullable())
259        {
260            notice_to_user(format!(
261                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
262                target_table_catalog.name(),
263            ));
264        }
265    }
266
267    let sink_plan = plan_root.gen_sink_plan(
268        sink_table_name,
269        definition,
270        resolved_with_options,
271        emit_on_window_close,
272        db_name.to_owned(),
273        sink_from_table_name,
274        format_desc,
275        without_backfill,
276        target_table_catalog.clone(),
277        partition_info,
278        user_specified_columns,
279    )?;
280
281    let sink_desc = sink_plan.sink_desc().clone();
282
283    let mut sink_plan: PlanRef = sink_plan.into();
284
285    let ctx = sink_plan.ctx();
286    let explain_trace = ctx.is_explain_trace();
287    if explain_trace {
288        ctx.trace("Create Sink:");
289        ctx.trace(sink_plan.explain_to_string());
290    }
291
292    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
293    // during binding instead of visiting the optimized plan.
294    let dependencies =
295        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
296            .into_iter()
297            .map(|id| id.table_id() as ObjectId)
298            .chain(
299                dependent_udfs
300                    .into_iter()
301                    .map(|id| id.function_id() as ObjectId),
302            )
303            .collect();
304
305    let sink_catalog = sink_desc.into_catalog(
306        SchemaId::new(sink_schema_id),
307        DatabaseId::new(sink_database_id),
308        UserId::new(session.user_id()),
309        connector_conn_ref.map(ConnectionId::from),
310    );
311
312    if let Some(table_catalog) = &target_table_catalog {
313        for column in sink_catalog.full_columns() {
314            if !column.can_dml() {
315                unreachable!(
316                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
317                );
318            }
319        }
320
321        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
322        let exprs = derive_default_column_project_for_sink(
323            &sink_catalog,
324            sink_plan.schema(),
325            &table_columns_without_rw_timestamp,
326            user_specified_columns,
327        )?;
328
329        let logical_project = generic::Project::new(exprs, sink_plan);
330
331        sink_plan = StreamProject::new(logical_project).into();
332
333        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
334            &table_columns_without_rw_timestamp,
335        )?;
336
337        if let Some(exprs) = exprs {
338            let logical_project = generic::Project::new(exprs, sink_plan);
339            sink_plan = StreamProject::new(logical_project).into();
340        }
341    };
342
343    Ok(SinkPlanContext {
344        query,
345        sink_plan,
346        sink_catalog,
347        target_table_catalog,
348        dependencies,
349    })
350}
351
352// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
353// Return:
354// `Some(PartitionComputeInfo)` if the sink need to compute partition.
355// `None` if the sink does not need to compute partition.
356pub async fn get_partition_compute_info(
357    with_options: &WithOptionsSecResolved,
358) -> Result<Option<PartitionComputeInfo>> {
359    let (options, secret_refs) = with_options.clone().into_parts();
360    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
361        return Ok(None);
362    };
363    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
364    match connector.as_str() {
365        ICEBERG_SINK => {
366            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
367            get_partition_compute_info_for_iceberg(&iceberg_config).await
368        }
369        _ => Ok(None),
370    }
371}
372
373#[allow(clippy::unused_async)]
374async fn get_partition_compute_info_for_iceberg(
375    _iceberg_config: &IcebergConfig,
376) -> Result<Option<PartitionComputeInfo>> {
377    // TODO: enable partition compute for iceberg after fixing the issue of sink decoupling.
378    Ok(None)
379
380    // TODO: migrate to iceberg-rust later
381}
382
383pub async fn handle_create_sink(
384    handle_args: HandlerArgs,
385    stmt: CreateSinkStatement,
386) -> Result<RwPgResponse> {
387    let session = handle_args.session.clone();
388
389    session.check_cluster_limits().await?;
390
391    if let Either::Right(resp) = session.check_relation_name_duplicated(
392        stmt.sink_name.clone(),
393        StatementType::CREATE_SINK,
394        stmt.if_not_exists,
395    )? {
396        return Ok(resp);
397    }
398
399    let (mut sink, graph, target_table_catalog, dependencies) = {
400        let SinkPlanContext {
401            query,
402            sink_plan: plan,
403            sink_catalog: sink,
404            target_table_catalog,
405            dependencies,
406        } = gen_sink_plan(handle_args, stmt, None).await?;
407
408        let has_order_by = !query.order_by.is_empty();
409        if has_order_by {
410            plan.ctx().warn_to_user(
411                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
412                    .to_owned(),
413            );
414        }
415
416        let graph = build_graph(plan)?;
417
418        (sink, graph, target_table_catalog, dependencies)
419    };
420
421    let mut target_table_replace_plan = None;
422    if let Some(table_catalog) = target_table_catalog {
423        use crate::handler::alter_table_column::hijack_merger_for_target_table;
424
425        let (mut graph, mut table, source) =
426            reparse_table_for_sink(&session, &table_catalog).await?;
427
428        sink.original_target_columns = table
429            .columns
430            .iter()
431            .map(|col| ColumnCatalog::from(col.clone()))
432            .collect_vec();
433
434        table
435            .incoming_sinks
436            .clone_from(&table_catalog.incoming_sinks);
437
438        let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
439        let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
440
441        let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
442        for existing_sink in incoming_sinks {
443            hijack_merger_for_target_table(
444                &mut graph,
445                &columns_without_rw_timestamp,
446                &existing_sink,
447                Some(&existing_sink.unique_identity()),
448            )?;
449        }
450
451        // for new creating sink, we don't have a unique identity because the sink id is not generated yet.
452        hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
453
454        target_table_replace_plan = Some(ReplaceJobPlan {
455            replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
456                replace_job_plan::ReplaceTable {
457                    table: Some(table),
458                    source,
459                    job_type: TableJobType::General as _,
460                },
461            )),
462            fragment_graph: Some(graph),
463            table_col_index_mapping: None,
464        });
465    }
466
467    let _job_guard =
468        session
469            .env()
470            .creating_streaming_job_tracker()
471            .guard(CreatingStreamingJobInfo::new(
472                session.session_id(),
473                sink.database_id.database_id,
474                sink.schema_id.schema_id,
475                sink.name.clone(),
476            ));
477
478    let catalog_writer = session.catalog_writer()?;
479    catalog_writer
480        .create_sink(
481            sink.to_proto(),
482            graph,
483            target_table_replace_plan,
484            dependencies,
485        )
486        .await?;
487
488    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
489}
490
491pub fn fetch_incoming_sinks(
492    session: &Arc<SessionImpl>,
493    incoming_sink_ids: &HashSet<SinkId>,
494) -> Result<Vec<Arc<SinkCatalog>>> {
495    let reader = session.env().catalog_reader().read_guard();
496    let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
497    let db_name = &session.database();
498    for schema in reader.iter_schemas(db_name)? {
499        for sink in schema.iter_sink() {
500            if incoming_sink_ids.contains(&sink.id.sink_id) {
501                sinks.push(sink.clone());
502            }
503        }
504    }
505
506    Ok(sinks)
507}
508
509pub(crate) async fn reparse_table_for_sink(
510    session: &Arc<SessionImpl>,
511    table_catalog: &Arc<TableCatalog>,
512) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
513    // Retrieve the original table definition and parse it to AST.
514    let definition = table_catalog.create_sql_ast_purified()?;
515    let Statement::CreateTable { name, .. } = &definition else {
516        panic!("unexpected statement: {:?}", definition);
517    };
518    let table_name = name.clone();
519
520    // Create handler args as if we're creating a new table with the altered definition.
521    let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
522    let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
523
524    let (graph, table, source, _) = generate_stream_graph_for_replace_table(
525        session,
526        table_name,
527        table_catalog,
528        handler_args,
529        definition,
530        col_id_gen,
531        SqlColumnStrategy::FollowUnchecked,
532    )
533    .await?;
534
535    Ok((graph, table, source))
536}
537
538pub(crate) fn insert_merger_to_union_with_project(
539    node: &mut StreamNode,
540    project_node: &PbNodeBody,
541    uniq_identity: Option<&str>,
542) {
543    if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
544        // TODO: MergeNode is used as a placeholder, see issue #17658
545        node.input.push(StreamNode {
546            input: vec![StreamNode {
547                node_body: Some(NodeBody::Merge(Box::new(MergeNode {
548                    ..Default::default()
549                }))),
550                ..Default::default()
551            }],
552            identity: uniq_identity
553                .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
554                .to_owned(),
555            fields: node.fields.clone(),
556            node_body: Some(project_node.clone()),
557            ..Default::default()
558        });
559
560        return;
561    }
562
563    for input in &mut node.input {
564        insert_merger_to_union_with_project(input, project_node, uniq_identity);
565    }
566}
567
568fn derive_sink_to_table_expr(
569    sink_schema: &Schema,
570    idx: usize,
571    target_type: &DataType,
572) -> Result<ExprImpl> {
573    let input_type = &sink_schema.fields()[idx].data_type;
574
575    if target_type != input_type {
576        bail!(
577            "column type mismatch: {:?} vs {:?}, column name: {:?}",
578            target_type,
579            input_type,
580            sink_schema.fields()[idx].name
581        );
582    } else {
583        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
584            idx,
585            input_type.clone(),
586        ))))
587    }
588}
589
590pub(crate) fn derive_default_column_project_for_sink(
591    sink: &SinkCatalog,
592    sink_schema: &Schema,
593    columns: &[ColumnCatalog],
594    user_specified_columns: bool,
595) -> Result<Vec<ExprImpl>> {
596    assert_eq!(sink.full_schema().len(), sink_schema.len());
597
598    let default_column_exprs = TableCatalog::default_column_exprs(columns);
599
600    let mut exprs = vec![];
601
602    let sink_visible_col_idxes = sink
603        .full_columns()
604        .iter()
605        .positions(|c| !c.is_hidden())
606        .collect_vec();
607    let sink_visible_col_idxes_by_name = sink
608        .full_columns()
609        .iter()
610        .enumerate()
611        .filter(|(_, c)| !c.is_hidden())
612        .map(|(i, c)| (c.name(), i))
613        .collect::<BTreeMap<_, _>>();
614
615    for (idx, column) in columns.iter().enumerate() {
616        if !column.can_dml() {
617            continue;
618        }
619
620        let default_col_expr =
621            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
622
623        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
624            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
625        };
626
627        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
628        // The missing columns will be filled with default value (`null` if not explicitly defined).
629        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
630        #[allow(clippy::collapsible_else_if)]
631        if user_specified_columns {
632            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
633                exprs.push(sink_col_expr(*idx)?);
634            } else {
635                exprs.push(default_col_expr());
636            }
637        } else {
638            if idx < sink_visible_col_idxes.len() {
639                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
640            } else {
641                exprs.push(default_col_expr());
642            };
643        }
644    }
645    Ok(exprs)
646}
647
648/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
649/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
650/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
651fn bind_sink_format_desc(
652    session: &SessionImpl,
653    value: FormatEncodeOptions,
654) -> Result<SinkFormatDesc> {
655    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
656    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
657    use risingwave_sqlparser::ast::{Encode as E, Format as F};
658
659    let format = match value.format {
660        F::Plain => SinkFormat::AppendOnly,
661        F::Upsert => SinkFormat::Upsert,
662        F::Debezium => SinkFormat::Debezium,
663        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
664            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
665        }
666    };
667    let encode = match value.row_encode {
668        E::Json => SinkEncode::Json,
669        E::Protobuf => SinkEncode::Protobuf,
670        E::Avro => SinkEncode::Avro,
671        E::Template => SinkEncode::Template,
672        E::Parquet => SinkEncode::Parquet,
673        e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
674            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
675        }
676    };
677
678    let mut key_encode = None;
679    if let Some(encode) = value.key_encode {
680        match encode {
681            E::Text => key_encode = Some(SinkEncode::Text),
682            E::Bytes => key_encode = Some(SinkEncode::Bytes),
683            _ => {
684                return Err(ErrorCode::BindError(format!(
685                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
686                ))
687                .into());
688            }
689        }
690    }
691
692    let (props, connection_type_flag, schema_registry_conn_ref) =
693        resolve_connection_ref_and_secret_ref(
694            WithOptions::try_from(value.row_options.as_slice())?,
695            session,
696            TelemetryDatabaseObject::Sink,
697        )?;
698    ensure_connection_type_allowed(
699        connection_type_flag,
700        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
701    )?;
702    let (mut options, secret_refs) = props.into_parts();
703
704    options
705        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
706        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
707
708    Ok(SinkFormatDesc {
709        format,
710        encode,
711        options,
712        secret_refs,
713        key_encode,
714        connection_id: schema_registry_conn_ref,
715    })
716}
717
718static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
719    LazyLock::new(|| {
720        use risingwave_connector::sink::Sink as _;
721        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
722        use risingwave_connector::sink::file_sink::fs::FsSink;
723        use risingwave_connector::sink::file_sink::gcs::GcsSink;
724        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
725        use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
726        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
727        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
728        use risingwave_connector::sink::kafka::KafkaSink;
729        use risingwave_connector::sink::kinesis::KinesisSink;
730        use risingwave_connector::sink::mqtt::MqttSink;
731        use risingwave_connector::sink::pulsar::PulsarSink;
732        use risingwave_connector::sink::redis::RedisSink;
733
734        convert_args!(hashmap!(
735                GooglePubSubSink::SINK_NAME => hashmap!(
736                    Format::Plain => vec![Encode::Json],
737                ),
738                KafkaSink::SINK_NAME => hashmap!(
739                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
740                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
741                    Format::Debezium => vec![Encode::Json],
742                ),
743                FileSink::<S3Sink>::SINK_NAME => hashmap!(
744                    Format::Plain => vec![Encode::Parquet, Encode::Json],
745                ),
746                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
747                    Format::Plain => vec![Encode::Parquet, Encode::Json],
748                ),
749                FileSink::<GcsSink>::SINK_NAME => hashmap!(
750                    Format::Plain => vec![Encode::Parquet, Encode::Json],
751                ),
752                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
753                    Format::Plain => vec![Encode::Parquet, Encode::Json],
754                ),
755                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
756                    Format::Plain => vec![Encode::Parquet, Encode::Json],
757                ),
758                FileSink::<FsSink>::SINK_NAME => hashmap!(
759                    Format::Plain => vec![Encode::Parquet, Encode::Json],
760                ),
761                KinesisSink::SINK_NAME => hashmap!(
762                    Format::Plain => vec![Encode::Json],
763                    Format::Upsert => vec![Encode::Json],
764                    Format::Debezium => vec![Encode::Json],
765                ),
766                MqttSink::SINK_NAME => hashmap!(
767                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
768                ),
769                PulsarSink::SINK_NAME => hashmap!(
770                    Format::Plain => vec![Encode::Json],
771                    Format::Upsert => vec![Encode::Json],
772                    Format::Debezium => vec![Encode::Json],
773                ),
774                RedisSink::SINK_NAME => hashmap!(
775                    Format::Plain => vec![Encode::Json, Encode::Template],
776                    Format::Upsert => vec![Encode::Json, Encode::Template],
777                ),
778        ))
779    });
780
781pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
782    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
783        .get(connector)
784        .ok_or_else(|| {
785            ErrorCode::BindError(format!(
786                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
787                connector
788            ))
789        })?;
790    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
791        ErrorCode::BindError(format!(
792            "connector {} does not support format {:?}",
793            connector, format_desc.format
794        ))
795    })?;
796    if !compatible_encodes.contains(&format_desc.row_encode) {
797        return Err(ErrorCode::BindError(format!(
798            "connector {} does not support format {:?} with encode {:?}",
799            connector, format_desc.format, format_desc.row_encode
800        ))
801        .into());
802    }
803
804    // only allow Kafka connector work with `bytes` as key encode
805    if let Some(encode) = &format_desc.key_encode
806        && connector != KAFKA_SINK
807        && matches!(encode, Encode::Bytes)
808    {
809        return Err(ErrorCode::BindError(format!(
810            "key encode bytes only works with kafka connector, but found {}",
811            connector
812        ))
813        .into());
814    }
815
816    Ok(())
817}
818
819#[cfg(test)]
820pub mod tests {
821    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
822
823    use crate::catalog::root_catalog::SchemaPath;
824    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
825
826    #[tokio::test]
827    async fn test_create_sink_handler() {
828        let proto_file = create_proto_file(PROTO_FILE_DATA);
829        let sql = format!(
830            r#"CREATE SOURCE t1
831    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
832    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
833            proto_file.path().to_str().unwrap()
834        );
835        let frontend = LocalFrontend::new(Default::default()).await;
836        frontend.run_sql(sql).await.unwrap();
837
838        let sql = "create materialized view mv1 as select t1.country from t1;";
839        frontend.run_sql(sql).await.unwrap();
840
841        let sql = r#"CREATE SINK snk1 FROM mv1
842                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
843                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
844                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
845        frontend.run_sql(sql).await.unwrap();
846
847        let session = frontend.session_ref();
848        let catalog_reader = session.env().catalog_reader().read_guard();
849        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
850
851        // Check source exists.
852        let (source, _) = catalog_reader
853            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
854            .unwrap();
855        assert_eq!(source.name, "t1");
856
857        // Check table exists.
858        let (table, schema_name) = catalog_reader
859            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
860            .unwrap();
861        assert_eq!(table.name(), "mv1");
862
863        // Check sink exists.
864        let (sink, _) = catalog_reader
865            .get_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
866            .unwrap();
867        assert_eq!(sink.name, "snk1");
868    }
869}