1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, CreateType, FieldLike, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
23};
24use risingwave_common::types::{DataType, StructType};
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_connector::sink::catalog::desc::SinkDesc;
27use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
28use risingwave_connector::sink::file_sink::fs::FsSink;
29use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
30use risingwave_connector::sink::trivial::TABLE_SINK;
31use risingwave_connector::sink::{
32 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
33 SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
34 SINK_USER_IGNORE_DELETE_OPTION,
35};
36use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
37use risingwave_pb::expr::expr_node::Type;
38use risingwave_pb::stream_plan::SinkLogStoreType;
39use risingwave_pb::stream_plan::stream_node::PbNodeBody;
40
41use super::derive::{derive_columns, derive_pk};
42use super::stream::prelude::*;
43use super::utils::{
44 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
45};
46use super::{
47 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
48 StreamSyncLogStore, generic,
49};
50use crate::TableCatalog;
51use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
52use crate::expr::{ExprImpl, FunctionCall, InputRef};
53use crate::optimizer::StreamOptimizedLogicalPlanRoot;
54use crate::optimizer::plan_node::PlanTreeNodeUnary;
55use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
56use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
57use crate::optimizer::property::{Distribution, RequiredDist};
58use crate::stream_fragmenter::BuildFragmentGraphState;
59use crate::utils::WithOptionsSecResolved;
60
61const DOWNSTREAM_PK_KEY: &str = "primary_key";
62const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
63
64pub enum PartitionComputeInfo {
77 Iceberg(IcebergPartitionInfo),
78}
79
80impl PartitionComputeInfo {
81 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
82 match self {
83 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
84 }
85 }
86}
87
88pub struct IcebergPartitionInfo {
89 pub partition_type: StructType,
90 pub partition_fields: Vec<(String, Transform)>,
92}
93
94impl IcebergPartitionInfo {
95 #[inline]
96 fn transform_to_expression(
97 transform: &Transform,
98 col_id: usize,
99 columns: &[ColumnCatalog],
100 result_type: DataType,
101 ) -> Result<ExprImpl> {
102 match transform {
103 Transform::Identity => {
104 if columns[col_id].column_desc.data_type != result_type {
105 return Err(ErrorCode::InvalidInputSyntax(format!(
106 "The partition field {} has type {}, but the partition field is {}",
107 columns[col_id].column_desc.name,
108 columns[col_id].column_desc.data_type,
109 result_type
110 ))
111 .into());
112 }
113 Ok(ExprImpl::InputRef(
114 InputRef::new(col_id, result_type).into(),
115 ))
116 }
117 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
118 _ => Ok(ExprImpl::FunctionCall(
119 FunctionCall::new_unchecked(
120 Type::IcebergTransform,
121 vec![
122 ExprImpl::literal_varchar(transform.to_string()),
123 ExprImpl::InputRef(
124 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
125 .into(),
126 ),
127 ],
128 result_type,
129 )
130 .into(),
131 )),
132 }
133 }
134
135 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
136 let child_exprs = self
137 .partition_fields
138 .into_iter()
139 .zip_eq_debug(self.partition_type.iter())
140 .map(|((field_name, transform), (_, result_type))| {
141 let col_id = find_column_idx_by_name(columns, &field_name)?;
142 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
143 })
144 .collect::<Result<Vec<_>>>()?;
145
146 Ok(ExprImpl::FunctionCall(
147 FunctionCall::new_unchecked(
148 Type::Row,
149 child_exprs,
150 DataType::Struct(self.partition_type),
151 )
152 .into(),
153 ))
154 }
155}
156
157#[inline]
158fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
159 columns
160 .iter()
161 .position(|col| col.column_desc.name == col_name)
162 .ok_or_else(|| {
163 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
164 .into()
165 })
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Hash)]
170pub struct StreamSink {
171 pub base: PlanBase<Stream>,
172 input: PlanRef,
173 sink_desc: SinkDesc,
174 log_store_type: SinkLogStoreType,
175}
176
177impl StreamSink {
178 #[must_use]
179 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
180 let input_kind = input.stream_kind();
185 let kind = match sink_desc.sink_type {
186 SinkType::AppendOnly => {
187 if !sink_desc.ignore_delete {
188 assert_eq!(
189 input_kind,
190 StreamKind::AppendOnly,
191 "{input_kind} stream cannot be used as input of append-only sink",
192 );
193 }
194 StreamKind::AppendOnly
195 }
196 SinkType::Upsert => StreamKind::Upsert,
197 SinkType::Retract => {
198 assert_ne!(
199 input_kind,
200 StreamKind::Upsert,
201 "upsert stream cannot be used as input of retract sink",
202 );
203 StreamKind::Retract
204 }
205 };
206
207 let base = PlanBase::new_stream(
208 input.ctx(),
209 input.schema().clone(),
210 input.stream_key().map(|v| v.to_vec()),
217 input.functional_dependency().clone(),
218 input.distribution().clone(),
219 kind,
220 input.emit_on_window_close(),
221 input.watermark_columns().clone(),
222 input.columns_monotonicity().clone(),
223 );
224
225 Self {
226 base,
227 input,
228 sink_desc,
229 log_store_type,
230 }
231 }
232
233 pub fn sink_desc(&self) -> &SinkDesc {
234 &self.sink_desc
235 }
236
237 fn derive_iceberg_sink_distribution(
238 input: PlanRef,
239 partition_info: Option<PartitionComputeInfo>,
240 columns: &[ColumnCatalog],
241 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
242 if let Some(partition_info) = partition_info {
244 let input_fields = input.schema().fields();
245
246 let mut exprs: Vec<_> = input_fields
247 .iter()
248 .enumerate()
249 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
250 .collect();
251
252 exprs.push(partition_info.convert_to_expression(columns)?);
254 let partition_col_idx = exprs.len() - 1;
255 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
256 Ok((
257 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
258 project.into(),
259 Some(partition_col_idx),
260 ))
261 } else {
262 Ok((
263 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
264 input,
265 None,
266 ))
267 }
268 }
269
270 #[expect(clippy::too_many_arguments)]
271 pub fn create(
272 StreamOptimizedLogicalPlanRoot {
273 plan: mut input,
274 required_dist: user_distributed_by,
275 required_order: user_order_by,
276 out_fields: user_cols,
277 out_names,
278 ..
279 }: StreamOptimizedLogicalPlanRoot,
280 name: String,
281 db_name: String,
282 sink_from_table_name: String,
283 target_table: Option<Arc<TableCatalog>>,
284 target_table_mapping: Option<Vec<Option<usize>>>,
285 definition: String,
286 properties: WithOptionsSecResolved,
287 format_desc: Option<SinkFormatDesc>,
288 partition_info: Option<PartitionComputeInfo>,
289 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
290 ) -> Result<Self> {
291 let (sink_type, ignore_delete) =
292 Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
293
294 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
295 let (pk, _) = derive_pk(
296 input.clone(),
297 user_distributed_by.clone(),
298 user_order_by,
299 &columns,
300 );
301 let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
302
303 let downstream_pk = {
305 let downstream_pk = properties
306 .get(DOWNSTREAM_PK_KEY)
307 .map(|v| Self::parse_downstream_pk(v, &columns))
308 .transpose()?;
309
310 if let Some(t) = &target_table {
311 let user_defined_primary_key_table = t.row_id_index.is_none();
312 let sink_is_append_only = sink_type.is_append_only();
313
314 if !user_defined_primary_key_table && !sink_is_append_only {
315 return Err(RwError::from(ErrorCode::BindError(
316 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
317 )));
318 }
319
320 if t.append_only && !sink_is_append_only {
321 return Err(RwError::from(ErrorCode::BindError(
322 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
323 )));
324 }
325
326 if sink_is_append_only {
327 None
328 } else {
329 let target_table_mapping = target_table_mapping.unwrap();
330 Some(t.pk()
331 .iter()
332 .map(|c| {
333 target_table_mapping[c.column_index].ok_or_else(
334 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
335 })
336 .try_collect::<_, _, RwError>()?)
337 }
338 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
339 && sink_type == SinkType::Upsert
340 && downstream_pk.is_none()
341 {
342 Some(derived_pk.clone())
343 } else if properties.is_iceberg_connector()
344 && sink_type == SinkType::Upsert
345 && downstream_pk.is_none()
346 {
347 Some(derived_pk.clone())
349 } else {
350 downstream_pk
351 }
352 };
353
354 if let Some(pk) = &downstream_pk
360 && pk.is_empty()
361 {
362 bail_invalid_input_syntax!(
363 "Empty primary key is not supported. \
364 Please specify the primary key in WITH options."
365 )
366 }
367
368 if let StreamKind::Upsert = input.stream_kind()
371 && let Some(downstream_pk) = &downstream_pk
372 && !downstream_pk.iter().all(|i| derived_pk.contains(i))
373 {
374 bail_bind_error!(
375 "When sinking from an upsert stream, \
376 the downstream primary key must be the same as or a subset of the one derived from the stream."
377 )
378 }
379
380 if let Some(upstream_table) = &auto_refresh_schema_from_table
381 && let Some(downstream_pk) = &downstream_pk
382 {
383 let upstream_table_pk_col_names = upstream_table
384 .pk
385 .iter()
386 .map(|order| {
387 upstream_table.columns[order.column_index]
388 .column_desc
389 .name()
390 })
391 .collect_vec();
392 let sink_pk_col_names = downstream_pk
393 .iter()
394 .map(|&column_index| columns[column_index].name())
395 .collect_vec();
396 if upstream_table_pk_col_names != sink_pk_col_names {
397 let is_iceberg_row_id_alias = properties.is_iceberg_connector()
398 && upstream_table_pk_col_names.len() == 1
399 && upstream_table_pk_col_names[0] == ROW_ID_COLUMN_NAME
400 && sink_pk_col_names.len() == 1
401 && sink_pk_col_names[0] == RISINGWAVE_ICEBERG_ROW_ID;
402 if !is_iceberg_row_id_alias {
403 return Err(ErrorCode::InvalidInputSyntax(format!(
404 "sink with auto schema change should have same pk as upstream table {:?}, but got {:?}",
405 upstream_table_pk_col_names, sink_pk_col_names
406 ))
407 .into());
408 }
409 }
410 }
411 let mut extra_partition_col_idx = None;
412
413 let required_dist = match input.distribution() {
414 Distribution::Single => RequiredDist::single(),
415 _ => {
416 match properties.get("connector") {
417 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
418 let Some(downstream_pk) = &downstream_pk else {
419 return Err(ErrorCode::InvalidInputSyntax(format!(
420 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
421 key = DOWNSTREAM_PK_KEY
422 )).into());
423 };
424 RequiredDist::hash_shard(downstream_pk)
427 }
428 Some(s) if s == ICEBERG_SINK => {
429 let (required_dist, new_input, partition_col_idx) =
430 Self::derive_iceberg_sink_distribution(
431 input,
432 partition_info,
433 &columns,
434 )?;
435 input = new_input;
436 extra_partition_col_idx = partition_col_idx;
437 required_dist
438 }
439 _ => {
440 assert_matches!(user_distributed_by, RequiredDist::Any);
441 if let Some(downstream_pk) = &downstream_pk {
442 RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
445 } else {
446 RequiredDist::shard_by_key(
447 input.schema().len(),
448 input.expect_stream_key(),
449 )
450 }
451 }
452 }
453 }
454 };
455 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
456 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
457 && input.as_stream_exchange().is_none()
458 {
459 StreamExchange::new_no_shuffle(input).into()
460 } else {
461 input
462 };
463
464 let distribution_key = input.distribution().dist_column_indices().to_vec();
465 let create_type = if input.ctx().session_ctx().config().background_ddl()
466 && plan_can_use_background_ddl(&input)
467 {
468 CreateType::Background
469 } else {
470 CreateType::Foreground
471 };
472 let (properties, secret_refs) = properties.into_parts();
473 let is_exactly_once = properties
474 .get("is_exactly_once")
475 .map(|v| v.to_lowercase() == "true");
476
477 let mut sink_desc = SinkDesc {
478 id: SinkId::placeholder(),
479 name,
480 db_name,
481 sink_from_name: sink_from_table_name,
482 definition,
483 columns,
484 plan_pk: pk,
485 downstream_pk,
486 distribution_key,
487 properties,
488 secret_refs,
489 sink_type,
490 ignore_delete,
491 format_desc,
492 target_table: target_table.as_ref().map(|catalog| catalog.id()),
493 extra_partition_col_idx,
494 create_type,
495 is_exactly_once,
496 auto_refresh_schema_from_table: auto_refresh_schema_from_table
497 .as_ref()
498 .map(|table| table.id),
499 };
500
501 let unsupported_sink = |sink: &str| -> Result<_> {
502 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
503 };
504
505 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
507 Some(connector) => {
508 let connector_type = connector.to_lowercase();
509 match_sink_name_str!(
510 connector_type.as_str(),
511 SinkType,
512 {
513 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
515 unsupported_sink(TABLE_SINK)
516 } else {
517 SinkType::set_default_commit_checkpoint_interval(
518 &mut sink_desc,
519 &input.ctx().session_ctx().config().sink_decouple(),
520 )?;
521 let support_schema_change = SinkType::support_schema_change();
522 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
523 return Err(ErrorCode::InvalidInputSyntax(format!(
524 "{} sink does not support schema change",
525 connector_type
526 ))
527 .into());
528 }
529 SinkType::is_sink_decouple(
530 &input.ctx().session_ctx().config().sink_decouple(),
531 )
532 .map_err(Into::into)
533 }
534 },
535 |other: &str| unsupported_sink(other)
536 )?
537 }
538 None => {
539 return Err(ErrorCode::InvalidInputSyntax(
540 "connector not specified when create sink".to_owned(),
541 )
542 .into());
543 }
544 };
545 let hint_string =
546 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
547 if !sink_decouple {
548 if sink_desc.is_file_sink() {
550 return Err(ErrorCode::NotSupported(
551 "File sink can only be created with sink_decouple enabled.".to_owned(),
552 hint_string(true),
553 )
554 .into());
555 }
556
557 if sink_desc.is_exactly_once.is_none()
558 && let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY)
559 {
560 let connector_type = connector.to_lowercase();
561 if connector_type == ICEBERG_SINK {
562 sink_desc
565 .properties
566 .insert("is_exactly_once".to_owned(), "false".to_owned());
567 }
568 }
569 }
570 let log_store_type = if sink_decouple {
571 SinkLogStoreType::KvLogStore
572 } else {
573 SinkLogStoreType::InMemoryLogStore
574 };
575
576 let input = if sink_decouple && target_table.is_some() {
578 StreamSyncLogStore::new(input).into()
579 } else {
580 input
581 };
582
583 Ok(Self::new(input, sink_desc, log_store_type))
584 }
585
586 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
587 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
588 let sink_type = match sink_type.as_str() {
589 SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
590 SINK_TYPE_UPSERT => {
591 if properties.is_iceberg_connector() {
592 SinkType::Retract
594 } else {
595 SinkType::Upsert
596 }
597 }
598 SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
599 _ => {
600 return Err(ErrorCode::InvalidInputSyntax(format!(
601 "`{}` must be {}, {}, {}, or {}",
602 SINK_TYPE_OPTION,
603 SINK_TYPE_APPEND_ONLY,
604 SINK_TYPE_RETRACT,
605 SINK_TYPE_UPSERT,
606 SINK_TYPE_DEBEZIUM,
607 ))
608 .into());
609 }
610 };
611 return Ok(Some(sink_type));
612 }
613 Ok(None)
614 }
615
616 fn is_user_ignore_delete(properties: &WithOptionsSecResolved) -> Result<bool> {
618 let has_ignore_delete = properties.contains_key(SINK_USER_IGNORE_DELETE_OPTION);
619 let has_force_append_only = properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION);
620
621 if has_ignore_delete && has_force_append_only {
622 return Err(ErrorCode::InvalidInputSyntax(format!(
623 "`{}` is an alias of `{}`, only one of them can be specified.",
624 SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_USER_IGNORE_DELETE_OPTION
625 ))
626 .into());
627 }
628
629 let key = if has_ignore_delete {
630 SINK_USER_IGNORE_DELETE_OPTION
631 } else if has_force_append_only {
632 SINK_USER_FORCE_APPEND_ONLY_OPTION
633 } else {
634 return Ok(false);
635 };
636
637 if properties.value_eq_ignore_case(key, "true") {
638 Ok(true)
639 } else if properties.value_eq_ignore_case(key, "false") {
640 Ok(false)
641 } else {
642 Err(ErrorCode::InvalidInputSyntax(format!("`{key}` must be true or false")).into())
643 }
644 }
645
646 fn derive_sink_type(
655 derived_stream_kind: StreamKind,
656 properties: &WithOptionsSecResolved,
657 format_desc: Option<&SinkFormatDesc>,
658 ) -> Result<(SinkType, bool)> {
659 let (user_defined_sink_type, user_ignore_delete, syntax_legacy) = match format_desc {
660 Some(f) => (
661 Some(match f.format {
662 SinkFormat::AppendOnly => SinkType::AppendOnly,
663 SinkFormat::Upsert => SinkType::Upsert,
664 SinkFormat::Debezium => SinkType::Retract,
665 }),
666 Self::is_user_ignore_delete(&WithOptionsSecResolved::without_secrets(
667 f.options.clone(),
668 ))?,
669 false,
670 ),
671 None => (
672 Self::sink_type_in_prop(properties)?,
673 Self::is_user_ignore_delete(properties)?,
674 true,
675 ),
676 };
677
678 if let Some(user_defined_sink_type) = user_defined_sink_type {
679 match user_defined_sink_type {
680 SinkType::AppendOnly => {
681 if derived_stream_kind != StreamKind::AppendOnly && !user_ignore_delete {
682 return Err(ErrorCode::InvalidInputSyntax(format!(
683 "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
684 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
685 derived_stream_kind,
686 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
687 ))
688 .into());
689 }
690 }
691 SinkType::Upsert => { }
692 SinkType::Retract => {
693 if user_ignore_delete {
694 bail_invalid_input_syntax!(
695 "Retract sink type does not support `ignore_delete`. \
696 Please use `type = 'append-only'` or `type = 'upsert'` instead.",
697 );
698 }
699 if derived_stream_kind == StreamKind::Upsert {
700 bail_invalid_input_syntax!(
701 "The sink of upsert stream cannot be retract. \
702 Please create a materialized view or sink-into-table with this query before sinking it.",
703 );
704 }
705 }
706 }
707 Ok((user_defined_sink_type, user_ignore_delete))
708 } else {
709 let sink_type = match derived_stream_kind {
712 StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
715 StreamKind::AppendOnly => SinkType::AppendOnly,
716 };
717 Ok((sink_type, user_ignore_delete))
718 }
719 }
720
721 fn parse_downstream_pk(
727 downstream_pk_str: &str,
728 columns: &[ColumnCatalog],
729 ) -> Result<Vec<usize>> {
730 let downstream_pk = downstream_pk_str.split(',').collect_vec();
732 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
733 for key in downstream_pk {
734 let trimmed_key = key.trim();
735 if trimmed_key.is_empty() {
736 continue;
737 }
738 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
739 }
740 if downstream_pk_indices.is_empty() {
741 bail_invalid_input_syntax!(
742 "Specified primary key should not be empty. \
743 To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
744 );
745 }
746 Ok(downstream_pk_indices)
747 }
748
749 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
752 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
753 }
754
755 pub fn into_stream_plan(self) -> Result<PlanRef> {
761 use super::{StreamIcebergWithPkIndexDvMerger, StreamIcebergWithPkIndexWriter};
762
763 if !is_iceberg_with_pk_index_sink(&self.sink_desc)? {
764 return Ok(self.into());
765 }
766
767 let writer: PlanRef = StreamIcebergWithPkIndexWriter::from_stream_sink(&self)?.into();
768 let dv_merger: PlanRef =
769 StreamIcebergWithPkIndexDvMerger::new(writer, self.sink_desc).into();
770 Ok(dv_merger)
771 }
772}
773
774pub fn is_iceberg_with_pk_index_sink(sink_desc: &SinkDesc) -> Result<bool> {
775 if !sink_desc
776 .properties
777 .get(CONNECTOR_TYPE_KEY)
778 .is_some_and(|connector| connector.eq_ignore_ascii_case(ICEBERG_SINK))
779 {
780 return Ok(false);
781 }
782
783 let iceberg_config = IcebergConfig::from_btreemap(sink_desc.properties.clone())?;
784 Ok(iceberg_config.enable_pk_index)
785}
786
787impl PlanTreeNodeUnary<Stream> for StreamSink {
788 fn input(&self) -> PlanRef {
789 self.input.clone()
790 }
791
792 fn clone_with_input(&self, input: PlanRef) -> Self {
793 Self::new(input, self.sink_desc.clone(), self.log_store_type)
794 }
796}
797
798impl_plan_tree_node_for_unary! { Stream, StreamSink }
799
800impl Distill for StreamSink {
801 fn distill<'a>(&self) -> XmlNode<'a> {
802 let sink_type = if self.sink_desc.sink_type.is_append_only() {
803 "append-only"
804 } else {
805 "upsert"
806 };
807 let column_names = self
808 .sink_desc
809 .columns
810 .iter()
811 .map(|col| col.name_with_hidden().to_string())
812 .map(Pretty::from)
813 .collect();
814 let column_names = Pretty::Array(column_names);
815 let mut vec = Vec::with_capacity(3);
816 vec.push(("type", Pretty::from(sink_type)));
817 vec.push(("columns", column_names));
818 if let Some(pk) = &self.sink_desc.downstream_pk {
819 let sink_pk = IndicesDisplay {
820 indices: pk,
821 schema: self.base.schema(),
822 };
823 vec.push(("downstream_pk", sink_pk.distill()));
824 }
825 childless_record("StreamSink", vec)
826 }
827}
828
829impl StreamNode for StreamSink {
830 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
831 use risingwave_pb::stream_plan::*;
832
833 let table = self
835 .infer_kv_log_store_table_catalog()
836 .with_id(state.gen_table_id_wrapped());
837
838 PbNodeBody::Sink(Box::new(SinkNode {
839 sink_desc: Some(self.sink_desc.to_proto()),
840 table: Some(table.to_internal_table_prost()),
841 log_store_type: self.log_store_type as i32,
842 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
843 }))
844 }
845}
846
847impl ExprRewritable<Stream> for StreamSink {}
848
849impl ExprVisitable for StreamSink {}
850
851#[cfg(test)]
852mod test {
853 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
854 use risingwave_common::types::{DataType, StructType};
855 use risingwave_common::util::iter_util::ZipEqDebug;
856 use risingwave_pb::expr::expr_node::Type;
857
858 use super::{IcebergPartitionInfo, *};
859 use crate::expr::{Expr, ExprImpl};
860
861 fn create_column_catalog() -> Vec<ColumnCatalog> {
862 vec![
863 ColumnCatalog {
864 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
865 is_hidden: false,
866 },
867 ColumnCatalog {
868 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
869 is_hidden: false,
870 },
871 ColumnCatalog {
872 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
873 is_hidden: false,
874 },
875 ]
876 }
877
878 #[test]
879 fn test_iceberg_convert_to_expression() {
880 let partition_type = StructType::new(vec![
881 ("f1", DataType::Int32),
882 ("f2", DataType::Int32),
883 ("f3", DataType::Int32),
884 ("f4", DataType::Int32),
885 ("f5", DataType::Int32),
886 ("f6", DataType::Int32),
887 ("f7", DataType::Int32),
888 ("f8", DataType::Int32),
889 ("f9", DataType::Int32),
890 ]);
891 let partition_fields = vec![
892 ("v1".into(), Transform::Identity),
893 ("v1".into(), Transform::Bucket(10)),
894 ("v1".into(), Transform::Truncate(3)),
895 ("v2".into(), Transform::Year),
896 ("v2".into(), Transform::Month),
897 ("v3".into(), Transform::Day),
898 ("v3".into(), Transform::Hour),
899 ("v1".into(), Transform::Void),
900 ("v3".into(), Transform::Void),
901 ];
902 let partition_info = IcebergPartitionInfo {
903 partition_type: partition_type.clone(),
904 partition_fields: partition_fields.clone(),
905 };
906 let catalog = create_column_catalog();
907 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
908 let actual_expr = actual_expr.as_function_call().unwrap();
909
910 assert_eq!(
911 actual_expr.return_type(),
912 DataType::Struct(partition_type.clone())
913 );
914 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
915 assert_eq!(actual_expr.func_type(), Type::Row);
916
917 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
918 .inputs()
919 .iter()
920 .zip_eq_debug(partition_fields.iter())
921 .zip_eq_debug(partition_type.iter())
922 {
923 match transform {
924 Transform::Identity => {
925 assert!(expr.is_input_ref());
926 assert_eq!(expr.return_type(), *expect_type);
927 }
928 Transform::Void => {
929 assert!(expr.is_literal());
930 assert_eq!(expr.return_type(), *expect_type);
931 }
932 _ => {
933 let expr = expr.as_function_call().unwrap();
934 assert_eq!(expr.func_type(), Type::IcebergTransform);
935 assert_eq!(expr.inputs().len(), 2);
936 assert_eq!(
937 expr.inputs()[0],
938 ExprImpl::literal_varchar(transform.to_string())
939 );
940 }
941 }
942 }
943 }
944}