1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::sink::catalog::desc::SinkDesc;
25use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::iceberg::ICEBERG_SINK;
28use risingwave_connector::sink::trivial::TABLE_SINK;
29use risingwave_connector::sink::{
30 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
31 SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
32};
33use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45 StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError, bail_bind_error, bail_invalid_input_syntax};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
60
61pub enum PartitionComputeInfo {
74 Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79 match self {
80 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81 }
82 }
83}
84
85pub struct IcebergPartitionInfo {
86 pub partition_type: StructType,
87 pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92 #[inline]
93 fn transform_to_expression(
94 transform: &Transform,
95 col_id: usize,
96 columns: &[ColumnCatalog],
97 result_type: DataType,
98 ) -> Result<ExprImpl> {
99 match transform {
100 Transform::Identity => {
101 if columns[col_id].column_desc.data_type != result_type {
102 return Err(ErrorCode::InvalidInputSyntax(format!(
103 "The partition field {} has type {}, but the partition field is {}",
104 columns[col_id].column_desc.name,
105 columns[col_id].column_desc.data_type,
106 result_type
107 ))
108 .into());
109 }
110 Ok(ExprImpl::InputRef(
111 InputRef::new(col_id, result_type).into(),
112 ))
113 }
114 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
115 _ => Ok(ExprImpl::FunctionCall(
116 FunctionCall::new_unchecked(
117 Type::IcebergTransform,
118 vec![
119 ExprImpl::literal_varchar(transform.to_string()),
120 ExprImpl::InputRef(
121 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
122 .into(),
123 ),
124 ],
125 result_type,
126 )
127 .into(),
128 )),
129 }
130 }
131
132 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
133 let child_exprs = self
134 .partition_fields
135 .into_iter()
136 .zip_eq_debug(self.partition_type.iter())
137 .map(|((field_name, transform), (_, result_type))| {
138 let col_id = find_column_idx_by_name(columns, &field_name)?;
139 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
140 })
141 .collect::<Result<Vec<_>>>()?;
142
143 Ok(ExprImpl::FunctionCall(
144 FunctionCall::new_unchecked(
145 Type::Row,
146 child_exprs,
147 DataType::Struct(self.partition_type),
148 )
149 .into(),
150 ))
151 }
152}
153
154#[inline]
155fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
156 columns
157 .iter()
158 .position(|col| col.column_desc.name == col_name)
159 .ok_or_else(|| {
160 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
161 .into()
162 })
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Hash)]
167pub struct StreamSink {
168 pub base: PlanBase<Stream>,
169 input: PlanRef,
170 sink_desc: SinkDesc,
171 log_store_type: SinkLogStoreType,
172}
173
174impl StreamSink {
175 #[must_use]
176 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
177 let base = input.plan_base().clone_with_new_plan_id();
178
179 if let SinkType::AppendOnly = sink_desc.sink_type {
180 let kind = input.stream_kind();
181 assert_matches!(
182 kind,
183 StreamKind::AppendOnly,
184 "{kind} stream cannot be used as input of append-only sink",
185 );
186 }
187
188 Self {
189 base,
190 input,
191 sink_desc,
192 log_store_type,
193 }
194 }
195
196 pub fn sink_desc(&self) -> &SinkDesc {
197 &self.sink_desc
198 }
199
200 fn derive_iceberg_sink_distribution(
201 input: PlanRef,
202 partition_info: Option<PartitionComputeInfo>,
203 columns: &[ColumnCatalog],
204 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
205 if let Some(partition_info) = partition_info {
207 let input_fields = input.schema().fields();
208
209 let mut exprs: Vec<_> = input_fields
210 .iter()
211 .enumerate()
212 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
213 .collect();
214
215 exprs.push(partition_info.convert_to_expression(columns)?);
217 let partition_col_idx = exprs.len() - 1;
218 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
219 Ok((
220 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
221 project.into(),
222 Some(partition_col_idx),
223 ))
224 } else {
225 Ok((
226 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
227 input,
228 None,
229 ))
230 }
231 }
232
233 #[allow(clippy::too_many_arguments)]
234 pub fn create(
235 StreamOptimizedLogicalPlanRoot {
236 plan: mut input,
237 required_dist: user_distributed_by,
238 required_order: user_order_by,
239 out_fields: user_cols,
240 out_names,
241 ..
242 }: StreamOptimizedLogicalPlanRoot,
243 name: String,
244 db_name: String,
245 sink_from_table_name: String,
246 target_table: Option<Arc<TableCatalog>>,
247 target_table_mapping: Option<Vec<Option<usize>>>,
248 definition: String,
249 properties: WithOptionsSecResolved,
250 format_desc: Option<SinkFormatDesc>,
251 partition_info: Option<PartitionComputeInfo>,
252 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
253 ) -> Result<Self> {
254 let sink_type =
255 Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
256
257 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
258 let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
259 let derived_pk = pk.iter().map(|k| k.column_index).collect_vec();
260
261 let downstream_pk = {
263 let downstream_pk = properties
264 .get(DOWNSTREAM_PK_KEY)
265 .map(|v| Self::parse_downstream_pk(v, &columns))
266 .transpose()?;
267
268 if let Some(t) = &target_table {
269 let user_defined_primary_key_table = t.row_id_index.is_none();
270 let sink_is_append_only =
271 sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
272
273 if !user_defined_primary_key_table && !sink_is_append_only {
274 return Err(RwError::from(ErrorCode::BindError(
275 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
276 )));
277 }
278
279 if t.append_only && !sink_is_append_only {
280 return Err(RwError::from(ErrorCode::BindError(
281 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
282 )));
283 }
284
285 if sink_type != SinkType::Upsert {
286 None
287 } else {
288 let target_table_mapping = target_table_mapping.unwrap();
289
290 Some(t.pk()
291 .iter()
292 .map(|c| {
293 target_table_mapping[c.column_index].ok_or_else(
294 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
295 })
296 .try_collect::<_, _, RwError>()?)
297 }
298 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
299 && sink_type == SinkType::Upsert
300 && downstream_pk.is_none()
301 {
302 Some(derived_pk.clone())
303 } else if properties.is_iceberg_connector()
304 && sink_type == SinkType::Upsert
305 && downstream_pk.is_none()
306 {
307 Some(derived_pk.clone())
309 } else {
310 downstream_pk
311 }
312 };
313
314 if let Some(pk) = &downstream_pk
320 && pk.is_empty()
321 {
322 bail_invalid_input_syntax!(
323 "Empty primary key is not supported. \
324 Please specify the primary key in WITH options."
325 )
326 }
327
328 if let StreamKind::Upsert = input.stream_kind()
331 && let Some(downstream_pk) = &downstream_pk
332 && !downstream_pk.iter().all(|i| derived_pk.contains(i))
333 {
334 bail_bind_error!(
335 "When sinking from an upsert stream, \
336 the downstream primary key must be the same as or a subset of the one derived from the stream."
337 )
338 }
339
340 if let Some(upstream_table) = &auto_refresh_schema_from_table
341 && let Some(downstream_pk) = &downstream_pk
342 {
343 let upstream_table_pk_col_names = upstream_table
344 .pk
345 .iter()
346 .map(|order| {
347 upstream_table.columns[order.column_index]
348 .column_desc
349 .name()
350 })
351 .collect_vec();
352 let sink_pk_col_names = downstream_pk
353 .iter()
354 .map(|&column_index| columns[column_index].name())
355 .collect_vec();
356 if upstream_table_pk_col_names != sink_pk_col_names {
357 return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
358 }
359 }
360 let mut extra_partition_col_idx = None;
361
362 let required_dist = match input.distribution() {
363 Distribution::Single => RequiredDist::single(),
364 _ => {
365 match properties.get("connector") {
366 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
367 let Some(downstream_pk) = &downstream_pk else {
368 return Err(ErrorCode::InvalidInputSyntax(format!(
369 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
370 key = DOWNSTREAM_PK_KEY
371 )).into());
372 };
373 RequiredDist::hash_shard(downstream_pk)
376 }
377 Some(s) if s == ICEBERG_SINK => {
378 let (required_dist, new_input, partition_col_idx) =
379 Self::derive_iceberg_sink_distribution(
380 input,
381 partition_info,
382 &columns,
383 )?;
384 input = new_input;
385 extra_partition_col_idx = partition_col_idx;
386 required_dist
387 }
388 _ => {
389 assert_matches!(user_distributed_by, RequiredDist::Any);
390 if let Some(downstream_pk) = &downstream_pk {
391 RequiredDist::shard_by_key(input.schema().len(), downstream_pk)
394 } else {
395 RequiredDist::shard_by_key(
396 input.schema().len(),
397 input.expect_stream_key(),
398 )
399 }
400 }
401 }
402 }
403 };
404 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
405 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
406 && input.as_stream_exchange().is_none()
407 {
408 StreamExchange::new_no_shuffle(input).into()
409 } else {
410 input
411 };
412
413 let distribution_key = input.distribution().dist_column_indices().to_vec();
414 let create_type = if input.ctx().session_ctx().config().background_ddl()
415 && plan_can_use_background_ddl(&input)
416 {
417 CreateType::Background
418 } else {
419 CreateType::Foreground
420 };
421 let (properties, secret_refs) = properties.into_parts();
422 let is_exactly_once = properties
423 .get("is_exactly_once")
424 .map(|v| v.to_lowercase() == "true");
425
426 let mut sink_desc = SinkDesc {
427 id: SinkId::placeholder(),
428 name,
429 db_name,
430 sink_from_name: sink_from_table_name,
431 definition,
432 columns,
433 plan_pk: pk,
434 downstream_pk,
435 distribution_key,
436 properties,
437 secret_refs,
438 sink_type,
439 format_desc,
440 target_table: target_table.as_ref().map(|catalog| catalog.id()),
441 extra_partition_col_idx,
442 create_type,
443 is_exactly_once,
444 auto_refresh_schema_from_table: auto_refresh_schema_from_table
445 .as_ref()
446 .map(|table| table.id),
447 };
448
449 let unsupported_sink = |sink: &str| -> Result<_> {
450 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
451 };
452
453 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
455 Some(connector) => {
456 let connector_type = connector.to_lowercase();
457 match_sink_name_str!(
458 connector_type.as_str(),
459 SinkType,
460 {
461 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
463 unsupported_sink(TABLE_SINK)
464 } else {
465 SinkType::set_default_commit_checkpoint_interval(
466 &mut sink_desc,
467 &input.ctx().session_ctx().config().sink_decouple(),
468 )?;
469 let support_schema_change = SinkType::support_schema_change();
470 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
471 return Err(ErrorCode::InvalidInputSyntax(format!(
472 "{} sink does not support schema change",
473 connector_type
474 ))
475 .into());
476 }
477 SinkType::is_sink_decouple(
478 &input.ctx().session_ctx().config().sink_decouple(),
479 )
480 .map_err(Into::into)
481 }
482 },
483 |other: &str| unsupported_sink(other)
484 )?
485 }
486 None => {
487 return Err(ErrorCode::InvalidInputSyntax(
488 "connector not specified when create sink".to_owned(),
489 )
490 .into());
491 }
492 };
493 let hint_string =
494 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
495 if !sink_decouple {
496 if sink_desc.is_file_sink() {
498 return Err(ErrorCode::NotSupported(
499 "File sink can only be created with sink_decouple enabled.".to_owned(),
500 hint_string(true),
501 )
502 .into());
503 }
504
505 let is_exactly_once = match sink_desc.is_exactly_once {
506 Some(v) => v,
507 None => {
508 if let Some(connector) = sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
509 let connector_type = connector.to_lowercase();
510 if connector_type == ICEBERG_SINK {
511 sink_desc
514 .properties
515 .insert("is_exactly_once".to_owned(), "false".to_owned());
516 }
517 }
518 false
519 }
520 };
521
522 if is_exactly_once {
523 return Err(ErrorCode::NotSupported(
524 "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
525 hint_string(true),
526 )
527 .into());
528 }
529 }
530 if sink_decouple && auto_refresh_schema_from_table.is_some() {
531 return Err(ErrorCode::NotSupported(
532 "sink with auto schema refresh can only be created with sink_decouple disabled."
533 .to_owned(),
534 hint_string(false),
535 )
536 .into());
537 }
538 let log_store_type = if sink_decouple {
539 SinkLogStoreType::KvLogStore
540 } else {
541 SinkLogStoreType::InMemoryLogStore
542 };
543
544 let input = if sink_decouple && target_table.is_some() {
546 StreamSyncLogStore::new(input).into()
547 } else {
548 input
549 };
550
551 Ok(Self::new(input, sink_desc, log_store_type))
552 }
553
554 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
555 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
556 if sink_type == SINK_TYPE_APPEND_ONLY {
557 return Ok(Some(SinkType::AppendOnly));
558 } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
559 return Ok(Some(SinkType::Upsert));
560 } else {
561 return Err(ErrorCode::InvalidInputSyntax(format!(
562 "`{}` must be {}, {}, or {}",
563 SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT
564 ))
565 .into());
566 }
567 }
568 Ok(None)
569 }
570
571 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
572 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
573 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
574 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
575 {
576 return Err(ErrorCode::InvalidInputSyntax(format!(
577 "`{}` must be true or false",
578 SINK_USER_FORCE_APPEND_ONLY_OPTION
579 ))
580 .into());
581 }
582 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
583 }
584
585 fn derive_sink_type(
586 input_append_only: bool,
587 properties: &WithOptionsSecResolved,
588 format_desc: Option<&SinkFormatDesc>,
589 ) -> Result<SinkType> {
590 let frontend_derived_append_only = input_append_only;
591 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
592 Some(f) => (
593 Some(match f.format {
594 SinkFormat::AppendOnly => SinkType::AppendOnly,
595 SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
596 }),
597 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
598 f.options.clone(),
599 ))?,
600 false,
601 ),
602 None => (
603 Self::sink_type_in_prop(properties)?,
604 Self::is_user_force_append_only(properties)?,
605 true,
606 ),
607 };
608
609 if user_force_append_only
610 && user_defined_sink_type.is_some()
611 && user_defined_sink_type != Some(SinkType::AppendOnly)
612 {
613 return Err(ErrorCode::InvalidInputSyntax(
614 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
615 )
616 .into());
617 }
618
619 let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
620 false
621 } else {
622 user_force_append_only
623 };
624
625 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
626 return Err(ErrorCode::InvalidInputSyntax(format!(
627 "Cannot force the sink to be append-only without \"{}\".",
628 if syntax_legacy {
629 "type='append-only'"
630 } else {
631 "FORMAT PLAIN"
632 }
633 ))
634 .into());
635 }
636
637 if let Some(user_defined_sink_type) = user_defined_sink_type {
638 if user_defined_sink_type == SinkType::AppendOnly {
639 if user_force_append_only {
640 return Ok(SinkType::ForceAppendOnly);
641 }
642 if !frontend_derived_append_only {
643 return Err(ErrorCode::InvalidInputSyntax(format!(
644 "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
645 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
646 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
647 ))
648 .into());
649 } else {
650 return Ok(SinkType::AppendOnly);
651 }
652 }
653
654 Ok(user_defined_sink_type)
655 } else {
656 match frontend_derived_append_only {
657 true => Ok(SinkType::AppendOnly),
658 false => Ok(SinkType::Upsert),
659 }
660 }
661 }
662
663 fn parse_downstream_pk(
669 downstream_pk_str: &str,
670 columns: &[ColumnCatalog],
671 ) -> Result<Vec<usize>> {
672 let downstream_pk = downstream_pk_str.split(',').collect_vec();
674 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
675 for key in downstream_pk {
676 let trimmed_key = key.trim();
677 if trimmed_key.is_empty() {
678 continue;
679 }
680 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
681 }
682 if downstream_pk_indices.is_empty() {
683 bail_invalid_input_syntax!(
684 "Specified primary key should not be empty. \
685 To use derived primary key, remove {DOWNSTREAM_PK_KEY} from WITH options instead."
686 );
687 }
688 Ok(downstream_pk_indices)
689 }
690
691 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
694 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
695 }
696}
697
698impl PlanTreeNodeUnary<Stream> for StreamSink {
699 fn input(&self) -> PlanRef {
700 self.input.clone()
701 }
702
703 fn clone_with_input(&self, input: PlanRef) -> Self {
704 Self::new(input, self.sink_desc.clone(), self.log_store_type)
705 }
707}
708
709impl_plan_tree_node_for_unary! { Stream, StreamSink }
710
711impl Distill for StreamSink {
712 fn distill<'a>(&self) -> XmlNode<'a> {
713 let sink_type = if self.sink_desc.sink_type.is_append_only() {
714 "append-only"
715 } else {
716 "upsert"
717 };
718 let column_names = self
719 .sink_desc
720 .columns
721 .iter()
722 .map(|col| col.name_with_hidden().to_string())
723 .map(Pretty::from)
724 .collect();
725 let column_names = Pretty::Array(column_names);
726 let mut vec = Vec::with_capacity(3);
727 vec.push(("type", Pretty::from(sink_type)));
728 vec.push(("columns", column_names));
729 if let Some(pk) = &self.sink_desc.downstream_pk {
730 let sink_pk = IndicesDisplay {
731 indices: pk,
732 schema: self.base.schema(),
733 };
734 vec.push(("downstream_pk", sink_pk.distill()));
735 }
736 childless_record("StreamSink", vec)
737 }
738}
739
740impl StreamNode for StreamSink {
741 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
742 use risingwave_pb::stream_plan::*;
743
744 let table = self
746 .infer_kv_log_store_table_catalog()
747 .with_id(state.gen_table_id_wrapped());
748
749 PbNodeBody::Sink(Box::new(SinkNode {
750 sink_desc: Some(self.sink_desc.to_proto()),
751 table: Some(table.to_internal_table_prost()),
752 log_store_type: self.log_store_type as i32,
753 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
754 }))
755 }
756}
757
758impl ExprRewritable<Stream> for StreamSink {}
759
760impl ExprVisitable for StreamSink {}
761
762#[cfg(test)]
763mod test {
764 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
765 use risingwave_common::types::{DataType, StructType};
766 use risingwave_common::util::iter_util::ZipEqDebug;
767 use risingwave_pb::expr::expr_node::Type;
768
769 use super::{IcebergPartitionInfo, *};
770 use crate::expr::{Expr, ExprImpl};
771
772 fn create_column_catalog() -> Vec<ColumnCatalog> {
773 vec![
774 ColumnCatalog {
775 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
776 is_hidden: false,
777 },
778 ColumnCatalog {
779 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
780 is_hidden: false,
781 },
782 ColumnCatalog {
783 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
784 is_hidden: false,
785 },
786 ]
787 }
788
789 #[test]
790 fn test_iceberg_convert_to_expression() {
791 let partition_type = StructType::new(vec![
792 ("f1", DataType::Int32),
793 ("f2", DataType::Int32),
794 ("f3", DataType::Int32),
795 ("f4", DataType::Int32),
796 ("f5", DataType::Int32),
797 ("f6", DataType::Int32),
798 ("f7", DataType::Int32),
799 ("f8", DataType::Int32),
800 ("f9", DataType::Int32),
801 ]);
802 let partition_fields = vec![
803 ("v1".into(), Transform::Identity),
804 ("v1".into(), Transform::Bucket(10)),
805 ("v1".into(), Transform::Truncate(3)),
806 ("v2".into(), Transform::Year),
807 ("v2".into(), Transform::Month),
808 ("v3".into(), Transform::Day),
809 ("v3".into(), Transform::Hour),
810 ("v1".into(), Transform::Void),
811 ("v3".into(), Transform::Void),
812 ];
813 let partition_info = IcebergPartitionInfo {
814 partition_type: partition_type.clone(),
815 partition_fields: partition_fields.clone(),
816 };
817 let catalog = create_column_catalog();
818 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
819 let actual_expr = actual_expr.as_function_call().unwrap();
820
821 assert_eq!(
822 actual_expr.return_type(),
823 DataType::Struct(partition_type.clone())
824 );
825 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
826 assert_eq!(actual_expr.func_type(), Type::Row);
827
828 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
829 .inputs()
830 .iter()
831 .zip_eq_debug(partition_fields.iter())
832 .zip_eq_debug(partition_type.iter())
833 {
834 match transform {
835 Transform::Identity => {
836 assert!(expr.is_input_ref());
837 assert_eq!(expr.return_type(), *expect_type);
838 }
839 Transform::Void => {
840 assert!(expr.is_literal());
841 assert_eq!(expr.return_type(), *expect_type);
842 }
843 _ => {
844 let expr = expr.as_function_call().unwrap();
845 assert_eq!(expr.func_type(), Type::IcebergTransform);
846 assert_eq!(expr.inputs().len(), 2);
847 assert_eq!(
848 expr.inputs()[0],
849 ExprImpl::literal_varchar(transform.to_string())
850 );
851 }
852 }
853 }
854 }
855}