1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{ColumnCatalog, ConnectionId, ObjectId, Schema, UserId};
28use risingwave_common::license::Feature;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::types::DataType;
32use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
33use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
34use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
35use risingwave_connector::sink::kafka::KAFKA_SINK;
36use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
37use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
38use risingwave_connector::sink::{
39 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40 Sink, enforce_secret_sink,
41};
42use risingwave_connector::{
43 AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
44 SINK_TARGET_TABLE_NAME, WithPropertiesExt,
45};
46use risingwave_pb::catalog::connection_params::PbConnectionType;
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50 ObjectName, Query,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::UPSTREAM_SOURCE_KEY;
56use super::util::gen_query_from_table_name;
57use crate::binder::{Binder, Relation};
58use crate::catalog::table_catalog::TableType;
59use crate::error::{ErrorCode, Result, RwError};
60use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
61use crate::handler::HandlerArgs;
62use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
63use crate::handler::create_mv::parse_column_names;
64use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
65use crate::optimizer::plan_node::{
66 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
67 StreamProject, generic,
68};
69use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
70use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
71use crate::session::SessionImpl;
72use crate::session::current::notice_to_user;
73use crate::stream_fragmenter::{GraphJobType, build_graph};
74use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
75use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
76
77static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
78 LazyLock::new(|| {
79 hashset! {
80 PbConnectionType::Unspecified,
81 PbConnectionType::Kafka,
82 PbConnectionType::Iceberg,
83 PbConnectionType::Elasticsearch,
84 }
85 });
86
87static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
88 LazyLock::new(|| {
89 hashset! {
90 PbConnectionType::Unspecified,
91 PbConnectionType::SchemaRegistry,
92 }
93 });
94
95pub struct SinkPlanContext {
97 pub query: Box<Query>,
98 pub sink_plan: PlanRef,
99 pub sink_catalog: SinkCatalog,
100 pub target_table_catalog: Option<Arc<TableCatalog>>,
101 pub dependencies: HashSet<ObjectId>,
102}
103
104pub async fn gen_sink_plan(
105 handler_args: HandlerArgs,
106 stmt: CreateSinkStatement,
107 explain_options: Option<ExplainOptions>,
108 is_iceberg_engine_internal: bool,
109) -> Result<SinkPlanContext> {
110 let session = handler_args.session.clone();
111 let session = session.as_ref();
112 let user_specified_columns = !stmt.columns.is_empty();
113 let db_name = &session.database();
114 let (sink_schema_name, sink_table_name) =
115 Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
116
117 let mut with_options = handler_args.with_options.clone();
118
119 if session
120 .env()
121 .system_params_manager()
122 .get_params()
123 .load()
124 .enforce_secret()
125 && Feature::SecretManagement.check_available().is_ok()
126 {
127 enforce_secret_sink(&with_options)?;
128 }
129
130 resolve_privatelink_in_with_option(&mut with_options)?;
131 let (mut resolved_with_options, connection_type, connector_conn_ref) =
132 resolve_connection_ref_and_secret_ref(
133 with_options,
134 session,
135 Some(TelemetryDatabaseObject::Sink),
136 )?;
137 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
138
139 if !matches!(connection_type, PbConnectionType::Unspecified) {
141 let Some(connector) = resolved_with_options.get_connector() else {
142 return Err(RwError::from(ErrorCode::ProtocolError(format!(
143 "missing field '{}' in WITH clause",
144 CONNECTOR_TYPE_KEY
145 ))));
146 };
147 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
148 }
149
150 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
151
152 let context = if let Some(explain_options) = explain_options {
153 OptimizerContext::new(handler_args.clone(), explain_options)
154 } else {
155 OptimizerContext::from_handler_args(handler_args.clone())
156 };
157
158 let is_auto_schema_change = resolved_with_options
159 .remove(AUTO_SCHEMA_CHANGE_KEY)
160 .map(|value| {
161 value.parse::<bool>().map_err(|_| {
162 ErrorCode::InvalidInputSyntax(format!(
163 "invalid value {} of '{}' option, expect",
164 value, AUTO_SCHEMA_CHANGE_KEY
165 ))
166 })
167 })
168 .transpose()?
169 .unwrap_or(false);
170
171 if is_auto_schema_change {
172 Feature::SinkAutoSchemaChange.check_available()?;
173 }
174
175 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
176 if sink_into_table_name.is_some() {
177 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
178
179 if prev.is_some() {
180 return Err(RwError::from(ErrorCode::BindError(
181 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
182 )));
183 }
184 }
185 let connector = resolved_with_options
186 .get(CONNECTOR_TYPE_KEY)
187 .cloned()
188 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
189
190 let sink_from_table_name;
192 let direct_sink_from_name: Option<(ObjectName, bool)>;
195 let query = match stmt.sink_from {
196 CreateSink::From(from_name) => {
197 sink_from_table_name = from_name.0.last().unwrap().real_value();
198 direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
199 if is_auto_schema_change && sink_into_table_name.is_some() {
200 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
201 "auto schema change not supported for sink-into-table".to_owned(),
202 )));
203 }
204 if resolved_with_options
205 .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
206 && connector == RedshiftSink::SINK_NAME
207 || connector == SnowflakeV2Sink::SINK_NAME
208 {
209 if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
210 if resolved_with_options
212 .get(SINK_INTERMEDIATE_TABLE_NAME)
213 .is_none()
214 {
215 let intermediate_table_name = format!(
217 "rw_{}_{}_{}",
218 sink_table_name,
219 table_name,
220 uuid::Uuid::new_v4()
221 );
222 resolved_with_options.insert(
223 SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
224 intermediate_table_name,
225 );
226 }
227 } else {
228 return Err(RwError::from(ErrorCode::BindError(
229 "'table.name' option must be specified.".to_owned(),
230 )));
231 }
232 }
233 Box::new(gen_query_from_table_name(from_name))
234 }
235 CreateSink::AsQuery(query) => {
236 if is_auto_schema_change {
237 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
238 "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
239 )));
240 }
241 sink_from_table_name = sink_table_name.clone();
242 direct_sink_from_name = None;
243 query
244 }
245 };
246
247 let (sink_database_id, sink_schema_id) =
248 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
249
250 let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
251 let mut binder = Binder::new_for_stream(session);
252 let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
253 {
254 let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
255 if let Relation::BaseTable(table) = from_relation {
256 if table.table_catalog.table_type != TableType::Table {
257 return Err(ErrorCode::InvalidInputSyntax(format!(
258 "auto schema change only support on TABLE, but got {:?}",
259 table.table_catalog.table_type
260 ))
261 .into());
262 }
263 if table.table_catalog.database_id != sink_database_id {
264 return Err(ErrorCode::InvalidInputSyntax(
265 "auto schema change sink does not support created from cross database table".to_owned()
266 )
267 .into());
268 }
269 for col in &table.table_catalog.columns {
270 if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
271 return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
272 }
273 }
274 Some(table.table_catalog)
275 } else {
276 return Err(RwError::from(ErrorCode::NotSupported(
277 "auto schema change only supported for TABLE".to_owned(),
278 "try recreating the sink from table".to_owned(),
279 )));
280 }
281 } else {
282 None
283 };
284
285 let bound = binder.bind_query(&query)?;
286
287 (
288 binder.included_relations().clone(),
289 binder.included_udfs().clone(),
290 bound,
291 auto_refresh_schema_from_table,
292 )
293 };
294
295 let col_names = if sink_into_table_name.is_some() {
296 parse_column_names(&stmt.columns)
297 } else {
298 get_column_names(&bound, stmt.columns)?
300 };
301
302 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
303 if emit_on_window_close {
304 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
305 }
306
307 let format_desc = match stmt.sink_schema {
308 Some(f) => {
310 validate_compatibility(&connector, &f)?;
311 Some(bind_sink_format_desc(session,f)?)
312 }
313 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
314 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
316 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
317 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
318 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
319 }
320 f
321 }),
322 None => None,
324 },
325 };
326
327 let definition = context.normalized_sql().to_owned();
328 let mut plan_root = if is_iceberg_engine_internal {
329 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
330 } else {
331 Planner::new_for_stream(context.into()).plan_query(bound)?
332 };
333 if let Some(col_names) = &col_names {
334 plan_root.set_out_names(col_names.clone())?;
335 };
336
337 let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
338 Some(flag) if flag.eq_ignore_ascii_case("false") => {
339 if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
340 true
341 } else {
342 return Err(ErrorCode::BindError(
343 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
344 )
345 .into());
346 }
347 }
348 _ => false,
349 };
350
351 let target_table_catalog = stmt
352 .into_table_name
353 .as_ref()
354 .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
355 .transpose()?;
356
357 if let Some(target_table_catalog) = &target_table_catalog {
358 if let Some(col_names) = col_names {
359 let target_table_columns = target_table_catalog
360 .columns()
361 .iter()
362 .map(|c| c.name())
363 .collect::<BTreeSet<_>>();
364 for c in col_names {
365 if !target_table_columns.contains(c.as_str()) {
366 return Err(RwError::from(ErrorCode::BindError(format!(
367 "Column {} not found in table {}",
368 c,
369 target_table_catalog.name()
370 ))));
371 }
372 }
373 }
374 if target_table_catalog
375 .columns()
376 .iter()
377 .any(|col| !col.nullable())
378 {
379 notice_to_user(format!(
380 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
381 target_table_catalog.name(),
382 ));
383 }
384 }
385
386 let sink_plan = plan_root.gen_sink_plan(
387 sink_table_name,
388 definition,
389 resolved_with_options,
390 emit_on_window_close,
391 db_name.to_owned(),
392 sink_from_table_name,
393 format_desc,
394 without_backfill,
395 target_table_catalog.clone(),
396 partition_info,
397 user_specified_columns,
398 auto_refresh_schema_from_table,
399 )?;
400
401 let sink_desc = sink_plan.sink_desc().clone();
402
403 let mut sink_plan: PlanRef = sink_plan.into();
404
405 let ctx = sink_plan.ctx();
406 let explain_trace = ctx.is_explain_trace();
407 if explain_trace {
408 ctx.trace("Create Sink:");
409 ctx.trace(sink_plan.explain_to_string());
410 }
411 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
412
413 let dependencies =
416 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
417 .into_iter()
418 .map(|id| id.as_raw_id() as ObjectId)
419 .chain(
420 dependent_udfs
421 .into_iter()
422 .map(|id| id.function_id() as ObjectId),
423 )
424 .collect();
425
426 let sink_catalog = sink_desc.into_catalog(
427 sink_schema_id,
428 sink_database_id,
429 UserId::new(session.user_id()),
430 connector_conn_ref.map(ConnectionId::from),
431 );
432
433 if let Some(table_catalog) = &target_table_catalog {
434 for column in sink_catalog.full_columns() {
435 if !column.can_dml() {
436 unreachable!(
437 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
438 );
439 }
440 }
441
442 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
443 let exprs = derive_default_column_project_for_sink(
444 &sink_catalog,
445 sink_plan.schema(),
446 &table_columns_without_rw_timestamp,
447 user_specified_columns,
448 )?;
449
450 let logical_project = generic::Project::new(exprs, sink_plan);
451
452 sink_plan = StreamProject::new(logical_project).into();
453
454 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
455 &table_columns_without_rw_timestamp,
456 )?;
457
458 if let Some(exprs) = exprs {
459 let logical_project = generic::Project::new(exprs, sink_plan);
460 sink_plan = StreamProject::new(logical_project).into();
461 }
462 };
463
464 Ok(SinkPlanContext {
465 query,
466 sink_plan,
467 sink_catalog,
468 target_table_catalog,
469 dependencies,
470 })
471}
472
473pub async fn get_partition_compute_info(
478 with_options: &WithOptionsSecResolved,
479) -> Result<Option<PartitionComputeInfo>> {
480 let (options, secret_refs) = with_options.clone().into_parts();
481 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
482 return Ok(None);
483 };
484 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
485 match connector.as_str() {
486 ICEBERG_SINK => {
487 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
488 get_partition_compute_info_for_iceberg(&iceberg_config).await
489 }
490 _ => Ok(None),
491 }
492}
493
494#[allow(clippy::unused_async)]
495async fn get_partition_compute_info_for_iceberg(
496 _iceberg_config: &IcebergConfig,
497) -> Result<Option<PartitionComputeInfo>> {
498 if _iceberg_config.create_table_if_not_exists {
500 return Ok(None);
501 }
502 let table = _iceberg_config.load_table().await?;
503 let partition_spec = table.metadata().default_partition_spec();
504 if partition_spec.is_unpartitioned() {
505 return Ok(None);
506 }
507
508 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
513 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
515 Transform::Year
517 | Transform::Month
518 | Transform::Day
519 | Transform::Hour
520 | Transform::Void
521 | Transform::Unknown => false,
522 });
523 if !has_sparse_partition {
524 return Ok(None);
525 }
526
527 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
528 table.metadata().default_partition_type().clone(),
529 ))
530 .map_err(|_| {
531 RwError::from(ErrorCode::SinkError(
532 "Fail to convert iceberg partition type to arrow type".into(),
533 ))
534 })?;
535 let ArrowDataType::Struct(struct_fields) = arrow_type else {
536 return Err(RwError::from(ErrorCode::SinkError(
537 "Partition type of iceberg should be a struct type".into(),
538 )));
539 };
540
541 let schema = table.metadata().current_schema();
542 let partition_fields = partition_spec
543 .fields()
544 .iter()
545 .map(|f| {
546 let source_f =
547 schema
548 .field_by_id(f.source_id)
549 .ok_or(RwError::from(ErrorCode::SinkError(
550 "Fail to look up iceberg partition field".into(),
551 )))?;
552 Ok((source_f.name.clone(), f.transform))
553 })
554 .collect::<Result<Vec<_>>>()?;
555
556 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
557 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
558 partition_fields,
559 })))
560}
561
562pub async fn handle_create_sink(
563 handle_args: HandlerArgs,
564 stmt: CreateSinkStatement,
565 is_iceberg_engine_internal: bool,
566) -> Result<RwPgResponse> {
567 let session = handle_args.session.clone();
568
569 session.check_cluster_limits().await?;
570
571 let if_not_exists = stmt.if_not_exists;
572 if let Either::Right(resp) = session.check_relation_name_duplicated(
573 stmt.sink_name.clone(),
574 StatementType::CREATE_SINK,
575 if_not_exists,
576 )? {
577 return Ok(resp);
578 }
579
580 let (mut sink, graph, target_table_catalog, dependencies) = {
581 let SinkPlanContext {
582 query,
583 sink_plan: plan,
584 sink_catalog: sink,
585 target_table_catalog,
586 dependencies,
587 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
588
589 let has_order_by = !query.order_by.is_empty();
590 if has_order_by {
591 plan.ctx().warn_to_user(
592 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
593 .to_owned(),
594 );
595 }
596
597 let graph = build_graph(plan, Some(GraphJobType::Sink))?;
598
599 (sink, graph, target_table_catalog, dependencies)
600 };
601
602 if let Some(table_catalog) = target_table_catalog {
603 sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
604 }
605
606 let _job_guard =
607 session
608 .env()
609 .creating_streaming_job_tracker()
610 .guard(CreatingStreamingJobInfo::new(
611 session.session_id(),
612 sink.database_id,
613 sink.schema_id,
614 sink.name.clone(),
615 ));
616
617 let catalog_writer = session.catalog_writer()?;
618 catalog_writer
619 .create_sink(sink.to_proto(), graph, dependencies, if_not_exists)
620 .await?;
621
622 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
623}
624
625pub fn fetch_incoming_sinks(
626 session: &Arc<SessionImpl>,
627 table: &TableCatalog,
628) -> Result<Vec<Arc<SinkCatalog>>> {
629 let reader = session.env().catalog_reader().read_guard();
630 let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
631 let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
632 return Ok(vec![]);
633 };
634 let mut sinks = vec![];
635 for sink_id in incoming_sinks {
636 sinks.push(
637 schema
638 .get_sink_by_id(sink_id)
639 .expect("should exist")
640 .clone(),
641 );
642 }
643 Ok(sinks)
644}
645
646fn derive_sink_to_table_expr(
647 sink_schema: &Schema,
648 idx: usize,
649 target_type: &DataType,
650) -> Result<ExprImpl> {
651 let input_type = &sink_schema.fields()[idx].data_type;
652
653 if target_type != input_type {
654 bail!(
655 "column type mismatch: {:?} vs {:?}, column name: {:?}",
656 target_type,
657 input_type,
658 sink_schema.fields()[idx].name
659 );
660 } else {
661 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
662 idx,
663 input_type.clone(),
664 ))))
665 }
666}
667
668pub(crate) fn derive_default_column_project_for_sink(
669 sink: &SinkCatalog,
670 sink_schema: &Schema,
671 columns: &[ColumnCatalog],
672 user_specified_columns: bool,
673) -> Result<Vec<ExprImpl>> {
674 assert_eq!(sink.full_schema().len(), sink_schema.len());
675
676 let default_column_exprs = TableCatalog::default_column_exprs(columns);
677
678 let mut exprs = vec![];
679
680 let sink_visible_col_idxes = sink
681 .full_columns()
682 .iter()
683 .positions(|c| !c.is_hidden())
684 .collect_vec();
685 let sink_visible_col_idxes_by_name = sink
686 .full_columns()
687 .iter()
688 .enumerate()
689 .filter(|(_, c)| !c.is_hidden())
690 .map(|(i, c)| (c.name(), i))
691 .collect::<BTreeMap<_, _>>();
692
693 for (idx, column) in columns.iter().enumerate() {
694 if !column.can_dml() {
695 continue;
696 }
697
698 let default_col_expr =
699 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
700
701 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
702 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
703 };
704
705 #[allow(clippy::collapsible_else_if)]
709 if user_specified_columns {
710 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
711 exprs.push(sink_col_expr(*idx)?);
712 } else {
713 exprs.push(default_col_expr());
714 }
715 } else {
716 if idx < sink_visible_col_idxes.len() {
717 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
718 } else {
719 exprs.push(default_col_expr());
720 };
721 }
722 }
723 Ok(exprs)
724}
725
726fn bind_sink_format_desc(
730 session: &SessionImpl,
731 value: FormatEncodeOptions,
732) -> Result<SinkFormatDesc> {
733 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
734 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
735 use risingwave_sqlparser::ast::{Encode as E, Format as F};
736
737 let format = match value.format {
738 F::Plain => SinkFormat::AppendOnly,
739 F::Upsert => SinkFormat::Upsert,
740 F::Debezium => SinkFormat::Debezium,
741 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
742 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
743 }
744 };
745 let encode = match value.row_encode {
746 E::Json => SinkEncode::Json,
747 E::Protobuf => SinkEncode::Protobuf,
748 E::Avro => SinkEncode::Avro,
749 E::Template => SinkEncode::Template,
750 E::Parquet => SinkEncode::Parquet,
751 e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
752 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
753 }
754 };
755
756 let mut key_encode = None;
757 if let Some(encode) = value.key_encode {
758 match encode {
759 E::Text => key_encode = Some(SinkEncode::Text),
760 E::Bytes => key_encode = Some(SinkEncode::Bytes),
761 _ => {
762 return Err(ErrorCode::BindError(format!(
763 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
764 ))
765 .into());
766 }
767 }
768 }
769
770 let (props, connection_type_flag, schema_registry_conn_ref) =
771 resolve_connection_ref_and_secret_ref(
772 WithOptions::try_from(value.row_options.as_slice())?,
773 session,
774 Some(TelemetryDatabaseObject::Sink),
775 )?;
776 ensure_connection_type_allowed(
777 connection_type_flag,
778 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
779 )?;
780 let (mut options, secret_refs) = props.into_parts();
781
782 options
783 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
784 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
785
786 Ok(SinkFormatDesc {
787 format,
788 encode,
789 options,
790 secret_refs,
791 key_encode,
792 connection_id: schema_registry_conn_ref,
793 })
794}
795
796static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
797 LazyLock::new(|| {
798 use risingwave_connector::sink::Sink as _;
799 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
800 use risingwave_connector::sink::file_sink::fs::FsSink;
801 use risingwave_connector::sink::file_sink::gcs::GcsSink;
802 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
803 use risingwave_connector::sink::file_sink::s3::S3Sink;
804 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
805 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
806 use risingwave_connector::sink::kafka::KafkaSink;
807 use risingwave_connector::sink::kinesis::KinesisSink;
808 use risingwave_connector::sink::mqtt::MqttSink;
809 use risingwave_connector::sink::pulsar::PulsarSink;
810 use risingwave_connector::sink::redis::RedisSink;
811
812 convert_args!(hashmap!(
813 GooglePubSubSink::SINK_NAME => hashmap!(
814 Format::Plain => vec![Encode::Json],
815 ),
816 KafkaSink::SINK_NAME => hashmap!(
817 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
818 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
819 Format::Debezium => vec![Encode::Json],
820 ),
821 FileSink::<S3Sink>::SINK_NAME => hashmap!(
822 Format::Plain => vec![Encode::Parquet, Encode::Json],
823 ),
824 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
825 Format::Plain => vec![Encode::Parquet, Encode::Json],
826 ),
827 FileSink::<GcsSink>::SINK_NAME => hashmap!(
828 Format::Plain => vec![Encode::Parquet, Encode::Json],
829 ),
830 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
831 Format::Plain => vec![Encode::Parquet, Encode::Json],
832 ),
833 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
834 Format::Plain => vec![Encode::Parquet, Encode::Json],
835 ),
836 FileSink::<FsSink>::SINK_NAME => hashmap!(
837 Format::Plain => vec![Encode::Parquet, Encode::Json],
838 ),
839 KinesisSink::SINK_NAME => hashmap!(
840 Format::Plain => vec![Encode::Json],
841 Format::Upsert => vec![Encode::Json],
842 Format::Debezium => vec![Encode::Json],
843 ),
844 MqttSink::SINK_NAME => hashmap!(
845 Format::Plain => vec![Encode::Json, Encode::Protobuf],
846 ),
847 PulsarSink::SINK_NAME => hashmap!(
848 Format::Plain => vec![Encode::Json],
849 Format::Upsert => vec![Encode::Json],
850 Format::Debezium => vec![Encode::Json],
851 ),
852 RedisSink::SINK_NAME => hashmap!(
853 Format::Plain => vec![Encode::Json, Encode::Template],
854 Format::Upsert => vec![Encode::Json, Encode::Template],
855 ),
856 ))
857 });
858
859pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
860 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
861 .get(connector)
862 .ok_or_else(|| {
863 ErrorCode::BindError(format!(
864 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
865 connector
866 ))
867 })?;
868 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
869 ErrorCode::BindError(format!(
870 "connector {} does not support format {:?}",
871 connector, format_desc.format
872 ))
873 })?;
874 if !compatible_encodes.contains(&format_desc.row_encode) {
875 return Err(ErrorCode::BindError(format!(
876 "connector {} does not support format {:?} with encode {:?}",
877 connector, format_desc.format, format_desc.row_encode
878 ))
879 .into());
880 }
881
882 if let Some(encode) = &format_desc.key_encode
884 && connector != KAFKA_SINK
885 && matches!(encode, Encode::Bytes)
886 {
887 return Err(ErrorCode::BindError(format!(
888 "key encode bytes only works with kafka connector, but found {}",
889 connector
890 ))
891 .into());
892 }
893
894 Ok(())
895}
896
897#[cfg(test)]
898pub mod tests {
899 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
900
901 use crate::catalog::root_catalog::SchemaPath;
902 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
903
904 #[tokio::test]
905 async fn test_create_sink_handler() {
906 let proto_file = create_proto_file(PROTO_FILE_DATA);
907 let sql = format!(
908 r#"CREATE SOURCE t1
909 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
910 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
911 proto_file.path().to_str().unwrap()
912 );
913 let frontend = LocalFrontend::new(Default::default()).await;
914 frontend.run_sql(sql).await.unwrap();
915
916 let sql = "create materialized view mv1 as select t1.country from t1;";
917 frontend.run_sql(sql).await.unwrap();
918
919 let sql = r#"CREATE SINK snk1 FROM mv1
920 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
921 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
922 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
923 frontend.run_sql(sql).await.unwrap();
924
925 let session = frontend.session_ref();
926 let catalog_reader = session.env().catalog_reader().read_guard();
927 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
928
929 let (source, _) = catalog_reader
931 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
932 .unwrap();
933 assert_eq!(source.name, "t1");
934
935 let (table, schema_name) = catalog_reader
937 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
938 .unwrap();
939 assert_eq!(table.name(), "mv1");
940
941 let (sink, _) = catalog_reader
943 .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
944 .unwrap();
945 assert_eq!(sink.name, "snk1");
946 }
947}