1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::sink::catalog::desc::SinkDesc;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::trivial::TABLE_SINK;
29use risingwave_connector::sink::{
30 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
31 SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
32};
33use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45 StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
60
61pub enum PartitionComputeInfo {
74 Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79 match self {
80 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81 }
82 }
83}
84
85pub struct IcebergPartitionInfo {
86 pub partition_type: StructType,
87 pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92 #[inline]
93 fn transform_to_expression(
94 transform: &Transform,
95 col_id: usize,
96 columns: &[ColumnCatalog],
97 result_type: DataType,
98 ) -> Result<ExprImpl> {
99 match transform {
100 Transform::Identity => {
101 if columns[col_id].column_desc.data_type != result_type {
102 return Err(ErrorCode::InvalidInputSyntax(format!(
103 "The partition field {} has type {}, but the partition field is {}",
104 columns[col_id].column_desc.name,
105 columns[col_id].column_desc.data_type,
106 result_type
107 ))
108 .into());
109 }
110 Ok(ExprImpl::InputRef(
111 InputRef::new(col_id, result_type).into(),
112 ))
113 }
114 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
115 _ => Ok(ExprImpl::FunctionCall(
116 FunctionCall::new_unchecked(
117 Type::IcebergTransform,
118 vec![
119 ExprImpl::literal_varchar(transform.to_string()),
120 ExprImpl::InputRef(
121 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
122 .into(),
123 ),
124 ],
125 result_type,
126 )
127 .into(),
128 )),
129 }
130 }
131
132 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
133 let child_exprs = self
134 .partition_fields
135 .into_iter()
136 .zip_eq_debug(self.partition_type.iter())
137 .map(|((field_name, transform), (_, result_type))| {
138 let col_id = find_column_idx_by_name(columns, &field_name)?;
139 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
140 })
141 .collect::<Result<Vec<_>>>()?;
142
143 Ok(ExprImpl::FunctionCall(
144 FunctionCall::new_unchecked(
145 Type::Row,
146 child_exprs,
147 DataType::Struct(self.partition_type),
148 )
149 .into(),
150 ))
151 }
152}
153
154#[inline]
155fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
156 columns
157 .iter()
158 .position(|col| col.column_desc.name == col_name)
159 .ok_or_else(|| {
160 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
161 .into()
162 })
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Hash)]
167pub struct StreamSink {
168 pub base: PlanBase<Stream>,
169 input: PlanRef,
170 sink_desc: SinkDesc,
171 log_store_type: SinkLogStoreType,
172}
173
174impl StreamSink {
175 #[must_use]
176 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
177 let input_kind = input.stream_kind();
182 let kind = match sink_desc.sink_type {
183 SinkType::AppendOnly => {
184 assert_eq!(
185 input_kind,
186 StreamKind::AppendOnly,
187 "{input_kind} stream cannot be used as input of append-only sink",
188 );
189 StreamKind::AppendOnly
190 }
191 SinkType::ForceAppendOnly => StreamKind::AppendOnly,
192 SinkType::Upsert => StreamKind::Upsert,
193 SinkType::Retract => {
194 assert_ne!(
195 input_kind,
196 StreamKind::Upsert,
197 "upsert stream cannot be used as input of retract sink",
198 );
199 StreamKind::Retract
200 }
201 };
202
203 let base = PlanBase::new_stream(
204 input.ctx(),
205 input.schema().clone(),
206 input.stream_key().map(|v| v.to_vec()),
213 input.functional_dependency().clone(),
214 input.distribution().clone(),
215 kind,
216 input.emit_on_window_close(),
217 input.watermark_columns().clone(),
218 input.columns_monotonicity().clone(),
219 );
220
221 Self {
222 base,
223 input,
224 sink_desc,
225 log_store_type,
226 }
227 }
228
229 pub fn sink_desc(&self) -> &SinkDesc {
230 &self.sink_desc
231 }
232
233 fn derive_iceberg_sink_distribution(
234 input: PlanRef,
235 partition_info: Option<PartitionComputeInfo>,
236 columns: &[ColumnCatalog],
237 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
238 if let Some(partition_info) = partition_info {
240 let input_fields = input.schema().fields();
241
242 let mut exprs: Vec<_> = input_fields
243 .iter()
244 .enumerate()
245 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
246 .collect();
247
248 exprs.push(partition_info.convert_to_expression(columns)?);
250 let partition_col_idx = exprs.len() - 1;
251 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
252 Ok((
253 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
254 project.into(),
255 Some(partition_col_idx),
256 ))
257 } else {
258 Ok((
259 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
260 input,
261 None,
262 ))
263 }
264 }
265
266 #[allow(clippy::too_many_arguments)]
267 pub fn create(
268 StreamOptimizedLogicalPlanRoot {
269 plan: mut input,
270 required_dist: user_distributed_by,
271 required_order: user_order_by,
272 out_fields: user_cols,
273 out_names,
274 ..
275 }: StreamOptimizedLogicalPlanRoot,
276 name: String,
277 db_name: String,
278 sink_from_table_name: String,
279 target_table: Option<Arc<TableCatalog>>,
280 target_table_mapping: Option<Vec<Option<usize>>>,
281 definition: String,
282 properties: WithOptionsSecResolved,
283 format_desc: Option<SinkFormatDesc>,
284 partition_info: Option<PartitionComputeInfo>,
285 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
286 ) -> Result<Self> {
287 let sink_type =
288 Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
289
290 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
291 let (pk, _) = derive_pk(
292 input.clone(),
293 user_distributed_by.clone(),
294 user_order_by,
295 &columns,
296 );
297 let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
298
299 let downstream_pk = {
301 let downstream_pk = properties
302 .get(DOWNSTREAM_PK_KEY)
303 .map(|v| Self::parse_downstream_pk(v, &columns))
304 .transpose()?;
305
306 if let Some(t) = &target_table {
307 let user_defined_primary_key_table = t.row_id_index.is_none();
308 let sink_is_append_only = sink_type.is_append_only();
309
310 if !user_defined_primary_key_table && !sink_is_append_only {
311 return Err(RwError::from(ErrorCode::BindError(
312 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
313 )));
314 }
315
316 if t.append_only && !sink_is_append_only {
317 return Err(RwError::from(ErrorCode::BindError(
318 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
319 )));
320 }
321
322 if sink_is_append_only {
323 None
324 } else {
325 let target_table_mapping = target_table_mapping.unwrap();
326 Some(t.pk()
327 .iter()
328 .map(|c| {
329 target_table_mapping[c.column_index].ok_or_else(
330 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
331 })
332 .try_collect::<_, _, RwError>()?)
333 }
334 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
335 && sink_type == SinkType::Upsert
336 && downstream_pk.is_none()
337 {
338 Some(derived_pk.clone())
339 } else if properties.is_iceberg_connector()
340 && sink_type == SinkType::Upsert
341 && downstream_pk.is_none()
342 {
343 Some(derived_pk.clone())
345 } else {
346 downstream_pk
347 }
348 };
349
350 if let Some(pk) = &downstream_pk
356 && pk.is_empty()
357 {
358 bail_invalid_input_syntax!(
359 "Empty primary key is not supported. \
360 Please specify the primary key in WITH options."
361 )
362 }
363
364 if let StreamKind::Upsert = input.stream_kind()
367 && let Some(downstream_pk) = &downstream_pk
368 && !downstream_pk.iter().all(|i| derived_pk.contains(i))
369 {
370 bail_bind_error!(
371 "When sinking from an upsert stream, \
372 the downstream primary key must be the same as or a subset of the one derived from the stream."
373 )
374 }
375
376 if let Some(upstream_table) = &auto_refresh_schema_from_table
377 && let Some(downstream_pk) = &downstream_pk
378 {
379 let upstream_table_pk_col_names = upstream_table
380 .pk
381 .iter()
382 .map(|order| {
383 upstream_table.columns[order.column_index]
384 .column_desc
385 .name()
386 })
387 .collect_vec();
388 let sink_pk_col_names = downstream_pk
389 .iter()
390 .map(|&column_index| columns[column_index].name())
391 .collect_vec();
392 if upstream_table_pk_col_names != sink_pk_col_names {
393 return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
394 }
395 }
396 let mut extra_partition_col_idx = None;
397
398 let required_dist = match input.distribution() {
399 Distribution::Single => RequiredDist::single(),
400 _ => {
401 match properties.get("connector") {
402 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
403 let Some(downstream_pk) = &downstream_pk else {
404 return Err(ErrorCode::InvalidInputSyntax(format!(
405 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
406 key = DOWNSTREAM_PK_KEY
407 )).into());
408 };
409 RequiredDist::hash_shard(downstream_pk)
412 }
413 Some(s) if s == ICEBERG_SINK => {
414 let (required_dist, new_input, partition_col_idx) =
415 Self::derive_iceberg_sink_distribution(
416 input,
417 partition_info,
418 &columns,
419 )?;
420 input = new_input;
421 extra_partition_col_idx = partition_col_idx;
422 required_dist
423 }
424 _ => {
425 assert_matches!(user_distributed_by, RequiredDist::Any);
426 if let Some(downstream_pk) = &downstream_pk {
427 RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
430 } else {
431 RequiredDist::shard_by_key(
432 input.schema().len(),
433 input.expect_stream_key(),
434 )
435 }
436 }
437 }
438 }
439 };
440 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
441 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
442 && input.as_stream_exchange().is_none()
443 {
444 StreamExchange::new_no_shuffle(input).into()
445 } else {
446 input
447 };
448
449 let distribution_key = input.distribution().dist_column_indices().to_vec();
450 let create_type = if input.ctx().session_ctx().config().background_ddl()
451 && plan_can_use_background_ddl(&input)
452 {
453 CreateType::Background
454 } else {
455 CreateType::Foreground
456 };
457 let (properties, secret_refs) = properties.into_parts();
458 let is_exactly_once = properties
459 .get("is_exactly_once")
460 .map(|v| v.to_lowercase() == "true");
461
462 let mut sink_desc = SinkDesc {
463 id: SinkId::placeholder(),
464 name,
465 db_name,
466 sink_from_name: sink_from_table_name,
467 definition,
468 columns,
469 plan_pk: pk,
470 downstream_pk,
471 distribution_key,
472 properties,
473 secret_refs,
474 sink_type,
475 format_desc,
476 target_table: target_table.as_ref().map(|catalog| catalog.id()),
477 extra_partition_col_idx,
478 create_type,
479 is_exactly_once,
480 auto_refresh_schema_from_table: auto_refresh_schema_from_table
481 .as_ref()
482 .map(|table| table.id),
483 };
484
485 let unsupported_sink = |sink: &str| -> Result<_> {
486 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
487 };
488
489 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
491 Some(connector) => {
492 let connector_type = connector.to_lowercase();
493 match_sink_name_str!(
494 connector_type.as_str(),
495 SinkType,
496 {
497 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
499 unsupported_sink(TABLE_SINK)
500 } else {
501 SinkType::set_default_commit_checkpoint_interval(
502 &mut sink_desc,
503 &input.ctx().session_ctx().config().sink_decouple(),
504 )?;
505 let support_schema_change = SinkType::support_schema_change();
506 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
507 return Err(ErrorCode::InvalidInputSyntax(format!(
508 "{} sink does not support schema change",
509 connector_type
510 ))
511 .into());
512 }
513 SinkType::is_sink_decouple(
514 &input.ctx().session_ctx().config().sink_decouple(),
515 )
516 .map_err(Into::into)
517 }
518 },
519 |other: &str| unsupported_sink(other)
520 )?
521 }
522 None => {
523 return Err(ErrorCode::InvalidInputSyntax(
524 "connector not specified when create sink".to_owned(),
525 )
526 .into());
527 }
528 };
529 let hint_string =
530 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
531 if !sink_decouple {
532 if sink_desc.is_file_sink() {
534 return Err(ErrorCode::NotSupported(
535 "File sink can only be created with sink_decouple enabled.".to_owned(),
536 hint_string(true),
537 )
538 .into());
539 }
540
541 let is_exactly_once = match sink_desc.is_exactly_once {
542 Some(v) => v,
543 None => {
544 if let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
545 let connector_type = connector.to_lowercase();
546 if connector_type == ICEBERG_SINK {
547 sink_desc
550 .properties
551 .insert("is_exactly_once".to_owned(), "false".to_owned());
552 }
553 }
554 false
555 }
556 };
557
558 if is_exactly_once {
559 return Err(ErrorCode::NotSupported(
560 "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
561 hint_string(true),
562 )
563 .into());
564 }
565 }
566 let log_store_type = if sink_decouple {
567 SinkLogStoreType::KvLogStore
568 } else {
569 SinkLogStoreType::InMemoryLogStore
570 };
571
572 let input = if sink_decouple && target_table.is_some() {
574 StreamSyncLogStore::new(input).into()
575 } else {
576 input
577 };
578
579 Ok(Self::new(input, sink_desc, log_store_type))
580 }
581
582 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
583 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
584 let sink_type = match sink_type.as_str() {
585 SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
586 SINK_TYPE_UPSERT => {
587 if properties.is_iceberg_connector() {
588 SinkType::Retract
590 } else {
591 SinkType::Upsert
592 }
593 }
594 SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
595 _ => {
596 return Err(ErrorCode::InvalidInputSyntax(format!(
597 "`{}` must be {}, {}, {}, or {}",
598 SINK_TYPE_OPTION,
599 SINK_TYPE_APPEND_ONLY,
600 SINK_TYPE_RETRACT,
601 SINK_TYPE_UPSERT,
602 SINK_TYPE_DEBEZIUM,
603 ))
604 .into());
605 }
606 };
607 return Ok(Some(sink_type));
608 }
609 Ok(None)
610 }
611
612 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
613 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
614 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
615 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
616 {
617 return Err(ErrorCode::InvalidInputSyntax(format!(
618 "`{}` must be true or false",
619 SINK_USER_FORCE_APPEND_ONLY_OPTION
620 ))
621 .into());
622 }
623 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
624 }
625
626 fn derive_sink_type(
632 derived_stream_kind: StreamKind,
633 properties: &WithOptionsSecResolved,
634 format_desc: Option<&SinkFormatDesc>,
635 ) -> Result<SinkType> {
636 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
637 Some(f) => (
638 Some(match f.format {
639 SinkFormat::AppendOnly => SinkType::AppendOnly,
640 SinkFormat::Upsert => SinkType::Upsert,
641 SinkFormat::Debezium => SinkType::Retract,
642 }),
643 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
644 f.options.clone(),
645 ))?,
646 false,
647 ),
648 None => (
649 Self::sink_type_in_prop(properties)?,
650 Self::is_user_force_append_only(properties)?,
651 true,
652 ),
653 };
654
655 if user_force_append_only
656 && user_defined_sink_type.is_some()
657 && user_defined_sink_type != Some(SinkType::AppendOnly)
658 {
659 return Err(ErrorCode::InvalidInputSyntax(
660 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
661 )
662 .into());
663 }
664
665 let user_force_append_only =
666 if user_force_append_only && derived_stream_kind.is_append_only() {
667 false
668 } else {
669 user_force_append_only
670 };
671
672 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
673 return Err(ErrorCode::InvalidInputSyntax(format!(
674 "Cannot force the sink to be append-only without \"{}\".",
675 if syntax_legacy {
676 "type='append-only'"
677 } else {
678 "FORMAT PLAIN"
679 }
680 ))
681 .into());
682 }
683
684 if let Some(user_defined_sink_type) = user_defined_sink_type {
685 match user_defined_sink_type {
686 SinkType::AppendOnly => {
687 if user_force_append_only {
688 return Ok(SinkType::ForceAppendOnly);
689 }
690 if derived_stream_kind != StreamKind::AppendOnly {
691 return Err(ErrorCode::InvalidInputSyntax(format!(
692 "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
693 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
694 derived_stream_kind,
695 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
696 ))
697 .into());
698 }
699 }
700 SinkType::ForceAppendOnly => unreachable!(),
701 SinkType::Upsert => { }
702 SinkType::Retract => {
703 if derived_stream_kind == StreamKind::Upsert {
704 bail_invalid_input_syntax!(
705 "The sink of upsert stream cannot be retract. \
706 Please create a materialized view or sink-into-table with this query before sinking it.",
707 );
708 }
709 }
710 }
711 Ok(user_defined_sink_type)
712 } else {
713 Ok(match derived_stream_kind {
716 StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
719 StreamKind::AppendOnly => SinkType::AppendOnly,
720 })
721 }
722 }
723
724 fn parse_downstream_pk(
730 downstream_pk_str: &str,
731 columns: &[ColumnCatalog],
732 ) -> Result<Vec<usize>> {
733 let downstream_pk = downstream_pk_str.split(',').collect_vec();
735 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
736 for key in downstream_pk {
737 let trimmed_key = key.trim();
738 if trimmed_key.is_empty() {
739 continue;
740 }
741 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
742 }
743 if downstream_pk_indices.is_empty() {
744 bail_invalid_input_syntax!(
745 "Specified primary key should not be empty. \
746 To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
747 );
748 }
749 Ok(downstream_pk_indices)
750 }
751
752 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
755 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
756 }
757}
758
759impl PlanTreeNodeUnary<Stream> for StreamSink {
760 fn input(&self) -> PlanRef {
761 self.input.clone()
762 }
763
764 fn clone_with_input(&self, input: PlanRef) -> Self {
765 Self::new(input, self.sink_desc.clone(), self.log_store_type)
766 }
768}
769
770impl_plan_tree_node_for_unary! { Stream, StreamSink }
771
772impl Distill for StreamSink {
773 fn distill<'a>(&self) -> XmlNode<'a> {
774 let sink_type = if self.sink_desc.sink_type.is_append_only() {
775 "append-only"
776 } else {
777 "upsert"
778 };
779 let column_names = self
780 .sink_desc
781 .columns
782 .iter()
783 .map(|col| col.name_with_hidden().to_string())
784 .map(Pretty::from)
785 .collect();
786 let column_names = Pretty::Array(column_names);
787 let mut vec = Vec::with_capacity(3);
788 vec.push(("type", Pretty::from(sink_type)));
789 vec.push(("columns", column_names));
790 if let Some(pk) = &self.sink_desc.downstream_pk {
791 let sink_pk = IndicesDisplay {
792 indices: pk,
793 schema: self.base.schema(),
794 };
795 vec.push(("downstream_pk", sink_pk.distill()));
796 }
797 childless_record("StreamSink", vec)
798 }
799}
800
801impl StreamNode for StreamSink {
802 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
803 use risingwave_pb::stream_plan::*;
804
805 let table = self
807 .infer_kv_log_store_table_catalog()
808 .with_id(state.gen_table_id_wrapped());
809
810 PbNodeBody::Sink(Box::new(SinkNode {
811 sink_desc: Some(self.sink_desc.to_proto()),
812 table: Some(table.to_internal_table_prost()),
813 log_store_type: self.log_store_type as i32,
814 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
815 }))
816 }
817}
818
819impl ExprRewritable<Stream> for StreamSink {}
820
821impl ExprVisitable for StreamSink {}
822
823#[cfg(test)]
824mod test {
825 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
826 use risingwave_common::types::{DataType, StructType};
827 use risingwave_common::util::iter_util::ZipEqDebug;
828 use risingwave_pb::expr::expr_node::Type;
829
830 use super::{IcebergPartitionInfo, *};
831 use crate::expr::{Expr, ExprImpl};
832
833 fn create_column_catalog() -> Vec<ColumnCatalog> {
834 vec![
835 ColumnCatalog {
836 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
837 is_hidden: false,
838 },
839 ColumnCatalog {
840 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
841 is_hidden: false,
842 },
843 ColumnCatalog {
844 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
845 is_hidden: false,
846 },
847 ]
848 }
849
850 #[test]
851 fn test_iceberg_convert_to_expression() {
852 let partition_type = StructType::new(vec![
853 ("f1", DataType::Int32),
854 ("f2", DataType::Int32),
855 ("f3", DataType::Int32),
856 ("f4", DataType::Int32),
857 ("f5", DataType::Int32),
858 ("f6", DataType::Int32),
859 ("f7", DataType::Int32),
860 ("f8", DataType::Int32),
861 ("f9", DataType::Int32),
862 ]);
863 let partition_fields = vec![
864 ("v1".into(), Transform::Identity),
865 ("v1".into(), Transform::Bucket(10)),
866 ("v1".into(), Transform::Truncate(3)),
867 ("v2".into(), Transform::Year),
868 ("v2".into(), Transform::Month),
869 ("v3".into(), Transform::Day),
870 ("v3".into(), Transform::Hour),
871 ("v1".into(), Transform::Void),
872 ("v3".into(), Transform::Void),
873 ];
874 let partition_info = IcebergPartitionInfo {
875 partition_type: partition_type.clone(),
876 partition_fields: partition_fields.clone(),
877 };
878 let catalog = create_column_catalog();
879 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
880 let actual_expr = actual_expr.as_function_call().unwrap();
881
882 assert_eq!(
883 actual_expr.return_type(),
884 DataType::Struct(partition_type.clone())
885 );
886 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
887 assert_eq!(actual_expr.func_type(), Type::Row);
888
889 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
890 .inputs()
891 .iter()
892 .zip_eq_debug(partition_fields.iter())
893 .zip_eq_debug(partition_type.iter())
894 {
895 match transform {
896 Transform::Identity => {
897 assert!(expr.is_input_ref());
898 assert_eq!(expr.return_type(), *expect_type);
899 }
900 Transform::Void => {
901 assert!(expr.is_literal());
902 assert_eq!(expr.return_type(), *expect_type);
903 }
904 _ => {
905 let expr = expr.as_function_call().unwrap();
906 assert_eq!(expr.func_type(), Type::IcebergTransform);
907 assert_eq!(expr.inputs().len(), 2);
908 assert_eq!(
909 expr.inputs()[0],
910 ExprImpl::literal_varchar(transform.to_string())
911 );
912 }
913 }
914 }
915 }
916}