risingwave_frontend/handler/
create_sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{ColumnCatalog, ICEBERG_SINK_PREFIX, ObjectId, Schema, UserId};
28use risingwave_common::license::Feature;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::types::DataType;
32use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
33use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
34use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
35use risingwave_connector::sink::kafka::KAFKA_SINK;
36use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
37use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
38use risingwave_connector::sink::{
39    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40    SINK_USER_IGNORE_DELETE_OPTION, Sink, enforce_secret_sink,
41};
42use risingwave_connector::{
43    AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
44    SINK_TARGET_TABLE_NAME, WithPropertiesExt,
45};
46use risingwave_pb::catalog::connection_params::PbConnectionType;
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50    ObjectName, Query,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::UPSTREAM_SOURCE_KEY;
56use super::util::gen_query_from_table_name;
57use crate::binder::{Binder, Relation};
58use crate::catalog::table_catalog::TableType;
59use crate::error::{ErrorCode, Result, RwError};
60use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
61use crate::handler::HandlerArgs;
62use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
63use crate::handler::create_mv::parse_column_names;
64use crate::handler::util::{
65    LongRunningNotificationAction, check_connector_match_connection_type,
66    ensure_connection_type_allowed, execute_with_long_running_notification,
67};
68use crate::optimizer::backfill_order_strategy::plan_backfill_order;
69use crate::optimizer::plan_node::{
70    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
71    StreamProject, generic,
72};
73use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
74use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
75use crate::session::SessionImpl;
76use crate::session::current::notice_to_user;
77use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
78use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
79use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
80
81static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
82    LazyLock::new(|| {
83        hashset! {
84            PbConnectionType::Unspecified,
85            PbConnectionType::Kafka,
86            PbConnectionType::Iceberg,
87            PbConnectionType::Elasticsearch,
88        }
89    });
90
91static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
92    LazyLock::new(|| {
93        hashset! {
94            PbConnectionType::Unspecified,
95            PbConnectionType::SchemaRegistry,
96        }
97    });
98
99// used to store result of `gen_sink_plan`
100pub struct SinkPlanContext {
101    pub query: Box<Query>,
102    pub sink_plan: PlanRef,
103    pub sink_catalog: SinkCatalog,
104    pub target_table_catalog: Option<Arc<TableCatalog>>,
105    pub dependencies: HashSet<ObjectId>,
106}
107
108pub async fn gen_sink_plan(
109    handler_args: HandlerArgs,
110    stmt: CreateSinkStatement,
111    explain_options: Option<ExplainOptions>,
112    is_iceberg_engine_internal: bool,
113) -> Result<SinkPlanContext> {
114    let session = handler_args.session.clone();
115    let session = session.as_ref();
116    let user_specified_columns = !stmt.columns.is_empty();
117    let db_name = &session.database();
118    let (sink_schema_name, sink_table_name) =
119        Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
120
121    let mut with_options = handler_args.with_options.clone();
122
123    if session
124        .env()
125        .system_params_manager()
126        .get_params()
127        .load()
128        .enforce_secret()
129        && Feature::SecretManagement.check_available().is_ok()
130    {
131        enforce_secret_sink(&with_options)?;
132    }
133
134    resolve_privatelink_in_with_option(&mut with_options)?;
135    let (mut resolved_with_options, connection_type, connector_conn_ref) =
136        resolve_connection_ref_and_secret_ref(
137            with_options,
138            session,
139            Some(TelemetryDatabaseObject::Sink),
140        )?;
141    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
142
143    // if not using connection, we don't need to check connector match connection type
144    if !matches!(connection_type, PbConnectionType::Unspecified) {
145        let Some(connector) = resolved_with_options.get_connector() else {
146            return Err(RwError::from(ErrorCode::ProtocolError(format!(
147                "missing field '{}' in WITH clause",
148                CONNECTOR_TYPE_KEY
149            ))));
150        };
151        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
152    }
153
154    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
155
156    let context = if let Some(explain_options) = explain_options {
157        OptimizerContext::new(handler_args.clone(), explain_options)
158    } else {
159        OptimizerContext::from_handler_args(handler_args.clone())
160    };
161
162    let is_auto_schema_change = resolved_with_options
163        .remove(AUTO_SCHEMA_CHANGE_KEY)
164        .map(|value| {
165            value.parse::<bool>().map_err(|_| {
166                ErrorCode::InvalidInputSyntax(format!(
167                    "invalid value {} of '{}' option, expect",
168                    value, AUTO_SCHEMA_CHANGE_KEY
169                ))
170            })
171        })
172        .transpose()?
173        .unwrap_or(false);
174
175    if is_auto_schema_change {
176        Feature::SinkAutoSchemaChange.check_available()?;
177    }
178
179    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
180    if sink_into_table_name.is_some() {
181        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
182
183        if prev.is_some() {
184            return Err(RwError::from(ErrorCode::BindError(
185                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
186            )));
187        }
188    }
189    let connector = resolved_with_options
190        .get(CONNECTOR_TYPE_KEY)
191        .cloned()
192        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
193
194    // Used for debezium's table name
195    let sink_from_table_name;
196    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
197    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
198    let direct_sink_from_name: Option<(ObjectName, bool)>;
199    let query = match stmt.sink_from {
200        CreateSink::From(from_name) => {
201            sink_from_table_name = from_name.0.last().unwrap().real_value();
202            direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
203            if is_auto_schema_change && sink_into_table_name.is_some() {
204                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
205                    "auto schema change not supported for sink-into-table".to_owned(),
206                )));
207            }
208            if resolved_with_options
209                .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
210                && connector == RedshiftSink::SINK_NAME
211                || connector == SnowflakeV2Sink::SINK_NAME
212            {
213                if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
214                    // auto fill intermediate table name if target table name is specified
215                    if resolved_with_options
216                        .get(SINK_INTERMEDIATE_TABLE_NAME)
217                        .is_none()
218                    {
219                        // generate the intermediate table name with random value appended to the target table name
220                        let intermediate_table_name = format!(
221                            "rw_{}_{}_{}",
222                            sink_table_name,
223                            table_name,
224                            uuid::Uuid::new_v4()
225                        );
226                        resolved_with_options.insert(
227                            SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
228                            intermediate_table_name,
229                        );
230                    }
231                } else {
232                    return Err(RwError::from(ErrorCode::BindError(
233                        "'table.name' option must be specified.".to_owned(),
234                    )));
235                }
236            }
237            Box::new(gen_query_from_table_name(from_name))
238        }
239        CreateSink::AsQuery(query) => {
240            if is_auto_schema_change {
241                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
242                    "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
243                )));
244            }
245            sink_from_table_name = sink_table_name.clone();
246            direct_sink_from_name = None;
247            query
248        }
249    };
250
251    let (sink_database_id, sink_schema_id) =
252        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
253
254    let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
255        let mut binder = Binder::new_for_stream(session);
256        let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
257        {
258            let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
259            if let Relation::BaseTable(table) = from_relation {
260                if table.table_catalog.table_type != TableType::Table {
261                    return Err(ErrorCode::InvalidInputSyntax(format!(
262                        "auto schema change only support on TABLE, but got {:?}",
263                        table.table_catalog.table_type
264                    ))
265                    .into());
266                }
267                if table.table_catalog.database_id != sink_database_id {
268                    return Err(ErrorCode::InvalidInputSyntax(
269                        "auto schema change sink does not support created from cross database table".to_owned()
270                    )
271                        .into());
272                }
273                for col in &table.table_catalog.columns {
274                    if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
275                        return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
276                    }
277                }
278                Some(table.table_catalog)
279            } else {
280                return Err(RwError::from(ErrorCode::NotSupported(
281                    "auto schema change only supported for TABLE".to_owned(),
282                    "try recreating the sink from table".to_owned(),
283                )));
284            }
285        } else {
286            None
287        };
288
289        let bound = binder.bind_query(&query)?;
290
291        (
292            binder.included_relations().clone(),
293            binder.included_udfs().clone(),
294            bound,
295            auto_refresh_schema_from_table,
296        )
297    };
298
299    let col_names = if sink_into_table_name.is_some() {
300        parse_column_names(&stmt.columns)
301    } else {
302        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
303        get_column_names(&bound, stmt.columns)?
304    };
305
306    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
307    if emit_on_window_close {
308        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
309    }
310
311    let format_desc = match stmt.sink_schema {
312        // Case A: new syntax `format ... encode ...`
313        Some(f) => {
314            validate_compatibility(&connector, &f)?;
315            Some(bind_sink_format_desc(session,f)?)
316        }
317        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
318            // Case B: old syntax `type = '...'`
319            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
320                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
321                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
322                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
323                }
324                if let Some(v) = resolved_with_options.get(SINK_USER_IGNORE_DELETE_OPTION) {
325                    f.options.insert(SINK_USER_IGNORE_DELETE_OPTION.into(), v.into());
326                }
327                f
328            }),
329            // Case C: no format + encode required
330            None => None,
331        },
332    };
333
334    let definition = context.normalized_sql().to_owned();
335    let mut plan_root = if is_iceberg_engine_internal {
336        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
337    } else {
338        Planner::new_for_stream(context.into()).plan_query(bound)?
339    };
340    if let Some(col_names) = &col_names {
341        plan_root.set_out_names(col_names.clone())?;
342    };
343
344    let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
345        Some(flag) if flag.eq_ignore_ascii_case("false") => {
346            if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
347                true
348            } else {
349                return Err(ErrorCode::BindError(
350                    "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
351                )
352                .into());
353            }
354        }
355        _ => false,
356    };
357
358    let target_table_catalog = stmt
359        .into_table_name
360        .as_ref()
361        .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
362        .transpose()?;
363
364    if let Some(target_table_catalog) = &target_table_catalog {
365        if let Some(col_names) = col_names {
366            let target_table_columns = target_table_catalog
367                .columns()
368                .iter()
369                .map(|c| c.name())
370                .collect::<BTreeSet<_>>();
371            for c in col_names {
372                if !target_table_columns.contains(c.as_str()) {
373                    return Err(RwError::from(ErrorCode::BindError(format!(
374                        "Column {} not found in table {}",
375                        c,
376                        target_table_catalog.name()
377                    ))));
378                }
379            }
380        }
381        if target_table_catalog
382            .columns()
383            .iter()
384            .any(|col| !col.nullable())
385        {
386            notice_to_user(format!(
387                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
388                target_table_catalog.name(),
389            ));
390        }
391    }
392
393    let allow_snapshot_backfill = target_table_catalog.is_none() && !is_iceberg_engine_internal;
394
395    let sink_plan = plan_root.gen_sink_plan(
396        sink_table_name,
397        definition,
398        resolved_with_options,
399        emit_on_window_close,
400        db_name.to_owned(),
401        sink_from_table_name,
402        format_desc,
403        without_backfill,
404        target_table_catalog.clone(),
405        partition_info,
406        user_specified_columns,
407        auto_refresh_schema_from_table,
408        allow_snapshot_backfill,
409    )?;
410
411    let sink_desc = sink_plan.sink_desc().clone();
412
413    let mut sink_plan: PlanRef = sink_plan.into();
414
415    let ctx = sink_plan.ctx();
416    let explain_trace = ctx.is_explain_trace();
417    if explain_trace {
418        ctx.trace("Create Sink:");
419        ctx.trace(sink_plan.explain_to_string());
420    }
421    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
422
423    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
424    // during binding instead of visiting the optimized plan.
425    let dependencies =
426        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
427            .into_iter()
428            .chain(dependent_udfs.iter().copied().map_into())
429            .collect();
430
431    let sink_catalog = sink_desc.into_catalog(
432        sink_schema_id,
433        sink_database_id,
434        UserId::new(session.user_id()),
435        connector_conn_ref,
436    );
437
438    if let Some(table_catalog) = &target_table_catalog {
439        for column in sink_catalog.full_columns() {
440            if !column.can_dml() {
441                unreachable!(
442                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
443                );
444            }
445        }
446
447        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
448        let exprs = derive_default_column_project_for_sink(
449            &sink_catalog,
450            sink_plan.schema(),
451            &table_columns_without_rw_timestamp,
452            user_specified_columns,
453        )?;
454
455        let logical_project = generic::Project::new(exprs, sink_plan);
456
457        sink_plan = StreamProject::new(logical_project).into();
458
459        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
460            &table_columns_without_rw_timestamp,
461        )?;
462
463        if let Some(exprs) = exprs {
464            let logical_project = generic::Project::new(exprs, sink_plan);
465            sink_plan = StreamProject::new(logical_project).into();
466        }
467    };
468
469    Ok(SinkPlanContext {
470        query,
471        sink_plan,
472        sink_catalog,
473        target_table_catalog,
474        dependencies,
475    })
476}
477
478// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
479// Return:
480// `Some(PartitionComputeInfo)` if the sink need to compute partition.
481// `None` if the sink does not need to compute partition.
482pub async fn get_partition_compute_info(
483    with_options: &WithOptionsSecResolved,
484) -> Result<Option<PartitionComputeInfo>> {
485    let (options, secret_refs) = with_options.clone().into_parts();
486    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
487        return Ok(None);
488    };
489    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
490    match connector.as_str() {
491        ICEBERG_SINK => {
492            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
493            get_partition_compute_info_for_iceberg(&iceberg_config).await
494        }
495        _ => Ok(None),
496    }
497}
498
499#[allow(clippy::unused_async)]
500async fn get_partition_compute_info_for_iceberg(
501    _iceberg_config: &IcebergConfig,
502) -> Result<Option<PartitionComputeInfo>> {
503    // TODO: check table if exists
504    if _iceberg_config.create_table_if_not_exists {
505        return Ok(None);
506    }
507    let table = _iceberg_config.load_table().await?;
508    let partition_spec = table.metadata().default_partition_spec();
509    if partition_spec.is_unpartitioned() {
510        return Ok(None);
511    }
512
513    // Separate the partition spec into two parts: sparse partition and range partition.
514    // Sparse partition means that the data distribution is more sparse at a given time.
515    // Range partition means that the data distribution is likely same at a given time.
516    // Only compute the partition and shuffle by them for the sparse partition.
517    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
518        // Sparse partition
519        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
520        // Range partition
521        Transform::Year
522        | Transform::Month
523        | Transform::Day
524        | Transform::Hour
525        | Transform::Void
526        | Transform::Unknown => false,
527    });
528    if !has_sparse_partition {
529        return Ok(None);
530    }
531
532    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
533        table.metadata().default_partition_type().clone(),
534    ))
535    .map_err(|_| {
536        RwError::from(ErrorCode::SinkError(
537            "Fail to convert iceberg partition type to arrow type".into(),
538        ))
539    })?;
540    let ArrowDataType::Struct(struct_fields) = arrow_type else {
541        return Err(RwError::from(ErrorCode::SinkError(
542            "Partition type of iceberg should be a struct type".into(),
543        )));
544    };
545
546    let schema = table.metadata().current_schema();
547    let partition_fields = partition_spec
548        .fields()
549        .iter()
550        .map(|f| {
551            let source_f =
552                schema
553                    .field_by_id(f.source_id)
554                    .ok_or(RwError::from(ErrorCode::SinkError(
555                        "Fail to look up iceberg partition field".into(),
556                    )))?;
557            Ok((source_f.name.clone(), f.transform))
558        })
559        .collect::<Result<Vec<_>>>()?;
560
561    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
562        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
563        partition_fields,
564    })))
565}
566
567pub async fn handle_create_sink(
568    handle_args: HandlerArgs,
569    stmt: CreateSinkStatement,
570    is_iceberg_engine_internal: bool,
571) -> Result<RwPgResponse> {
572    let session = handle_args.session.clone();
573
574    session.check_cluster_limits().await?;
575
576    let if_not_exists = stmt.if_not_exists;
577    if let Either::Right(resp) = session.check_relation_name_duplicated(
578        stmt.sink_name.clone(),
579        StatementType::CREATE_SINK,
580        if_not_exists,
581    )? {
582        return Ok(resp);
583    }
584
585    if stmt.sink_name.base_name().starts_with(ICEBERG_SINK_PREFIX) {
586        return Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
587            "Sink name cannot start with reserved prefix '{}'",
588            ICEBERG_SINK_PREFIX
589        ))));
590    }
591
592    let (mut sink, graph, target_table_catalog, dependencies) = {
593        let backfill_order_strategy = handle_args.with_options.backfill_order_strategy();
594
595        let SinkPlanContext {
596            query,
597            sink_plan: plan,
598            sink_catalog: sink,
599            target_table_catalog,
600            dependencies,
601        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
602
603        let has_order_by = !query.order_by.is_empty();
604        if has_order_by {
605            plan.ctx().warn_to_user(
606                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
607                    .to_owned(),
608            );
609        }
610
611        let backfill_order =
612            plan_backfill_order(session.as_ref(), backfill_order_strategy, plan.clone())?;
613
614        let graph =
615            build_graph_with_strategy(plan, Some(GraphJobType::Sink), Some(backfill_order))?;
616
617        (sink, graph, target_table_catalog, dependencies)
618    };
619
620    if let Some(table_catalog) = target_table_catalog {
621        sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
622    }
623
624    let _job_guard =
625        session
626            .env()
627            .creating_streaming_job_tracker()
628            .guard(CreatingStreamingJobInfo::new(
629                session.session_id(),
630                sink.database_id,
631                sink.schema_id,
632                sink.name.clone(),
633            ));
634
635    let catalog_writer = session.catalog_writer()?;
636    execute_with_long_running_notification(
637        catalog_writer.create_sink(sink.to_proto(), graph, dependencies, if_not_exists),
638        &session,
639        "CREATE SINK",
640        LongRunningNotificationAction::MonitorBackfillJob,
641    )
642    .await?;
643
644    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
645}
646
647pub fn fetch_incoming_sinks(
648    session: &Arc<SessionImpl>,
649    table: &TableCatalog,
650) -> Result<Vec<Arc<SinkCatalog>>> {
651    let reader = session.env().catalog_reader().read_guard();
652    let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
653    let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
654        return Ok(vec![]);
655    };
656    let mut sinks = vec![];
657    for sink_id in incoming_sinks {
658        sinks.push(
659            schema
660                .get_sink_by_id(*sink_id)
661                .expect("should exist")
662                .clone(),
663        );
664    }
665    Ok(sinks)
666}
667
668fn derive_sink_to_table_expr(
669    sink_schema: &Schema,
670    idx: usize,
671    target_type: &DataType,
672) -> Result<ExprImpl> {
673    let input_type = &sink_schema.fields()[idx].data_type;
674
675    if !target_type.equals_datatype(input_type) {
676        bail!(
677            "column type mismatch: {:?} vs {:?}, column name: {:?}",
678            target_type,
679            input_type,
680            sink_schema.fields()[idx].name
681        );
682    } else {
683        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
684            idx,
685            input_type.clone(),
686        ))))
687    }
688}
689
690pub(crate) fn derive_default_column_project_for_sink(
691    sink: &SinkCatalog,
692    sink_schema: &Schema,
693    columns: &[ColumnCatalog],
694    user_specified_columns: bool,
695) -> Result<Vec<ExprImpl>> {
696    assert_eq!(sink.full_schema().len(), sink_schema.len());
697
698    let default_column_exprs = TableCatalog::default_column_exprs(columns);
699
700    let mut exprs = vec![];
701
702    let sink_visible_col_idxes = sink
703        .full_columns()
704        .iter()
705        .positions(|c| !c.is_hidden())
706        .collect_vec();
707    let sink_visible_col_idxes_by_name = sink
708        .full_columns()
709        .iter()
710        .enumerate()
711        .filter(|(_, c)| !c.is_hidden())
712        .map(|(i, c)| (c.name(), i))
713        .collect::<BTreeMap<_, _>>();
714
715    for (idx, column) in columns.iter().enumerate() {
716        if !column.can_dml() {
717            continue;
718        }
719
720        let default_col_expr =
721            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
722
723        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
724            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
725        };
726
727        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
728        // The missing columns will be filled with default value (`null` if not explicitly defined).
729        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
730        #[allow(clippy::collapsible_else_if)]
731        if user_specified_columns {
732            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
733                exprs.push(sink_col_expr(*idx)?);
734            } else {
735                exprs.push(default_col_expr());
736            }
737        } else {
738            if idx < sink_visible_col_idxes.len() {
739                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
740            } else {
741                exprs.push(default_col_expr());
742            };
743        }
744    }
745    Ok(exprs)
746}
747
748/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
749/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
750/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
751fn bind_sink_format_desc(
752    session: &SessionImpl,
753    value: FormatEncodeOptions,
754) -> Result<SinkFormatDesc> {
755    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
756    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
757    use risingwave_sqlparser::ast::{Encode as E, Format as F};
758
759    let format = match value.format {
760        F::Plain => SinkFormat::AppendOnly,
761        F::Upsert => SinkFormat::Upsert,
762        F::Debezium => SinkFormat::Debezium,
763        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
764            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
765        }
766    };
767    let encode = match value.row_encode {
768        E::Json => SinkEncode::Json,
769        E::Protobuf => SinkEncode::Protobuf,
770        E::Avro => SinkEncode::Avro,
771        E::Template => SinkEncode::Template,
772        E::Parquet => SinkEncode::Parquet,
773        E::Bytes => SinkEncode::Bytes,
774        e @ (E::Native | E::Csv | E::None | E::Text) => {
775            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
776        }
777    };
778
779    let mut key_encode = None;
780    if let Some(encode) = value.key_encode {
781        match encode {
782            E::Text => key_encode = Some(SinkEncode::Text),
783            E::Bytes => key_encode = Some(SinkEncode::Bytes),
784            _ => {
785                return Err(ErrorCode::BindError(format!(
786                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
787                ))
788                .into());
789            }
790        }
791    }
792
793    let (props, connection_type_flag, schema_registry_conn_ref) =
794        resolve_connection_ref_and_secret_ref(
795            WithOptions::try_from(value.row_options.as_slice())?,
796            session,
797            Some(TelemetryDatabaseObject::Sink),
798        )?;
799    ensure_connection_type_allowed(
800        connection_type_flag,
801        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
802    )?;
803    let (mut options, secret_refs) = props.into_parts();
804
805    options
806        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
807        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
808
809    Ok(SinkFormatDesc {
810        format,
811        encode,
812        options,
813        secret_refs,
814        key_encode,
815        connection_id: schema_registry_conn_ref,
816    })
817}
818
819static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
820    LazyLock::new(|| {
821        use risingwave_connector::sink::Sink as _;
822        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
823        use risingwave_connector::sink::file_sink::fs::FsSink;
824        use risingwave_connector::sink::file_sink::gcs::GcsSink;
825        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
826        use risingwave_connector::sink::file_sink::s3::S3Sink;
827        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
828        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
829        use risingwave_connector::sink::kafka::KafkaSink;
830        use risingwave_connector::sink::kinesis::KinesisSink;
831        use risingwave_connector::sink::mqtt::MqttSink;
832        use risingwave_connector::sink::pulsar::PulsarSink;
833        use risingwave_connector::sink::redis::RedisSink;
834
835        convert_args!(hashmap!(
836                GooglePubSubSink::SINK_NAME => hashmap!(
837                    Format::Plain => vec![Encode::Json],
838                ),
839                KafkaSink::SINK_NAME => hashmap!(
840                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf, Encode::Bytes],
841                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
842                    Format::Debezium => vec![Encode::Json],
843                ),
844                FileSink::<S3Sink>::SINK_NAME => hashmap!(
845                    Format::Plain => vec![Encode::Parquet, Encode::Json],
846                ),
847                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
848                    Format::Plain => vec![Encode::Parquet, Encode::Json],
849                ),
850                FileSink::<GcsSink>::SINK_NAME => hashmap!(
851                    Format::Plain => vec![Encode::Parquet, Encode::Json],
852                ),
853                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
854                    Format::Plain => vec![Encode::Parquet, Encode::Json],
855                ),
856                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
857                    Format::Plain => vec![Encode::Parquet, Encode::Json],
858                ),
859                FileSink::<FsSink>::SINK_NAME => hashmap!(
860                    Format::Plain => vec![Encode::Parquet, Encode::Json],
861                ),
862                KinesisSink::SINK_NAME => hashmap!(
863                    Format::Plain => vec![Encode::Json],
864                    Format::Upsert => vec![Encode::Json],
865                    Format::Debezium => vec![Encode::Json],
866                ),
867                MqttSink::SINK_NAME => hashmap!(
868                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
869                ),
870                PulsarSink::SINK_NAME => hashmap!(
871                    Format::Plain => vec![Encode::Json],
872                    Format::Upsert => vec![Encode::Json],
873                    Format::Debezium => vec![Encode::Json],
874                ),
875                RedisSink::SINK_NAME => hashmap!(
876                    Format::Plain => vec![Encode::Json, Encode::Template],
877                    Format::Upsert => vec![Encode::Json, Encode::Template],
878                ),
879        ))
880    });
881
882pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
883    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
884        .get(connector)
885        .ok_or_else(|| {
886            ErrorCode::BindError(format!(
887                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
888                connector
889            ))
890        })?;
891    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
892        ErrorCode::BindError(format!(
893            "connector {} does not support format {:?}",
894            connector, format_desc.format
895        ))
896    })?;
897    if !compatible_encodes.contains(&format_desc.row_encode) {
898        return Err(ErrorCode::BindError(format!(
899            "connector {} does not support format {:?} with encode {:?}",
900            connector, format_desc.format, format_desc.row_encode
901        ))
902        .into());
903    }
904
905    // only allow Kafka connector work with `bytes` as key encode
906    if let Some(encode) = &format_desc.key_encode
907        && connector != KAFKA_SINK
908        && matches!(encode, Encode::Bytes)
909    {
910        return Err(ErrorCode::BindError(format!(
911            "key encode bytes only works with kafka connector, but found {}",
912            connector
913        ))
914        .into());
915    }
916
917    Ok(())
918}
919
920#[cfg(test)]
921pub mod tests {
922    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
923
924    use crate::catalog::root_catalog::SchemaPath;
925    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
926
927    #[tokio::test]
928    async fn test_create_sink_handler() {
929        let proto_file = create_proto_file(PROTO_FILE_DATA);
930        let sql = format!(
931            r#"CREATE SOURCE t1
932    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
933    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
934            proto_file.path().to_str().unwrap()
935        );
936        let frontend = LocalFrontend::new(Default::default()).await;
937        frontend.run_sql(sql).await.unwrap();
938
939        let sql = "create materialized view mv1 as select t1.country from t1;";
940        frontend.run_sql(sql).await.unwrap();
941
942        let sql = r#"CREATE SINK snk1 FROM mv1
943                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
944                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
945                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
946        frontend.run_sql(sql).await.unwrap();
947
948        let session = frontend.session_ref();
949        let catalog_reader = session.env().catalog_reader().read_guard();
950        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
951
952        // Check source exists.
953        let (source, _) = catalog_reader
954            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
955            .unwrap();
956        assert_eq!(source.name, "t1");
957
958        // Check table exists.
959        let (table, schema_name) = catalog_reader
960            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
961            .unwrap();
962        assert_eq!(table.name(), "mv1");
963
964        // Check sink exists.
965        let (sink, _) = catalog_reader
966            .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
967            .unwrap();
968        assert_eq!(sink.name, "snk1");
969    }
970}