1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{
22 ColumnCatalog, CreateType, FieldLike, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
23};
24use risingwave_common::types::{DataType, StructType};
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_connector::sink::catalog::desc::SinkDesc;
27use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
28use risingwave_connector::sink::file_sink::fs::FsSink;
29use risingwave_connector::sink::iceberg::ICEBERG_SINK;
30use risingwave_connector::sink::trivial::TABLE_SINK;
31use risingwave_connector::sink::{
32 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
33 SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
34 SINK_USER_IGNORE_DELETE_OPTION,
35};
36use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
37use risingwave_pb::expr::expr_node::Type;
38use risingwave_pb::stream_plan::SinkLogStoreType;
39use risingwave_pb::stream_plan::stream_node::PbNodeBody;
40
41use super::derive::{derive_columns, derive_pk};
42use super::stream::prelude::*;
43use super::utils::{
44 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
45};
46use super::{
47 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
48 StreamSyncLogStore, generic,
49};
50use crate::TableCatalog;
51use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
52use crate::expr::{ExprImpl, FunctionCall, InputRef};
53use crate::optimizer::StreamOptimizedLogicalPlanRoot;
54use crate::optimizer::plan_node::PlanTreeNodeUnary;
55use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
56use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
57use crate::optimizer::property::{Distribution, RequiredDist};
58use crate::stream_fragmenter::BuildFragmentGraphState;
59use crate::utils::WithOptionsSecResolved;
60
61const DOWNSTREAM_PK_KEY: &str = "primary_key";
62const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
63
64pub enum PartitionComputeInfo {
77 Iceberg(IcebergPartitionInfo),
78}
79
80impl PartitionComputeInfo {
81 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
82 match self {
83 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
84 }
85 }
86}
87
88pub struct IcebergPartitionInfo {
89 pub partition_type: StructType,
90 pub partition_fields: Vec<(String, Transform)>,
92}
93
94impl IcebergPartitionInfo {
95 #[inline]
96 fn transform_to_expression(
97 transform: &Transform,
98 col_id: usize,
99 columns: &[ColumnCatalog],
100 result_type: DataType,
101 ) -> Result<ExprImpl> {
102 match transform {
103 Transform::Identity => {
104 if columns[col_id].column_desc.data_type != result_type {
105 return Err(ErrorCode::InvalidInputSyntax(format!(
106 "The partition field {} has type {}, but the partition field is {}",
107 columns[col_id].column_desc.name,
108 columns[col_id].column_desc.data_type,
109 result_type
110 ))
111 .into());
112 }
113 Ok(ExprImpl::InputRef(
114 InputRef::new(col_id, result_type).into(),
115 ))
116 }
117 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
118 _ => Ok(ExprImpl::FunctionCall(
119 FunctionCall::new_unchecked(
120 Type::IcebergTransform,
121 vec![
122 ExprImpl::literal_varchar(transform.to_string()),
123 ExprImpl::InputRef(
124 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
125 .into(),
126 ),
127 ],
128 result_type,
129 )
130 .into(),
131 )),
132 }
133 }
134
135 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
136 let child_exprs = self
137 .partition_fields
138 .into_iter()
139 .zip_eq_debug(self.partition_type.iter())
140 .map(|((field_name, transform), (_, result_type))| {
141 let col_id = find_column_idx_by_name(columns, &field_name)?;
142 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
143 })
144 .collect::<Result<Vec<_>>>()?;
145
146 Ok(ExprImpl::FunctionCall(
147 FunctionCall::new_unchecked(
148 Type::Row,
149 child_exprs,
150 DataType::Struct(self.partition_type),
151 )
152 .into(),
153 ))
154 }
155}
156
157#[inline]
158fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
159 columns
160 .iter()
161 .position(|col| col.column_desc.name == col_name)
162 .ok_or_else(|| {
163 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
164 .into()
165 })
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Hash)]
170pub struct StreamSink {
171 pub base: PlanBase<Stream>,
172 input: PlanRef,
173 sink_desc: SinkDesc,
174 log_store_type: SinkLogStoreType,
175}
176
177impl StreamSink {
178 #[must_use]
179 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
180 let input_kind = input.stream_kind();
185 let kind = match sink_desc.sink_type {
186 SinkType::AppendOnly => {
187 if !sink_desc.ignore_delete {
188 assert_eq!(
189 input_kind,
190 StreamKind::AppendOnly,
191 "{input_kind} stream cannot be used as input of append-only sink",
192 );
193 }
194 StreamKind::AppendOnly
195 }
196 SinkType::Upsert => StreamKind::Upsert,
197 SinkType::Retract => {
198 assert_ne!(
199 input_kind,
200 StreamKind::Upsert,
201 "upsert stream cannot be used as input of retract sink",
202 );
203 StreamKind::Retract
204 }
205 };
206
207 let base = PlanBase::new_stream(
208 input.ctx(),
209 input.schema().clone(),
210 input.stream_key().map(|v| v.to_vec()),
217 input.functional_dependency().clone(),
218 input.distribution().clone(),
219 kind,
220 input.emit_on_window_close(),
221 input.watermark_columns().clone(),
222 input.columns_monotonicity().clone(),
223 );
224
225 Self {
226 base,
227 input,
228 sink_desc,
229 log_store_type,
230 }
231 }
232
233 pub fn sink_desc(&self) -> &SinkDesc {
234 &self.sink_desc
235 }
236
237 fn derive_iceberg_sink_distribution(
238 input: PlanRef,
239 partition_info: Option<PartitionComputeInfo>,
240 columns: &[ColumnCatalog],
241 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
242 if let Some(partition_info) = partition_info {
244 let input_fields = input.schema().fields();
245
246 let mut exprs: Vec<_> = input_fields
247 .iter()
248 .enumerate()
249 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
250 .collect();
251
252 exprs.push(partition_info.convert_to_expression(columns)?);
254 let partition_col_idx = exprs.len() - 1;
255 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
256 Ok((
257 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
258 project.into(),
259 Some(partition_col_idx),
260 ))
261 } else {
262 Ok((
263 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
264 input,
265 None,
266 ))
267 }
268 }
269
270 #[allow(clippy::too_many_arguments)]
271 pub fn create(
272 StreamOptimizedLogicalPlanRoot {
273 plan: mut input,
274 required_dist: user_distributed_by,
275 required_order: user_order_by,
276 out_fields: user_cols,
277 out_names,
278 ..
279 }: StreamOptimizedLogicalPlanRoot,
280 name: String,
281 db_name: String,
282 sink_from_table_name: String,
283 target_table: Option<Arc<TableCatalog>>,
284 target_table_mapping: Option<Vec<Option<usize>>>,
285 definition: String,
286 properties: WithOptionsSecResolved,
287 format_desc: Option<SinkFormatDesc>,
288 partition_info: Option<PartitionComputeInfo>,
289 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
290 ) -> Result<Self> {
291 let (sink_type, ignore_delete) =
292 Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
293
294 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
295 let (pk, _) = derive_pk(
296 input.clone(),
297 user_distributed_by.clone(),
298 user_order_by,
299 &columns,
300 );
301 let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
302
303 let downstream_pk = {
305 let downstream_pk = properties
306 .get(DOWNSTREAM_PK_KEY)
307 .map(|v| Self::parse_downstream_pk(v, &columns))
308 .transpose()?;
309
310 if let Some(t) = &target_table {
311 let user_defined_primary_key_table = t.row_id_index.is_none();
312 let sink_is_append_only = sink_type.is_append_only();
313
314 if !user_defined_primary_key_table && !sink_is_append_only {
315 return Err(RwError::from(ErrorCode::BindError(
316 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
317 )));
318 }
319
320 if t.append_only && !sink_is_append_only {
321 return Err(RwError::from(ErrorCode::BindError(
322 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
323 )));
324 }
325
326 if sink_is_append_only {
327 None
328 } else {
329 let target_table_mapping = target_table_mapping.unwrap();
330 Some(t.pk()
331 .iter()
332 .map(|c| {
333 target_table_mapping[c.column_index].ok_or_else(
334 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
335 })
336 .try_collect::<_, _, RwError>()?)
337 }
338 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
339 && sink_type == SinkType::Upsert
340 && downstream_pk.is_none()
341 {
342 Some(derived_pk.clone())
343 } else if properties.is_iceberg_connector()
344 && sink_type == SinkType::Upsert
345 && downstream_pk.is_none()
346 {
347 Some(derived_pk.clone())
349 } else {
350 downstream_pk
351 }
352 };
353
354 if let Some(pk) = &downstream_pk
360 && pk.is_empty()
361 {
362 bail_invalid_input_syntax!(
363 "Empty primary key is not supported. \
364 Please specify the primary key in WITH options."
365 )
366 }
367
368 if let StreamKind::Upsert = input.stream_kind()
371 && let Some(downstream_pk) = &downstream_pk
372 && !downstream_pk.iter().all(|i| derived_pk.contains(i))
373 {
374 bail_bind_error!(
375 "When sinking from an upsert stream, \
376 the downstream primary key must be the same as or a subset of the one derived from the stream."
377 )
378 }
379
380 if let Some(upstream_table) = &auto_refresh_schema_from_table
381 && let Some(downstream_pk) = &downstream_pk
382 {
383 let upstream_table_pk_col_names = upstream_table
384 .pk
385 .iter()
386 .map(|order| {
387 upstream_table.columns[order.column_index]
388 .column_desc
389 .name()
390 })
391 .collect_vec();
392 let sink_pk_col_names = downstream_pk
393 .iter()
394 .map(|&column_index| columns[column_index].name())
395 .collect_vec();
396 if upstream_table_pk_col_names != sink_pk_col_names {
397 let is_iceberg_row_id_alias = properties.is_iceberg_connector()
398 && upstream_table_pk_col_names.len() == 1
399 && upstream_table_pk_col_names[0] == ROW_ID_COLUMN_NAME
400 && sink_pk_col_names.len() == 1
401 && sink_pk_col_names[0] == RISINGWAVE_ICEBERG_ROW_ID;
402 if !is_iceberg_row_id_alias {
403 return Err(ErrorCode::InvalidInputSyntax(format!(
404 "sink with auto schema change should have same pk as upstream table {:?}, but got {:?}",
405 upstream_table_pk_col_names, sink_pk_col_names
406 ))
407 .into());
408 }
409 }
410 }
411 let mut extra_partition_col_idx = None;
412
413 let required_dist = match input.distribution() {
414 Distribution::Single => RequiredDist::single(),
415 _ => {
416 match properties.get("connector") {
417 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
418 let Some(downstream_pk) = &downstream_pk else {
419 return Err(ErrorCode::InvalidInputSyntax(format!(
420 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
421 key = DOWNSTREAM_PK_KEY
422 )).into());
423 };
424 RequiredDist::hash_shard(downstream_pk)
427 }
428 Some(s) if s == ICEBERG_SINK => {
429 let (required_dist, new_input, partition_col_idx) =
430 Self::derive_iceberg_sink_distribution(
431 input,
432 partition_info,
433 &columns,
434 )?;
435 input = new_input;
436 extra_partition_col_idx = partition_col_idx;
437 required_dist
438 }
439 _ => {
440 assert_matches!(user_distributed_by, RequiredDist::Any);
441 if let Some(downstream_pk) = &downstream_pk {
442 RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
445 } else {
446 RequiredDist::shard_by_key(
447 input.schema().len(),
448 input.expect_stream_key(),
449 )
450 }
451 }
452 }
453 }
454 };
455 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
456 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
457 && input.as_stream_exchange().is_none()
458 {
459 StreamExchange::new_no_shuffle(input).into()
460 } else {
461 input
462 };
463
464 let distribution_key = input.distribution().dist_column_indices().to_vec();
465 let create_type = if input.ctx().session_ctx().config().background_ddl()
466 && plan_can_use_background_ddl(&input)
467 {
468 CreateType::Background
469 } else {
470 CreateType::Foreground
471 };
472 let (properties, secret_refs) = properties.into_parts();
473 let is_exactly_once = properties
474 .get("is_exactly_once")
475 .map(|v| v.to_lowercase() == "true");
476
477 let mut sink_desc = SinkDesc {
478 id: SinkId::placeholder(),
479 name,
480 db_name,
481 sink_from_name: sink_from_table_name,
482 definition,
483 columns,
484 plan_pk: pk,
485 downstream_pk,
486 distribution_key,
487 properties,
488 secret_refs,
489 sink_type,
490 ignore_delete,
491 format_desc,
492 target_table: target_table.as_ref().map(|catalog| catalog.id()),
493 extra_partition_col_idx,
494 create_type,
495 is_exactly_once,
496 auto_refresh_schema_from_table: auto_refresh_schema_from_table
497 .as_ref()
498 .map(|table| table.id),
499 };
500
501 let unsupported_sink = |sink: &str| -> Result<_> {
502 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
503 };
504
505 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
507 Some(connector) => {
508 let connector_type = connector.to_lowercase();
509 match_sink_name_str!(
510 connector_type.as_str(),
511 SinkType,
512 {
513 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
515 unsupported_sink(TABLE_SINK)
516 } else {
517 SinkType::set_default_commit_checkpoint_interval(
518 &mut sink_desc,
519 &input.ctx().session_ctx().config().sink_decouple(),
520 )?;
521 let support_schema_change = SinkType::support_schema_change();
522 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
523 return Err(ErrorCode::InvalidInputSyntax(format!(
524 "{} sink does not support schema change",
525 connector_type
526 ))
527 .into());
528 }
529 SinkType::is_sink_decouple(
530 &input.ctx().session_ctx().config().sink_decouple(),
531 )
532 .map_err(Into::into)
533 }
534 },
535 |other: &str| unsupported_sink(other)
536 )?
537 }
538 None => {
539 return Err(ErrorCode::InvalidInputSyntax(
540 "connector not specified when create sink".to_owned(),
541 )
542 .into());
543 }
544 };
545 let hint_string =
546 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
547 if !sink_decouple {
548 if sink_desc.is_file_sink() {
550 return Err(ErrorCode::NotSupported(
551 "File sink can only be created with sink_decouple enabled.".to_owned(),
552 hint_string(true),
553 )
554 .into());
555 }
556
557 if sink_desc.is_exactly_once.is_none()
558 && let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY)
559 {
560 let connector_type = connector.to_lowercase();
561 if connector_type == ICEBERG_SINK {
562 sink_desc
565 .properties
566 .insert("is_exactly_once".to_owned(), "false".to_owned());
567 }
568 }
569 }
570 let log_store_type = if sink_decouple {
571 SinkLogStoreType::KvLogStore
572 } else {
573 SinkLogStoreType::InMemoryLogStore
574 };
575
576 let input = if sink_decouple && target_table.is_some() {
578 StreamSyncLogStore::new(input).into()
579 } else {
580 input
581 };
582
583 Ok(Self::new(input, sink_desc, log_store_type))
584 }
585
586 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
587 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
588 let sink_type = match sink_type.as_str() {
589 SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
590 SINK_TYPE_UPSERT => {
591 if properties.is_iceberg_connector() {
592 SinkType::Retract
594 } else {
595 SinkType::Upsert
596 }
597 }
598 SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
599 _ => {
600 return Err(ErrorCode::InvalidInputSyntax(format!(
601 "`{}` must be {}, {}, {}, or {}",
602 SINK_TYPE_OPTION,
603 SINK_TYPE_APPEND_ONLY,
604 SINK_TYPE_RETRACT,
605 SINK_TYPE_UPSERT,
606 SINK_TYPE_DEBEZIUM,
607 ))
608 .into());
609 }
610 };
611 return Ok(Some(sink_type));
612 }
613 Ok(None)
614 }
615
616 fn is_user_ignore_delete(properties: &WithOptionsSecResolved) -> Result<bool> {
618 let has_ignore_delete = properties.contains_key(SINK_USER_IGNORE_DELETE_OPTION);
619 let has_force_append_only = properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION);
620
621 if has_ignore_delete && has_force_append_only {
622 return Err(ErrorCode::InvalidInputSyntax(format!(
623 "`{}` is an alias of `{}`, only one of them can be specified.",
624 SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_USER_IGNORE_DELETE_OPTION
625 ))
626 .into());
627 }
628
629 let key = if has_ignore_delete {
630 SINK_USER_IGNORE_DELETE_OPTION
631 } else if has_force_append_only {
632 SINK_USER_FORCE_APPEND_ONLY_OPTION
633 } else {
634 return Ok(false);
635 };
636
637 if properties.value_eq_ignore_case(key, "true") {
638 Ok(true)
639 } else if properties.value_eq_ignore_case(key, "false") {
640 Ok(false)
641 } else {
642 Err(ErrorCode::InvalidInputSyntax(format!("`{key}` must be true or false")).into())
643 }
644 }
645
646 fn derive_sink_type(
655 derived_stream_kind: StreamKind,
656 properties: &WithOptionsSecResolved,
657 format_desc: Option<&SinkFormatDesc>,
658 ) -> Result<(SinkType, bool)> {
659 let (user_defined_sink_type, user_ignore_delete, syntax_legacy) = match format_desc {
660 Some(f) => (
661 Some(match f.format {
662 SinkFormat::AppendOnly => SinkType::AppendOnly,
663 SinkFormat::Upsert => SinkType::Upsert,
664 SinkFormat::Debezium => SinkType::Retract,
665 }),
666 Self::is_user_ignore_delete(&WithOptionsSecResolved::without_secrets(
667 f.options.clone(),
668 ))?,
669 false,
670 ),
671 None => (
672 Self::sink_type_in_prop(properties)?,
673 Self::is_user_ignore_delete(properties)?,
674 true,
675 ),
676 };
677
678 if let Some(user_defined_sink_type) = user_defined_sink_type {
679 match user_defined_sink_type {
680 SinkType::AppendOnly => {
681 if derived_stream_kind != StreamKind::AppendOnly && !user_ignore_delete {
682 return Err(ErrorCode::InvalidInputSyntax(format!(
683 "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
684 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
685 derived_stream_kind,
686 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
687 ))
688 .into());
689 }
690 }
691 SinkType::Upsert => { }
692 SinkType::Retract => {
693 if user_ignore_delete {
694 bail_invalid_input_syntax!(
695 "Retract sink type does not support `ignore_delete`. \
696 Please use `type = 'append-only'` or `type = 'upsert'` instead.",
697 );
698 }
699 if derived_stream_kind == StreamKind::Upsert {
700 bail_invalid_input_syntax!(
701 "The sink of upsert stream cannot be retract. \
702 Please create a materialized view or sink-into-table with this query before sinking it.",
703 );
704 }
705 }
706 }
707 Ok((user_defined_sink_type, user_ignore_delete))
708 } else {
709 let sink_type = match derived_stream_kind {
712 StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
715 StreamKind::AppendOnly => SinkType::AppendOnly,
716 };
717 Ok((sink_type, user_ignore_delete))
718 }
719 }
720
721 fn parse_downstream_pk(
727 downstream_pk_str: &str,
728 columns: &[ColumnCatalog],
729 ) -> Result<Vec<usize>> {
730 let downstream_pk = downstream_pk_str.split(',').collect_vec();
732 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
733 for key in downstream_pk {
734 let trimmed_key = key.trim();
735 if trimmed_key.is_empty() {
736 continue;
737 }
738 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
739 }
740 if downstream_pk_indices.is_empty() {
741 bail_invalid_input_syntax!(
742 "Specified primary key should not be empty. \
743 To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
744 );
745 }
746 Ok(downstream_pk_indices)
747 }
748
749 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
752 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
753 }
754}
755
756impl PlanTreeNodeUnary<Stream> for StreamSink {
757 fn input(&self) -> PlanRef {
758 self.input.clone()
759 }
760
761 fn clone_with_input(&self, input: PlanRef) -> Self {
762 Self::new(input, self.sink_desc.clone(), self.log_store_type)
763 }
765}
766
767impl_plan_tree_node_for_unary! { Stream, StreamSink }
768
769impl Distill for StreamSink {
770 fn distill<'a>(&self) -> XmlNode<'a> {
771 let sink_type = if self.sink_desc.sink_type.is_append_only() {
772 "append-only"
773 } else {
774 "upsert"
775 };
776 let column_names = self
777 .sink_desc
778 .columns
779 .iter()
780 .map(|col| col.name_with_hidden().to_string())
781 .map(Pretty::from)
782 .collect();
783 let column_names = Pretty::Array(column_names);
784 let mut vec = Vec::with_capacity(3);
785 vec.push(("type", Pretty::from(sink_type)));
786 vec.push(("columns", column_names));
787 if let Some(pk) = &self.sink_desc.downstream_pk {
788 let sink_pk = IndicesDisplay {
789 indices: pk,
790 schema: self.base.schema(),
791 };
792 vec.push(("downstream_pk", sink_pk.distill()));
793 }
794 childless_record("StreamSink", vec)
795 }
796}
797
798impl StreamNode for StreamSink {
799 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
800 use risingwave_pb::stream_plan::*;
801
802 let table = self
804 .infer_kv_log_store_table_catalog()
805 .with_id(state.gen_table_id_wrapped());
806
807 PbNodeBody::Sink(Box::new(SinkNode {
808 sink_desc: Some(self.sink_desc.to_proto()),
809 table: Some(table.to_internal_table_prost()),
810 log_store_type: self.log_store_type as i32,
811 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
812 }))
813 }
814}
815
816impl ExprRewritable<Stream> for StreamSink {}
817
818impl ExprVisitable for StreamSink {}
819
820#[cfg(test)]
821mod test {
822 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
823 use risingwave_common::types::{DataType, StructType};
824 use risingwave_common::util::iter_util::ZipEqDebug;
825 use risingwave_pb::expr::expr_node::Type;
826
827 use super::{IcebergPartitionInfo, *};
828 use crate::expr::{Expr, ExprImpl};
829
830 fn create_column_catalog() -> Vec<ColumnCatalog> {
831 vec![
832 ColumnCatalog {
833 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
834 is_hidden: false,
835 },
836 ColumnCatalog {
837 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
838 is_hidden: false,
839 },
840 ColumnCatalog {
841 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
842 is_hidden: false,
843 },
844 ]
845 }
846
847 #[test]
848 fn test_iceberg_convert_to_expression() {
849 let partition_type = StructType::new(vec![
850 ("f1", DataType::Int32),
851 ("f2", DataType::Int32),
852 ("f3", DataType::Int32),
853 ("f4", DataType::Int32),
854 ("f5", DataType::Int32),
855 ("f6", DataType::Int32),
856 ("f7", DataType::Int32),
857 ("f8", DataType::Int32),
858 ("f9", DataType::Int32),
859 ]);
860 let partition_fields = vec![
861 ("v1".into(), Transform::Identity),
862 ("v1".into(), Transform::Bucket(10)),
863 ("v1".into(), Transform::Truncate(3)),
864 ("v2".into(), Transform::Year),
865 ("v2".into(), Transform::Month),
866 ("v3".into(), Transform::Day),
867 ("v3".into(), Transform::Hour),
868 ("v1".into(), Transform::Void),
869 ("v3".into(), Transform::Void),
870 ];
871 let partition_info = IcebergPartitionInfo {
872 partition_type: partition_type.clone(),
873 partition_fields: partition_fields.clone(),
874 };
875 let catalog = create_column_catalog();
876 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
877 let actual_expr = actual_expr.as_function_call().unwrap();
878
879 assert_eq!(
880 actual_expr.return_type(),
881 DataType::Struct(partition_type.clone())
882 );
883 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
884 assert_eq!(actual_expr.func_type(), Type::Row);
885
886 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
887 .inputs()
888 .iter()
889 .zip_eq_debug(partition_fields.iter())
890 .zip_eq_debug(partition_type.iter())
891 {
892 match transform {
893 Transform::Identity => {
894 assert!(expr.is_input_ref());
895 assert_eq!(expr.return_type(), *expect_type);
896 }
897 Transform::Void => {
898 assert!(expr.is_literal());
899 assert_eq!(expr.return_type(), *expect_type);
900 }
901 _ => {
902 let expr = expr.as_function_call().unwrap();
903 assert_eq!(expr.func_type(), Type::IcebergTransform);
904 assert_eq!(expr.inputs().len(), 2);
905 assert_eq!(
906 expr.inputs()[0],
907 ExprImpl::literal_varchar(transform.to_string())
908 );
909 }
910 }
911 }
912 }
913}