1use std::assert_matches::assert_matches;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use fixedbitset::FixedBitSet;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use pretty_xmlish::{Pretty, XmlNode};
24use risingwave_common::catalog::{ColumnCatalog, CreateType};
25use risingwave_common::types::{DataType, StructType};
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_connector::match_sink_name_str;
28use risingwave_connector::sink::catalog::desc::SinkDesc;
29use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
30use risingwave_connector::sink::file_sink::fs::FsSink;
31use risingwave_connector::sink::iceberg::ICEBERG_SINK;
32use risingwave_connector::sink::trivial::TABLE_SINK;
33use risingwave_connector::sink::{
34 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
35 SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, SinkError,
36};
37use risingwave_pb::expr::expr_node::Type;
38use risingwave_pb::stream_plan::SinkLogStoreType;
39use risingwave_pb::stream_plan::stream_node::PbNodeBody;
40
41use super::derive::{derive_columns, derive_pk};
42use super::stream::prelude::*;
43use super::utils::{
44 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
45};
46use super::{ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject, generic};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::plan_node::PlanTreeNodeUnary;
51use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
52use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
53use crate::optimizer::property::{Distribution, Order, RequiredDist};
54use crate::stream_fragmenter::BuildFragmentGraphState;
55use crate::utils::WithOptionsSecResolved;
56
57const DOWNSTREAM_PK_KEY: &str = "primary_key";
58
59pub enum PartitionComputeInfo {
72 Iceberg(IcebergPartitionInfo),
73}
74
75impl PartitionComputeInfo {
76 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
77 match self {
78 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
79 }
80 }
81}
82
83pub struct IcebergPartitionInfo {
84 pub partition_type: StructType,
85 pub partition_fields: Vec<(String, Transform)>,
87}
88
89impl IcebergPartitionInfo {
90 #[inline]
91 fn transform_to_expression(
92 transform: &Transform,
93 col_id: usize,
94 columns: &[ColumnCatalog],
95 result_type: DataType,
96 ) -> Result<ExprImpl> {
97 match transform {
98 Transform::Identity => {
99 if columns[col_id].column_desc.data_type != result_type {
100 return Err(ErrorCode::SinkError(Box::new(Error::new(
101 ErrorKind::InvalidInput,
102 format!(
103 "The partition field {} has type {}, but the partition field is {}",
104 columns[col_id].column_desc.name,
105 columns[col_id].column_desc.data_type,
106 result_type
107 ),
108 )))
109 .into());
110 }
111 Ok(ExprImpl::InputRef(
112 InputRef::new(col_id, result_type).into(),
113 ))
114 }
115 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
116 _ => Ok(ExprImpl::FunctionCall(
117 FunctionCall::new_unchecked(
118 Type::IcebergTransform,
119 vec![
120 ExprImpl::literal_varchar(transform.to_string()),
121 ExprImpl::InputRef(
122 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
123 .into(),
124 ),
125 ],
126 result_type,
127 )
128 .into(),
129 )),
130 }
131 }
132
133 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
134 let child_exprs = self
135 .partition_fields
136 .into_iter()
137 .zip_eq_debug(self.partition_type.iter())
138 .map(|((field_name, transform), (_, result_type))| {
139 let col_id = find_column_idx_by_name(columns, &field_name)?;
140 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
141 })
142 .collect::<Result<Vec<_>>>()?;
143
144 Ok(ExprImpl::FunctionCall(
145 FunctionCall::new_unchecked(
146 Type::Row,
147 child_exprs,
148 DataType::Struct(self.partition_type),
149 )
150 .into(),
151 ))
152 }
153}
154
155#[inline]
156fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
157 columns
158 .iter()
159 .position(|col| col.column_desc.name == col_name)
160 .ok_or_else(|| {
161 ErrorCode::SinkError(Box::new(Error::new(
162 ErrorKind::InvalidInput,
163 format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name),
164 )))
165 .into()
166 })
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Hash)]
171pub struct StreamSink {
172 pub base: PlanBase<Stream>,
173 input: PlanRef,
174 sink_desc: SinkDesc,
175 log_store_type: SinkLogStoreType,
176}
177
178impl StreamSink {
179 #[must_use]
180 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
181 let base = input
182 .plan_base()
183 .into_stream()
184 .expect("input should be stream plan")
185 .clone_with_new_plan_id();
186
187 Self {
188 base,
189 input,
190 sink_desc,
191 log_store_type,
192 }
193 }
194
195 pub fn sink_desc(&self) -> &SinkDesc {
196 &self.sink_desc
197 }
198
199 fn derive_iceberg_sink_distribution(
200 input: PlanRef,
201 partition_info: Option<PartitionComputeInfo>,
202 columns: &[ColumnCatalog],
203 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
204 if let Some(partition_info) = partition_info {
206 let input_fields = input.schema().fields();
207
208 let mut exprs: Vec<_> = input_fields
209 .iter()
210 .enumerate()
211 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
212 .collect();
213
214 exprs.push(partition_info.convert_to_expression(columns)?);
216 let partition_col_idx = exprs.len() - 1;
217 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
218 Ok((
219 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
220 project.into(),
221 Some(partition_col_idx),
222 ))
223 } else {
224 Ok((
225 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
226 input,
227 None,
228 ))
229 }
230 }
231
232 #[allow(clippy::too_many_arguments)]
233 pub fn create(
234 mut input: PlanRef,
235 name: String,
236 db_name: String,
237 sink_from_table_name: String,
238 target_table: Option<Arc<TableCatalog>>,
239 target_table_mapping: Option<Vec<Option<usize>>>,
240 user_distributed_by: RequiredDist,
241 user_order_by: Order,
242 user_cols: FixedBitSet,
243 out_names: Vec<String>,
244 definition: String,
245 properties: WithOptionsSecResolved,
246 format_desc: Option<SinkFormatDesc>,
247 partition_info: Option<PartitionComputeInfo>,
248 ) -> Result<Self> {
249 let sink_type =
250 Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
251
252 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
253 let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
254 let mut downstream_pk = {
255 let from_properties =
256 Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
257 if let Some(t) = &target_table {
258 let user_defined_primary_key_table = t.row_id_index.is_none();
259 let sink_is_append_only =
260 sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
261
262 if !user_defined_primary_key_table && !sink_is_append_only {
263 return Err(RwError::from(ErrorCode::BindError(
264 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
265 )));
266 }
267
268 if t.append_only && !sink_is_append_only {
269 return Err(RwError::from(ErrorCode::BindError(
270 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
271 )));
272 }
273
274 if sink_type != SinkType::Upsert {
275 vec![]
276 } else {
277 let target_table_mapping = target_table_mapping.unwrap();
278
279 t.pk()
280 .iter()
281 .map(|c| {
282 target_table_mapping[c.column_index].ok_or(
283 ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput,
284 "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()
285 ))).into())})
286 .try_collect::<_, _, RwError>()?
287 }
288 } else {
289 from_properties
290 }
291 };
292 let mut extra_partition_col_idx = None;
293
294 let required_dist = match input.distribution() {
295 Distribution::Single => RequiredDist::single(),
296 _ => {
297 match properties.get("connector") {
298 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
299 if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
300 return Err(ErrorCode::SinkError(Box::new(Error::new(
301 ErrorKind::InvalidInput,
302 format!(
303 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
304 key = DOWNSTREAM_PK_KEY
305 ),
306 )))
307 .into());
308 }
309 RequiredDist::hash_shard(downstream_pk.as_slice())
312 }
313 Some(s) if s == ICEBERG_SINK => {
314 if sink_type.is_upsert() && downstream_pk.is_empty() {
316 downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
317 }
318 let (required_dist, new_input, partition_col_idx) =
319 Self::derive_iceberg_sink_distribution(
320 input,
321 partition_info,
322 &columns,
323 )?;
324 input = new_input;
325 extra_partition_col_idx = partition_col_idx;
326 required_dist
327 }
328 _ => {
329 assert_matches!(user_distributed_by, RequiredDist::Any);
330 if downstream_pk.is_empty() {
331 RequiredDist::shard_by_key(
332 input.schema().len(),
333 input.expect_stream_key(),
334 )
335 } else {
336 RequiredDist::shard_by_key(
339 input.schema().len(),
340 downstream_pk.as_slice(),
341 )
342 }
343 }
344 }
345 }
346 };
347 let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
348 let distribution_key = input.distribution().dist_column_indices().to_vec();
349 let create_type = if input.ctx().session_ctx().config().background_ddl()
350 && plan_can_use_background_ddl(&input)
351 {
352 CreateType::Background
353 } else {
354 CreateType::Foreground
355 };
356 let (properties, secret_refs) = properties.into_parts();
357 let is_exactly_once = properties
358 .get("is_exactly_once")
359 .is_some_and(|v| v.to_lowercase() == "true");
360 let mut sink_desc = SinkDesc {
361 id: SinkId::placeholder(),
362 name,
363 db_name,
364 sink_from_name: sink_from_table_name,
365 definition,
366 columns,
367 plan_pk: pk,
368 downstream_pk,
369 distribution_key,
370 properties,
371 secret_refs,
372 sink_type,
373 format_desc,
374 target_table: target_table.as_ref().map(|catalog| catalog.id()),
375 extra_partition_col_idx,
376 create_type,
377 is_exactly_once,
378 };
379
380 let unsupported_sink =
381 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));
382
383 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
385 Some(connector) => {
386 let connector_type = connector.to_lowercase();
387 match_sink_name_str!(
388 connector_type.as_str(),
389 SinkType,
390 {
391 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
393 unsupported_sink(TABLE_SINK)
394 } else {
395 SinkType::set_default_commit_checkpoint_interval(
396 &mut sink_desc,
397 &input.ctx().session_ctx().config().sink_decouple(),
398 )?;
399 SinkType::is_sink_decouple(
400 &input.ctx().session_ctx().config().sink_decouple(),
401 )
402 }
403 },
404 |other: &str| unsupported_sink(other)
405 )?
406 }
407 None => {
408 return Err(
409 SinkError::Config(anyhow!("connector not specified when create sink")).into(),
410 );
411 }
412 };
413 if !sink_decouple && sink_desc.is_file_sink() {
415 return Err(
416 SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
417 );
418 }
419 if !sink_decouple && sink_desc.is_exactly_once {
420 return Err(
421 SinkError::Config(anyhow!("Exactly once sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(),
422 );
423 }
424 let log_store_type = if sink_decouple {
425 SinkLogStoreType::KvLogStore
426 } else {
427 SinkLogStoreType::InMemoryLogStore
428 };
429
430 Ok(Self::new(input, sink_desc, log_store_type))
431 }
432
433 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
434 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
435 if sink_type == SINK_TYPE_APPEND_ONLY {
436 return Ok(Some(SinkType::AppendOnly));
437 } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
438 return Ok(Some(SinkType::Upsert));
439 } else {
440 return Err(ErrorCode::SinkError(Box::new(Error::new(
441 ErrorKind::InvalidInput,
442 format!(
443 "`{}` must be {}, {}, or {}",
444 SINK_TYPE_OPTION,
445 SINK_TYPE_APPEND_ONLY,
446 SINK_TYPE_DEBEZIUM,
447 SINK_TYPE_UPSERT
448 ),
449 )))
450 .into());
451 }
452 }
453 Ok(None)
454 }
455
456 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
457 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
458 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
459 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
460 {
461 return Err(ErrorCode::SinkError(Box::new(Error::new(
462 ErrorKind::InvalidInput,
463 format!(
464 "`{}` must be true or false",
465 SINK_USER_FORCE_APPEND_ONLY_OPTION
466 ),
467 )))
468 .into());
469 }
470 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
471 }
472
473 fn derive_sink_type(
474 input_append_only: bool,
475 properties: &WithOptionsSecResolved,
476 format_desc: Option<&SinkFormatDesc>,
477 ) -> Result<SinkType> {
478 let frontend_derived_append_only = input_append_only;
479 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
480 Some(f) => (
481 Some(match f.format {
482 SinkFormat::AppendOnly => SinkType::AppendOnly,
483 SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
484 }),
485 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
486 f.options.clone(),
487 ))?,
488 false,
489 ),
490 None => (
491 Self::sink_type_in_prop(properties)?,
492 Self::is_user_force_append_only(properties)?,
493 true,
494 ),
495 };
496
497 if user_force_append_only
498 && user_defined_sink_type.is_some()
499 && user_defined_sink_type != Some(SinkType::AppendOnly)
500 {
501 return Err(ErrorCode::SinkError(Box::new(Error::new(
502 ErrorKind::InvalidInput,
503 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
504 )))
505 .into());
506 }
507
508 let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
509 false
510 } else {
511 user_force_append_only
512 };
513
514 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
515 return Err(ErrorCode::SinkError(Box::new(Error::new(
516 ErrorKind::InvalidInput,
517 format!(
518 "Cannot force the sink to be append-only without \"{}\".",
519 if syntax_legacy {
520 "type='append-only'"
521 } else {
522 "FORMAT PLAIN"
523 }
524 ),
525 )))
526 .into());
527 }
528
529 if let Some(user_defined_sink_type) = user_defined_sink_type {
530 if user_defined_sink_type == SinkType::AppendOnly {
531 if user_force_append_only {
532 return Ok(SinkType::ForceAppendOnly);
533 }
534 if !frontend_derived_append_only {
535 return Err(ErrorCode::SinkError(Box::new(Error::new(
536 ErrorKind::InvalidInput,
537 format!(
538 "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
539 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
540 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
541 ),
542 )))
543 .into());
544 } else {
545 return Ok(SinkType::AppendOnly);
546 }
547 }
548
549 Ok(user_defined_sink_type)
550 } else {
551 match frontend_derived_append_only {
552 true => Ok(SinkType::AppendOnly),
553 false => Ok(SinkType::Upsert),
554 }
555 }
556 }
557
558 fn parse_downstream_pk(
564 columns: &[ColumnCatalog],
565 downstream_pk_str: Option<&String>,
566 ) -> Result<Vec<usize>> {
567 match downstream_pk_str {
568 Some(downstream_pk_str) => {
569 let downstream_pk = downstream_pk_str.split(',').collect_vec();
571 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
572 for key in downstream_pk {
573 let trimmed_key = key.trim();
574 if trimmed_key.is_empty() {
575 continue;
576 }
577 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
578 }
579 Ok(downstream_pk_indices)
580 }
581 None => {
582 Ok(Vec::new())
585 }
586 }
587 }
588
589 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
592 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
593 }
594}
595
596impl PlanTreeNodeUnary for StreamSink {
597 fn input(&self) -> PlanRef {
598 self.input.clone()
599 }
600
601 fn clone_with_input(&self, input: PlanRef) -> Self {
602 Self::new(input, self.sink_desc.clone(), self.log_store_type)
603 }
605}
606
607impl_plan_tree_node_for_unary! { StreamSink }
608
609impl Distill for StreamSink {
610 fn distill<'a>(&self) -> XmlNode<'a> {
611 let sink_type = if self.sink_desc.sink_type.is_append_only() {
612 "append-only"
613 } else {
614 "upsert"
615 };
616 let column_names = self
617 .sink_desc
618 .columns
619 .iter()
620 .map(|col| col.name_with_hidden().to_string())
621 .map(Pretty::from)
622 .collect();
623 let column_names = Pretty::Array(column_names);
624 let mut vec = Vec::with_capacity(3);
625 vec.push(("type", Pretty::from(sink_type)));
626 vec.push(("columns", column_names));
627 if self.sink_desc.sink_type.is_upsert() {
628 let sink_pk = IndicesDisplay {
629 indices: &self.sink_desc.downstream_pk.clone(),
630 schema: self.base.schema(),
631 };
632 vec.push(("downstream_pk", sink_pk.distill()));
633 }
634 childless_record("StreamSink", vec)
635 }
636}
637
638impl StreamNode for StreamSink {
639 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
640 use risingwave_pb::stream_plan::*;
641
642 let table = self
644 .infer_kv_log_store_table_catalog()
645 .with_id(state.gen_table_id_wrapped());
646
647 PbNodeBody::Sink(Box::new(SinkNode {
648 sink_desc: Some(self.sink_desc.to_proto()),
649 table: Some(table.to_internal_table_prost()),
650 log_store_type: self.log_store_type as i32,
651 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
652 }))
653 }
654}
655
656impl ExprRewritable for StreamSink {}
657
658impl ExprVisitable for StreamSink {}
659
660#[cfg(test)]
661mod test {
662 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
663 use risingwave_common::types::{DataType, StructType};
664 use risingwave_common::util::iter_util::ZipEqDebug;
665 use risingwave_pb::expr::expr_node::Type;
666
667 use super::{IcebergPartitionInfo, *};
668 use crate::expr::{Expr, ExprImpl};
669
670 fn create_column_catalog() -> Vec<ColumnCatalog> {
671 vec![
672 ColumnCatalog {
673 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
674 is_hidden: false,
675 },
676 ColumnCatalog {
677 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
678 is_hidden: false,
679 },
680 ColumnCatalog {
681 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
682 is_hidden: false,
683 },
684 ]
685 }
686
687 #[test]
688 fn test_iceberg_convert_to_expression() {
689 let partition_type = StructType::new(vec![
690 ("f1", DataType::Int32),
691 ("f2", DataType::Int32),
692 ("f3", DataType::Int32),
693 ("f4", DataType::Int32),
694 ("f5", DataType::Int32),
695 ("f6", DataType::Int32),
696 ("f7", DataType::Int32),
697 ("f8", DataType::Int32),
698 ("f9", DataType::Int32),
699 ]);
700 let partition_fields = vec![
701 ("v1".into(), Transform::Identity),
702 ("v1".into(), Transform::Bucket(10)),
703 ("v1".into(), Transform::Truncate(3)),
704 ("v2".into(), Transform::Year),
705 ("v2".into(), Transform::Month),
706 ("v3".into(), Transform::Day),
707 ("v3".into(), Transform::Hour),
708 ("v1".into(), Transform::Void),
709 ("v3".into(), Transform::Void),
710 ];
711 let partition_info = IcebergPartitionInfo {
712 partition_type: partition_type.clone(),
713 partition_fields: partition_fields.clone(),
714 };
715 let catalog = create_column_catalog();
716 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
717 let actual_expr = actual_expr.as_function_call().unwrap();
718
719 assert_eq!(
720 actual_expr.return_type(),
721 DataType::Struct(partition_type.clone())
722 );
723 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
724 assert_eq!(actual_expr.func_type(), Type::Row);
725
726 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
727 .inputs()
728 .iter()
729 .zip_eq_debug(partition_fields.iter())
730 .zip_eq_debug(partition_type.iter())
731 {
732 match transform {
733 Transform::Identity => {
734 assert!(expr.is_input_ref());
735 assert_eq!(expr.return_type(), *expect_type);
736 }
737 Transform::Void => {
738 assert!(expr.is_literal());
739 assert_eq!(expr.return_type(), *expect_type);
740 }
741 _ => {
742 let expr = expr.as_function_call().unwrap();
743 assert_eq!(expr.func_type(), Type::IcebergTransform);
744 assert_eq!(expr.inputs().len(), 2);
745 assert_eq!(
746 expr.inputs()[0],
747 ExprImpl::literal_varchar(transform.to_string())
748 );
749 }
750 }
751 }
752 }
753}