1use std::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, FieldLike, RISINGWAVE_ICEBERG_ROW_ID,
23 ROW_ID_COLUMN_NAME,
24};
25use risingwave_common::types::{DataType, StructType};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_connector::sink::catalog::desc::SinkDesc;
28use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
29use risingwave_connector::sink::file_sink::fs::FsSink;
30use risingwave_connector::sink::iceberg::{ENABLE_PK_INDEX, ICEBERG_SINK};
31use risingwave_connector::sink::trivial::TABLE_SINK;
32use risingwave_connector::sink::{
33 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
34 SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
35 SINK_USER_IGNORE_DELETE_OPTION, SINK_USER_PRESERVE_ROW_LEVEL_CHANGES,
36};
37use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
38use risingwave_pb::expr::expr_node::Type;
39use risingwave_pb::stream_plan::SinkLogStoreType;
40use risingwave_pb::stream_plan::stream_node::PbNodeBody;
41
42use super::derive::{derive_columns, derive_pk};
43use super::stream::prelude::*;
44use super::utils::{
45 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
46};
47use super::{
48 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
49 StreamSyncLogStore, generic,
50};
51use crate::TableCatalog;
52use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
53use crate::expr::{ExprImpl, FunctionCall, InputRef};
54use crate::optimizer::StreamOptimizedLogicalPlanRoot;
55use crate::optimizer::plan_node::PlanTreeNodeUnary;
56use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
57use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
58use crate::optimizer::property::{Distribution, RequiredDist};
59use crate::stream_fragmenter::BuildFragmentGraphState;
60use crate::utils::WithOptionsSecResolved;
61
62const DOWNSTREAM_PK_KEY: &str = "primary_key";
63const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
64
65fn target_table_requires_row_level_conflict_handling(target_table: &TableCatalog) -> bool {
66 !target_table.version_column_indices.is_empty()
67 || matches!(
68 target_table.conflict_behavior(),
69 ConflictBehavior::DoUpdateIfNotNull | ConflictBehavior::IgnoreConflict
70 )
71}
72
73pub enum PartitionComputeInfo {
86 Iceberg(IcebergPartitionInfo),
87}
88
89impl PartitionComputeInfo {
90 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
91 match self {
92 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
93 }
94 }
95}
96
97pub struct IcebergPartitionInfo {
98 pub partition_type: StructType,
99 pub partition_fields: Vec<(String, Transform)>,
101}
102
103impl IcebergPartitionInfo {
104 #[inline]
105 fn transform_to_expression(
106 transform: &Transform,
107 col_id: usize,
108 columns: &[ColumnCatalog],
109 result_type: DataType,
110 ) -> Result<ExprImpl> {
111 match transform {
112 Transform::Identity => {
113 if columns[col_id].column_desc.data_type != result_type {
114 return Err(ErrorCode::InvalidInputSyntax(format!(
115 "The partition field {} has type {}, but the partition field is {}",
116 columns[col_id].column_desc.name,
117 columns[col_id].column_desc.data_type,
118 result_type
119 ))
120 .into());
121 }
122 Ok(ExprImpl::InputRef(
123 InputRef::new(col_id, result_type).into(),
124 ))
125 }
126 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
127 _ => Ok(ExprImpl::FunctionCall(
128 FunctionCall::new_unchecked(
129 Type::IcebergTransform,
130 vec![
131 ExprImpl::literal_varchar(transform.to_string()),
132 ExprImpl::InputRef(
133 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
134 .into(),
135 ),
136 ],
137 result_type,
138 )
139 .into(),
140 )),
141 }
142 }
143
144 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
145 let child_exprs = self
146 .partition_fields
147 .into_iter()
148 .zip_eq_debug(self.partition_type.iter())
149 .map(|((field_name, transform), (_, result_type))| {
150 let col_id = find_column_idx_by_name(columns, &field_name)?;
151 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
152 })
153 .collect::<Result<Vec<_>>>()?;
154
155 Ok(ExprImpl::FunctionCall(
156 FunctionCall::new_unchecked(
157 Type::Row,
158 child_exprs,
159 DataType::Struct(self.partition_type),
160 )
161 .into(),
162 ))
163 }
164}
165
166#[inline]
167fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
168 columns
169 .iter()
170 .position(|col| col.column_desc.name == col_name)
171 .ok_or_else(|| {
172 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
173 .into()
174 })
175}
176
177#[derive(Debug, Clone, PartialEq, Eq, Hash)]
179pub struct StreamSink {
180 pub base: PlanBase<Stream>,
181 input: PlanRef,
182 sink_desc: SinkDesc,
183 log_store_type: SinkLogStoreType,
184}
185
186impl StreamSink {
187 #[must_use]
188 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
189 let input_kind = input.stream_kind();
194 let kind = match sink_desc.sink_type {
195 SinkType::AppendOnly => {
196 if !sink_desc.ignore_delete {
197 assert_eq!(
198 input_kind,
199 StreamKind::AppendOnly,
200 "{input_kind} stream cannot be used as input of append-only sink",
201 );
202 }
203 StreamKind::AppendOnly
204 }
205 SinkType::Upsert => StreamKind::Upsert,
206 SinkType::Retract => {
207 assert_ne!(
208 input_kind,
209 StreamKind::Upsert,
210 "upsert stream cannot be used as input of retract sink",
211 );
212 StreamKind::Retract
213 }
214 };
215
216 let base = PlanBase::new_stream(
217 input.ctx(),
218 input.schema().clone(),
219 input.stream_key().map(|v| v.to_vec()),
226 input.functional_dependency().clone(),
227 input.distribution().clone(),
228 kind,
229 input.emit_on_window_close(),
230 input.watermark_columns().clone(),
231 input.columns_monotonicity().clone(),
232 );
233
234 Self {
235 base,
236 input,
237 sink_desc,
238 log_store_type,
239 }
240 }
241
242 pub fn sink_desc(&self) -> &SinkDesc {
243 &self.sink_desc
244 }
245
246 fn derive_iceberg_sink_distribution(
247 input: PlanRef,
248 partition_info: Option<PartitionComputeInfo>,
249 columns: &[ColumnCatalog],
250 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
251 if let Some(partition_info) = partition_info {
253 let input_fields = input.schema().fields();
254
255 let mut exprs: Vec<_> = input_fields
256 .iter()
257 .enumerate()
258 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
259 .collect();
260
261 exprs.push(partition_info.convert_to_expression(columns)?);
263 let partition_col_idx = exprs.len() - 1;
264 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
265 Ok((
266 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
267 project.into(),
268 Some(partition_col_idx),
269 ))
270 } else {
271 Ok((
272 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
273 input,
274 None,
275 ))
276 }
277 }
278
279 #[expect(clippy::too_many_arguments)]
280 pub fn create(
281 StreamOptimizedLogicalPlanRoot {
282 plan: mut input,
283 required_dist: user_distributed_by,
284 required_order: user_order_by,
285 out_fields: user_cols,
286 out_names,
287 ..
288 }: StreamOptimizedLogicalPlanRoot,
289 name: String,
290 db_name: String,
291 sink_from_table_name: String,
292 target_table: Option<Arc<TableCatalog>>,
293 target_table_mapping: Option<Vec<Option<usize>>>,
294 definition: String,
295 properties: WithOptionsSecResolved,
296 format_desc: Option<SinkFormatDesc>,
297 partition_info: Option<PartitionComputeInfo>,
298 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
299 ) -> Result<Self> {
300 let (sink_type, ignore_delete) =
301 Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
302
303 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
304 let (pk, _) = derive_pk(
305 input.clone(),
306 user_distributed_by.clone(),
307 user_order_by,
308 &columns,
309 );
310 let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
311
312 let downstream_pk = {
314 let downstream_pk = properties
315 .get(DOWNSTREAM_PK_KEY)
316 .map(|v| Self::parse_downstream_pk(v, &columns))
317 .transpose()?;
318
319 if let Some(t) = &target_table {
320 let user_defined_primary_key_table = t.row_id_index.is_none();
321 let sink_is_append_only = sink_type.is_append_only();
322
323 if !user_defined_primary_key_table && !sink_is_append_only {
324 return Err(RwError::from(ErrorCode::BindError(
325 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
326 )));
327 }
328
329 if t.append_only && !sink_is_append_only {
330 return Err(RwError::from(ErrorCode::BindError(
331 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
332 )));
333 }
334
335 if sink_is_append_only {
336 None
337 } else {
338 let target_table_mapping = target_table_mapping.unwrap();
339 Some(t.pk()
340 .iter()
341 .map(|c| {
342 target_table_mapping[c.column_index].ok_or_else(
343 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
344 })
345 .try_collect::<_, _, RwError>()?)
346 }
347 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
348 && sink_type == SinkType::Upsert
349 && downstream_pk.is_none()
350 {
351 Some(derived_pk.clone())
352 } else if properties.is_iceberg_connector()
353 && sink_type == SinkType::Upsert
354 && downstream_pk.is_none()
355 {
356 Some(derived_pk.clone())
358 } else {
359 downstream_pk
360 }
361 };
362
363 if let Some(pk) = &downstream_pk
369 && pk.is_empty()
370 {
371 bail_invalid_input_syntax!(
372 "Empty primary key is not supported. \
373 Please specify the primary key in WITH options."
374 )
375 }
376
377 if let StreamKind::Upsert = input.stream_kind()
380 && let Some(downstream_pk) = &downstream_pk
381 && !downstream_pk.iter().all(|i| derived_pk.contains(i))
382 {
383 bail_bind_error!(
384 "When sinking from an upsert stream, \
385 the downstream primary key must be the same as or a subset of the one derived from the stream."
386 )
387 }
388
389 if let Some(upstream_table) = &auto_refresh_schema_from_table
390 && let Some(downstream_pk) = &downstream_pk
391 {
392 let upstream_table_pk_col_names = upstream_table
393 .pk
394 .iter()
395 .map(|order| {
396 upstream_table.columns[order.column_index]
397 .column_desc
398 .name()
399 })
400 .collect_vec();
401 let sink_pk_col_names = downstream_pk
402 .iter()
403 .map(|&column_index| columns[column_index].name())
404 .collect_vec();
405 if upstream_table_pk_col_names != sink_pk_col_names {
406 let is_iceberg_row_id_alias = properties.is_iceberg_connector()
407 && upstream_table_pk_col_names.len() == 1
408 && upstream_table_pk_col_names[0] == ROW_ID_COLUMN_NAME
409 && sink_pk_col_names.len() == 1
410 && sink_pk_col_names[0] == RISINGWAVE_ICEBERG_ROW_ID;
411 if !is_iceberg_row_id_alias {
412 return Err(ErrorCode::InvalidInputSyntax(format!(
413 "sink with auto schema change should have same pk as upstream table {:?}, but got {:?}",
414 upstream_table_pk_col_names, sink_pk_col_names
415 ))
416 .into());
417 }
418 }
419 }
420 let mut extra_partition_col_idx = None;
421
422 let required_dist = match input.distribution() {
423 Distribution::Single => RequiredDist::single(),
424 _ => {
425 match properties.get("connector") {
426 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
427 let Some(downstream_pk) = &downstream_pk else {
428 return Err(ErrorCode::InvalidInputSyntax(format!(
429 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
430 key = DOWNSTREAM_PK_KEY
431 )).into());
432 };
433 RequiredDist::hash_shard(downstream_pk)
436 }
437 Some(s) if s == ICEBERG_SINK => {
438 let (required_dist, new_input, partition_col_idx) =
439 Self::derive_iceberg_sink_distribution(
440 input,
441 partition_info,
442 &columns,
443 )?;
444 input = new_input;
445 extra_partition_col_idx = partition_col_idx;
446 required_dist
447 }
448 _ => {
449 assert_matches!(user_distributed_by, RequiredDist::Any);
450 if let Some(downstream_pk) = &downstream_pk {
451 RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
454 } else {
455 RequiredDist::shard_by_key(
456 input.schema().len(),
457 input.expect_stream_key(),
458 )
459 }
460 }
461 }
462 }
463 };
464 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
465 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
466 && input.as_stream_exchange().is_none()
467 {
468 StreamExchange::new_no_shuffle(input).into()
469 } else {
470 input
471 };
472
473 let distribution_key = input.distribution().dist_column_indices().to_vec();
474 let create_type = if input.ctx().session_ctx().config().background_ddl()
475 && plan_can_use_background_ddl(&input)
476 {
477 CreateType::Background
478 } else {
479 CreateType::Foreground
480 };
481 let (mut properties, secret_refs) = properties.into_parts();
482 if let Some(target_table) = &target_table
483 && target_table_requires_row_level_conflict_handling(target_table)
484 {
485 properties.insert(
486 SINK_USER_PRESERVE_ROW_LEVEL_CHANGES.to_owned(),
487 "true".to_owned(),
488 );
489 }
490 let is_exactly_once = properties
491 .get("is_exactly_once")
492 .map(|v| v.to_lowercase() == "true");
493
494 let mut sink_desc = SinkDesc {
495 id: SinkId::placeholder(),
496 name,
497 db_name,
498 sink_from_name: sink_from_table_name,
499 definition,
500 columns,
501 plan_pk: pk,
502 downstream_pk,
503 distribution_key,
504 properties,
505 secret_refs,
506 sink_type,
507 ignore_delete,
508 format_desc,
509 target_table: target_table.as_ref().map(|catalog| catalog.id()),
510 extra_partition_col_idx,
511 create_type,
512 is_exactly_once,
513 auto_refresh_schema_from_table: auto_refresh_schema_from_table
514 .as_ref()
515 .map(|table| table.id),
516 };
517
518 let unsupported_sink = |sink: &str| -> Result<_> {
519 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
520 };
521
522 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
524 Some(connector) => {
525 let connector_type = connector.to_lowercase();
526 match_sink_name_str!(
527 connector_type.as_str(),
528 SinkType,
529 {
530 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
532 unsupported_sink(TABLE_SINK)
533 } else {
534 SinkType::set_default_commit_checkpoint_interval(
535 &mut sink_desc,
536 &input.ctx().session_ctx().config().sink_decouple(),
537 )?;
538 let support_schema_change = SinkType::support_schema_change();
539 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
540 return Err(ErrorCode::InvalidInputSyntax(format!(
541 "{} sink does not support schema change",
542 connector_type
543 ))
544 .into());
545 }
546 SinkType::is_sink_decouple(
547 &input.ctx().session_ctx().config().sink_decouple(),
548 )
549 .map_err(Into::into)
550 }
551 },
552 |other: &str| unsupported_sink(other)
553 )?
554 }
555 None => {
556 return Err(ErrorCode::InvalidInputSyntax(
557 "connector not specified when create sink".to_owned(),
558 )
559 .into());
560 }
561 };
562 let hint_string =
563 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
564 if !sink_decouple {
565 if sink_desc.is_file_sink() {
567 return Err(ErrorCode::NotSupported(
568 "File sink can only be created with sink_decouple enabled.".to_owned(),
569 hint_string(true),
570 )
571 .into());
572 }
573
574 if sink_desc.is_exactly_once.is_none()
575 && let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY)
576 {
577 let connector_type = connector.to_lowercase();
578 if connector_type == ICEBERG_SINK {
579 sink_desc
582 .properties
583 .insert("is_exactly_once".to_owned(), "false".to_owned());
584 }
585 }
586 }
587 let log_store_type = if sink_decouple {
588 SinkLogStoreType::KvLogStore
589 } else {
590 SinkLogStoreType::InMemoryLogStore
591 };
592
593 let input = if sink_decouple && target_table.is_some() {
595 StreamSyncLogStore::new(input).into()
596 } else {
597 input
598 };
599
600 Ok(Self::new(input, sink_desc, log_store_type))
601 }
602
603 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
604 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
605 let sink_type = match sink_type.as_str() {
606 SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
607 SINK_TYPE_UPSERT => {
608 if properties.is_iceberg_connector() {
609 SinkType::Retract
611 } else {
612 SinkType::Upsert
613 }
614 }
615 SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
616 _ => {
617 return Err(ErrorCode::InvalidInputSyntax(format!(
618 "`{}` must be {}, {}, {}, or {}",
619 SINK_TYPE_OPTION,
620 SINK_TYPE_APPEND_ONLY,
621 SINK_TYPE_RETRACT,
622 SINK_TYPE_UPSERT,
623 SINK_TYPE_DEBEZIUM,
624 ))
625 .into());
626 }
627 };
628 return Ok(Some(sink_type));
629 }
630 Ok(None)
631 }
632
633 fn is_user_ignore_delete(properties: &WithOptionsSecResolved) -> Result<bool> {
635 let has_ignore_delete = properties.contains_key(SINK_USER_IGNORE_DELETE_OPTION);
636 let has_force_append_only = properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION);
637
638 if has_ignore_delete && has_force_append_only {
639 return Err(ErrorCode::InvalidInputSyntax(format!(
640 "`{}` is an alias of `{}`, only one of them can be specified.",
641 SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_USER_IGNORE_DELETE_OPTION
642 ))
643 .into());
644 }
645
646 let key = if has_ignore_delete {
647 SINK_USER_IGNORE_DELETE_OPTION
648 } else if has_force_append_only {
649 SINK_USER_FORCE_APPEND_ONLY_OPTION
650 } else {
651 return Ok(false);
652 };
653
654 if properties.value_eq_ignore_case(key, "true") {
655 Ok(true)
656 } else if properties.value_eq_ignore_case(key, "false") {
657 Ok(false)
658 } else {
659 Err(ErrorCode::InvalidInputSyntax(format!("`{key}` must be true or false")).into())
660 }
661 }
662
663 fn derive_sink_type(
672 derived_stream_kind: StreamKind,
673 properties: &WithOptionsSecResolved,
674 format_desc: Option<&SinkFormatDesc>,
675 ) -> Result<(SinkType, bool)> {
676 let (user_defined_sink_type, user_ignore_delete, syntax_legacy) = match format_desc {
677 Some(f) => (
678 Some(match f.format {
679 SinkFormat::AppendOnly => SinkType::AppendOnly,
680 SinkFormat::Upsert => SinkType::Upsert,
681 SinkFormat::Debezium => SinkType::Retract,
682 }),
683 Self::is_user_ignore_delete(&WithOptionsSecResolved::without_secrets(
684 f.options.clone(),
685 ))?,
686 false,
687 ),
688 None => (
689 Self::sink_type_in_prop(properties)?,
690 Self::is_user_ignore_delete(properties)?,
691 true,
692 ),
693 };
694
695 if let Some(user_defined_sink_type) = user_defined_sink_type {
696 match user_defined_sink_type {
697 SinkType::AppendOnly => {
698 if derived_stream_kind != StreamKind::AppendOnly && !user_ignore_delete {
699 return Err(ErrorCode::InvalidInputSyntax(format!(
700 "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
701 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
702 derived_stream_kind,
703 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
704 ))
705 .into());
706 }
707 }
708 SinkType::Upsert => { }
709 SinkType::Retract => {
710 if user_ignore_delete {
711 bail_invalid_input_syntax!(
712 "Retract sink type does not support `ignore_delete`. \
713 Please use `type = 'append-only'` or `type = 'upsert'` instead.",
714 );
715 }
716 if derived_stream_kind == StreamKind::Upsert {
717 bail_invalid_input_syntax!(
718 "The sink of upsert stream cannot be retract. \
719 Please create a materialized view or sink-into-table with this query before sinking it.",
720 );
721 }
722 }
723 }
724 Ok((user_defined_sink_type, user_ignore_delete))
725 } else {
726 let sink_type = match derived_stream_kind {
729 StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
732 StreamKind::AppendOnly => SinkType::AppendOnly,
733 };
734 Ok((sink_type, user_ignore_delete))
735 }
736 }
737
738 fn parse_downstream_pk(
744 downstream_pk_str: &str,
745 columns: &[ColumnCatalog],
746 ) -> Result<Vec<usize>> {
747 let downstream_pk = downstream_pk_str.split(',').collect_vec();
749 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
750 for key in downstream_pk {
751 let trimmed_key = key.trim();
752 if trimmed_key.is_empty() {
753 continue;
754 }
755 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
756 }
757 if downstream_pk_indices.is_empty() {
758 bail_invalid_input_syntax!(
759 "Specified primary key should not be empty. \
760 To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
761 );
762 }
763 Ok(downstream_pk_indices)
764 }
765
766 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
769 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
770 }
771
772 pub fn into_stream_plan(self) -> Result<PlanRef> {
778 use super::{StreamIcebergWithPkIndexDvMerger, StreamIcebergWithPkIndexWriter};
779
780 if !is_iceberg_with_pk_index_sink(&self.sink_desc)? {
781 return Ok(self.into());
782 }
783
784 let writer: PlanRef = StreamIcebergWithPkIndexWriter::from_stream_sink(&self)?.into();
785 let dv_merger: PlanRef =
786 StreamIcebergWithPkIndexDvMerger::new(writer, self.sink_desc).into();
787 Ok(dv_merger)
788 }
789}
790
791pub fn is_iceberg_with_pk_index_sink(sink_desc: &SinkDesc) -> Result<bool> {
792 if !sink_desc
793 .properties
794 .get(CONNECTOR_TYPE_KEY)
795 .is_some_and(|connector| connector.eq_ignore_ascii_case(ICEBERG_SINK))
796 {
797 return Ok(false);
798 }
799
800 let res = sink_desc
801 .properties
802 .get(ENABLE_PK_INDEX)
803 .is_some_and(|v| v.eq_ignore_ascii_case("true"));
804 Ok(res)
805}
806
807impl PlanTreeNodeUnary<Stream> for StreamSink {
808 fn input(&self) -> PlanRef {
809 self.input.clone()
810 }
811
812 fn clone_with_input(&self, input: PlanRef) -> Self {
813 Self::new(input, self.sink_desc.clone(), self.log_store_type)
814 }
816}
817
818impl_plan_tree_node_for_unary! { Stream, StreamSink }
819
820impl Distill for StreamSink {
821 fn distill<'a>(&self) -> XmlNode<'a> {
822 let sink_type = if self.sink_desc.sink_type.is_append_only() {
823 "append-only"
824 } else {
825 "upsert"
826 };
827 let column_names = self
828 .sink_desc
829 .columns
830 .iter()
831 .map(|col| col.name_with_hidden().to_string())
832 .map(Pretty::from)
833 .collect();
834 let column_names = Pretty::Array(column_names);
835 let mut vec = Vec::with_capacity(3);
836 vec.push(("type", Pretty::from(sink_type)));
837 vec.push(("columns", column_names));
838 if let Some(pk) = &self.sink_desc.downstream_pk {
839 let sink_pk = IndicesDisplay {
840 indices: pk,
841 schema: self.base.schema(),
842 };
843 vec.push(("downstream_pk", sink_pk.distill()));
844 }
845 childless_record("StreamSink", vec)
846 }
847}
848
849impl StreamNode for StreamSink {
850 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
851 use risingwave_pb::stream_plan::*;
852
853 let table = self
855 .infer_kv_log_store_table_catalog()
856 .with_id(state.gen_table_id_wrapped());
857
858 PbNodeBody::Sink(Box::new(SinkNode {
859 sink_desc: Some(self.sink_desc.to_proto()),
860 table: Some(table.to_internal_table_prost()),
861 log_store_type: self.log_store_type as i32,
862 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
863 }))
864 }
865}
866
867impl ExprRewritable<Stream> for StreamSink {}
868
869impl ExprVisitable for StreamSink {}
870
871#[cfg(test)]
872mod test {
873 use fixedbitset::FixedBitSet;
874 use risingwave_common::catalog::{
875 ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field,
876 };
877 use risingwave_common::types::{DataType, StructType};
878 use risingwave_common::util::iter_util::ZipEqDebug;
879 use risingwave_pb::expr::expr_node::Type;
880
881 use super::{IcebergPartitionInfo, *};
882 use crate::catalog::table_catalog::TableType;
883 use crate::expr::{Expr, ExprImpl};
884 use crate::optimizer::plan_node::utils::TableCatalogBuilder;
885
886 fn create_column_catalog() -> Vec<ColumnCatalog> {
887 vec![
888 ColumnCatalog {
889 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
890 is_hidden: false,
891 },
892 ColumnCatalog {
893 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
894 is_hidden: false,
895 },
896 ColumnCatalog {
897 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
898 is_hidden: false,
899 },
900 ]
901 }
902
903 fn test_target_table(
904 conflict_behavior: ConflictBehavior,
905 version_column_indices: Vec<usize>,
906 ) -> TableCatalog {
907 let mut builder = TableCatalogBuilder::default();
908 let col_idx = builder.add_column(&Field::with_name(DataType::Int32, "v1"));
909 let mut table = builder.build(vec![], 0);
910 table.table_type = TableType::Table;
911 table.columns = vec![ColumnCatalog {
912 column_desc: ColumnDesc::named("v1", ColumnId::new(col_idx as i32), DataType::Int32),
913 is_hidden: false,
914 }];
915 table.conflict_behavior = conflict_behavior;
916 table.version_column_indices = version_column_indices;
917 table.watermark_columns = FixedBitSet::with_capacity(table.columns.len());
918 table
919 }
920
921 #[test]
922 fn test_target_table_requires_row_level_conflict_handling() {
923 assert!(target_table_requires_row_level_conflict_handling(
924 &test_target_table(ConflictBehavior::DoUpdateIfNotNull, vec![])
925 ));
926 assert!(target_table_requires_row_level_conflict_handling(
927 &test_target_table(ConflictBehavior::IgnoreConflict, vec![])
928 ));
929 assert!(target_table_requires_row_level_conflict_handling(
930 &test_target_table(ConflictBehavior::Overwrite, vec![0])
931 ));
932 assert!(!target_table_requires_row_level_conflict_handling(
933 &test_target_table(ConflictBehavior::Overwrite, vec![])
934 ));
935 }
936
937 #[test]
938 fn test_iceberg_convert_to_expression() {
939 let partition_type = StructType::new(vec![
940 ("f1", DataType::Int32),
941 ("f2", DataType::Int32),
942 ("f3", DataType::Int32),
943 ("f4", DataType::Int32),
944 ("f5", DataType::Int32),
945 ("f6", DataType::Int32),
946 ("f7", DataType::Int32),
947 ("f8", DataType::Int32),
948 ("f9", DataType::Int32),
949 ]);
950 let partition_fields = vec![
951 ("v1".into(), Transform::Identity),
952 ("v1".into(), Transform::Bucket(10)),
953 ("v1".into(), Transform::Truncate(3)),
954 ("v2".into(), Transform::Year),
955 ("v2".into(), Transform::Month),
956 ("v3".into(), Transform::Day),
957 ("v3".into(), Transform::Hour),
958 ("v1".into(), Transform::Void),
959 ("v3".into(), Transform::Void),
960 ];
961 let partition_info = IcebergPartitionInfo {
962 partition_type: partition_type.clone(),
963 partition_fields: partition_fields.clone(),
964 };
965 let catalog = create_column_catalog();
966 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
967 let actual_expr = actual_expr.as_function_call().unwrap();
968
969 assert_eq!(
970 actual_expr.return_type(),
971 DataType::Struct(partition_type.clone())
972 );
973 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
974 assert_eq!(actual_expr.func_type(), Type::Row);
975
976 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
977 .inputs()
978 .iter()
979 .zip_eq_debug(partition_fields.iter())
980 .zip_eq_debug(partition_type.iter())
981 {
982 match transform {
983 Transform::Identity => {
984 assert!(expr.is_input_ref());
985 assert_eq!(expr.return_type(), *expect_type);
986 }
987 Transform::Void => {
988 assert!(expr.is_literal());
989 assert_eq!(expr.return_type(), *expect_type);
990 }
991 _ => {
992 let expr = expr.as_function_call().unwrap();
993 assert_eq!(expr.func_type(), Type::IcebergTransform);
994 assert_eq!(expr.inputs().len(), 2);
995 assert_eq!(
996 expr.inputs()[0],
997 ExprImpl::literal_varchar(transform.to_string())
998 );
999 }
1000 }
1001 }
1002 }
1003}