1use std::assert_matches::assert_matches;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use pretty_xmlish::{Pretty, XmlNode};
23use risingwave_common::catalog::{ColumnCatalog, CreateType};
24use risingwave_common::types::{DataType, StructType};
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_connector::match_sink_name_str;
27use risingwave_connector::sink::catalog::desc::SinkDesc;
28use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
29use risingwave_connector::sink::file_sink::fs::FsSink;
30use risingwave_connector::sink::iceberg::ICEBERG_SINK;
31use risingwave_connector::sink::trivial::TABLE_SINK;
32use risingwave_connector::sink::{
33 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
34 SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, SinkError,
35};
36use risingwave_pb::expr::expr_node::Type;
37use risingwave_pb::stream_plan::SinkLogStoreType;
38use risingwave_pb::stream_plan::stream_node::PbNodeBody;
39
40use super::derive::{derive_columns, derive_pk};
41use super::stream::prelude::*;
42use super::utils::{
43 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
44};
45use super::{
46 ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject, StreamSyncLogStore, generic,
47};
48use crate::TableCatalog;
49use crate::error::{ErrorCode, Result, RwError};
50use crate::expr::{ExprImpl, FunctionCall, InputRef};
51use crate::optimizer::StreamOptimizedLogicalPlanRoot;
52use crate::optimizer::plan_node::PlanTreeNodeUnary;
53use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
54use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
55use crate::optimizer::property::{Distribution, Order, RequiredDist};
56use crate::stream_fragmenter::BuildFragmentGraphState;
57use crate::utils::WithOptionsSecResolved;
58
59const DOWNSTREAM_PK_KEY: &str = "primary_key";
60
61pub enum PartitionComputeInfo {
74 Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79 match self {
80 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81 }
82 }
83}
84
85pub struct IcebergPartitionInfo {
86 pub partition_type: StructType,
87 pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92 #[inline]
93 fn transform_to_expression(
94 transform: &Transform,
95 col_id: usize,
96 columns: &[ColumnCatalog],
97 result_type: DataType,
98 ) -> Result<ExprImpl> {
99 match transform {
100 Transform::Identity => {
101 if columns[col_id].column_desc.data_type != result_type {
102 return Err(ErrorCode::SinkError(Box::new(Error::new(
103 ErrorKind::InvalidInput,
104 format!(
105 "The partition field {} has type {}, but the partition field is {}",
106 columns[col_id].column_desc.name,
107 columns[col_id].column_desc.data_type,
108 result_type
109 ),
110 )))
111 .into());
112 }
113 Ok(ExprImpl::InputRef(
114 InputRef::new(col_id, result_type).into(),
115 ))
116 }
117 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
118 _ => Ok(ExprImpl::FunctionCall(
119 FunctionCall::new_unchecked(
120 Type::IcebergTransform,
121 vec![
122 ExprImpl::literal_varchar(transform.to_string()),
123 ExprImpl::InputRef(
124 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
125 .into(),
126 ),
127 ],
128 result_type,
129 )
130 .into(),
131 )),
132 }
133 }
134
135 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
136 let child_exprs = self
137 .partition_fields
138 .into_iter()
139 .zip_eq_debug(self.partition_type.iter())
140 .map(|((field_name, transform), (_, result_type))| {
141 let col_id = find_column_idx_by_name(columns, &field_name)?;
142 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
143 })
144 .collect::<Result<Vec<_>>>()?;
145
146 Ok(ExprImpl::FunctionCall(
147 FunctionCall::new_unchecked(
148 Type::Row,
149 child_exprs,
150 DataType::Struct(self.partition_type),
151 )
152 .into(),
153 ))
154 }
155}
156
157#[inline]
158fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
159 columns
160 .iter()
161 .position(|col| col.column_desc.name == col_name)
162 .ok_or_else(|| {
163 ErrorCode::SinkError(Box::new(Error::new(
164 ErrorKind::InvalidInput,
165 format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name),
166 )))
167 .into()
168 })
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct StreamSink {
174 pub base: PlanBase<Stream>,
175 input: PlanRef,
176 sink_desc: SinkDesc,
177 log_store_type: SinkLogStoreType,
178}
179
180impl StreamSink {
181 #[must_use]
182 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
183 let base = input
184 .plan_base()
185 .into_stream()
186 .expect("input should be stream plan")
187 .clone_with_new_plan_id();
188
189 Self {
190 base,
191 input,
192 sink_desc,
193 log_store_type,
194 }
195 }
196
197 pub fn sink_desc(&self) -> &SinkDesc {
198 &self.sink_desc
199 }
200
201 fn derive_iceberg_sink_distribution(
202 input: PlanRef,
203 partition_info: Option<PartitionComputeInfo>,
204 columns: &[ColumnCatalog],
205 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
206 if let Some(partition_info) = partition_info {
208 let input_fields = input.schema().fields();
209
210 let mut exprs: Vec<_> = input_fields
211 .iter()
212 .enumerate()
213 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
214 .collect();
215
216 exprs.push(partition_info.convert_to_expression(columns)?);
218 let partition_col_idx = exprs.len() - 1;
219 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
220 Ok((
221 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
222 project.into(),
223 Some(partition_col_idx),
224 ))
225 } else {
226 Ok((
227 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
228 input,
229 None,
230 ))
231 }
232 }
233
234 #[allow(clippy::too_many_arguments)]
235 pub fn create(
236 StreamOptimizedLogicalPlanRoot {
237 plan: mut input,
238 required_dist: user_distributed_by,
239 required_order: user_order_by,
240 out_fields: user_cols,
241 out_names,
242 ..
243 }: StreamOptimizedLogicalPlanRoot,
244 name: String,
245 db_name: String,
246 sink_from_table_name: String,
247 target_table: Option<Arc<TableCatalog>>,
248 target_table_mapping: Option<Vec<Option<usize>>>,
249 definition: String,
250 properties: WithOptionsSecResolved,
251 format_desc: Option<SinkFormatDesc>,
252 partition_info: Option<PartitionComputeInfo>,
253 ) -> Result<Self> {
254 let sink_type =
255 Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
256
257 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
258 let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
259 let mut downstream_pk = {
260 let from_properties =
261 Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
262 if let Some(t) = &target_table {
263 let user_defined_primary_key_table = t.row_id_index.is_none();
264 let sink_is_append_only =
265 sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
266
267 if !user_defined_primary_key_table && !sink_is_append_only {
268 return Err(RwError::from(ErrorCode::BindError(
269 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
270 )));
271 }
272
273 if t.append_only && !sink_is_append_only {
274 return Err(RwError::from(ErrorCode::BindError(
275 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
276 )));
277 }
278
279 if sink_type != SinkType::Upsert {
280 vec![]
281 } else {
282 let target_table_mapping = target_table_mapping.unwrap();
283
284 t.pk()
285 .iter()
286 .map(|c| {
287 target_table_mapping[c.column_index].ok_or(
288 ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput,
289 "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()
290 ))).into())})
291 .try_collect::<_, _, RwError>()?
292 }
293 } else {
294 from_properties
295 }
296 };
297 let mut extra_partition_col_idx = None;
298
299 let required_dist = match input.distribution() {
300 Distribution::Single => RequiredDist::single(),
301 _ => {
302 match properties.get("connector") {
303 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
304 if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
305 return Err(ErrorCode::SinkError(Box::new(Error::new(
306 ErrorKind::InvalidInput,
307 format!(
308 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
309 key = DOWNSTREAM_PK_KEY
310 ),
311 )))
312 .into());
313 }
314 RequiredDist::hash_shard(downstream_pk.as_slice())
317 }
318 Some(s) if s == ICEBERG_SINK => {
319 if sink_type.is_upsert() && downstream_pk.is_empty() {
321 downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
322 }
323 let (required_dist, new_input, partition_col_idx) =
324 Self::derive_iceberg_sink_distribution(
325 input,
326 partition_info,
327 &columns,
328 )?;
329 input = new_input;
330 extra_partition_col_idx = partition_col_idx;
331 required_dist
332 }
333 _ => {
334 assert_matches!(user_distributed_by, RequiredDist::Any);
335 if downstream_pk.is_empty() {
336 RequiredDist::shard_by_key(
337 input.schema().len(),
338 input.expect_stream_key(),
339 )
340 } else {
341 RequiredDist::shard_by_key(
344 input.schema().len(),
345 downstream_pk.as_slice(),
346 )
347 }
348 }
349 }
350 }
351 };
352 let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
353 let distribution_key = input.distribution().dist_column_indices().to_vec();
354 let create_type = if input.ctx().session_ctx().config().background_ddl()
355 && plan_can_use_background_ddl(&input)
356 {
357 CreateType::Background
358 } else {
359 CreateType::Foreground
360 };
361 let (properties, secret_refs) = properties.into_parts();
362 let is_exactly_once = properties
363 .get("is_exactly_once")
364 .is_some_and(|v| v.to_lowercase() == "true");
365 let mut sink_desc = SinkDesc {
366 id: SinkId::placeholder(),
367 name,
368 db_name,
369 sink_from_name: sink_from_table_name,
370 definition,
371 columns,
372 plan_pk: pk,
373 downstream_pk,
374 distribution_key,
375 properties,
376 secret_refs,
377 sink_type,
378 format_desc,
379 target_table: target_table.as_ref().map(|catalog| catalog.id()),
380 extra_partition_col_idx,
381 create_type,
382 is_exactly_once,
383 };
384
385 let unsupported_sink =
386 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));
387
388 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
390 Some(connector) => {
391 let connector_type = connector.to_lowercase();
392 match_sink_name_str!(
393 connector_type.as_str(),
394 SinkType,
395 {
396 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
398 unsupported_sink(TABLE_SINK)
399 } else {
400 SinkType::set_default_commit_checkpoint_interval(
401 &mut sink_desc,
402 &input.ctx().session_ctx().config().sink_decouple(),
403 )?;
404 SinkType::is_sink_decouple(
405 &input.ctx().session_ctx().config().sink_decouple(),
406 )
407 }
408 },
409 |other: &str| unsupported_sink(other)
410 )?
411 }
412 None => {
413 return Err(
414 SinkError::Config(anyhow!("connector not specified when create sink")).into(),
415 );
416 }
417 };
418 if !sink_decouple && sink_desc.is_file_sink() {
420 return Err(
421 SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
422 );
423 }
424 if !sink_decouple && sink_desc.is_exactly_once {
425 return Err(
426 SinkError::Config(anyhow!("Exactly once sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
427 );
428 }
429 let log_store_type = if sink_decouple {
430 SinkLogStoreType::KvLogStore
431 } else {
432 SinkLogStoreType::InMemoryLogStore
433 };
434
435 let input = if sink_decouple && target_table.is_some() {
437 StreamSyncLogStore::new(input).into()
438 } else {
439 input
440 };
441
442 Ok(Self::new(input, sink_desc, log_store_type))
443 }
444
445 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
446 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
447 if sink_type == SINK_TYPE_APPEND_ONLY {
448 return Ok(Some(SinkType::AppendOnly));
449 } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
450 return Ok(Some(SinkType::Upsert));
451 } else {
452 return Err(ErrorCode::SinkError(Box::new(Error::new(
453 ErrorKind::InvalidInput,
454 format!(
455 "`{}` must be {}, {}, or {}",
456 SINK_TYPE_OPTION,
457 SINK_TYPE_APPEND_ONLY,
458 SINK_TYPE_DEBEZIUM,
459 SINK_TYPE_UPSERT
460 ),
461 )))
462 .into());
463 }
464 }
465 Ok(None)
466 }
467
468 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
469 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
470 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
471 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
472 {
473 return Err(ErrorCode::SinkError(Box::new(Error::new(
474 ErrorKind::InvalidInput,
475 format!(
476 "`{}` must be true or false",
477 SINK_USER_FORCE_APPEND_ONLY_OPTION
478 ),
479 )))
480 .into());
481 }
482 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
483 }
484
485 fn derive_sink_type(
486 input_append_only: bool,
487 properties: &WithOptionsSecResolved,
488 format_desc: Option<&SinkFormatDesc>,
489 ) -> Result<SinkType> {
490 let frontend_derived_append_only = input_append_only;
491 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
492 Some(f) => (
493 Some(match f.format {
494 SinkFormat::AppendOnly => SinkType::AppendOnly,
495 SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
496 }),
497 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
498 f.options.clone(),
499 ))?,
500 false,
501 ),
502 None => (
503 Self::sink_type_in_prop(properties)?,
504 Self::is_user_force_append_only(properties)?,
505 true,
506 ),
507 };
508
509 if user_force_append_only
510 && user_defined_sink_type.is_some()
511 && user_defined_sink_type != Some(SinkType::AppendOnly)
512 {
513 return Err(ErrorCode::SinkError(Box::new(Error::new(
514 ErrorKind::InvalidInput,
515 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
516 )))
517 .into());
518 }
519
520 let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
521 false
522 } else {
523 user_force_append_only
524 };
525
526 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
527 return Err(ErrorCode::SinkError(Box::new(Error::new(
528 ErrorKind::InvalidInput,
529 format!(
530 "Cannot force the sink to be append-only without \"{}\".",
531 if syntax_legacy {
532 "type='append-only'"
533 } else {
534 "FORMAT PLAIN"
535 }
536 ),
537 )))
538 .into());
539 }
540
541 if let Some(user_defined_sink_type) = user_defined_sink_type {
542 if user_defined_sink_type == SinkType::AppendOnly {
543 if user_force_append_only {
544 return Ok(SinkType::ForceAppendOnly);
545 }
546 if !frontend_derived_append_only {
547 return Err(ErrorCode::SinkError(Box::new(Error::new(
548 ErrorKind::InvalidInput,
549 format!(
550 "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
551 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
552 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
553 ),
554 )))
555 .into());
556 } else {
557 return Ok(SinkType::AppendOnly);
558 }
559 }
560
561 Ok(user_defined_sink_type)
562 } else {
563 match frontend_derived_append_only {
564 true => Ok(SinkType::AppendOnly),
565 false => Ok(SinkType::Upsert),
566 }
567 }
568 }
569
570 fn parse_downstream_pk(
576 columns: &[ColumnCatalog],
577 downstream_pk_str: Option<&String>,
578 ) -> Result<Vec<usize>> {
579 match downstream_pk_str {
580 Some(downstream_pk_str) => {
581 let downstream_pk = downstream_pk_str.split(',').collect_vec();
583 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
584 for key in downstream_pk {
585 let trimmed_key = key.trim();
586 if trimmed_key.is_empty() {
587 continue;
588 }
589 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
590 }
591 Ok(downstream_pk_indices)
592 }
593 None => {
594 Ok(Vec::new())
597 }
598 }
599 }
600
601 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
604 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
605 }
606}
607
608impl PlanTreeNodeUnary for StreamSink {
609 fn input(&self) -> PlanRef {
610 self.input.clone()
611 }
612
613 fn clone_with_input(&self, input: PlanRef) -> Self {
614 Self::new(input, self.sink_desc.clone(), self.log_store_type)
615 }
617}
618
619impl_plan_tree_node_for_unary! { StreamSink }
620
621impl Distill for StreamSink {
622 fn distill<'a>(&self) -> XmlNode<'a> {
623 let sink_type = if self.sink_desc.sink_type.is_append_only() {
624 "append-only"
625 } else {
626 "upsert"
627 };
628 let column_names = self
629 .sink_desc
630 .columns
631 .iter()
632 .map(|col| col.name_with_hidden().to_string())
633 .map(Pretty::from)
634 .collect();
635 let column_names = Pretty::Array(column_names);
636 let mut vec = Vec::with_capacity(3);
637 vec.push(("type", Pretty::from(sink_type)));
638 vec.push(("columns", column_names));
639 if self.sink_desc.sink_type.is_upsert() {
640 let sink_pk = IndicesDisplay {
641 indices: &self.sink_desc.downstream_pk.clone(),
642 schema: self.base.schema(),
643 };
644 vec.push(("downstream_pk", sink_pk.distill()));
645 }
646 childless_record("StreamSink", vec)
647 }
648}
649
650impl StreamNode for StreamSink {
651 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
652 use risingwave_pb::stream_plan::*;
653
654 let table = self
656 .infer_kv_log_store_table_catalog()
657 .with_id(state.gen_table_id_wrapped());
658
659 PbNodeBody::Sink(Box::new(SinkNode {
660 sink_desc: Some(self.sink_desc.to_proto()),
661 table: Some(table.to_internal_table_prost()),
662 log_store_type: self.log_store_type as i32,
663 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
664 }))
665 }
666}
667
668impl ExprRewritable for StreamSink {}
669
670impl ExprVisitable for StreamSink {}
671
672#[cfg(test)]
673mod test {
674 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
675 use risingwave_common::types::{DataType, StructType};
676 use risingwave_common::util::iter_util::ZipEqDebug;
677 use risingwave_pb::expr::expr_node::Type;
678
679 use super::{IcebergPartitionInfo, *};
680 use crate::expr::{Expr, ExprImpl};
681
682 fn create_column_catalog() -> Vec<ColumnCatalog> {
683 vec![
684 ColumnCatalog {
685 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
686 is_hidden: false,
687 },
688 ColumnCatalog {
689 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
690 is_hidden: false,
691 },
692 ColumnCatalog {
693 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
694 is_hidden: false,
695 },
696 ]
697 }
698
699 #[test]
700 fn test_iceberg_convert_to_expression() {
701 let partition_type = StructType::new(vec![
702 ("f1", DataType::Int32),
703 ("f2", DataType::Int32),
704 ("f3", DataType::Int32),
705 ("f4", DataType::Int32),
706 ("f5", DataType::Int32),
707 ("f6", DataType::Int32),
708 ("f7", DataType::Int32),
709 ("f8", DataType::Int32),
710 ("f9", DataType::Int32),
711 ]);
712 let partition_fields = vec![
713 ("v1".into(), Transform::Identity),
714 ("v1".into(), Transform::Bucket(10)),
715 ("v1".into(), Transform::Truncate(3)),
716 ("v2".into(), Transform::Year),
717 ("v2".into(), Transform::Month),
718 ("v3".into(), Transform::Day),
719 ("v3".into(), Transform::Hour),
720 ("v1".into(), Transform::Void),
721 ("v3".into(), Transform::Void),
722 ];
723 let partition_info = IcebergPartitionInfo {
724 partition_type: partition_type.clone(),
725 partition_fields: partition_fields.clone(),
726 };
727 let catalog = create_column_catalog();
728 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
729 let actual_expr = actual_expr.as_function_call().unwrap();
730
731 assert_eq!(
732 actual_expr.return_type(),
733 DataType::Struct(partition_type.clone())
734 );
735 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
736 assert_eq!(actual_expr.func_type(), Type::Row);
737
738 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
739 .inputs()
740 .iter()
741 .zip_eq_debug(partition_fields.iter())
742 .zip_eq_debug(partition_type.iter())
743 {
744 match transform {
745 Transform::Identity => {
746 assert!(expr.is_input_ref());
747 assert_eq!(expr.return_type(), *expect_type);
748 }
749 Transform::Void => {
750 assert!(expr.is_literal());
751 assert_eq!(expr.return_type(), *expect_type);
752 }
753 _ => {
754 let expr = expr.as_function_call().unwrap();
755 assert_eq!(expr.func_type(), Type::IcebergTransform);
756 assert_eq!(expr.inputs().len(), 2);
757 assert_eq!(
758 expr.inputs()[0],
759 ExprImpl::literal_varchar(transform.to_string())
760 );
761 }
762 }
763 }
764 }
765}