risingwave_frontend/handler/
create_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use itertools::Itertools;
20use maplit::{convert_args, hashmap, hashset};
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_common::bail;
23use risingwave_common::catalog::{
24    ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
25};
26use risingwave_common::license::Feature;
27use risingwave_common::secret::LocalSecretManager;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::types::DataType;
30use risingwave_connector::WithPropertiesExt;
31use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
32use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
33use risingwave_connector::sink::kafka::KAFKA_SINK;
34use risingwave_connector::sink::{
35    CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
36    SINK_WITHOUT_BACKFILL, enforce_secret_sink,
37};
38use risingwave_pb::catalog::connection_params::PbConnectionType;
39use risingwave_pb::catalog::{PbSink, PbSource, Table};
40use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
41use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
42use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
43use risingwave_pb::telemetry::TelemetryDatabaseObject;
44use risingwave_sqlparser::ast::{
45    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
46    Query, Statement,
47};
48
49use super::RwPgResponse;
50use super::create_mv::get_column_names;
51use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
52use super::util::gen_query_from_table_name;
53use crate::binder::Binder;
54use crate::catalog::SinkId;
55use crate::error::{ErrorCode, Result, RwError};
56use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
57use crate::handler::HandlerArgs;
58use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
59use crate::handler::create_mv::parse_column_names;
60use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
61use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
62use crate::optimizer::plan_node::{LogicalSource, PartitionComputeInfo, StreamProject, generic};
63use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
64use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
65use crate::session::SessionImpl;
66use crate::session::current::notice_to_user;
67use crate::stream_fragmenter::{GraphJobType, build_graph};
68use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
69use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
70
71static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
72    LazyLock::new(|| {
73        hashset! {
74            PbConnectionType::Unspecified,
75            PbConnectionType::Kafka,
76            PbConnectionType::Iceberg,
77            PbConnectionType::Elasticsearch,
78        }
79    });
80
81static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
82    LazyLock::new(|| {
83        hashset! {
84            PbConnectionType::Unspecified,
85            PbConnectionType::SchemaRegistry,
86        }
87    });
88
89// used to store result of `gen_sink_plan`
90pub struct SinkPlanContext {
91    pub query: Box<Query>,
92    pub sink_plan: PlanRef,
93    pub sink_catalog: SinkCatalog,
94    pub target_table_catalog: Option<Arc<TableCatalog>>,
95    pub dependencies: HashSet<ObjectId>,
96}
97
98pub async fn gen_sink_plan(
99    handler_args: HandlerArgs,
100    stmt: CreateSinkStatement,
101    explain_options: Option<ExplainOptions>,
102) -> Result<SinkPlanContext> {
103    let session = handler_args.session.clone();
104    let session = session.as_ref();
105    let user_specified_columns = !stmt.columns.is_empty();
106    let db_name = &session.database();
107    let (sink_schema_name, sink_table_name) =
108        Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
109
110    let mut with_options = handler_args.with_options.clone();
111
112    if session
113        .env()
114        .system_params_manager()
115        .get_params()
116        .load()
117        .enforce_secret()
118        && Feature::SecretManagement.check_available().is_ok()
119    {
120        enforce_secret_sink(&with_options)?;
121    }
122
123    resolve_privatelink_in_with_option(&mut with_options)?;
124    let (mut resolved_with_options, connection_type, connector_conn_ref) =
125        resolve_connection_ref_and_secret_ref(
126            with_options,
127            session,
128            Some(TelemetryDatabaseObject::Sink),
129        )?;
130    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
131
132    // if not using connection, we don't need to check connector match connection type
133    if !matches!(connection_type, PbConnectionType::Unspecified) {
134        let connector = resolved_with_options.get_connector().unwrap();
135        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
136    }
137
138    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
139
140    let context = if let Some(explain_options) = explain_options {
141        OptimizerContext::new(handler_args.clone(), explain_options)
142    } else {
143        OptimizerContext::from_handler_args(handler_args.clone())
144    };
145
146    // Used for debezium's table name
147    let sink_from_table_name;
148    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
149    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
150    let direct_sink;
151    let query = match stmt.sink_from {
152        CreateSink::From(from_name) => {
153            sink_from_table_name = from_name.0.last().unwrap().real_value();
154            direct_sink = true;
155            Box::new(gen_query_from_table_name(from_name))
156        }
157        CreateSink::AsQuery(query) => {
158            sink_from_table_name = sink_table_name.clone();
159            direct_sink = false;
160            query
161        }
162    };
163
164    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
165
166    let (sink_database_id, sink_schema_id) =
167        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
168
169    let (dependent_relations, dependent_udfs, bound) = {
170        let mut binder = Binder::new_for_stream(session);
171        let bound = binder.bind_query(*query.clone())?;
172        (
173            binder.included_relations().clone(),
174            binder.included_udfs().clone(),
175            bound,
176        )
177    };
178
179    let col_names = if sink_into_table_name.is_some() {
180        parse_column_names(&stmt.columns)
181    } else {
182        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
183        get_column_names(&bound, stmt.columns)?
184    };
185
186    if sink_into_table_name.is_some() {
187        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
188
189        if prev.is_some() {
190            return Err(RwError::from(ErrorCode::BindError(
191                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
192            )));
193        }
194    }
195
196    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
197    if emit_on_window_close {
198        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
199    }
200
201    let connector = resolved_with_options
202        .get(CONNECTOR_TYPE_KEY)
203        .cloned()
204        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
205
206    let format_desc = match stmt.sink_schema {
207        // Case A: new syntax `format ... encode ...`
208        Some(f) => {
209            validate_compatibility(&connector, &f)?;
210            Some(bind_sink_format_desc(session,f)?)
211        }
212        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
213            // Case B: old syntax `type = '...'`
214            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
215                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
216                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
217                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
218                }
219                f
220            }),
221            // Case C: no format + encode required
222            None => None,
223        },
224    };
225
226    let definition = context.normalized_sql().to_owned();
227    let mut plan_root = Planner::new_for_stream(context.into()).plan_query(bound)?;
228    if let Some(col_names) = &col_names {
229        plan_root.set_out_names(col_names.clone())?;
230    };
231
232    let without_backfill = match resolved_with_options.remove(SINK_WITHOUT_BACKFILL) {
233        Some(flag) if flag.eq_ignore_ascii_case("false") => {
234            if direct_sink {
235                true
236            } else {
237                return Err(ErrorCode::BindError(
238                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
239                )
240                .into());
241            }
242        }
243        _ => false,
244    };
245
246    let target_table_catalog = stmt
247        .into_table_name
248        .as_ref()
249        .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
250        .transpose()?;
251
252    if let Some(target_table_catalog) = &target_table_catalog {
253        if let Some(col_names) = col_names {
254            let target_table_columns = target_table_catalog
255                .columns()
256                .iter()
257                .map(|c| c.name())
258                .collect::<BTreeSet<_>>();
259            for c in col_names {
260                if !target_table_columns.contains(c.as_str()) {
261                    return Err(RwError::from(ErrorCode::BindError(format!(
262                        "Column {} not found in table {}",
263                        c,
264                        target_table_catalog.name()
265                    ))));
266                }
267            }
268        }
269        if target_table_catalog
270            .columns()
271            .iter()
272            .any(|col| !col.nullable())
273        {
274            notice_to_user(format!(
275                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
276                target_table_catalog.name(),
277            ));
278        }
279    }
280
281    let sink_plan = plan_root.gen_sink_plan(
282        sink_table_name,
283        definition,
284        resolved_with_options,
285        emit_on_window_close,
286        db_name.to_owned(),
287        sink_from_table_name,
288        format_desc,
289        without_backfill,
290        target_table_catalog.clone(),
291        partition_info,
292        user_specified_columns,
293    )?;
294
295    let sink_desc = sink_plan.sink_desc().clone();
296
297    let mut sink_plan: PlanRef = sink_plan.into();
298
299    let ctx = sink_plan.ctx();
300    let explain_trace = ctx.is_explain_trace();
301    if explain_trace {
302        ctx.trace("Create Sink:");
303        ctx.trace(sink_plan.explain_to_string());
304    }
305
306    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
307    // during binding instead of visiting the optimized plan.
308    let dependencies =
309        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
310            .into_iter()
311            .map(|id| id.table_id() as ObjectId)
312            .chain(
313                dependent_udfs
314                    .into_iter()
315                    .map(|id| id.function_id() as ObjectId),
316            )
317            .collect();
318
319    let sink_catalog = sink_desc.into_catalog(
320        SchemaId::new(sink_schema_id),
321        DatabaseId::new(sink_database_id),
322        UserId::new(session.user_id()),
323        connector_conn_ref.map(ConnectionId::from),
324    );
325
326    if let Some(table_catalog) = &target_table_catalog {
327        for column in sink_catalog.full_columns() {
328            if !column.can_dml() {
329                unreachable!(
330                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
331                );
332            }
333        }
334
335        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
336        let exprs = derive_default_column_project_for_sink(
337            &sink_catalog,
338            sink_plan.schema(),
339            &table_columns_without_rw_timestamp,
340            user_specified_columns,
341        )?;
342
343        let logical_project = generic::Project::new(exprs, sink_plan);
344
345        sink_plan = StreamProject::new(logical_project).into();
346
347        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
348            &table_columns_without_rw_timestamp,
349        )?;
350
351        if let Some(exprs) = exprs {
352            let logical_project = generic::Project::new(exprs, sink_plan);
353            sink_plan = StreamProject::new(logical_project).into();
354        }
355    };
356
357    Ok(SinkPlanContext {
358        query,
359        sink_plan,
360        sink_catalog,
361        target_table_catalog,
362        dependencies,
363    })
364}
365
366// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
367// Return:
368// `Some(PartitionComputeInfo)` if the sink need to compute partition.
369// `None` if the sink does not need to compute partition.
370pub async fn get_partition_compute_info(
371    with_options: &WithOptionsSecResolved,
372) -> Result<Option<PartitionComputeInfo>> {
373    let (options, secret_refs) = with_options.clone().into_parts();
374    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
375        return Ok(None);
376    };
377    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
378    match connector.as_str() {
379        ICEBERG_SINK => {
380            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
381            get_partition_compute_info_for_iceberg(&iceberg_config).await
382        }
383        _ => Ok(None),
384    }
385}
386
387#[allow(clippy::unused_async)]
388async fn get_partition_compute_info_for_iceberg(
389    _iceberg_config: &IcebergConfig,
390) -> Result<Option<PartitionComputeInfo>> {
391    // TODO: enable partition compute for iceberg after fixing the issue of sink decoupling.
392    Ok(None)
393
394    // TODO: migrate to iceberg-rust later
395}
396
397pub async fn handle_create_sink(
398    handle_args: HandlerArgs,
399    stmt: CreateSinkStatement,
400) -> Result<RwPgResponse> {
401    let session = handle_args.session.clone();
402
403    session.check_cluster_limits().await?;
404
405    if let Either::Right(resp) = session.check_relation_name_duplicated(
406        stmt.sink_name.clone(),
407        StatementType::CREATE_SINK,
408        stmt.if_not_exists,
409    )? {
410        return Ok(resp);
411    }
412
413    let (mut sink, graph, target_table_catalog, dependencies) = {
414        let SinkPlanContext {
415            query,
416            sink_plan: plan,
417            sink_catalog: sink,
418            target_table_catalog,
419            dependencies,
420        } = gen_sink_plan(handle_args, stmt, None).await?;
421
422        let has_order_by = !query.order_by.is_empty();
423        if has_order_by {
424            plan.ctx().warn_to_user(
425                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
426                    .to_owned(),
427            );
428        }
429
430        let graph = build_graph(plan, Some(GraphJobType::Sink))?;
431
432        (sink, graph, target_table_catalog, dependencies)
433    };
434
435    let mut target_table_replace_plan = None;
436    if let Some(table_catalog) = target_table_catalog {
437        use crate::handler::alter_table_column::hijack_merger_for_target_table;
438
439        let (mut graph, mut table, source, target_job_type) =
440            reparse_table_for_sink(&session, &table_catalog).await?;
441
442        sink.original_target_columns = table
443            .columns
444            .iter()
445            .map(|col| ColumnCatalog::from(col.clone()))
446            .collect_vec();
447
448        table
449            .incoming_sinks
450            .clone_from(&table_catalog.incoming_sinks);
451
452        let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
453        let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
454
455        let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
456        for existing_sink in incoming_sinks {
457            hijack_merger_for_target_table(
458                &mut graph,
459                &columns_without_rw_timestamp,
460                &existing_sink,
461                Some(&existing_sink.unique_identity()),
462            )?;
463        }
464
465        // for new creating sink, we don't have a unique identity because the sink id is not generated yet.
466        hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
467
468        target_table_replace_plan = Some(ReplaceJobPlan {
469            replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
470                replace_job_plan::ReplaceTable {
471                    table: Some(table),
472                    source,
473                    job_type: target_job_type as _,
474                },
475            )),
476            fragment_graph: Some(graph),
477            table_col_index_mapping: None,
478        });
479    }
480
481    let _job_guard =
482        session
483            .env()
484            .creating_streaming_job_tracker()
485            .guard(CreatingStreamingJobInfo::new(
486                session.session_id(),
487                sink.database_id.database_id,
488                sink.schema_id.schema_id,
489                sink.name.clone(),
490            ));
491
492    let catalog_writer = session.catalog_writer()?;
493    catalog_writer
494        .create_sink(
495            sink.to_proto(),
496            graph,
497            target_table_replace_plan,
498            dependencies,
499        )
500        .await?;
501
502    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
503}
504
505pub fn fetch_incoming_sinks(
506    session: &Arc<SessionImpl>,
507    incoming_sink_ids: &HashSet<SinkId>,
508) -> Result<Vec<Arc<SinkCatalog>>> {
509    let reader = session.env().catalog_reader().read_guard();
510    let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
511    let db_name = &session.database();
512    for schema in reader.iter_schemas(db_name)? {
513        for sink in schema.iter_sink() {
514            if incoming_sink_ids.contains(&sink.id.sink_id) {
515                sinks.push(sink.clone());
516            }
517        }
518    }
519
520    Ok(sinks)
521}
522
523pub(crate) async fn reparse_table_for_sink(
524    session: &Arc<SessionImpl>,
525    table_catalog: &Arc<TableCatalog>,
526) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
527    // Retrieve the original table definition and parse it to AST.
528    let definition = table_catalog.create_sql_ast_purified()?;
529    let Statement::CreateTable { name, .. } = &definition else {
530        panic!("unexpected statement: {:?}", definition);
531    };
532    let table_name = name.clone();
533
534    // Create handler args as if we're creating a new table with the altered definition.
535    let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
536    let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
537
538    let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
539        session,
540        table_name,
541        table_catalog,
542        handler_args,
543        definition,
544        col_id_gen,
545        SqlColumnStrategy::FollowUnchecked,
546    )
547    .await?;
548
549    Ok((graph, table, source, job_type))
550}
551
552pub(crate) fn insert_merger_to_union_with_project(
553    node: &mut StreamNode,
554    project_node: &PbNodeBody,
555    uniq_identity: Option<&str>,
556) {
557    if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
558        // TODO: MergeNode is used as a placeholder, see issue #17658
559        node.input.push(StreamNode {
560            input: vec![StreamNode {
561                node_body: Some(NodeBody::Merge(Box::new(MergeNode {
562                    ..Default::default()
563                }))),
564                ..Default::default()
565            }],
566            identity: uniq_identity
567                .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
568                .to_owned(),
569            fields: node.fields.clone(),
570            node_body: Some(project_node.clone()),
571            ..Default::default()
572        });
573
574        return;
575    }
576
577    for input in &mut node.input {
578        insert_merger_to_union_with_project(input, project_node, uniq_identity);
579    }
580}
581
582fn derive_sink_to_table_expr(
583    sink_schema: &Schema,
584    idx: usize,
585    target_type: &DataType,
586) -> Result<ExprImpl> {
587    let input_type = &sink_schema.fields()[idx].data_type;
588
589    if target_type != input_type {
590        bail!(
591            "column type mismatch: {:?} vs {:?}, column name: {:?}",
592            target_type,
593            input_type,
594            sink_schema.fields()[idx].name
595        );
596    } else {
597        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
598            idx,
599            input_type.clone(),
600        ))))
601    }
602}
603
604pub(crate) fn derive_default_column_project_for_sink(
605    sink: &SinkCatalog,
606    sink_schema: &Schema,
607    columns: &[ColumnCatalog],
608    user_specified_columns: bool,
609) -> Result<Vec<ExprImpl>> {
610    assert_eq!(sink.full_schema().len(), sink_schema.len());
611
612    let default_column_exprs = TableCatalog::default_column_exprs(columns);
613
614    let mut exprs = vec![];
615
616    let sink_visible_col_idxes = sink
617        .full_columns()
618        .iter()
619        .positions(|c| !c.is_hidden())
620        .collect_vec();
621    let sink_visible_col_idxes_by_name = sink
622        .full_columns()
623        .iter()
624        .enumerate()
625        .filter(|(_, c)| !c.is_hidden())
626        .map(|(i, c)| (c.name(), i))
627        .collect::<BTreeMap<_, _>>();
628
629    for (idx, column) in columns.iter().enumerate() {
630        if !column.can_dml() {
631            continue;
632        }
633
634        let default_col_expr =
635            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
636
637        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
638            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
639        };
640
641        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
642        // The missing columns will be filled with default value (`null` if not explicitly defined).
643        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
644        #[allow(clippy::collapsible_else_if)]
645        if user_specified_columns {
646            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
647                exprs.push(sink_col_expr(*idx)?);
648            } else {
649                exprs.push(default_col_expr());
650            }
651        } else {
652            if idx < sink_visible_col_idxes.len() {
653                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
654            } else {
655                exprs.push(default_col_expr());
656            };
657        }
658    }
659    Ok(exprs)
660}
661
662/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
663/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
664/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
665fn bind_sink_format_desc(
666    session: &SessionImpl,
667    value: FormatEncodeOptions,
668) -> Result<SinkFormatDesc> {
669    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
670    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
671    use risingwave_sqlparser::ast::{Encode as E, Format as F};
672
673    let format = match value.format {
674        F::Plain => SinkFormat::AppendOnly,
675        F::Upsert => SinkFormat::Upsert,
676        F::Debezium => SinkFormat::Debezium,
677        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
678            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
679        }
680    };
681    let encode = match value.row_encode {
682        E::Json => SinkEncode::Json,
683        E::Protobuf => SinkEncode::Protobuf,
684        E::Avro => SinkEncode::Avro,
685        E::Template => SinkEncode::Template,
686        E::Parquet => SinkEncode::Parquet,
687        e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
688            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
689        }
690    };
691
692    let mut key_encode = None;
693    if let Some(encode) = value.key_encode {
694        match encode {
695            E::Text => key_encode = Some(SinkEncode::Text),
696            E::Bytes => key_encode = Some(SinkEncode::Bytes),
697            _ => {
698                return Err(ErrorCode::BindError(format!(
699                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
700                ))
701                .into());
702            }
703        }
704    }
705
706    let (props, connection_type_flag, schema_registry_conn_ref) =
707        resolve_connection_ref_and_secret_ref(
708            WithOptions::try_from(value.row_options.as_slice())?,
709            session,
710            Some(TelemetryDatabaseObject::Sink),
711        )?;
712    ensure_connection_type_allowed(
713        connection_type_flag,
714        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
715    )?;
716    let (mut options, secret_refs) = props.into_parts();
717
718    options
719        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
720        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
721
722    Ok(SinkFormatDesc {
723        format,
724        encode,
725        options,
726        secret_refs,
727        key_encode,
728        connection_id: schema_registry_conn_ref,
729    })
730}
731
732static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
733    LazyLock::new(|| {
734        use risingwave_connector::sink::Sink as _;
735        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
736        use risingwave_connector::sink::file_sink::fs::FsSink;
737        use risingwave_connector::sink::file_sink::gcs::GcsSink;
738        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
739        use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
740        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
741        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
742        use risingwave_connector::sink::kafka::KafkaSink;
743        use risingwave_connector::sink::kinesis::KinesisSink;
744        use risingwave_connector::sink::mqtt::MqttSink;
745        use risingwave_connector::sink::pulsar::PulsarSink;
746        use risingwave_connector::sink::redis::RedisSink;
747
748        convert_args!(hashmap!(
749                GooglePubSubSink::SINK_NAME => hashmap!(
750                    Format::Plain => vec![Encode::Json],
751                ),
752                KafkaSink::SINK_NAME => hashmap!(
753                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
754                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
755                    Format::Debezium => vec![Encode::Json],
756                ),
757                FileSink::<S3Sink>::SINK_NAME => hashmap!(
758                    Format::Plain => vec![Encode::Parquet, Encode::Json],
759                ),
760                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
761                    Format::Plain => vec![Encode::Parquet, Encode::Json],
762                ),
763                FileSink::<GcsSink>::SINK_NAME => hashmap!(
764                    Format::Plain => vec![Encode::Parquet, Encode::Json],
765                ),
766                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
767                    Format::Plain => vec![Encode::Parquet, Encode::Json],
768                ),
769                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
770                    Format::Plain => vec![Encode::Parquet, Encode::Json],
771                ),
772                FileSink::<FsSink>::SINK_NAME => hashmap!(
773                    Format::Plain => vec![Encode::Parquet, Encode::Json],
774                ),
775                KinesisSink::SINK_NAME => hashmap!(
776                    Format::Plain => vec![Encode::Json],
777                    Format::Upsert => vec![Encode::Json],
778                    Format::Debezium => vec![Encode::Json],
779                ),
780                MqttSink::SINK_NAME => hashmap!(
781                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
782                ),
783                PulsarSink::SINK_NAME => hashmap!(
784                    Format::Plain => vec![Encode::Json],
785                    Format::Upsert => vec![Encode::Json],
786                    Format::Debezium => vec![Encode::Json],
787                ),
788                RedisSink::SINK_NAME => hashmap!(
789                    Format::Plain => vec![Encode::Json, Encode::Template],
790                    Format::Upsert => vec![Encode::Json, Encode::Template],
791                ),
792        ))
793    });
794
795pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
796    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
797        .get(connector)
798        .ok_or_else(|| {
799            ErrorCode::BindError(format!(
800                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
801                connector
802            ))
803        })?;
804    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
805        ErrorCode::BindError(format!(
806            "connector {} does not support format {:?}",
807            connector, format_desc.format
808        ))
809    })?;
810    if !compatible_encodes.contains(&format_desc.row_encode) {
811        return Err(ErrorCode::BindError(format!(
812            "connector {} does not support format {:?} with encode {:?}",
813            connector, format_desc.format, format_desc.row_encode
814        ))
815        .into());
816    }
817
818    // only allow Kafka connector work with `bytes` as key encode
819    if let Some(encode) = &format_desc.key_encode
820        && connector != KAFKA_SINK
821        && matches!(encode, Encode::Bytes)
822    {
823        return Err(ErrorCode::BindError(format!(
824            "key encode bytes only works with kafka connector, but found {}",
825            connector
826        ))
827        .into());
828    }
829
830    Ok(())
831}
832
833#[cfg(test)]
834pub mod tests {
835    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
836
837    use crate::catalog::root_catalog::SchemaPath;
838    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
839
840    #[tokio::test]
841    async fn test_create_sink_handler() {
842        let proto_file = create_proto_file(PROTO_FILE_DATA);
843        let sql = format!(
844            r#"CREATE SOURCE t1
845    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
846    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
847            proto_file.path().to_str().unwrap()
848        );
849        let frontend = LocalFrontend::new(Default::default()).await;
850        frontend.run_sql(sql).await.unwrap();
851
852        let sql = "create materialized view mv1 as select t1.country from t1;";
853        frontend.run_sql(sql).await.unwrap();
854
855        let sql = r#"CREATE SINK snk1 FROM mv1
856                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
857                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
858                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
859        frontend.run_sql(sql).await.unwrap();
860
861        let session = frontend.session_ref();
862        let catalog_reader = session.env().catalog_reader().read_guard();
863        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
864
865        // Check source exists.
866        let (source, _) = catalog_reader
867            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
868            .unwrap();
869        assert_eq!(source.name, "t1");
870
871        // Check table exists.
872        let (table, schema_name) = catalog_reader
873            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
874            .unwrap();
875        assert_eq!(table.name(), "mv1");
876
877        // Check sink exists.
878        let (sink, _) = catalog_reader
879            .get_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
880            .unwrap();
881        assert_eq!(sink.name, "snk1");
882    }
883}