1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::sink::catalog::desc::SinkDesc;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::trivial::TABLE_SINK;
29use risingwave_connector::sink::{
30 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
31 SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
32 SINK_USER_IGNORE_DELETE_OPTION,
33};
34use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
35use risingwave_pb::expr::expr_node::Type;
36use risingwave_pb::stream_plan::SinkLogStoreType;
37use risingwave_pb::stream_plan::stream_node::PbNodeBody;
38
39use super::derive::{derive_columns, derive_pk};
40use super::stream::prelude::*;
41use super::utils::{
42 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
43};
44use super::{
45 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
46 StreamSyncLogStore, generic,
47};
48use crate::TableCatalog;
49use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
50use crate::expr::{ExprImpl, FunctionCall, InputRef};
51use crate::optimizer::StreamOptimizedLogicalPlanRoot;
52use crate::optimizer::plan_node::PlanTreeNodeUnary;
53use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
54use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
55use crate::optimizer::property::{Distribution, RequiredDist};
56use crate::stream_fragmenter::BuildFragmentGraphState;
57use crate::utils::WithOptionsSecResolved;
58
59const DOWNSTREAM_PK_KEY: &str = "primary_key";
60const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
61
62pub enum PartitionComputeInfo {
75 Iceberg(IcebergPartitionInfo),
76}
77
78impl PartitionComputeInfo {
79 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
80 match self {
81 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
82 }
83 }
84}
85
86pub struct IcebergPartitionInfo {
87 pub partition_type: StructType,
88 pub partition_fields: Vec<(String, Transform)>,
90}
91
92impl IcebergPartitionInfo {
93 #[inline]
94 fn transform_to_expression(
95 transform: &Transform,
96 col_id: usize,
97 columns: &[ColumnCatalog],
98 result_type: DataType,
99 ) -> Result<ExprImpl> {
100 match transform {
101 Transform::Identity => {
102 if columns[col_id].column_desc.data_type != result_type {
103 return Err(ErrorCode::InvalidInputSyntax(format!(
104 "The partition field {} has type {}, but the partition field is {}",
105 columns[col_id].column_desc.name,
106 columns[col_id].column_desc.data_type,
107 result_type
108 ))
109 .into());
110 }
111 Ok(ExprImpl::InputRef(
112 InputRef::new(col_id, result_type).into(),
113 ))
114 }
115 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
116 _ => Ok(ExprImpl::FunctionCall(
117 FunctionCall::new_unchecked(
118 Type::IcebergTransform,
119 vec![
120 ExprImpl::literal_varchar(transform.to_string()),
121 ExprImpl::InputRef(
122 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
123 .into(),
124 ),
125 ],
126 result_type,
127 )
128 .into(),
129 )),
130 }
131 }
132
133 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
134 let child_exprs = self
135 .partition_fields
136 .into_iter()
137 .zip_eq_debug(self.partition_type.iter())
138 .map(|((field_name, transform), (_, result_type))| {
139 let col_id = find_column_idx_by_name(columns, &field_name)?;
140 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
141 })
142 .collect::<Result<Vec<_>>>()?;
143
144 Ok(ExprImpl::FunctionCall(
145 FunctionCall::new_unchecked(
146 Type::Row,
147 child_exprs,
148 DataType::Struct(self.partition_type),
149 )
150 .into(),
151 ))
152 }
153}
154
155#[inline]
156fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
157 columns
158 .iter()
159 .position(|col| col.column_desc.name == col_name)
160 .ok_or_else(|| {
161 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
162 .into()
163 })
164}
165
166#[derive(Debug, Clone, PartialEq, Eq, Hash)]
168pub struct StreamSink {
169 pub base: PlanBase<Stream>,
170 input: PlanRef,
171 sink_desc: SinkDesc,
172 log_store_type: SinkLogStoreType,
173}
174
175impl StreamSink {
176 #[must_use]
177 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
178 let input_kind = input.stream_kind();
183 let kind = match sink_desc.sink_type {
184 SinkType::AppendOnly => {
185 if !sink_desc.ignore_delete {
186 assert_eq!(
187 input_kind,
188 StreamKind::AppendOnly,
189 "{input_kind} stream cannot be used as input of append-only sink",
190 );
191 }
192 StreamKind::AppendOnly
193 }
194 SinkType::Upsert => StreamKind::Upsert,
195 SinkType::Retract => {
196 assert_ne!(
197 input_kind,
198 StreamKind::Upsert,
199 "upsert stream cannot be used as input of retract sink",
200 );
201 StreamKind::Retract
202 }
203 };
204
205 let base = PlanBase::new_stream(
206 input.ctx(),
207 input.schema().clone(),
208 input.stream_key().map(|v| v.to_vec()),
215 input.functional_dependency().clone(),
216 input.distribution().clone(),
217 kind,
218 input.emit_on_window_close(),
219 input.watermark_columns().clone(),
220 input.columns_monotonicity().clone(),
221 );
222
223 Self {
224 base,
225 input,
226 sink_desc,
227 log_store_type,
228 }
229 }
230
231 pub fn sink_desc(&self) -> &SinkDesc {
232 &self.sink_desc
233 }
234
235 fn derive_iceberg_sink_distribution(
236 input: PlanRef,
237 partition_info: Option<PartitionComputeInfo>,
238 columns: &[ColumnCatalog],
239 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
240 if let Some(partition_info) = partition_info {
242 let input_fields = input.schema().fields();
243
244 let mut exprs: Vec<_> = input_fields
245 .iter()
246 .enumerate()
247 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
248 .collect();
249
250 exprs.push(partition_info.convert_to_expression(columns)?);
252 let partition_col_idx = exprs.len() - 1;
253 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
254 Ok((
255 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
256 project.into(),
257 Some(partition_col_idx),
258 ))
259 } else {
260 Ok((
261 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
262 input,
263 None,
264 ))
265 }
266 }
267
268 #[allow(clippy::too_many_arguments)]
269 pub fn create(
270 StreamOptimizedLogicalPlanRoot {
271 plan: mut input,
272 required_dist: user_distributed_by,
273 required_order: user_order_by,
274 out_fields: user_cols,
275 out_names,
276 ..
277 }: StreamOptimizedLogicalPlanRoot,
278 name: String,
279 db_name: String,
280 sink_from_table_name: String,
281 target_table: Option<Arc<TableCatalog>>,
282 target_table_mapping: Option<Vec<Option<usize>>>,
283 definition: String,
284 properties: WithOptionsSecResolved,
285 format_desc: Option<SinkFormatDesc>,
286 partition_info: Option<PartitionComputeInfo>,
287 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
288 ) -> Result<Self> {
289 let (sink_type, ignore_delete) =
290 Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
291
292 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
293 let (pk, _) = derive_pk(
294 input.clone(),
295 user_distributed_by.clone(),
296 user_order_by,
297 &columns,
298 );
299 let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
300
301 let downstream_pk = {
303 let downstream_pk = properties
304 .get(DOWNSTREAM_PK_KEY)
305 .map(|v| Self::parse_downstream_pk(v, &columns))
306 .transpose()?;
307
308 if let Some(t) = &target_table {
309 let user_defined_primary_key_table = t.row_id_index.is_none();
310 let sink_is_append_only = sink_type.is_append_only();
311
312 if !user_defined_primary_key_table && !sink_is_append_only {
313 return Err(RwError::from(ErrorCode::BindError(
314 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
315 )));
316 }
317
318 if t.append_only && !sink_is_append_only {
319 return Err(RwError::from(ErrorCode::BindError(
320 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
321 )));
322 }
323
324 if sink_is_append_only {
325 None
326 } else {
327 let target_table_mapping = target_table_mapping.unwrap();
328 Some(t.pk()
329 .iter()
330 .map(|c| {
331 target_table_mapping[c.column_index].ok_or_else(
332 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
333 })
334 .try_collect::<_, _, RwError>()?)
335 }
336 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
337 && sink_type == SinkType::Upsert
338 && downstream_pk.is_none()
339 {
340 Some(derived_pk.clone())
341 } else if properties.is_iceberg_connector()
342 && sink_type == SinkType::Upsert
343 && downstream_pk.is_none()
344 {
345 Some(derived_pk.clone())
347 } else {
348 downstream_pk
349 }
350 };
351
352 if let Some(pk) = &downstream_pk
358 && pk.is_empty()
359 {
360 bail_invalid_input_syntax!(
361 "Empty primary key is not supported. \
362 Please specify the primary key in WITH options."
363 )
364 }
365
366 if let StreamKind::Upsert = input.stream_kind()
369 && let Some(downstream_pk) = &downstream_pk
370 && !downstream_pk.iter().all(|i| derived_pk.contains(i))
371 {
372 bail_bind_error!(
373 "When sinking from an upsert stream, \
374 the downstream primary key must be the same as or a subset of the one derived from the stream."
375 )
376 }
377
378 if let Some(upstream_table) = &auto_refresh_schema_from_table
379 && let Some(downstream_pk) = &downstream_pk
380 {
381 let upstream_table_pk_col_names = upstream_table
382 .pk
383 .iter()
384 .map(|order| {
385 upstream_table.columns[order.column_index]
386 .column_desc
387 .name()
388 })
389 .collect_vec();
390 let sink_pk_col_names = downstream_pk
391 .iter()
392 .map(|&column_index| columns[column_index].name())
393 .collect_vec();
394 if upstream_table_pk_col_names != sink_pk_col_names {
395 return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
396 }
397 }
398 let mut extra_partition_col_idx = None;
399
400 let required_dist = match input.distribution() {
401 Distribution::Single => RequiredDist::single(),
402 _ => {
403 match properties.get("connector") {
404 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
405 let Some(downstream_pk) = &downstream_pk else {
406 return Err(ErrorCode::InvalidInputSyntax(format!(
407 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
408 key = DOWNSTREAM_PK_KEY
409 )).into());
410 };
411 RequiredDist::hash_shard(downstream_pk)
414 }
415 Some(s) if s == ICEBERG_SINK => {
416 let (required_dist, new_input, partition_col_idx) =
417 Self::derive_iceberg_sink_distribution(
418 input,
419 partition_info,
420 &columns,
421 )?;
422 input = new_input;
423 extra_partition_col_idx = partition_col_idx;
424 required_dist
425 }
426 _ => {
427 assert_matches!(user_distributed_by, RequiredDist::Any);
428 if let Some(downstream_pk) = &downstream_pk {
429 RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
432 } else {
433 RequiredDist::shard_by_key(
434 input.schema().len(),
435 input.expect_stream_key(),
436 )
437 }
438 }
439 }
440 }
441 };
442 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
443 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
444 && input.as_stream_exchange().is_none()
445 {
446 StreamExchange::new_no_shuffle(input).into()
447 } else {
448 input
449 };
450
451 let distribution_key = input.distribution().dist_column_indices().to_vec();
452 let create_type = if input.ctx().session_ctx().config().background_ddl()
453 && plan_can_use_background_ddl(&input)
454 {
455 CreateType::Background
456 } else {
457 CreateType::Foreground
458 };
459 let (properties, secret_refs) = properties.into_parts();
460 let is_exactly_once = properties
461 .get("is_exactly_once")
462 .map(|v| v.to_lowercase() == "true");
463
464 let mut sink_desc = SinkDesc {
465 id: SinkId::placeholder(),
466 name,
467 db_name,
468 sink_from_name: sink_from_table_name,
469 definition,
470 columns,
471 plan_pk: pk,
472 downstream_pk,
473 distribution_key,
474 properties,
475 secret_refs,
476 sink_type,
477 ignore_delete,
478 format_desc,
479 target_table: target_table.as_ref().map(|catalog| catalog.id()),
480 extra_partition_col_idx,
481 create_type,
482 is_exactly_once,
483 auto_refresh_schema_from_table: auto_refresh_schema_from_table
484 .as_ref()
485 .map(|table| table.id),
486 };
487
488 let unsupported_sink = |sink: &str| -> Result<_> {
489 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
490 };
491
492 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
494 Some(connector) => {
495 let connector_type = connector.to_lowercase();
496 match_sink_name_str!(
497 connector_type.as_str(),
498 SinkType,
499 {
500 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
502 unsupported_sink(TABLE_SINK)
503 } else {
504 SinkType::set_default_commit_checkpoint_interval(
505 &mut sink_desc,
506 &input.ctx().session_ctx().config().sink_decouple(),
507 )?;
508 let support_schema_change = SinkType::support_schema_change();
509 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
510 return Err(ErrorCode::InvalidInputSyntax(format!(
511 "{} sink does not support schema change",
512 connector_type
513 ))
514 .into());
515 }
516 SinkType::is_sink_decouple(
517 &input.ctx().session_ctx().config().sink_decouple(),
518 )
519 .map_err(Into::into)
520 }
521 },
522 |other: &str| unsupported_sink(other)
523 )?
524 }
525 None => {
526 return Err(ErrorCode::InvalidInputSyntax(
527 "connector not specified when create sink".to_owned(),
528 )
529 .into());
530 }
531 };
532 let hint_string =
533 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
534 if !sink_decouple {
535 if sink_desc.is_file_sink() {
537 return Err(ErrorCode::NotSupported(
538 "File sink can only be created with sink_decouple enabled.".to_owned(),
539 hint_string(true),
540 )
541 .into());
542 }
543
544 if sink_desc.is_exactly_once.is_none()
545 && let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY)
546 {
547 let connector_type = connector.to_lowercase();
548 if connector_type == ICEBERG_SINK {
549 sink_desc
552 .properties
553 .insert("is_exactly_once".to_owned(), "false".to_owned());
554 }
555 }
556 }
557 let log_store_type = if sink_decouple {
558 SinkLogStoreType::KvLogStore
559 } else {
560 SinkLogStoreType::InMemoryLogStore
561 };
562
563 let input = if sink_decouple && target_table.is_some() {
565 StreamSyncLogStore::new(input).into()
566 } else {
567 input
568 };
569
570 Ok(Self::new(input, sink_desc, log_store_type))
571 }
572
573 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
574 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
575 let sink_type = match sink_type.as_str() {
576 SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
577 SINK_TYPE_UPSERT => {
578 if properties.is_iceberg_connector() {
579 SinkType::Retract
581 } else {
582 SinkType::Upsert
583 }
584 }
585 SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
586 _ => {
587 return Err(ErrorCode::InvalidInputSyntax(format!(
588 "`{}` must be {}, {}, {}, or {}",
589 SINK_TYPE_OPTION,
590 SINK_TYPE_APPEND_ONLY,
591 SINK_TYPE_RETRACT,
592 SINK_TYPE_UPSERT,
593 SINK_TYPE_DEBEZIUM,
594 ))
595 .into());
596 }
597 };
598 return Ok(Some(sink_type));
599 }
600 Ok(None)
601 }
602
603 fn is_user_ignore_delete(properties: &WithOptionsSecResolved) -> Result<bool> {
605 let has_ignore_delete = properties.contains_key(SINK_USER_IGNORE_DELETE_OPTION);
606 let has_force_append_only = properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION);
607
608 if has_ignore_delete && has_force_append_only {
609 return Err(ErrorCode::InvalidInputSyntax(format!(
610 "`{}` is an alias of `{}`, only one of them can be specified.",
611 SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_USER_IGNORE_DELETE_OPTION
612 ))
613 .into());
614 }
615
616 let key = if has_ignore_delete {
617 SINK_USER_IGNORE_DELETE_OPTION
618 } else if has_force_append_only {
619 SINK_USER_FORCE_APPEND_ONLY_OPTION
620 } else {
621 return Ok(false);
622 };
623
624 if properties.value_eq_ignore_case(key, "true") {
625 Ok(true)
626 } else if properties.value_eq_ignore_case(key, "false") {
627 Ok(false)
628 } else {
629 Err(ErrorCode::InvalidInputSyntax(format!("`{key}` must be true or false")).into())
630 }
631 }
632
633 fn derive_sink_type(
642 derived_stream_kind: StreamKind,
643 properties: &WithOptionsSecResolved,
644 format_desc: Option<&SinkFormatDesc>,
645 ) -> Result<(SinkType, bool)> {
646 let (user_defined_sink_type, user_ignore_delete, syntax_legacy) = match format_desc {
647 Some(f) => (
648 Some(match f.format {
649 SinkFormat::AppendOnly => SinkType::AppendOnly,
650 SinkFormat::Upsert => SinkType::Upsert,
651 SinkFormat::Debezium => SinkType::Retract,
652 }),
653 Self::is_user_ignore_delete(&WithOptionsSecResolved::without_secrets(
654 f.options.clone(),
655 ))?,
656 false,
657 ),
658 None => (
659 Self::sink_type_in_prop(properties)?,
660 Self::is_user_ignore_delete(properties)?,
661 true,
662 ),
663 };
664
665 if let Some(user_defined_sink_type) = user_defined_sink_type {
666 match user_defined_sink_type {
667 SinkType::AppendOnly => {
668 if derived_stream_kind != StreamKind::AppendOnly && !user_ignore_delete {
669 return Err(ErrorCode::InvalidInputSyntax(format!(
670 "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
671 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
672 derived_stream_kind,
673 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
674 ))
675 .into());
676 }
677 }
678 SinkType::Upsert => { }
679 SinkType::Retract => {
680 if user_ignore_delete {
681 bail_invalid_input_syntax!(
682 "Retract sink type does not support `ignore_delete`. \
683 Please use `type = 'append-only'` or `type = 'upsert'` instead.",
684 );
685 }
686 if derived_stream_kind == StreamKind::Upsert {
687 bail_invalid_input_syntax!(
688 "The sink of upsert stream cannot be retract. \
689 Please create a materialized view or sink-into-table with this query before sinking it.",
690 );
691 }
692 }
693 }
694 Ok((user_defined_sink_type, user_ignore_delete))
695 } else {
696 let sink_type = match derived_stream_kind {
699 StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
702 StreamKind::AppendOnly => SinkType::AppendOnly,
703 };
704 Ok((sink_type, user_ignore_delete))
705 }
706 }
707
708 fn parse_downstream_pk(
714 downstream_pk_str: &str,
715 columns: &[ColumnCatalog],
716 ) -> Result<Vec<usize>> {
717 let downstream_pk = downstream_pk_str.split(',').collect_vec();
719 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
720 for key in downstream_pk {
721 let trimmed_key = key.trim();
722 if trimmed_key.is_empty() {
723 continue;
724 }
725 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
726 }
727 if downstream_pk_indices.is_empty() {
728 bail_invalid_input_syntax!(
729 "Specified primary key should not be empty. \
730 To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
731 );
732 }
733 Ok(downstream_pk_indices)
734 }
735
736 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
739 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
740 }
741}
742
743impl PlanTreeNodeUnary<Stream> for StreamSink {
744 fn input(&self) -> PlanRef {
745 self.input.clone()
746 }
747
748 fn clone_with_input(&self, input: PlanRef) -> Self {
749 Self::new(input, self.sink_desc.clone(), self.log_store_type)
750 }
752}
753
754impl_plan_tree_node_for_unary! { Stream, StreamSink }
755
756impl Distill for StreamSink {
757 fn distill<'a>(&self) -> XmlNode<'a> {
758 let sink_type = if self.sink_desc.sink_type.is_append_only() {
759 "append-only"
760 } else {
761 "upsert"
762 };
763 let column_names = self
764 .sink_desc
765 .columns
766 .iter()
767 .map(|col| col.name_with_hidden().to_string())
768 .map(Pretty::from)
769 .collect();
770 let column_names = Pretty::Array(column_names);
771 let mut vec = Vec::with_capacity(3);
772 vec.push(("type", Pretty::from(sink_type)));
773 vec.push(("columns", column_names));
774 if let Some(pk) = &self.sink_desc.downstream_pk {
775 let sink_pk = IndicesDisplay {
776 indices: pk,
777 schema: self.base.schema(),
778 };
779 vec.push(("downstream_pk", sink_pk.distill()));
780 }
781 childless_record("StreamSink", vec)
782 }
783}
784
785impl StreamNode for StreamSink {
786 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
787 use risingwave_pb::stream_plan::*;
788
789 let table = self
791 .infer_kv_log_store_table_catalog()
792 .with_id(state.gen_table_id_wrapped());
793
794 PbNodeBody::Sink(Box::new(SinkNode {
795 sink_desc: Some(self.sink_desc.to_proto()),
796 table: Some(table.to_internal_table_prost()),
797 log_store_type: self.log_store_type as i32,
798 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
799 }))
800 }
801}
802
803impl ExprRewritable<Stream> for StreamSink {}
804
805impl ExprVisitable for StreamSink {}
806
807#[cfg(test)]
808mod test {
809 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
810 use risingwave_common::types::{DataType, StructType};
811 use risingwave_common::util::iter_util::ZipEqDebug;
812 use risingwave_pb::expr::expr_node::Type;
813
814 use super::{IcebergPartitionInfo, *};
815 use crate::expr::{Expr, ExprImpl};
816
817 fn create_column_catalog() -> Vec<ColumnCatalog> {
818 vec![
819 ColumnCatalog {
820 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
821 is_hidden: false,
822 },
823 ColumnCatalog {
824 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
825 is_hidden: false,
826 },
827 ColumnCatalog {
828 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
829 is_hidden: false,
830 },
831 ]
832 }
833
834 #[test]
835 fn test_iceberg_convert_to_expression() {
836 let partition_type = StructType::new(vec![
837 ("f1", DataType::Int32),
838 ("f2", DataType::Int32),
839 ("f3", DataType::Int32),
840 ("f4", DataType::Int32),
841 ("f5", DataType::Int32),
842 ("f6", DataType::Int32),
843 ("f7", DataType::Int32),
844 ("f8", DataType::Int32),
845 ("f9", DataType::Int32),
846 ]);
847 let partition_fields = vec![
848 ("v1".into(), Transform::Identity),
849 ("v1".into(), Transform::Bucket(10)),
850 ("v1".into(), Transform::Truncate(3)),
851 ("v2".into(), Transform::Year),
852 ("v2".into(), Transform::Month),
853 ("v3".into(), Transform::Day),
854 ("v3".into(), Transform::Hour),
855 ("v1".into(), Transform::Void),
856 ("v3".into(), Transform::Void),
857 ];
858 let partition_info = IcebergPartitionInfo {
859 partition_type: partition_type.clone(),
860 partition_fields: partition_fields.clone(),
861 };
862 let catalog = create_column_catalog();
863 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
864 let actual_expr = actual_expr.as_function_call().unwrap();
865
866 assert_eq!(
867 actual_expr.return_type(),
868 DataType::Struct(partition_type.clone())
869 );
870 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
871 assert_eq!(actual_expr.func_type(), Type::Row);
872
873 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
874 .inputs()
875 .iter()
876 .zip_eq_debug(partition_fields.iter())
877 .zip_eq_debug(partition_type.iter())
878 {
879 match transform {
880 Transform::Identity => {
881 assert!(expr.is_input_ref());
882 assert_eq!(expr.return_type(), *expect_type);
883 }
884 Transform::Void => {
885 assert!(expr.is_literal());
886 assert_eq!(expr.return_type(), *expect_type);
887 }
888 _ => {
889 let expr = expr.as_function_call().unwrap();
890 assert_eq!(expr.func_type(), Type::IcebergTransform);
891 assert_eq!(expr.inputs().len(), 2);
892 assert_eq!(
893 expr.inputs()[0],
894 ExprImpl::literal_varchar(transform.to_string())
895 );
896 }
897 }
898 }
899 }
900}