1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28 ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
35use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
36use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
37use risingwave_connector::sink::kafka::KAFKA_SINK;
38use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
39use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
40use risingwave_connector::sink::{
41 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
42 Sink, enforce_secret_sink,
43};
44use risingwave_connector::{
45 AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
46 SINK_TARGET_TABLE_NAME, WithPropertiesExt,
47};
48use risingwave_pb::catalog::connection_params::PbConnectionType;
49use risingwave_pb::telemetry::TelemetryDatabaseObject;
50use risingwave_sqlparser::ast::{
51 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
52 ObjectName, Query,
53};
54
55use super::RwPgResponse;
56use super::create_mv::get_column_names;
57use super::create_source::UPSTREAM_SOURCE_KEY;
58use super::util::gen_query_from_table_name;
59use crate::binder::{Binder, Relation};
60use crate::catalog::table_catalog::TableType;
61use crate::error::{ErrorCode, Result, RwError};
62use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
63use crate::handler::HandlerArgs;
64use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
65use crate::handler::create_mv::parse_column_names;
66use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
67use crate::optimizer::plan_node::{
68 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
69 StreamProject, generic,
70};
71use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
72use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
73use crate::session::SessionImpl;
74use crate::session::current::notice_to_user;
75use crate::stream_fragmenter::{GraphJobType, build_graph};
76use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
77use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
78
79static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
80 LazyLock::new(|| {
81 hashset! {
82 PbConnectionType::Unspecified,
83 PbConnectionType::Kafka,
84 PbConnectionType::Iceberg,
85 PbConnectionType::Elasticsearch,
86 }
87 });
88
89static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
90 LazyLock::new(|| {
91 hashset! {
92 PbConnectionType::Unspecified,
93 PbConnectionType::SchemaRegistry,
94 }
95 });
96
97pub struct SinkPlanContext {
99 pub query: Box<Query>,
100 pub sink_plan: PlanRef,
101 pub sink_catalog: SinkCatalog,
102 pub target_table_catalog: Option<Arc<TableCatalog>>,
103 pub dependencies: HashSet<ObjectId>,
104}
105
106pub async fn gen_sink_plan(
107 handler_args: HandlerArgs,
108 stmt: CreateSinkStatement,
109 explain_options: Option<ExplainOptions>,
110 is_iceberg_engine_internal: bool,
111) -> Result<SinkPlanContext> {
112 let session = handler_args.session.clone();
113 let session = session.as_ref();
114 let user_specified_columns = !stmt.columns.is_empty();
115 let db_name = &session.database();
116 let (sink_schema_name, sink_table_name) =
117 Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
118
119 let mut with_options = handler_args.with_options.clone();
120
121 if session
122 .env()
123 .system_params_manager()
124 .get_params()
125 .load()
126 .enforce_secret()
127 && Feature::SecretManagement.check_available().is_ok()
128 {
129 enforce_secret_sink(&with_options)?;
130 }
131
132 resolve_privatelink_in_with_option(&mut with_options)?;
133 let (mut resolved_with_options, connection_type, connector_conn_ref) =
134 resolve_connection_ref_and_secret_ref(
135 with_options,
136 session,
137 Some(TelemetryDatabaseObject::Sink),
138 )?;
139 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
140
141 if !matches!(connection_type, PbConnectionType::Unspecified) {
143 let Some(connector) = resolved_with_options.get_connector() else {
144 return Err(RwError::from(ErrorCode::ProtocolError(format!(
145 "missing field '{}' in WITH clause",
146 CONNECTOR_TYPE_KEY
147 ))));
148 };
149 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
150 }
151
152 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
153
154 let context = if let Some(explain_options) = explain_options {
155 OptimizerContext::new(handler_args.clone(), explain_options)
156 } else {
157 OptimizerContext::from_handler_args(handler_args.clone())
158 };
159
160 let is_auto_schema_change = resolved_with_options
161 .remove(AUTO_SCHEMA_CHANGE_KEY)
162 .map(|value| {
163 value.parse::<bool>().map_err(|_| {
164 ErrorCode::InvalidInputSyntax(format!(
165 "invalid value {} of '{}' option, expect",
166 value, AUTO_SCHEMA_CHANGE_KEY
167 ))
168 })
169 })
170 .transpose()?
171 .unwrap_or(false);
172
173 if is_auto_schema_change {
174 Feature::SinkAutoSchemaChange.check_available()?;
175 }
176
177 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
178 if sink_into_table_name.is_some() {
179 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
180
181 if prev.is_some() {
182 return Err(RwError::from(ErrorCode::BindError(
183 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
184 )));
185 }
186 }
187 let connector = resolved_with_options
188 .get(CONNECTOR_TYPE_KEY)
189 .cloned()
190 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
191
192 let sink_from_table_name;
194 let direct_sink_from_name: Option<(ObjectName, bool)>;
197 let query = match stmt.sink_from {
198 CreateSink::From(from_name) => {
199 sink_from_table_name = from_name.0.last().unwrap().real_value();
200 direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
201 if is_auto_schema_change && sink_into_table_name.is_some() {
202 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
203 "auto schema change not supported for sink-into-table".to_owned(),
204 )));
205 }
206 if resolved_with_options
207 .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
208 && connector == RedshiftSink::SINK_NAME
209 || connector == SnowflakeV2Sink::SINK_NAME
210 {
211 if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
212 if resolved_with_options
214 .get(SINK_INTERMEDIATE_TABLE_NAME)
215 .is_none()
216 {
217 let intermediate_table_name = format!(
219 "rw_{}_{}_{}",
220 sink_table_name,
221 table_name,
222 uuid::Uuid::new_v4()
223 );
224 resolved_with_options.insert(
225 SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
226 intermediate_table_name,
227 );
228 }
229 } else {
230 return Err(RwError::from(ErrorCode::BindError(
231 "'table.name' option must be specified.".to_owned(),
232 )));
233 }
234 }
235 Box::new(gen_query_from_table_name(from_name))
236 }
237 CreateSink::AsQuery(query) => {
238 if is_auto_schema_change {
239 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
240 "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
241 )));
242 }
243 sink_from_table_name = sink_table_name.clone();
244 direct_sink_from_name = None;
245 query
246 }
247 };
248
249 let (sink_database_id, sink_schema_id) =
250 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
251
252 let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
253 let mut binder = Binder::new_for_stream(session);
254 let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
255 {
256 let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
257 if let Relation::BaseTable(table) = from_relation {
258 if table.table_catalog.table_type != TableType::Table {
259 return Err(ErrorCode::InvalidInputSyntax(format!(
260 "auto schema change only support on TABLE, but got {:?}",
261 table.table_catalog.table_type
262 ))
263 .into());
264 }
265 if table.table_catalog.database_id != sink_database_id {
266 return Err(ErrorCode::InvalidInputSyntax(
267 "auto schema change sink does not support created from cross database table".to_owned()
268 )
269 .into());
270 }
271 for col in &table.table_catalog.columns {
272 if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
273 return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
274 }
275 }
276 Some(table.table_catalog)
277 } else {
278 return Err(RwError::from(ErrorCode::NotSupported(
279 "auto schema change only supported for TABLE".to_owned(),
280 "try recreating the sink from table".to_owned(),
281 )));
282 }
283 } else {
284 None
285 };
286
287 let bound = binder.bind_query(&query)?;
288
289 (
290 binder.included_relations().clone(),
291 binder.included_udfs().clone(),
292 bound,
293 auto_refresh_schema_from_table,
294 )
295 };
296
297 let col_names = if sink_into_table_name.is_some() {
298 parse_column_names(&stmt.columns)
299 } else {
300 get_column_names(&bound, stmt.columns)?
302 };
303
304 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
305 if emit_on_window_close {
306 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
307 }
308
309 let format_desc = match stmt.sink_schema {
310 Some(f) => {
312 validate_compatibility(&connector, &f)?;
313 Some(bind_sink_format_desc(session,f)?)
314 }
315 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
316 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
318 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
319 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
320 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
321 }
322 f
323 }),
324 None => None,
326 },
327 };
328
329 let definition = context.normalized_sql().to_owned();
330 let mut plan_root = if is_iceberg_engine_internal {
331 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
332 } else {
333 Planner::new_for_stream(context.into()).plan_query(bound)?
334 };
335 if let Some(col_names) = &col_names {
336 plan_root.set_out_names(col_names.clone())?;
337 };
338
339 let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
340 Some(flag) if flag.eq_ignore_ascii_case("false") => {
341 if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
342 true
343 } else {
344 return Err(ErrorCode::BindError(
345 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
346 )
347 .into());
348 }
349 }
350 _ => false,
351 };
352
353 let target_table_catalog = stmt
354 .into_table_name
355 .as_ref()
356 .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
357 .transpose()?;
358
359 if let Some(target_table_catalog) = &target_table_catalog {
360 if let Some(col_names) = col_names {
361 let target_table_columns = target_table_catalog
362 .columns()
363 .iter()
364 .map(|c| c.name())
365 .collect::<BTreeSet<_>>();
366 for c in col_names {
367 if !target_table_columns.contains(c.as_str()) {
368 return Err(RwError::from(ErrorCode::BindError(format!(
369 "Column {} not found in table {}",
370 c,
371 target_table_catalog.name()
372 ))));
373 }
374 }
375 }
376 if target_table_catalog
377 .columns()
378 .iter()
379 .any(|col| !col.nullable())
380 {
381 notice_to_user(format!(
382 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
383 target_table_catalog.name(),
384 ));
385 }
386 }
387
388 let sink_plan = plan_root.gen_sink_plan(
389 sink_table_name,
390 definition,
391 resolved_with_options,
392 emit_on_window_close,
393 db_name.to_owned(),
394 sink_from_table_name,
395 format_desc,
396 without_backfill,
397 target_table_catalog.clone(),
398 partition_info,
399 user_specified_columns,
400 auto_refresh_schema_from_table,
401 )?;
402
403 let sink_desc = sink_plan.sink_desc().clone();
404
405 let mut sink_plan: PlanRef = sink_plan.into();
406
407 let ctx = sink_plan.ctx();
408 let explain_trace = ctx.is_explain_trace();
409 if explain_trace {
410 ctx.trace("Create Sink:");
411 ctx.trace(sink_plan.explain_to_string());
412 }
413 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
414
415 let dependencies =
418 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
419 .into_iter()
420 .map(|id| id.table_id() as ObjectId)
421 .chain(
422 dependent_udfs
423 .into_iter()
424 .map(|id| id.function_id() as ObjectId),
425 )
426 .collect();
427
428 let sink_catalog = sink_desc.into_catalog(
429 SchemaId::new(sink_schema_id),
430 DatabaseId::new(sink_database_id),
431 UserId::new(session.user_id()),
432 connector_conn_ref.map(ConnectionId::from),
433 );
434
435 if let Some(table_catalog) = &target_table_catalog {
436 for column in sink_catalog.full_columns() {
437 if !column.can_dml() {
438 unreachable!(
439 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
440 );
441 }
442 }
443
444 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
445 let exprs = derive_default_column_project_for_sink(
446 &sink_catalog,
447 sink_plan.schema(),
448 &table_columns_without_rw_timestamp,
449 user_specified_columns,
450 )?;
451
452 let logical_project = generic::Project::new(exprs, sink_plan);
453
454 sink_plan = StreamProject::new(logical_project).into();
455
456 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
457 &table_columns_without_rw_timestamp,
458 )?;
459
460 if let Some(exprs) = exprs {
461 let logical_project = generic::Project::new(exprs, sink_plan);
462 sink_plan = StreamProject::new(logical_project).into();
463 }
464 };
465
466 Ok(SinkPlanContext {
467 query,
468 sink_plan,
469 sink_catalog,
470 target_table_catalog,
471 dependencies,
472 })
473}
474
475pub async fn get_partition_compute_info(
480 with_options: &WithOptionsSecResolved,
481) -> Result<Option<PartitionComputeInfo>> {
482 let (options, secret_refs) = with_options.clone().into_parts();
483 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
484 return Ok(None);
485 };
486 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
487 match connector.as_str() {
488 ICEBERG_SINK => {
489 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
490 get_partition_compute_info_for_iceberg(&iceberg_config).await
491 }
492 _ => Ok(None),
493 }
494}
495
496#[allow(clippy::unused_async)]
497async fn get_partition_compute_info_for_iceberg(
498 _iceberg_config: &IcebergConfig,
499) -> Result<Option<PartitionComputeInfo>> {
500 if _iceberg_config.create_table_if_not_exists {
502 return Ok(None);
503 }
504 let table = _iceberg_config.load_table().await?;
505 let partition_spec = table.metadata().default_partition_spec();
506 if partition_spec.is_unpartitioned() {
507 return Ok(None);
508 }
509
510 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
515 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
517 Transform::Year
519 | Transform::Month
520 | Transform::Day
521 | Transform::Hour
522 | Transform::Void
523 | Transform::Unknown => false,
524 });
525 if !has_sparse_partition {
526 return Ok(None);
527 }
528
529 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
530 table.metadata().default_partition_type().clone(),
531 ))
532 .map_err(|_| {
533 RwError::from(ErrorCode::SinkError(
534 "Fail to convert iceberg partition type to arrow type".into(),
535 ))
536 })?;
537 let ArrowDataType::Struct(struct_fields) = arrow_type else {
538 return Err(RwError::from(ErrorCode::SinkError(
539 "Partition type of iceberg should be a struct type".into(),
540 )));
541 };
542
543 let schema = table.metadata().current_schema();
544 let partition_fields = partition_spec
545 .fields()
546 .iter()
547 .map(|f| {
548 let source_f =
549 schema
550 .field_by_id(f.source_id)
551 .ok_or(RwError::from(ErrorCode::SinkError(
552 "Fail to look up iceberg partition field".into(),
553 )))?;
554 Ok((source_f.name.clone(), f.transform))
555 })
556 .collect::<Result<Vec<_>>>()?;
557
558 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
559 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
560 partition_fields,
561 })))
562}
563
564pub async fn handle_create_sink(
565 handle_args: HandlerArgs,
566 stmt: CreateSinkStatement,
567 is_iceberg_engine_internal: bool,
568) -> Result<RwPgResponse> {
569 let session = handle_args.session.clone();
570
571 session.check_cluster_limits().await?;
572
573 let if_not_exists = stmt.if_not_exists;
574 if let Either::Right(resp) = session.check_relation_name_duplicated(
575 stmt.sink_name.clone(),
576 StatementType::CREATE_SINK,
577 if_not_exists,
578 )? {
579 return Ok(resp);
580 }
581
582 let (mut sink, graph, target_table_catalog, dependencies) = {
583 let SinkPlanContext {
584 query,
585 sink_plan: plan,
586 sink_catalog: sink,
587 target_table_catalog,
588 dependencies,
589 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
590
591 let has_order_by = !query.order_by.is_empty();
592 if has_order_by {
593 plan.ctx().warn_to_user(
594 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
595 .to_owned(),
596 );
597 }
598
599 let graph = build_graph(plan, Some(GraphJobType::Sink))?;
600
601 (sink, graph, target_table_catalog, dependencies)
602 };
603
604 if let Some(table_catalog) = target_table_catalog {
605 sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
606 }
607
608 let _job_guard =
609 session
610 .env()
611 .creating_streaming_job_tracker()
612 .guard(CreatingStreamingJobInfo::new(
613 session.session_id(),
614 sink.database_id.database_id,
615 sink.schema_id.schema_id,
616 sink.name.clone(),
617 ));
618
619 let catalog_writer = session.catalog_writer()?;
620 catalog_writer
621 .create_sink(sink.to_proto(), graph, dependencies, if_not_exists)
622 .await?;
623
624 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
625}
626
627pub fn fetch_incoming_sinks(
628 session: &Arc<SessionImpl>,
629 table: &TableCatalog,
630) -> Result<Vec<Arc<SinkCatalog>>> {
631 let reader = session.env().catalog_reader().read_guard();
632 let schema = reader.get_schema_by_id(&table.database_id, &table.schema_id)?;
633 let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
634 return Ok(vec![]);
635 };
636 let mut sinks = vec![];
637 for sink_id in incoming_sinks {
638 sinks.push(
639 schema
640 .get_sink_by_id(sink_id)
641 .expect("should exist")
642 .clone(),
643 );
644 }
645 Ok(sinks)
646}
647
648fn derive_sink_to_table_expr(
649 sink_schema: &Schema,
650 idx: usize,
651 target_type: &DataType,
652) -> Result<ExprImpl> {
653 let input_type = &sink_schema.fields()[idx].data_type;
654
655 if target_type != input_type {
656 bail!(
657 "column type mismatch: {:?} vs {:?}, column name: {:?}",
658 target_type,
659 input_type,
660 sink_schema.fields()[idx].name
661 );
662 } else {
663 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
664 idx,
665 input_type.clone(),
666 ))))
667 }
668}
669
670pub(crate) fn derive_default_column_project_for_sink(
671 sink: &SinkCatalog,
672 sink_schema: &Schema,
673 columns: &[ColumnCatalog],
674 user_specified_columns: bool,
675) -> Result<Vec<ExprImpl>> {
676 assert_eq!(sink.full_schema().len(), sink_schema.len());
677
678 let default_column_exprs = TableCatalog::default_column_exprs(columns);
679
680 let mut exprs = vec![];
681
682 let sink_visible_col_idxes = sink
683 .full_columns()
684 .iter()
685 .positions(|c| !c.is_hidden())
686 .collect_vec();
687 let sink_visible_col_idxes_by_name = sink
688 .full_columns()
689 .iter()
690 .enumerate()
691 .filter(|(_, c)| !c.is_hidden())
692 .map(|(i, c)| (c.name(), i))
693 .collect::<BTreeMap<_, _>>();
694
695 for (idx, column) in columns.iter().enumerate() {
696 if !column.can_dml() {
697 continue;
698 }
699
700 let default_col_expr =
701 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
702
703 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
704 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
705 };
706
707 #[allow(clippy::collapsible_else_if)]
711 if user_specified_columns {
712 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
713 exprs.push(sink_col_expr(*idx)?);
714 } else {
715 exprs.push(default_col_expr());
716 }
717 } else {
718 if idx < sink_visible_col_idxes.len() {
719 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
720 } else {
721 exprs.push(default_col_expr());
722 };
723 }
724 }
725 Ok(exprs)
726}
727
728fn bind_sink_format_desc(
732 session: &SessionImpl,
733 value: FormatEncodeOptions,
734) -> Result<SinkFormatDesc> {
735 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
736 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
737 use risingwave_sqlparser::ast::{Encode as E, Format as F};
738
739 let format = match value.format {
740 F::Plain => SinkFormat::AppendOnly,
741 F::Upsert => SinkFormat::Upsert,
742 F::Debezium => SinkFormat::Debezium,
743 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
744 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
745 }
746 };
747 let encode = match value.row_encode {
748 E::Json => SinkEncode::Json,
749 E::Protobuf => SinkEncode::Protobuf,
750 E::Avro => SinkEncode::Avro,
751 E::Template => SinkEncode::Template,
752 E::Parquet => SinkEncode::Parquet,
753 e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
754 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
755 }
756 };
757
758 let mut key_encode = None;
759 if let Some(encode) = value.key_encode {
760 match encode {
761 E::Text => key_encode = Some(SinkEncode::Text),
762 E::Bytes => key_encode = Some(SinkEncode::Bytes),
763 _ => {
764 return Err(ErrorCode::BindError(format!(
765 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
766 ))
767 .into());
768 }
769 }
770 }
771
772 let (props, connection_type_flag, schema_registry_conn_ref) =
773 resolve_connection_ref_and_secret_ref(
774 WithOptions::try_from(value.row_options.as_slice())?,
775 session,
776 Some(TelemetryDatabaseObject::Sink),
777 )?;
778 ensure_connection_type_allowed(
779 connection_type_flag,
780 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
781 )?;
782 let (mut options, secret_refs) = props.into_parts();
783
784 options
785 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
786 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
787
788 Ok(SinkFormatDesc {
789 format,
790 encode,
791 options,
792 secret_refs,
793 key_encode,
794 connection_id: schema_registry_conn_ref,
795 })
796}
797
798static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
799 LazyLock::new(|| {
800 use risingwave_connector::sink::Sink as _;
801 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
802 use risingwave_connector::sink::file_sink::fs::FsSink;
803 use risingwave_connector::sink::file_sink::gcs::GcsSink;
804 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
805 use risingwave_connector::sink::file_sink::s3::S3Sink;
806 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
807 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
808 use risingwave_connector::sink::kafka::KafkaSink;
809 use risingwave_connector::sink::kinesis::KinesisSink;
810 use risingwave_connector::sink::mqtt::MqttSink;
811 use risingwave_connector::sink::pulsar::PulsarSink;
812 use risingwave_connector::sink::redis::RedisSink;
813
814 convert_args!(hashmap!(
815 GooglePubSubSink::SINK_NAME => hashmap!(
816 Format::Plain => vec![Encode::Json],
817 ),
818 KafkaSink::SINK_NAME => hashmap!(
819 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
820 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
821 Format::Debezium => vec![Encode::Json],
822 ),
823 FileSink::<S3Sink>::SINK_NAME => hashmap!(
824 Format::Plain => vec![Encode::Parquet, Encode::Json],
825 ),
826 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
827 Format::Plain => vec![Encode::Parquet, Encode::Json],
828 ),
829 FileSink::<GcsSink>::SINK_NAME => hashmap!(
830 Format::Plain => vec![Encode::Parquet, Encode::Json],
831 ),
832 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
833 Format::Plain => vec![Encode::Parquet, Encode::Json],
834 ),
835 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
836 Format::Plain => vec![Encode::Parquet, Encode::Json],
837 ),
838 FileSink::<FsSink>::SINK_NAME => hashmap!(
839 Format::Plain => vec![Encode::Parquet, Encode::Json],
840 ),
841 KinesisSink::SINK_NAME => hashmap!(
842 Format::Plain => vec![Encode::Json],
843 Format::Upsert => vec![Encode::Json],
844 Format::Debezium => vec![Encode::Json],
845 ),
846 MqttSink::SINK_NAME => hashmap!(
847 Format::Plain => vec![Encode::Json, Encode::Protobuf],
848 ),
849 PulsarSink::SINK_NAME => hashmap!(
850 Format::Plain => vec![Encode::Json],
851 Format::Upsert => vec![Encode::Json],
852 Format::Debezium => vec![Encode::Json],
853 ),
854 RedisSink::SINK_NAME => hashmap!(
855 Format::Plain => vec![Encode::Json, Encode::Template],
856 Format::Upsert => vec![Encode::Json, Encode::Template],
857 ),
858 ))
859 });
860
861pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
862 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
863 .get(connector)
864 .ok_or_else(|| {
865 ErrorCode::BindError(format!(
866 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
867 connector
868 ))
869 })?;
870 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
871 ErrorCode::BindError(format!(
872 "connector {} does not support format {:?}",
873 connector, format_desc.format
874 ))
875 })?;
876 if !compatible_encodes.contains(&format_desc.row_encode) {
877 return Err(ErrorCode::BindError(format!(
878 "connector {} does not support format {:?} with encode {:?}",
879 connector, format_desc.format, format_desc.row_encode
880 ))
881 .into());
882 }
883
884 if let Some(encode) = &format_desc.key_encode
886 && connector != KAFKA_SINK
887 && matches!(encode, Encode::Bytes)
888 {
889 return Err(ErrorCode::BindError(format!(
890 "key encode bytes only works with kafka connector, but found {}",
891 connector
892 ))
893 .into());
894 }
895
896 Ok(())
897}
898
899#[cfg(test)]
900pub mod tests {
901 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
902
903 use crate::catalog::root_catalog::SchemaPath;
904 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
905
906 #[tokio::test]
907 async fn test_create_sink_handler() {
908 let proto_file = create_proto_file(PROTO_FILE_DATA);
909 let sql = format!(
910 r#"CREATE SOURCE t1
911 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
912 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
913 proto_file.path().to_str().unwrap()
914 );
915 let frontend = LocalFrontend::new(Default::default()).await;
916 frontend.run_sql(sql).await.unwrap();
917
918 let sql = "create materialized view mv1 as select t1.country from t1;";
919 frontend.run_sql(sql).await.unwrap();
920
921 let sql = r#"CREATE SINK snk1 FROM mv1
922 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
923 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
924 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
925 frontend.run_sql(sql).await.unwrap();
926
927 let session = frontend.session_ref();
928 let catalog_reader = session.env().catalog_reader().read_guard();
929 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
930
931 let (source, _) = catalog_reader
933 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
934 .unwrap();
935 assert_eq!(source.name, "t1");
936
937 let (table, schema_name) = catalog_reader
939 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
940 .unwrap();
941 assert_eq!(table.name(), "mv1");
942
943 let (sink, _) = catalog_reader
945 .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
946 .unwrap();
947 assert_eq!(sink.name, "snk1");
948 }
949}