1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::sink::catalog::desc::SinkDesc;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::trivial::TABLE_SINK;
29use risingwave_connector::sink::{
30 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
31 SINK_TYPE_RETRACT, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
32};
33use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45 StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
60
61pub enum PartitionComputeInfo {
74 Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79 match self {
80 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81 }
82 }
83}
84
85pub struct IcebergPartitionInfo {
86 pub partition_type: StructType,
87 pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92 #[inline]
93 fn transform_to_expression(
94 transform: &Transform,
95 col_id: usize,
96 columns: &[ColumnCatalog],
97 result_type: DataType,
98 ) -> Result<ExprImpl> {
99 match transform {
100 Transform::Identity => {
101 if columns[col_id].column_desc.data_type != result_type {
102 return Err(ErrorCode::InvalidInputSyntax(format!(
103 "The partition field {} has type {}, but the partition field is {}",
104 columns[col_id].column_desc.name,
105 columns[col_id].column_desc.data_type,
106 result_type
107 ))
108 .into());
109 }
110 Ok(ExprImpl::InputRef(
111 InputRef::new(col_id, result_type).into(),
112 ))
113 }
114 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
115 _ => Ok(ExprImpl::FunctionCall(
116 FunctionCall::new_unchecked(
117 Type::IcebergTransform,
118 vec![
119 ExprImpl::literal_varchar(transform.to_string()),
120 ExprImpl::InputRef(
121 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
122 .into(),
123 ),
124 ],
125 result_type,
126 )
127 .into(),
128 )),
129 }
130 }
131
132 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
133 let child_exprs = self
134 .partition_fields
135 .into_iter()
136 .zip_eq_debug(self.partition_type.iter())
137 .map(|((field_name, transform), (_, result_type))| {
138 let col_id = find_column_idx_by_name(columns, &field_name)?;
139 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
140 })
141 .collect::<Result<Vec<_>>>()?;
142
143 Ok(ExprImpl::FunctionCall(
144 FunctionCall::new_unchecked(
145 Type::Row,
146 child_exprs,
147 DataType::Struct(self.partition_type),
148 )
149 .into(),
150 ))
151 }
152}
153
154#[inline]
155fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
156 columns
157 .iter()
158 .position(|col| col.column_desc.name == col_name)
159 .ok_or_else(|| {
160 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
161 .into()
162 })
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Hash)]
167pub struct StreamSink {
168 pub base: PlanBase<Stream>,
169 input: PlanRef,
170 sink_desc: SinkDesc,
171 log_store_type: SinkLogStoreType,
172}
173
174impl StreamSink {
175 #[must_use]
176 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
177 let input_kind = input.stream_kind();
182 let kind = match sink_desc.sink_type {
183 SinkType::AppendOnly => {
184 assert_eq!(
185 input_kind,
186 StreamKind::AppendOnly,
187 "{input_kind} stream cannot be used as input of append-only sink",
188 );
189 StreamKind::AppendOnly
190 }
191 SinkType::ForceAppendOnly => StreamKind::AppendOnly,
192 SinkType::Upsert => StreamKind::Upsert,
193 SinkType::Retract => {
194 assert_ne!(
195 input_kind,
196 StreamKind::Upsert,
197 "upsert stream cannot be used as input of retract sink",
198 );
199 StreamKind::Retract
200 }
201 };
202
203 let base = PlanBase::new_stream(
204 input.ctx(),
205 input.schema().clone(),
206 input.stream_key().map(|v| v.to_vec()),
213 input.functional_dependency().clone(),
214 input.distribution().clone(),
215 kind,
216 input.emit_on_window_close(),
217 input.watermark_columns().clone(),
218 input.columns_monotonicity().clone(),
219 );
220
221 Self {
222 base,
223 input,
224 sink_desc,
225 log_store_type,
226 }
227 }
228
229 pub fn sink_desc(&self) -> &SinkDesc {
230 &self.sink_desc
231 }
232
233 fn derive_iceberg_sink_distribution(
234 input: PlanRef,
235 partition_info: Option<PartitionComputeInfo>,
236 columns: &[ColumnCatalog],
237 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
238 if let Some(partition_info) = partition_info {
240 let input_fields = input.schema().fields();
241
242 let mut exprs: Vec<_> = input_fields
243 .iter()
244 .enumerate()
245 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
246 .collect();
247
248 exprs.push(partition_info.convert_to_expression(columns)?);
250 let partition_col_idx = exprs.len() - 1;
251 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
252 Ok((
253 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
254 project.into(),
255 Some(partition_col_idx),
256 ))
257 } else {
258 Ok((
259 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
260 input,
261 None,
262 ))
263 }
264 }
265
266 #[allow(clippy::too_many_arguments)]
267 pub fn create(
268 StreamOptimizedLogicalPlanRoot {
269 plan: mut input,
270 required_dist: user_distributed_by,
271 required_order: user_order_by,
272 out_fields: user_cols,
273 out_names,
274 ..
275 }: StreamOptimizedLogicalPlanRoot,
276 name: String,
277 db_name: String,
278 sink_from_table_name: String,
279 target_table: Option<Arc<TableCatalog>>,
280 target_table_mapping: Option<Vec<Option<usize>>>,
281 definition: String,
282 properties: WithOptionsSecResolved,
283 format_desc: Option<SinkFormatDesc>,
284 partition_info: Option<PartitionComputeInfo>,
285 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
286 ) -> Result<Self> {
287 let sink_type =
288 Self::derive_sink_type(input.stream_kind(), &properties, format_desc.as_ref())?;
289
290 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
291 let (pk, _) = derive_pk(
292 input.clone(),
293 user_distributed_by.clone(),
294 user_order_by,
295 &columns,
296 );
297 let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
298
299 let downstream_pk = {
301 let downstream_pk = properties
302 .get(DOWNSTREAM_PK_KEY)
303 .map(|v| Self::parse_downstream_pk(v, &columns))
304 .transpose()?;
305
306 if let Some(t) = &target_table {
307 let user_defined_primary_key_table = t.row_id_index.is_none();
308 let sink_is_append_only = sink_type.is_append_only();
309
310 if !user_defined_primary_key_table && !sink_is_append_only {
311 return Err(RwError::from(ErrorCode::BindError(
312 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
313 )));
314 }
315
316 if t.append_only && !sink_is_append_only {
317 return Err(RwError::from(ErrorCode::BindError(
318 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
319 )));
320 }
321
322 if sink_is_append_only {
323 None
324 } else {
325 let target_table_mapping = target_table_mapping.unwrap();
326 Some(t.pk()
327 .iter()
328 .map(|c| {
329 target_table_mapping[c.column_index].ok_or_else(
330 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
331 })
332 .try_collect::<_, _, RwError>()?)
333 }
334 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
335 && sink_type == SinkType::Upsert
336 && downstream_pk.is_none()
337 {
338 Some(derived_pk.clone())
339 } else if properties.is_iceberg_connector()
340 && sink_type == SinkType::Upsert
341 && downstream_pk.is_none()
342 {
343 Some(derived_pk.clone())
345 } else {
346 downstream_pk
347 }
348 };
349
350 if let Some(pk) = &downstream_pk
356 && pk.is_empty()
357 {
358 bail_invalid_input_syntax!(
359 "Empty primary key is not supported. \
360 Please specify the primary key in WITH options."
361 )
362 }
363
364 if let StreamKind::Upsert = input.stream_kind()
367 && let Some(downstream_pk) = &downstream_pk
368 && !downstream_pk.iter().all(|i| derived_pk.contains(i))
369 {
370 bail_bind_error!(
371 "When sinking from an upsert stream, \
372 the downstream primary key must be the same as or a subset of the one derived from the stream."
373 )
374 }
375
376 if let Some(upstream_table) = &auto_refresh_schema_from_table
377 && let Some(downstream_pk) = &downstream_pk
378 {
379 let upstream_table_pk_col_names = upstream_table
380 .pk
381 .iter()
382 .map(|order| {
383 upstream_table.columns[order.column_index]
384 .column_desc
385 .name()
386 })
387 .collect_vec();
388 let sink_pk_col_names = downstream_pk
389 .iter()
390 .map(|&column_index| columns[column_index].name())
391 .collect_vec();
392 if upstream_table_pk_col_names != sink_pk_col_names {
393 return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
394 }
395 }
396 let mut extra_partition_col_idx = None;
397
398 let required_dist = match input.distribution() {
399 Distribution::Single => RequiredDist::single(),
400 _ => {
401 match properties.get("connector") {
402 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
403 let Some(downstream_pk) = &downstream_pk else {
404 return Err(ErrorCode::InvalidInputSyntax(format!(
405 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
406 key = DOWNSTREAM_PK_KEY
407 )).into());
408 };
409 RequiredDist::hash_shard(downstream_pk)
412 }
413 Some(s) if s == ICEBERG_SINK => {
414 let (required_dist, new_input, partition_col_idx) =
415 Self::derive_iceberg_sink_distribution(
416 input,
417 partition_info,
418 &columns,
419 )?;
420 input = new_input;
421 extra_partition_col_idx = partition_col_idx;
422 required_dist
423 }
424 _ => {
425 assert_matches!(user_distributed_by, RequiredDist::Any);
426 if let Some(downstream_pk) = &downstream_pk {
427 RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
430 } else {
431 RequiredDist::shard_by_key(
432 input.schema().len(),
433 input.expect_stream_key(),
434 )
435 }
436 }
437 }
438 }
439 };
440 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
441 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
442 && input.as_stream_exchange().is_none()
443 {
444 StreamExchange::new_no_shuffle(input).into()
445 } else {
446 input
447 };
448
449 let distribution_key = input.distribution().dist_column_indices().to_vec();
450 let create_type = if input.ctx().session_ctx().config().background_ddl()
451 && plan_can_use_background_ddl(&input)
452 {
453 CreateType::Background
454 } else {
455 CreateType::Foreground
456 };
457 let (properties, secret_refs) = properties.into_parts();
458 let is_exactly_once = properties
459 .get("is_exactly_once")
460 .map(|v| v.to_lowercase() == "true");
461
462 let mut sink_desc = SinkDesc {
463 id: SinkId::placeholder(),
464 name,
465 db_name,
466 sink_from_name: sink_from_table_name,
467 definition,
468 columns,
469 plan_pk: pk,
470 downstream_pk,
471 distribution_key,
472 properties,
473 secret_refs,
474 sink_type,
475 format_desc,
476 target_table: target_table.as_ref().map(|catalog| catalog.id()),
477 extra_partition_col_idx,
478 create_type,
479 is_exactly_once,
480 auto_refresh_schema_from_table: auto_refresh_schema_from_table
481 .as_ref()
482 .map(|table| table.id),
483 };
484
485 let unsupported_sink = |sink: &str| -> Result<_> {
486 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
487 };
488
489 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
491 Some(connector) => {
492 let connector_type = connector.to_lowercase();
493 match_sink_name_str!(
494 connector_type.as_str(),
495 SinkType,
496 {
497 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
499 unsupported_sink(TABLE_SINK)
500 } else {
501 SinkType::set_default_commit_checkpoint_interval(
502 &mut sink_desc,
503 &input.ctx().session_ctx().config().sink_decouple(),
504 )?;
505 let support_schema_change = SinkType::support_schema_change();
506 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
507 return Err(ErrorCode::InvalidInputSyntax(format!(
508 "{} sink does not support schema change",
509 connector_type
510 ))
511 .into());
512 }
513 SinkType::is_sink_decouple(
514 &input.ctx().session_ctx().config().sink_decouple(),
515 )
516 .map_err(Into::into)
517 }
518 },
519 |other: &str| unsupported_sink(other)
520 )?
521 }
522 None => {
523 return Err(ErrorCode::InvalidInputSyntax(
524 "connector not specified when create sink".to_owned(),
525 )
526 .into());
527 }
528 };
529 let hint_string =
530 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
531 if !sink_decouple {
532 if sink_desc.is_file_sink() {
534 return Err(ErrorCode::NotSupported(
535 "File sink can only be created with sink_decouple enabled.".to_owned(),
536 hint_string(true),
537 )
538 .into());
539 }
540
541 let is_exactly_once = match sink_desc.is_exactly_once {
542 Some(v) => v,
543 None => {
544 if let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
545 let connector_type = connector.to_lowercase();
546 if connector_type == ICEBERG_SINK {
547 sink_desc
550 .properties
551 .insert("is_exactly_once".to_owned(), "false".to_owned());
552 }
553 }
554 false
555 }
556 };
557
558 if is_exactly_once {
559 return Err(ErrorCode::NotSupported(
560 "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
561 hint_string(true),
562 )
563 .into());
564 }
565 }
566 if sink_decouple && auto_refresh_schema_from_table.is_some() {
567 return Err(ErrorCode::NotSupported(
568 "sink with auto schema refresh can only be created with sink_decouple disabled."
569 .to_owned(),
570 hint_string(false),
571 )
572 .into());
573 }
574 let log_store_type = if sink_decouple {
575 SinkLogStoreType::KvLogStore
576 } else {
577 SinkLogStoreType::InMemoryLogStore
578 };
579
580 let input = if sink_decouple && target_table.is_some() {
582 StreamSyncLogStore::new(input).into()
583 } else {
584 input
585 };
586
587 Ok(Self::new(input, sink_desc, log_store_type))
588 }
589
590 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
591 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
592 let sink_type = match sink_type.as_str() {
593 SINK_TYPE_APPEND_ONLY => SinkType::AppendOnly,
594 SINK_TYPE_UPSERT => {
595 if properties.is_iceberg_connector() {
596 SinkType::Retract
598 } else {
599 SinkType::Upsert
600 }
601 }
602 SINK_TYPE_RETRACT | SINK_TYPE_DEBEZIUM => SinkType::Retract,
603 _ => {
604 return Err(ErrorCode::InvalidInputSyntax(format!(
605 "`{}` must be {}, {}, {}, or {}",
606 SINK_TYPE_OPTION,
607 SINK_TYPE_APPEND_ONLY,
608 SINK_TYPE_RETRACT,
609 SINK_TYPE_UPSERT,
610 SINK_TYPE_DEBEZIUM,
611 ))
612 .into());
613 }
614 };
615 return Ok(Some(sink_type));
616 }
617 Ok(None)
618 }
619
620 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
621 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
622 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
623 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
624 {
625 return Err(ErrorCode::InvalidInputSyntax(format!(
626 "`{}` must be true or false",
627 SINK_USER_FORCE_APPEND_ONLY_OPTION
628 ))
629 .into());
630 }
631 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
632 }
633
634 fn derive_sink_type(
640 derived_stream_kind: StreamKind,
641 properties: &WithOptionsSecResolved,
642 format_desc: Option<&SinkFormatDesc>,
643 ) -> Result<SinkType> {
644 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
645 Some(f) => (
646 Some(match f.format {
647 SinkFormat::AppendOnly => SinkType::AppendOnly,
648 SinkFormat::Upsert => SinkType::Upsert,
649 SinkFormat::Debezium => SinkType::Retract,
650 }),
651 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
652 f.options.clone(),
653 ))?,
654 false,
655 ),
656 None => (
657 Self::sink_type_in_prop(properties)?,
658 Self::is_user_force_append_only(properties)?,
659 true,
660 ),
661 };
662
663 if user_force_append_only
664 && user_defined_sink_type.is_some()
665 && user_defined_sink_type != Some(SinkType::AppendOnly)
666 {
667 return Err(ErrorCode::InvalidInputSyntax(
668 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
669 )
670 .into());
671 }
672
673 let user_force_append_only =
674 if user_force_append_only && derived_stream_kind.is_append_only() {
675 false
676 } else {
677 user_force_append_only
678 };
679
680 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
681 return Err(ErrorCode::InvalidInputSyntax(format!(
682 "Cannot force the sink to be append-only without \"{}\".",
683 if syntax_legacy {
684 "type='append-only'"
685 } else {
686 "FORMAT PLAIN"
687 }
688 ))
689 .into());
690 }
691
692 if let Some(user_defined_sink_type) = user_defined_sink_type {
693 match user_defined_sink_type {
694 SinkType::AppendOnly => {
695 if user_force_append_only {
696 return Ok(SinkType::ForceAppendOnly);
697 }
698 if derived_stream_kind != StreamKind::AppendOnly {
699 return Err(ErrorCode::InvalidInputSyntax(format!(
700 "The sink of {} stream cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
701 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
702 derived_stream_kind,
703 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
704 ))
705 .into());
706 }
707 }
708 SinkType::ForceAppendOnly => unreachable!(),
709 SinkType::Upsert => { }
710 SinkType::Retract => {
711 if derived_stream_kind == StreamKind::Upsert {
712 bail_invalid_input_syntax!(
713 "The sink of upsert stream cannot be retract. \
714 Please create a materialized view or sink-into-table with this query before sinking it.",
715 );
716 }
717 }
718 }
719 Ok(user_defined_sink_type)
720 } else {
721 Ok(match derived_stream_kind {
724 StreamKind::Retract | StreamKind::Upsert => SinkType::Upsert,
727 StreamKind::AppendOnly => SinkType::AppendOnly,
728 })
729 }
730 }
731
732 fn parse_downstream_pk(
738 downstream_pk_str: &str,
739 columns: &[ColumnCatalog],
740 ) -> Result<Vec<usize>> {
741 let downstream_pk = downstream_pk_str.split(',').collect_vec();
743 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
744 for key in downstream_pk {
745 let trimmed_key = key.trim();
746 if trimmed_key.is_empty() {
747 continue;
748 }
749 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
750 }
751 if downstream_pk_indices.is_empty() {
752 bail_invalid_input_syntax!(
753 "Specified primary key should not be empty. \
754 To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
755 );
756 }
757 Ok(downstream_pk_indices)
758 }
759
760 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
763 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
764 }
765}
766
767impl PlanTreeNodeUnary<Stream> for StreamSink {
768 fn input(&self) -> PlanRef {
769 self.input.clone()
770 }
771
772 fn clone_with_input(&self, input: PlanRef) -> Self {
773 Self::new(input, self.sink_desc.clone(), self.log_store_type)
774 }
776}
777
778impl_plan_tree_node_for_unary! { Stream, StreamSink }
779
780impl Distill for StreamSink {
781 fn distill<'a>(&self) -> XmlNode<'a> {
782 let sink_type = if self.sink_desc.sink_type.is_append_only() {
783 "append-only"
784 } else {
785 "upsert"
786 };
787 let column_names = self
788 .sink_desc
789 .columns
790 .iter()
791 .map(|col| col.name_with_hidden().to_string())
792 .map(Pretty::from)
793 .collect();
794 let column_names = Pretty::Array(column_names);
795 let mut vec = Vec::with_capacity(3);
796 vec.push(("type", Pretty::from(sink_type)));
797 vec.push(("columns", column_names));
798 if let Some(pk) = &self.sink_desc.downstream_pk {
799 let sink_pk = IndicesDisplay {
800 indices: pk,
801 schema: self.base.schema(),
802 };
803 vec.push(("downstream_pk", sink_pk.distill()));
804 }
805 childless_record("StreamSink", vec)
806 }
807}
808
809impl StreamNode for StreamSink {
810 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
811 use risingwave_pb::stream_plan::*;
812
813 let table = self
815 .infer_kv_log_store_table_catalog()
816 .with_id(state.gen_table_id_wrapped());
817
818 PbNodeBody::Sink(Box::new(SinkNode {
819 sink_desc: Some(self.sink_desc.to_proto()),
820 table: Some(table.to_internal_table_prost()),
821 log_store_type: self.log_store_type as i32,
822 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
823 }))
824 }
825}
826
827impl ExprRewritable<Stream> for StreamSink {}
828
829impl ExprVisitable for StreamSink {}
830
831#[cfg(test)]
832mod test {
833 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
834 use risingwave_common::types::{DataType, StructType};
835 use risingwave_common::util::iter_util::ZipEqDebug;
836 use risingwave_pb::expr::expr_node::Type;
837
838 use super::{IcebergPartitionInfo, *};
839 use crate::expr::{Expr, ExprImpl};
840
841 fn create_column_catalog() -> Vec<ColumnCatalog> {
842 vec![
843 ColumnCatalog {
844 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
845 is_hidden: false,
846 },
847 ColumnCatalog {
848 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
849 is_hidden: false,
850 },
851 ColumnCatalog {
852 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
853 is_hidden: false,
854 },
855 ]
856 }
857
858 #[test]
859 fn test_iceberg_convert_to_expression() {
860 let partition_type = StructType::new(vec![
861 ("f1", DataType::Int32),
862 ("f2", DataType::Int32),
863 ("f3", DataType::Int32),
864 ("f4", DataType::Int32),
865 ("f5", DataType::Int32),
866 ("f6", DataType::Int32),
867 ("f7", DataType::Int32),
868 ("f8", DataType::Int32),
869 ("f9", DataType::Int32),
870 ]);
871 let partition_fields = vec![
872 ("v1".into(), Transform::Identity),
873 ("v1".into(), Transform::Bucket(10)),
874 ("v1".into(), Transform::Truncate(3)),
875 ("v2".into(), Transform::Year),
876 ("v2".into(), Transform::Month),
877 ("v3".into(), Transform::Day),
878 ("v3".into(), Transform::Hour),
879 ("v1".into(), Transform::Void),
880 ("v3".into(), Transform::Void),
881 ];
882 let partition_info = IcebergPartitionInfo {
883 partition_type: partition_type.clone(),
884 partition_fields: partition_fields.clone(),
885 };
886 let catalog = create_column_catalog();
887 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
888 let actual_expr = actual_expr.as_function_call().unwrap();
889
890 assert_eq!(
891 actual_expr.return_type(),
892 DataType::Struct(partition_type.clone())
893 );
894 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
895 assert_eq!(actual_expr.func_type(), Type::Row);
896
897 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
898 .inputs()
899 .iter()
900 .zip_eq_debug(partition_fields.iter())
901 .zip_eq_debug(partition_type.iter())
902 {
903 match transform {
904 Transform::Identity => {
905 assert!(expr.is_input_ref());
906 assert_eq!(expr.return_type(), *expect_type);
907 }
908 Transform::Void => {
909 assert!(expr.is_literal());
910 assert_eq!(expr.return_type(), *expect_type);
911 }
912 _ => {
913 let expr = expr.as_function_call().unwrap();
914 assert_eq!(expr.func_type(), Type::IcebergTransform);
915 assert_eq!(expr.inputs().len(), 2);
916 assert_eq!(
917 expr.inputs()[0],
918 ExprImpl::literal_varchar(transform.to_string())
919 );
920 }
921 }
922 }
923 }
924}