1use std::assert_matches::assert_matches;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use fixedbitset::FixedBitSet;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use pretty_xmlish::{Pretty, XmlNode};
24use risingwave_common::catalog::{ColumnCatalog, CreateType};
25use risingwave_common::types::{DataType, StructType};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_connector::match_sink_name_str;
28use risingwave_connector::sink::catalog::desc::SinkDesc;
29use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
30use risingwave_connector::sink::file_sink::fs::FsSink;
31use risingwave_connector::sink::iceberg::ICEBERG_SINK;
32use risingwave_connector::sink::trivial::TABLE_SINK;
33use risingwave_connector::sink::{
34 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
35 SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, SinkError,
36};
37use risingwave_pb::expr::expr_node::Type;
38use risingwave_pb::stream_plan::SinkLogStoreType;
39use risingwave_pb::stream_plan::stream_node::PbNodeBody;
40
41use super::derive::{derive_columns, derive_pk};
42use super::stream::prelude::*;
43use super::utils::{
44 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
45};
46use super::{
47 ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject, StreamSyncLogStore, generic,
48};
49use crate::TableCatalog;
50use crate::error::{ErrorCode, Result, RwError};
51use crate::expr::{ExprImpl, FunctionCall, InputRef};
52use crate::optimizer::plan_node::PlanTreeNodeUnary;
53use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
54use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
55use crate::optimizer::property::{Distribution, Order, RequiredDist};
56use crate::stream_fragmenter::BuildFragmentGraphState;
57use crate::utils::WithOptionsSecResolved;
58
59const DOWNSTREAM_PK_KEY: &str = "primary_key";
60
61pub enum PartitionComputeInfo {
74 Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79 match self {
80 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81 }
82 }
83}
84
85pub struct IcebergPartitionInfo {
86 pub partition_type: StructType,
87 pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92 #[inline]
93 fn transform_to_expression(
94 transform: &Transform,
95 col_id: usize,
96 columns: &[ColumnCatalog],
97 result_type: DataType,
98 ) -> Result<ExprImpl> {
99 match transform {
100 Transform::Identity => {
101 if columns[col_id].column_desc.data_type != result_type {
102 return Err(ErrorCode::SinkError(Box::new(Error::new(
103 ErrorKind::InvalidInput,
104 format!(
105 "The partition field {} has type {}, but the partition field is {}",
106 columns[col_id].column_desc.name,
107 columns[col_id].column_desc.data_type,
108 result_type
109 ),
110 )))
111 .into());
112 }
113 Ok(ExprImpl::InputRef(
114 InputRef::new(col_id, result_type).into(),
115 ))
116 }
117 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
118 _ => Ok(ExprImpl::FunctionCall(
119 FunctionCall::new_unchecked(
120 Type::IcebergTransform,
121 vec![
122 ExprImpl::literal_varchar(transform.to_string()),
123 ExprImpl::InputRef(
124 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
125 .into(),
126 ),
127 ],
128 result_type,
129 )
130 .into(),
131 )),
132 }
133 }
134
135 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
136 let child_exprs = self
137 .partition_fields
138 .into_iter()
139 .zip_eq_debug(self.partition_type.iter())
140 .map(|((field_name, transform), (_, result_type))| {
141 let col_id = find_column_idx_by_name(columns, &field_name)?;
142 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
143 })
144 .collect::<Result<Vec<_>>>()?;
145
146 Ok(ExprImpl::FunctionCall(
147 FunctionCall::new_unchecked(
148 Type::Row,
149 child_exprs,
150 DataType::Struct(self.partition_type),
151 )
152 .into(),
153 ))
154 }
155}
156
157#[inline]
158fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
159 columns
160 .iter()
161 .position(|col| col.column_desc.name == col_name)
162 .ok_or_else(|| {
163 ErrorCode::SinkError(Box::new(Error::new(
164 ErrorKind::InvalidInput,
165 format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name),
166 )))
167 .into()
168 })
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct StreamSink {
174 pub base: PlanBase<Stream>,
175 input: PlanRef,
176 sink_desc: SinkDesc,
177 log_store_type: SinkLogStoreType,
178}
179
180impl StreamSink {
181 #[must_use]
182 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
183 let base = input
184 .plan_base()
185 .into_stream()
186 .expect("input should be stream plan")
187 .clone_with_new_plan_id();
188
189 Self {
190 base,
191 input,
192 sink_desc,
193 log_store_type,
194 }
195 }
196
197 pub fn sink_desc(&self) -> &SinkDesc {
198 &self.sink_desc
199 }
200
201 fn derive_iceberg_sink_distribution(
202 input: PlanRef,
203 partition_info: Option<PartitionComputeInfo>,
204 columns: &[ColumnCatalog],
205 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
206 if let Some(partition_info) = partition_info {
208 let input_fields = input.schema().fields();
209
210 let mut exprs: Vec<_> = input_fields
211 .iter()
212 .enumerate()
213 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
214 .collect();
215
216 exprs.push(partition_info.convert_to_expression(columns)?);
218 let partition_col_idx = exprs.len() - 1;
219 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
220 Ok((
221 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
222 project.into(),
223 Some(partition_col_idx),
224 ))
225 } else {
226 Ok((
227 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
228 input,
229 None,
230 ))
231 }
232 }
233
234 #[allow(clippy::too_many_arguments)]
235 pub fn create(
236 mut input: PlanRef,
237 name: String,
238 db_name: String,
239 sink_from_table_name: String,
240 target_table: Option<Arc<TableCatalog>>,
241 target_table_mapping: Option<Vec<Option<usize>>>,
242 user_distributed_by: RequiredDist,
243 user_order_by: Order,
244 user_cols: FixedBitSet,
245 out_names: Vec<String>,
246 definition: String,
247 properties: WithOptionsSecResolved,
248 format_desc: Option<SinkFormatDesc>,
249 partition_info: Option<PartitionComputeInfo>,
250 ) -> Result<Self> {
251 let sink_type =
252 Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
253
254 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
255 let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
256 let mut downstream_pk = {
257 let from_properties =
258 Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
259 if let Some(t) = &target_table {
260 let user_defined_primary_key_table = t.row_id_index.is_none();
261 let sink_is_append_only =
262 sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
263
264 if !user_defined_primary_key_table && !sink_is_append_only {
265 return Err(RwError::from(ErrorCode::BindError(
266 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
267 )));
268 }
269
270 if t.append_only && !sink_is_append_only {
271 return Err(RwError::from(ErrorCode::BindError(
272 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
273 )));
274 }
275
276 if sink_type != SinkType::Upsert {
277 vec![]
278 } else {
279 let target_table_mapping = target_table_mapping.unwrap();
280
281 t.pk()
282 .iter()
283 .map(|c| {
284 target_table_mapping[c.column_index].ok_or(
285 ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput,
286 "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()
287 ))).into())})
288 .try_collect::<_, _, RwError>()?
289 }
290 } else {
291 from_properties
292 }
293 };
294 let mut extra_partition_col_idx = None;
295
296 let required_dist = match input.distribution() {
297 Distribution::Single => RequiredDist::single(),
298 _ => {
299 match properties.get("connector") {
300 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
301 if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
302 return Err(ErrorCode::SinkError(Box::new(Error::new(
303 ErrorKind::InvalidInput,
304 format!(
305 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
306 key = DOWNSTREAM_PK_KEY
307 ),
308 )))
309 .into());
310 }
311 RequiredDist::hash_shard(downstream_pk.as_slice())
314 }
315 Some(s) if s == ICEBERG_SINK => {
316 if sink_type.is_upsert() && downstream_pk.is_empty() {
318 downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
319 }
320 let (required_dist, new_input, partition_col_idx) =
321 Self::derive_iceberg_sink_distribution(
322 input,
323 partition_info,
324 &columns,
325 )?;
326 input = new_input;
327 extra_partition_col_idx = partition_col_idx;
328 required_dist
329 }
330 _ => {
331 assert_matches!(user_distributed_by, RequiredDist::Any);
332 if downstream_pk.is_empty() {
333 RequiredDist::shard_by_key(
334 input.schema().len(),
335 input.expect_stream_key(),
336 )
337 } else {
338 RequiredDist::shard_by_key(
341 input.schema().len(),
342 downstream_pk.as_slice(),
343 )
344 }
345 }
346 }
347 }
348 };
349 let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
350 let distribution_key = input.distribution().dist_column_indices().to_vec();
351 let create_type = if input.ctx().session_ctx().config().background_ddl()
352 && plan_can_use_background_ddl(&input)
353 {
354 CreateType::Background
355 } else {
356 CreateType::Foreground
357 };
358 let (properties, secret_refs) = properties.into_parts();
359 let is_exactly_once = properties
360 .get("is_exactly_once")
361 .is_some_and(|v| v.to_lowercase() == "true");
362 let mut sink_desc = SinkDesc {
363 id: SinkId::placeholder(),
364 name,
365 db_name,
366 sink_from_name: sink_from_table_name,
367 definition,
368 columns,
369 plan_pk: pk,
370 downstream_pk,
371 distribution_key,
372 properties,
373 secret_refs,
374 sink_type,
375 format_desc,
376 target_table: target_table.as_ref().map(|catalog| catalog.id()),
377 extra_partition_col_idx,
378 create_type,
379 is_exactly_once,
380 };
381
382 let unsupported_sink =
383 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));
384
385 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
387 Some(connector) => {
388 let connector_type = connector.to_lowercase();
389 match_sink_name_str!(
390 connector_type.as_str(),
391 SinkType,
392 {
393 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
395 unsupported_sink(TABLE_SINK)
396 } else {
397 SinkType::set_default_commit_checkpoint_interval(
398 &mut sink_desc,
399 &input.ctx().session_ctx().config().sink_decouple(),
400 )?;
401 SinkType::is_sink_decouple(
402 &input.ctx().session_ctx().config().sink_decouple(),
403 )
404 }
405 },
406 |other: &str| unsupported_sink(other)
407 )?
408 }
409 None => {
410 return Err(
411 SinkError::Config(anyhow!("connector not specified when create sink")).into(),
412 );
413 }
414 };
415 if !sink_decouple && sink_desc.is_file_sink() {
417 return Err(
418 SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
419 );
420 }
421 if !sink_decouple && sink_desc.is_exactly_once {
422 return Err(
423 SinkError::Config(anyhow!("Exactly once sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
424 );
425 }
426 let log_store_type = if sink_decouple {
427 SinkLogStoreType::KvLogStore
428 } else {
429 SinkLogStoreType::InMemoryLogStore
430 };
431
432 let input = if sink_decouple && target_table.is_some() {
434 StreamSyncLogStore::new(input).into()
435 } else {
436 input
437 };
438
439 Ok(Self::new(input, sink_desc, log_store_type))
440 }
441
442 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
443 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
444 if sink_type == SINK_TYPE_APPEND_ONLY {
445 return Ok(Some(SinkType::AppendOnly));
446 } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
447 return Ok(Some(SinkType::Upsert));
448 } else {
449 return Err(ErrorCode::SinkError(Box::new(Error::new(
450 ErrorKind::InvalidInput,
451 format!(
452 "`{}` must be {}, {}, or {}",
453 SINK_TYPE_OPTION,
454 SINK_TYPE_APPEND_ONLY,
455 SINK_TYPE_DEBEZIUM,
456 SINK_TYPE_UPSERT
457 ),
458 )))
459 .into());
460 }
461 }
462 Ok(None)
463 }
464
465 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
466 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
467 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
468 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
469 {
470 return Err(ErrorCode::SinkError(Box::new(Error::new(
471 ErrorKind::InvalidInput,
472 format!(
473 "`{}` must be true or false",
474 SINK_USER_FORCE_APPEND_ONLY_OPTION
475 ),
476 )))
477 .into());
478 }
479 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
480 }
481
482 fn derive_sink_type(
483 input_append_only: bool,
484 properties: &WithOptionsSecResolved,
485 format_desc: Option<&SinkFormatDesc>,
486 ) -> Result<SinkType> {
487 let frontend_derived_append_only = input_append_only;
488 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
489 Some(f) => (
490 Some(match f.format {
491 SinkFormat::AppendOnly => SinkType::AppendOnly,
492 SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
493 }),
494 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
495 f.options.clone(),
496 ))?,
497 false,
498 ),
499 None => (
500 Self::sink_type_in_prop(properties)?,
501 Self::is_user_force_append_only(properties)?,
502 true,
503 ),
504 };
505
506 if user_force_append_only
507 && user_defined_sink_type.is_some()
508 && user_defined_sink_type != Some(SinkType::AppendOnly)
509 {
510 return Err(ErrorCode::SinkError(Box::new(Error::new(
511 ErrorKind::InvalidInput,
512 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
513 )))
514 .into());
515 }
516
517 let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
518 false
519 } else {
520 user_force_append_only
521 };
522
523 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
524 return Err(ErrorCode::SinkError(Box::new(Error::new(
525 ErrorKind::InvalidInput,
526 format!(
527 "Cannot force the sink to be append-only without \"{}\".",
528 if syntax_legacy {
529 "type='append-only'"
530 } else {
531 "FORMAT PLAIN"
532 }
533 ),
534 )))
535 .into());
536 }
537
538 if let Some(user_defined_sink_type) = user_defined_sink_type {
539 if user_defined_sink_type == SinkType::AppendOnly {
540 if user_force_append_only {
541 return Ok(SinkType::ForceAppendOnly);
542 }
543 if !frontend_derived_append_only {
544 return Err(ErrorCode::SinkError(Box::new(Error::new(
545 ErrorKind::InvalidInput,
546 format!(
547 "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
548 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
549 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
550 ),
551 )))
552 .into());
553 } else {
554 return Ok(SinkType::AppendOnly);
555 }
556 }
557
558 Ok(user_defined_sink_type)
559 } else {
560 match frontend_derived_append_only {
561 true => Ok(SinkType::AppendOnly),
562 false => Ok(SinkType::Upsert),
563 }
564 }
565 }
566
567 fn parse_downstream_pk(
573 columns: &[ColumnCatalog],
574 downstream_pk_str: Option<&String>,
575 ) -> Result<Vec<usize>> {
576 match downstream_pk_str {
577 Some(downstream_pk_str) => {
578 let downstream_pk = downstream_pk_str.split(',').collect_vec();
580 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
581 for key in downstream_pk {
582 let trimmed_key = key.trim();
583 if trimmed_key.is_empty() {
584 continue;
585 }
586 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
587 }
588 Ok(downstream_pk_indices)
589 }
590 None => {
591 Ok(Vec::new())
594 }
595 }
596 }
597
598 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
601 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
602 }
603}
604
605impl PlanTreeNodeUnary for StreamSink {
606 fn input(&self) -> PlanRef {
607 self.input.clone()
608 }
609
610 fn clone_with_input(&self, input: PlanRef) -> Self {
611 Self::new(input, self.sink_desc.clone(), self.log_store_type)
612 }
614}
615
616impl_plan_tree_node_for_unary! { StreamSink }
617
618impl Distill for StreamSink {
619 fn distill<'a>(&self) -> XmlNode<'a> {
620 let sink_type = if self.sink_desc.sink_type.is_append_only() {
621 "append-only"
622 } else {
623 "upsert"
624 };
625 let column_names = self
626 .sink_desc
627 .columns
628 .iter()
629 .map(|col| col.name_with_hidden().to_string())
630 .map(Pretty::from)
631 .collect();
632 let column_names = Pretty::Array(column_names);
633 let mut vec = Vec::with_capacity(3);
634 vec.push(("type", Pretty::from(sink_type)));
635 vec.push(("columns", column_names));
636 if self.sink_desc.sink_type.is_upsert() {
637 let sink_pk = IndicesDisplay {
638 indices: &self.sink_desc.downstream_pk.clone(),
639 schema: self.base.schema(),
640 };
641 vec.push(("downstream_pk", sink_pk.distill()));
642 }
643 childless_record("StreamSink", vec)
644 }
645}
646
647impl StreamNode for StreamSink {
648 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
649 use risingwave_pb::stream_plan::*;
650
651 let table = self
653 .infer_kv_log_store_table_catalog()
654 .with_id(state.gen_table_id_wrapped());
655
656 PbNodeBody::Sink(Box::new(SinkNode {
657 sink_desc: Some(self.sink_desc.to_proto()),
658 table: Some(table.to_internal_table_prost()),
659 log_store_type: self.log_store_type as i32,
660 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
661 }))
662 }
663}
664
665impl ExprRewritable for StreamSink {}
666
667impl ExprVisitable for StreamSink {}
668
669#[cfg(test)]
670mod test {
671 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
672 use risingwave_common::types::{DataType, StructType};
673 use risingwave_common::util::iter_util::ZipEqDebug;
674 use risingwave_pb::expr::expr_node::Type;
675
676 use super::{IcebergPartitionInfo, *};
677 use crate::expr::{Expr, ExprImpl};
678
679 fn create_column_catalog() -> Vec<ColumnCatalog> {
680 vec![
681 ColumnCatalog {
682 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
683 is_hidden: false,
684 },
685 ColumnCatalog {
686 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
687 is_hidden: false,
688 },
689 ColumnCatalog {
690 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
691 is_hidden: false,
692 },
693 ]
694 }
695
696 #[test]
697 fn test_iceberg_convert_to_expression() {
698 let partition_type = StructType::new(vec![
699 ("f1", DataType::Int32),
700 ("f2", DataType::Int32),
701 ("f3", DataType::Int32),
702 ("f4", DataType::Int32),
703 ("f5", DataType::Int32),
704 ("f6", DataType::Int32),
705 ("f7", DataType::Int32),
706 ("f8", DataType::Int32),
707 ("f9", DataType::Int32),
708 ]);
709 let partition_fields = vec![
710 ("v1".into(), Transform::Identity),
711 ("v1".into(), Transform::Bucket(10)),
712 ("v1".into(), Transform::Truncate(3)),
713 ("v2".into(), Transform::Year),
714 ("v2".into(), Transform::Month),
715 ("v3".into(), Transform::Day),
716 ("v3".into(), Transform::Hour),
717 ("v1".into(), Transform::Void),
718 ("v3".into(), Transform::Void),
719 ];
720 let partition_info = IcebergPartitionInfo {
721 partition_type: partition_type.clone(),
722 partition_fields: partition_fields.clone(),
723 };
724 let catalog = create_column_catalog();
725 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
726 let actual_expr = actual_expr.as_function_call().unwrap();
727
728 assert_eq!(
729 actual_expr.return_type(),
730 DataType::Struct(partition_type.clone())
731 );
732 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
733 assert_eq!(actual_expr.func_type(), Type::Row);
734
735 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
736 .inputs()
737 .iter()
738 .zip_eq_debug(partition_fields.iter())
739 .zip_eq_debug(partition_type.iter())
740 {
741 match transform {
742 Transform::Identity => {
743 assert!(expr.is_input_ref());
744 assert_eq!(expr.return_type(), *expect_type);
745 }
746 Transform::Void => {
747 assert!(expr.is_literal());
748 assert_eq!(expr.return_type(), *expect_type);
749 }
750 _ => {
751 let expr = expr.as_function_call().unwrap();
752 assert_eq!(expr.func_type(), Type::IcebergTransform);
753 assert_eq!(expr.inputs().len(), 2);
754 assert_eq!(
755 expr.inputs()[0],
756 ExprImpl::literal_varchar(transform.to_string())
757 );
758 }
759 }
760 }
761 }
762}