risingwave_frontend/handler/
create_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28    ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
35use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
36use risingwave_connector::sink::kafka::KAFKA_SINK;
37use risingwave_connector::sink::{
38    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
39    enforce_secret_sink,
40};
41use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
42use risingwave_pb::catalog::PbSink;
43use risingwave_pb::catalog::connection_params::PbConnectionType;
44use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50    ObjectName, Query, Statement,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
56use super::util::gen_query_from_table_name;
57use crate::binder::{Binder, Relation};
58use crate::catalog::SinkId;
59use crate::catalog::source_catalog::SourceCatalog;
60use crate::catalog::table_catalog::TableType;
61use crate::error::{ErrorCode, Result, RwError};
62use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
63use crate::handler::HandlerArgs;
64use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
65use crate::handler::create_mv::parse_column_names;
66use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
67use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
68use crate::optimizer::plan_node::{
69    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
70    StreamProject, generic,
71};
72use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
73use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
74use crate::session::SessionImpl;
75use crate::session::current::notice_to_user;
76use crate::stream_fragmenter::{GraphJobType, build_graph};
77use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
78use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
79
80static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
81    LazyLock::new(|| {
82        hashset! {
83            PbConnectionType::Unspecified,
84            PbConnectionType::Kafka,
85            PbConnectionType::Iceberg,
86            PbConnectionType::Elasticsearch,
87        }
88    });
89
90static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
91    LazyLock::new(|| {
92        hashset! {
93            PbConnectionType::Unspecified,
94            PbConnectionType::SchemaRegistry,
95        }
96    });
97
98// used to store result of `gen_sink_plan`
99pub struct SinkPlanContext {
100    pub query: Box<Query>,
101    pub sink_plan: PlanRef,
102    pub sink_catalog: SinkCatalog,
103    pub target_table_catalog: Option<Arc<TableCatalog>>,
104    pub dependencies: HashSet<ObjectId>,
105}
106
107pub async fn gen_sink_plan(
108    handler_args: HandlerArgs,
109    stmt: CreateSinkStatement,
110    explain_options: Option<ExplainOptions>,
111    is_iceberg_engine_internal: bool,
112) -> Result<SinkPlanContext> {
113    let session = handler_args.session.clone();
114    let session = session.as_ref();
115    let user_specified_columns = !stmt.columns.is_empty();
116    let db_name = &session.database();
117    let (sink_schema_name, sink_table_name) =
118        Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
119
120    let mut with_options = handler_args.with_options.clone();
121
122    if session
123        .env()
124        .system_params_manager()
125        .get_params()
126        .load()
127        .enforce_secret()
128        && Feature::SecretManagement.check_available().is_ok()
129    {
130        enforce_secret_sink(&with_options)?;
131    }
132
133    resolve_privatelink_in_with_option(&mut with_options)?;
134    let (mut resolved_with_options, connection_type, connector_conn_ref) =
135        resolve_connection_ref_and_secret_ref(
136            with_options,
137            session,
138            Some(TelemetryDatabaseObject::Sink),
139        )?;
140    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
141
142    // if not using connection, we don't need to check connector match connection type
143    if !matches!(connection_type, PbConnectionType::Unspecified) {
144        let Some(connector) = resolved_with_options.get_connector() else {
145            return Err(RwError::from(ErrorCode::ProtocolError(format!(
146                "missing field '{}' in WITH clause",
147                CONNECTOR_TYPE_KEY
148            ))));
149        };
150        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
151    }
152
153    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
154
155    let context = if let Some(explain_options) = explain_options {
156        OptimizerContext::new(handler_args.clone(), explain_options)
157    } else {
158        OptimizerContext::from_handler_args(handler_args.clone())
159    };
160
161    let is_auto_schema_change = resolved_with_options
162        .remove(AUTO_SCHEMA_CHANGE_KEY)
163        .map(|value| {
164            value.parse::<bool>().map_err(|_| {
165                ErrorCode::InvalidInputSyntax(format!(
166                    "invalid value {} of '{}' option, expect",
167                    value, AUTO_SCHEMA_CHANGE_KEY
168                ))
169            })
170        })
171        .transpose()?
172        .unwrap_or(false);
173
174    if is_auto_schema_change {
175        Feature::SinkAutoSchemaChange.check_available()?;
176    }
177
178    // Used for debezium's table name
179    let sink_from_table_name;
180    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
181    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
182    let direct_sink_from_name: Option<(ObjectName, bool)>;
183    let query = match stmt.sink_from {
184        CreateSink::From(from_name) => {
185            sink_from_table_name = from_name.0.last().unwrap().real_value();
186            direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
187            if is_auto_schema_change && stmt.into_table_name.is_some() {
188                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
189                    "auto schema change not supported for sink-into-table".to_owned(),
190                )));
191            }
192            Box::new(gen_query_from_table_name(from_name))
193        }
194        CreateSink::AsQuery(query) => {
195            if is_auto_schema_change {
196                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
197                    "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
198                )));
199            }
200            sink_from_table_name = sink_table_name.clone();
201            direct_sink_from_name = None;
202            query
203        }
204    };
205
206    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
207
208    let (sink_database_id, sink_schema_id) =
209        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
210
211    let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
212        let mut binder = Binder::new_for_stream(session);
213        let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
214        {
215            let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
216            if let Relation::BaseTable(table) = from_relation {
217                if table.table_catalog.table_type != TableType::Table {
218                    return Err(ErrorCode::InvalidInputSyntax(format!(
219                        "auto schema change only support on TABLE, but got {:?}",
220                        table.table_catalog.table_type
221                    ))
222                    .into());
223                }
224                if table.table_catalog.database_id != sink_database_id {
225                    return Err(ErrorCode::InvalidInputSyntax(
226                        "auto schema change sink does not support created from cross database table".to_owned()
227                    )
228                        .into());
229                }
230                for col in &table.table_catalog.columns {
231                    if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
232                        return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
233                    }
234                }
235                Some(table.table_catalog)
236            } else {
237                return Err(RwError::from(ErrorCode::NotSupported(
238                    "auto schema change only supported for TABLE".to_owned(),
239                    "try recreating the sink from table".to_owned(),
240                )));
241            }
242        } else {
243            None
244        };
245        let bound = binder.bind_query(&query)?;
246
247        (
248            binder.included_relations().clone(),
249            binder.included_udfs().clone(),
250            bound,
251            auto_refresh_schema_from_table,
252        )
253    };
254
255    let col_names = if sink_into_table_name.is_some() {
256        parse_column_names(&stmt.columns)
257    } else {
258        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
259        get_column_names(&bound, stmt.columns)?
260    };
261
262    if sink_into_table_name.is_some() {
263        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
264
265        if prev.is_some() {
266            return Err(RwError::from(ErrorCode::BindError(
267                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
268            )));
269        }
270    }
271
272    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
273    if emit_on_window_close {
274        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
275    }
276
277    let connector = resolved_with_options
278        .get(CONNECTOR_TYPE_KEY)
279        .cloned()
280        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
281
282    let format_desc = match stmt.sink_schema {
283        // Case A: new syntax `format ... encode ...`
284        Some(f) => {
285            validate_compatibility(&connector, &f)?;
286            Some(bind_sink_format_desc(session,f)?)
287        }
288        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
289            // Case B: old syntax `type = '...'`
290            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
291                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
292                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
293                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
294                }
295                f
296            }),
297            // Case C: no format + encode required
298            None => None,
299        },
300    };
301
302    let definition = context.normalized_sql().to_owned();
303    let mut plan_root = if is_iceberg_engine_internal {
304        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
305    } else {
306        Planner::new_for_stream(context.into()).plan_query(bound)?
307    };
308    if let Some(col_names) = &col_names {
309        plan_root.set_out_names(col_names.clone())?;
310    };
311
312    let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
313        Some(flag) if flag.eq_ignore_ascii_case("false") => {
314            if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
315                true
316            } else {
317                return Err(ErrorCode::BindError(
318                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
319                )
320                .into());
321            }
322        }
323        _ => false,
324    };
325
326    let target_table_catalog = stmt
327        .into_table_name
328        .as_ref()
329        .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
330        .transpose()?;
331
332    if let Some(target_table_catalog) = &target_table_catalog {
333        if let Some(col_names) = col_names {
334            let target_table_columns = target_table_catalog
335                .columns()
336                .iter()
337                .map(|c| c.name())
338                .collect::<BTreeSet<_>>();
339            for c in col_names {
340                if !target_table_columns.contains(c.as_str()) {
341                    return Err(RwError::from(ErrorCode::BindError(format!(
342                        "Column {} not found in table {}",
343                        c,
344                        target_table_catalog.name()
345                    ))));
346                }
347            }
348        }
349        if target_table_catalog
350            .columns()
351            .iter()
352            .any(|col| !col.nullable())
353        {
354            notice_to_user(format!(
355                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
356                target_table_catalog.name(),
357            ));
358        }
359    }
360
361    let sink_plan = plan_root.gen_sink_plan(
362        sink_table_name,
363        definition,
364        resolved_with_options,
365        emit_on_window_close,
366        db_name.to_owned(),
367        sink_from_table_name,
368        format_desc,
369        without_backfill,
370        target_table_catalog.clone(),
371        partition_info,
372        user_specified_columns,
373        auto_refresh_schema_from_table,
374    )?;
375
376    let sink_desc = sink_plan.sink_desc().clone();
377
378    let mut sink_plan: PlanRef = sink_plan.into();
379
380    let ctx = sink_plan.ctx();
381    let explain_trace = ctx.is_explain_trace();
382    if explain_trace {
383        ctx.trace("Create Sink:");
384        ctx.trace(sink_plan.explain_to_string());
385    }
386    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
387
388    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
389    // during binding instead of visiting the optimized plan.
390    let dependencies =
391        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
392            .into_iter()
393            .map(|id| id.table_id() as ObjectId)
394            .chain(
395                dependent_udfs
396                    .into_iter()
397                    .map(|id| id.function_id() as ObjectId),
398            )
399            .collect();
400
401    let sink_catalog = sink_desc.into_catalog(
402        SchemaId::new(sink_schema_id),
403        DatabaseId::new(sink_database_id),
404        UserId::new(session.user_id()),
405        connector_conn_ref.map(ConnectionId::from),
406    );
407
408    if let Some(table_catalog) = &target_table_catalog {
409        for column in sink_catalog.full_columns() {
410            if !column.can_dml() {
411                unreachable!(
412                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
413                );
414            }
415        }
416
417        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
418        let exprs = derive_default_column_project_for_sink(
419            &sink_catalog,
420            sink_plan.schema(),
421            &table_columns_without_rw_timestamp,
422            user_specified_columns,
423        )?;
424
425        let logical_project = generic::Project::new(exprs, sink_plan);
426
427        sink_plan = StreamProject::new(logical_project).into();
428
429        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
430            &table_columns_without_rw_timestamp,
431        )?;
432
433        if let Some(exprs) = exprs {
434            let logical_project = generic::Project::new(exprs, sink_plan);
435            sink_plan = StreamProject::new(logical_project).into();
436        }
437    };
438
439    Ok(SinkPlanContext {
440        query,
441        sink_plan,
442        sink_catalog,
443        target_table_catalog,
444        dependencies,
445    })
446}
447
448// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
449// Return:
450// `Some(PartitionComputeInfo)` if the sink need to compute partition.
451// `None` if the sink does not need to compute partition.
452pub async fn get_partition_compute_info(
453    with_options: &WithOptionsSecResolved,
454) -> Result<Option<PartitionComputeInfo>> {
455    let (options, secret_refs) = with_options.clone().into_parts();
456    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
457        return Ok(None);
458    };
459    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
460    match connector.as_str() {
461        ICEBERG_SINK => {
462            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
463            get_partition_compute_info_for_iceberg(&iceberg_config).await
464        }
465        _ => Ok(None),
466    }
467}
468
469#[allow(clippy::unused_async)]
470async fn get_partition_compute_info_for_iceberg(
471    _iceberg_config: &IcebergConfig,
472) -> Result<Option<PartitionComputeInfo>> {
473    // TODO: check table if exists
474    if _iceberg_config.create_table_if_not_exists {
475        return Ok(None);
476    }
477    let table = _iceberg_config.load_table().await?;
478    let partition_spec = table.metadata().default_partition_spec();
479    if partition_spec.is_unpartitioned() {
480        return Ok(None);
481    }
482
483    // Separate the partition spec into two parts: sparse partition and range partition.
484    // Sparse partition means that the data distribution is more sparse at a given time.
485    // Range partition means that the data distribution is likely same at a given time.
486    // Only compute the partition and shuffle by them for the sparse partition.
487    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
488        // Sparse partition
489        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
490        // Range partition
491        Transform::Year
492        | Transform::Month
493        | Transform::Day
494        | Transform::Hour
495        | Transform::Void
496        | Transform::Unknown => false,
497    });
498    if !has_sparse_partition {
499        return Ok(None);
500    }
501
502    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
503        table.metadata().default_partition_type().clone(),
504    ))
505    .map_err(|_| {
506        RwError::from(ErrorCode::SinkError(
507            "Fail to convert iceberg partition type to arrow type".into(),
508        ))
509    })?;
510    let ArrowDataType::Struct(struct_fields) = arrow_type else {
511        return Err(RwError::from(ErrorCode::SinkError(
512            "Partition type of iceberg should be a struct type".into(),
513        )));
514    };
515
516    let schema = table.metadata().current_schema();
517    let partition_fields = partition_spec
518        .fields()
519        .iter()
520        .map(|f| {
521            let source_f =
522                schema
523                    .field_by_id(f.source_id)
524                    .ok_or(RwError::from(ErrorCode::SinkError(
525                        "Fail to look up iceberg partition field".into(),
526                    )))?;
527            Ok((source_f.name.clone(), f.transform))
528        })
529        .collect::<Result<Vec<_>>>()?;
530
531    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
532        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
533        partition_fields,
534    })))
535}
536
537pub async fn handle_create_sink(
538    handle_args: HandlerArgs,
539    stmt: CreateSinkStatement,
540    is_iceberg_engine_internal: bool,
541) -> Result<RwPgResponse> {
542    let session = handle_args.session.clone();
543
544    session.check_cluster_limits().await?;
545
546    let if_not_exists = stmt.if_not_exists;
547    if let Either::Right(resp) = session.check_relation_name_duplicated(
548        stmt.sink_name.clone(),
549        StatementType::CREATE_SINK,
550        if_not_exists,
551    )? {
552        return Ok(resp);
553    }
554
555    let (mut sink, graph, target_table_catalog, dependencies) = {
556        let SinkPlanContext {
557            query,
558            sink_plan: plan,
559            sink_catalog: sink,
560            target_table_catalog,
561            dependencies,
562        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
563
564        let has_order_by = !query.order_by.is_empty();
565        if has_order_by {
566            plan.ctx().warn_to_user(
567                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
568                    .to_owned(),
569            );
570        }
571
572        let graph = build_graph(plan, Some(GraphJobType::Sink))?;
573
574        (sink, graph, target_table_catalog, dependencies)
575    };
576
577    let mut target_table_replace_plan = None;
578    if let Some(table_catalog) = target_table_catalog {
579        use crate::handler::alter_table_column::hijack_merger_for_target_table;
580
581        let (mut graph, mut table, source, target_job_type) =
582            reparse_table_for_sink(&session, &table_catalog).await?;
583
584        sink.original_target_columns = table.columns.clone();
585
586        table
587            .incoming_sinks
588            .clone_from(&table_catalog.incoming_sinks);
589
590        let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
591        let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
592
593        let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
594        for existing_sink in incoming_sinks {
595            hijack_merger_for_target_table(
596                &mut graph,
597                &columns_without_rw_timestamp,
598                &existing_sink,
599                Some(&existing_sink.unique_identity()),
600            )?;
601        }
602
603        // for new creating sink, we don't have a unique identity because the sink id is not generated yet.
604        hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
605
606        target_table_replace_plan = Some(ReplaceJobPlan {
607            replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
608                replace_job_plan::ReplaceTable {
609                    table: Some(table.to_prost()),
610                    source: source.map(|x| x.to_prost()),
611                    job_type: target_job_type as _,
612                },
613            )),
614            fragment_graph: Some(graph),
615        });
616    }
617
618    let _job_guard =
619        session
620            .env()
621            .creating_streaming_job_tracker()
622            .guard(CreatingStreamingJobInfo::new(
623                session.session_id(),
624                sink.database_id.database_id,
625                sink.schema_id.schema_id,
626                sink.name.clone(),
627            ));
628
629    let catalog_writer = session.catalog_writer()?;
630    catalog_writer
631        .create_sink(
632            sink.to_proto(),
633            graph,
634            target_table_replace_plan,
635            dependencies,
636            if_not_exists,
637        )
638        .await?;
639
640    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
641}
642
643pub fn fetch_incoming_sinks(
644    session: &Arc<SessionImpl>,
645    incoming_sink_ids: &HashSet<SinkId>,
646) -> Result<Vec<Arc<SinkCatalog>>> {
647    let reader = session.env().catalog_reader().read_guard();
648    let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
649    let db_name = &session.database();
650    for schema in reader.iter_schemas(db_name)? {
651        for sink in schema.iter_sink() {
652            if incoming_sink_ids.contains(&sink.id.sink_id) {
653                sinks.push(sink.clone());
654            }
655        }
656    }
657
658    Ok(sinks)
659}
660
661pub(crate) async fn reparse_table_for_sink(
662    session: &Arc<SessionImpl>,
663    table_catalog: &Arc<TableCatalog>,
664) -> Result<(
665    StreamFragmentGraph,
666    TableCatalog,
667    Option<SourceCatalog>,
668    TableJobType,
669)> {
670    // Retrieve the original table definition and parse it to AST.
671    let definition = table_catalog.create_sql_ast_purified()?;
672    let Statement::CreateTable { name, .. } = &definition else {
673        panic!("unexpected statement: {:?}", definition);
674    };
675    let table_name = name.clone();
676
677    // Create handler args as if we're creating a new table with the altered definition.
678    let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
679    let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
680
681    let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
682        session,
683        table_name,
684        table_catalog,
685        handler_args,
686        definition,
687        col_id_gen,
688        SqlColumnStrategy::FollowUnchecked,
689    )
690    .await?;
691
692    Ok((graph, table, source, job_type))
693}
694
695pub(crate) fn insert_merger_to_union_with_project(
696    node: &mut StreamNode,
697    project_node: &PbNodeBody,
698    uniq_identity: Option<&str>,
699) {
700    if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
701        // TODO: MergeNode is used as a placeholder, see issue #17658
702        node.input.push(StreamNode {
703            input: vec![StreamNode {
704                node_body: Some(NodeBody::Merge(Box::new(MergeNode {
705                    ..Default::default()
706                }))),
707                ..Default::default()
708            }],
709            identity: uniq_identity
710                .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
711                .to_owned(),
712            fields: node.fields.clone(),
713            node_body: Some(project_node.clone()),
714            ..Default::default()
715        });
716
717        return;
718    }
719
720    for input in &mut node.input {
721        insert_merger_to_union_with_project(input, project_node, uniq_identity);
722    }
723}
724
725fn derive_sink_to_table_expr(
726    sink_schema: &Schema,
727    idx: usize,
728    target_type: &DataType,
729) -> Result<ExprImpl> {
730    let input_type = &sink_schema.fields()[idx].data_type;
731
732    if target_type != input_type {
733        bail!(
734            "column type mismatch: {:?} vs {:?}, column name: {:?}",
735            target_type,
736            input_type,
737            sink_schema.fields()[idx].name
738        );
739    } else {
740        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
741            idx,
742            input_type.clone(),
743        ))))
744    }
745}
746
747pub(crate) fn derive_default_column_project_for_sink(
748    sink: &SinkCatalog,
749    sink_schema: &Schema,
750    columns: &[ColumnCatalog],
751    user_specified_columns: bool,
752) -> Result<Vec<ExprImpl>> {
753    assert_eq!(sink.full_schema().len(), sink_schema.len());
754
755    let default_column_exprs = TableCatalog::default_column_exprs(columns);
756
757    let mut exprs = vec![];
758
759    let sink_visible_col_idxes = sink
760        .full_columns()
761        .iter()
762        .positions(|c| !c.is_hidden())
763        .collect_vec();
764    let sink_visible_col_idxes_by_name = sink
765        .full_columns()
766        .iter()
767        .enumerate()
768        .filter(|(_, c)| !c.is_hidden())
769        .map(|(i, c)| (c.name(), i))
770        .collect::<BTreeMap<_, _>>();
771
772    for (idx, column) in columns.iter().enumerate() {
773        if !column.can_dml() {
774            continue;
775        }
776
777        let default_col_expr =
778            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
779
780        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
781            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
782        };
783
784        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
785        // The missing columns will be filled with default value (`null` if not explicitly defined).
786        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
787        #[allow(clippy::collapsible_else_if)]
788        if user_specified_columns {
789            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
790                exprs.push(sink_col_expr(*idx)?);
791            } else {
792                exprs.push(default_col_expr());
793            }
794        } else {
795            if idx < sink_visible_col_idxes.len() {
796                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
797            } else {
798                exprs.push(default_col_expr());
799            };
800        }
801    }
802    Ok(exprs)
803}
804
805/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
806/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
807/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
808fn bind_sink_format_desc(
809    session: &SessionImpl,
810    value: FormatEncodeOptions,
811) -> Result<SinkFormatDesc> {
812    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
813    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
814    use risingwave_sqlparser::ast::{Encode as E, Format as F};
815
816    let format = match value.format {
817        F::Plain => SinkFormat::AppendOnly,
818        F::Upsert => SinkFormat::Upsert,
819        F::Debezium => SinkFormat::Debezium,
820        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
821            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
822        }
823    };
824    let encode = match value.row_encode {
825        E::Json => SinkEncode::Json,
826        E::Protobuf => SinkEncode::Protobuf,
827        E::Avro => SinkEncode::Avro,
828        E::Template => SinkEncode::Template,
829        E::Parquet => SinkEncode::Parquet,
830        e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
831            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
832        }
833    };
834
835    let mut key_encode = None;
836    if let Some(encode) = value.key_encode {
837        match encode {
838            E::Text => key_encode = Some(SinkEncode::Text),
839            E::Bytes => key_encode = Some(SinkEncode::Bytes),
840            _ => {
841                return Err(ErrorCode::BindError(format!(
842                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
843                ))
844                .into());
845            }
846        }
847    }
848
849    let (props, connection_type_flag, schema_registry_conn_ref) =
850        resolve_connection_ref_and_secret_ref(
851            WithOptions::try_from(value.row_options.as_slice())?,
852            session,
853            Some(TelemetryDatabaseObject::Sink),
854        )?;
855    ensure_connection_type_allowed(
856        connection_type_flag,
857        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
858    )?;
859    let (mut options, secret_refs) = props.into_parts();
860
861    options
862        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
863        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
864
865    Ok(SinkFormatDesc {
866        format,
867        encode,
868        options,
869        secret_refs,
870        key_encode,
871        connection_id: schema_registry_conn_ref,
872    })
873}
874
875static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
876    LazyLock::new(|| {
877        use risingwave_connector::sink::Sink as _;
878        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
879        use risingwave_connector::sink::file_sink::fs::FsSink;
880        use risingwave_connector::sink::file_sink::gcs::GcsSink;
881        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
882        use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
883        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
884        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
885        use risingwave_connector::sink::kafka::KafkaSink;
886        use risingwave_connector::sink::kinesis::KinesisSink;
887        use risingwave_connector::sink::mqtt::MqttSink;
888        use risingwave_connector::sink::pulsar::PulsarSink;
889        use risingwave_connector::sink::redis::RedisSink;
890
891        convert_args!(hashmap!(
892                GooglePubSubSink::SINK_NAME => hashmap!(
893                    Format::Plain => vec![Encode::Json],
894                ),
895                KafkaSink::SINK_NAME => hashmap!(
896                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
897                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
898                    Format::Debezium => vec![Encode::Json],
899                ),
900                FileSink::<S3Sink>::SINK_NAME => hashmap!(
901                    Format::Plain => vec![Encode::Parquet, Encode::Json],
902                ),
903                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
904                    Format::Plain => vec![Encode::Parquet, Encode::Json],
905                ),
906                FileSink::<GcsSink>::SINK_NAME => hashmap!(
907                    Format::Plain => vec![Encode::Parquet, Encode::Json],
908                ),
909                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
910                    Format::Plain => vec![Encode::Parquet, Encode::Json],
911                ),
912                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
913                    Format::Plain => vec![Encode::Parquet, Encode::Json],
914                ),
915                FileSink::<FsSink>::SINK_NAME => hashmap!(
916                    Format::Plain => vec![Encode::Parquet, Encode::Json],
917                ),
918                KinesisSink::SINK_NAME => hashmap!(
919                    Format::Plain => vec![Encode::Json],
920                    Format::Upsert => vec![Encode::Json],
921                    Format::Debezium => vec![Encode::Json],
922                ),
923                MqttSink::SINK_NAME => hashmap!(
924                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
925                ),
926                PulsarSink::SINK_NAME => hashmap!(
927                    Format::Plain => vec![Encode::Json],
928                    Format::Upsert => vec![Encode::Json],
929                    Format::Debezium => vec![Encode::Json],
930                ),
931                RedisSink::SINK_NAME => hashmap!(
932                    Format::Plain => vec![Encode::Json, Encode::Template],
933                    Format::Upsert => vec![Encode::Json, Encode::Template],
934                ),
935        ))
936    });
937
938pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
939    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
940        .get(connector)
941        .ok_or_else(|| {
942            ErrorCode::BindError(format!(
943                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
944                connector
945            ))
946        })?;
947    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
948        ErrorCode::BindError(format!(
949            "connector {} does not support format {:?}",
950            connector, format_desc.format
951        ))
952    })?;
953    if !compatible_encodes.contains(&format_desc.row_encode) {
954        return Err(ErrorCode::BindError(format!(
955            "connector {} does not support format {:?} with encode {:?}",
956            connector, format_desc.format, format_desc.row_encode
957        ))
958        .into());
959    }
960
961    // only allow Kafka connector work with `bytes` as key encode
962    if let Some(encode) = &format_desc.key_encode
963        && connector != KAFKA_SINK
964        && matches!(encode, Encode::Bytes)
965    {
966        return Err(ErrorCode::BindError(format!(
967            "key encode bytes only works with kafka connector, but found {}",
968            connector
969        ))
970        .into());
971    }
972
973    Ok(())
974}
975
976#[cfg(test)]
977pub mod tests {
978    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
979
980    use crate::catalog::root_catalog::SchemaPath;
981    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
982
983    #[tokio::test]
984    async fn test_create_sink_handler() {
985        let proto_file = create_proto_file(PROTO_FILE_DATA);
986        let sql = format!(
987            r#"CREATE SOURCE t1
988    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
989    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
990            proto_file.path().to_str().unwrap()
991        );
992        let frontend = LocalFrontend::new(Default::default()).await;
993        frontend.run_sql(sql).await.unwrap();
994
995        let sql = "create materialized view mv1 as select t1.country from t1;";
996        frontend.run_sql(sql).await.unwrap();
997
998        let sql = r#"CREATE SINK snk1 FROM mv1
999                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
1000                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
1001                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
1002        frontend.run_sql(sql).await.unwrap();
1003
1004        let session = frontend.session_ref();
1005        let catalog_reader = session.env().catalog_reader().read_guard();
1006        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1007
1008        // Check source exists.
1009        let (source, _) = catalog_reader
1010            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
1011            .unwrap();
1012        assert_eq!(source.name, "t1");
1013
1014        // Check table exists.
1015        let (table, schema_name) = catalog_reader
1016            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
1017            .unwrap();
1018        assert_eq!(table.name(), "mv1");
1019
1020        // Check sink exists.
1021        let (sink, _) = catalog_reader
1022            .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
1023            .unwrap();
1024        assert_eq!(sink.name, "snk1");
1025    }
1026}