1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::match_sink_name_str;
25use risingwave_connector::sink::catalog::desc::SinkDesc;
26use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
27use risingwave_connector::sink::file_sink::fs::FsSink;
28use risingwave_connector::sink::iceberg::ICEBERG_SINK;
29use risingwave_connector::sink::trivial::TABLE_SINK;
30use risingwave_connector::sink::{
31 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
32 SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
33};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45 StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59
60pub enum PartitionComputeInfo {
73 Iceberg(IcebergPartitionInfo),
74}
75
76impl PartitionComputeInfo {
77 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
78 match self {
79 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
80 }
81 }
82}
83
84pub struct IcebergPartitionInfo {
85 pub partition_type: StructType,
86 pub partition_fields: Vec<(String, Transform)>,
88}
89
90impl IcebergPartitionInfo {
91 #[inline]
92 fn transform_to_expression(
93 transform: &Transform,
94 col_id: usize,
95 columns: &[ColumnCatalog],
96 result_type: DataType,
97 ) -> Result<ExprImpl> {
98 match transform {
99 Transform::Identity => {
100 if columns[col_id].column_desc.data_type != result_type {
101 return Err(ErrorCode::InvalidInputSyntax(format!(
102 "The partition field {} has type {}, but the partition field is {}",
103 columns[col_id].column_desc.name,
104 columns[col_id].column_desc.data_type,
105 result_type
106 ))
107 .into());
108 }
109 Ok(ExprImpl::InputRef(
110 InputRef::new(col_id, result_type).into(),
111 ))
112 }
113 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
114 _ => Ok(ExprImpl::FunctionCall(
115 FunctionCall::new_unchecked(
116 Type::IcebergTransform,
117 vec![
118 ExprImpl::literal_varchar(transform.to_string()),
119 ExprImpl::InputRef(
120 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
121 .into(),
122 ),
123 ],
124 result_type,
125 )
126 .into(),
127 )),
128 }
129 }
130
131 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
132 let child_exprs = self
133 .partition_fields
134 .into_iter()
135 .zip_eq_debug(self.partition_type.iter())
136 .map(|((field_name, transform), (_, result_type))| {
137 let col_id = find_column_idx_by_name(columns, &field_name)?;
138 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
139 })
140 .collect::<Result<Vec<_>>>()?;
141
142 Ok(ExprImpl::FunctionCall(
143 FunctionCall::new_unchecked(
144 Type::Row,
145 child_exprs,
146 DataType::Struct(self.partition_type),
147 )
148 .into(),
149 ))
150 }
151}
152
153#[inline]
154fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
155 columns
156 .iter()
157 .position(|col| col.column_desc.name == col_name)
158 .ok_or_else(|| {
159 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
160 .into()
161 })
162}
163
164#[derive(Debug, Clone, PartialEq, Eq, Hash)]
166pub struct StreamSink {
167 pub base: PlanBase<Stream>,
168 input: PlanRef,
169 sink_desc: SinkDesc,
170 log_store_type: SinkLogStoreType,
171}
172
173impl StreamSink {
174 #[must_use]
175 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
176 let base = input.plan_base().clone_with_new_plan_id();
177
178 if let SinkType::AppendOnly = sink_desc.sink_type {
179 let kind = input.stream_kind();
180 assert_matches!(
181 kind,
182 StreamKind::AppendOnly,
183 "{kind} stream cannot be used as input of append-only sink",
184 );
185 }
186
187 Self {
188 base,
189 input,
190 sink_desc,
191 log_store_type,
192 }
193 }
194
195 pub fn sink_desc(&self) -> &SinkDesc {
196 &self.sink_desc
197 }
198
199 fn derive_iceberg_sink_distribution(
200 input: PlanRef,
201 partition_info: Option<PartitionComputeInfo>,
202 columns: &[ColumnCatalog],
203 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
204 if let Some(partition_info) = partition_info {
206 let input_fields = input.schema().fields();
207
208 let mut exprs: Vec<_> = input_fields
209 .iter()
210 .enumerate()
211 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
212 .collect();
213
214 exprs.push(partition_info.convert_to_expression(columns)?);
216 let partition_col_idx = exprs.len() - 1;
217 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
218 Ok((
219 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
220 project.into(),
221 Some(partition_col_idx),
222 ))
223 } else {
224 Ok((
225 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
226 input,
227 None,
228 ))
229 }
230 }
231
232 #[allow(clippy::too_many_arguments)]
233 pub fn create(
234 StreamOptimizedLogicalPlanRoot {
235 plan: mut input,
236 required_dist: user_distributed_by,
237 required_order: user_order_by,
238 out_fields: user_cols,
239 out_names,
240 ..
241 }: StreamOptimizedLogicalPlanRoot,
242 name: String,
243 db_name: String,
244 sink_from_table_name: String,
245 target_table: Option<Arc<TableCatalog>>,
246 target_table_mapping: Option<Vec<Option<usize>>>,
247 definition: String,
248 properties: WithOptionsSecResolved,
249 format_desc: Option<SinkFormatDesc>,
250 partition_info: Option<PartitionComputeInfo>,
251 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
252 ) -> Result<Self> {
253 let sink_type =
254 Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
255
256 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
257 let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
258 let mut downstream_pk = {
259 let from_properties =
260 Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
261 if let Some(t) = &target_table {
262 let user_defined_primary_key_table = t.row_id_index.is_none();
263 let sink_is_append_only =
264 sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
265
266 if !user_defined_primary_key_table && !sink_is_append_only {
267 return Err(RwError::from(ErrorCode::BindError(
268 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
269 )));
270 }
271
272 if t.append_only && !sink_is_append_only {
273 return Err(RwError::from(ErrorCode::BindError(
274 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
275 )));
276 }
277
278 if sink_type != SinkType::Upsert {
279 vec![]
280 } else {
281 let target_table_mapping = target_table_mapping.unwrap();
282
283 t.pk()
284 .iter()
285 .map(|c| {
286 target_table_mapping[c.column_index].ok_or_else(
287 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
288 })
289 .try_collect::<_, _, RwError>()?
290 }
291 } else {
292 from_properties
293 }
294 };
295 if let Some(upstream_table) = &auto_refresh_schema_from_table
296 && !downstream_pk.is_empty()
297 {
298 let upstream_table_pk_col_names = upstream_table
299 .pk
300 .iter()
301 .map(|order| {
302 upstream_table.columns[order.column_index]
303 .column_desc
304 .name()
305 })
306 .collect_vec();
307 let sink_pk_col_names = downstream_pk
308 .iter()
309 .map(|&column_index| columns[column_index].name())
310 .collect_vec();
311 if upstream_table_pk_col_names != sink_pk_col_names {
312 return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
313 }
314 }
315 let mut extra_partition_col_idx = None;
316
317 let required_dist = match input.distribution() {
318 Distribution::Single => RequiredDist::single(),
319 _ => {
320 match properties.get("connector") {
321 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
322 if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
323 return Err(ErrorCode::InvalidInputSyntax(format!(
324 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
325 key = DOWNSTREAM_PK_KEY
326 )).into());
327 }
328 RequiredDist::hash_shard(downstream_pk.as_slice())
331 }
332 Some(s) if s == ICEBERG_SINK => {
333 if sink_type.is_upsert() && downstream_pk.is_empty() {
335 downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
336 }
337 let (required_dist, new_input, partition_col_idx) =
338 Self::derive_iceberg_sink_distribution(
339 input,
340 partition_info,
341 &columns,
342 )?;
343 input = new_input;
344 extra_partition_col_idx = partition_col_idx;
345 required_dist
346 }
347 _ => {
348 assert_matches!(user_distributed_by, RequiredDist::Any);
349 if downstream_pk.is_empty() {
350 RequiredDist::shard_by_key(
351 input.schema().len(),
352 input.expect_stream_key(),
353 )
354 } else {
355 RequiredDist::shard_by_key(
358 input.schema().len(),
359 downstream_pk.as_slice(),
360 )
361 }
362 }
363 }
364 }
365 };
366 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
367 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
368 && input.as_stream_exchange().is_none()
369 {
370 StreamExchange::new_no_shuffle(input).into()
371 } else {
372 input
373 };
374
375 let distribution_key = input.distribution().dist_column_indices().to_vec();
376 let create_type = if input.ctx().session_ctx().config().background_ddl()
377 && plan_can_use_background_ddl(&input)
378 {
379 CreateType::Background
380 } else {
381 CreateType::Foreground
382 };
383 let (properties, secret_refs) = properties.into_parts();
384 let is_exactly_once = properties
385 .get("is_exactly_once")
386 .is_some_and(|v| v.to_lowercase() == "true");
387 let mut sink_desc = SinkDesc {
388 id: SinkId::placeholder(),
389 name,
390 db_name,
391 sink_from_name: sink_from_table_name,
392 definition,
393 columns,
394 plan_pk: pk,
395 downstream_pk,
396 distribution_key,
397 properties,
398 secret_refs,
399 sink_type,
400 format_desc,
401 target_table: target_table.as_ref().map(|catalog| catalog.id()),
402 extra_partition_col_idx,
403 create_type,
404 is_exactly_once,
405 auto_refresh_schema_from_table: auto_refresh_schema_from_table
406 .as_ref()
407 .map(|table| table.id),
408 };
409
410 let unsupported_sink = |sink: &str| -> Result<_> {
411 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
412 };
413
414 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
416 Some(connector) => {
417 let connector_type = connector.to_lowercase();
418 match_sink_name_str!(
419 connector_type.as_str(),
420 SinkType,
421 {
422 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
424 unsupported_sink(TABLE_SINK)
425 } else {
426 SinkType::set_default_commit_checkpoint_interval(
427 &mut sink_desc,
428 &input.ctx().session_ctx().config().sink_decouple(),
429 )?;
430 let support_schema_change = SinkType::support_schema_change();
431 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
432 return Err(ErrorCode::InvalidInputSyntax(format!(
433 "{} sink does not support schema change",
434 connector_type
435 ))
436 .into());
437 }
438 SinkType::is_sink_decouple(
439 &input.ctx().session_ctx().config().sink_decouple(),
440 )
441 .map_err(Into::into)
442 }
443 },
444 |other: &str| unsupported_sink(other)
445 )?
446 }
447 None => {
448 return Err(ErrorCode::InvalidInputSyntax(
449 "connector not specified when create sink".to_owned(),
450 )
451 .into());
452 }
453 };
454 let hint_string =
455 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
456 if !sink_decouple {
457 if sink_desc.is_file_sink() {
459 return Err(ErrorCode::NotSupported(
460 "File sink can only be created with sink_decouple enabled.".to_owned(),
461 hint_string(true),
462 )
463 .into());
464 }
465 if sink_desc.is_exactly_once {
466 return Err(ErrorCode::NotSupported(
467 "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
468 hint_string(true),
469 )
470 .into());
471 }
472 }
473 if sink_decouple && auto_refresh_schema_from_table.is_some() {
474 return Err(ErrorCode::NotSupported(
475 "sink with auto schema refresh can only be created with sink_decouple disabled."
476 .to_owned(),
477 hint_string(false),
478 )
479 .into());
480 }
481 let log_store_type = if sink_decouple {
482 SinkLogStoreType::KvLogStore
483 } else {
484 SinkLogStoreType::InMemoryLogStore
485 };
486
487 let input = if sink_decouple && target_table.is_some() {
489 StreamSyncLogStore::new(input).into()
490 } else {
491 input
492 };
493
494 Ok(Self::new(input, sink_desc, log_store_type))
495 }
496
497 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
498 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
499 if sink_type == SINK_TYPE_APPEND_ONLY {
500 return Ok(Some(SinkType::AppendOnly));
501 } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
502 return Ok(Some(SinkType::Upsert));
503 } else {
504 return Err(ErrorCode::InvalidInputSyntax(format!(
505 "`{}` must be {}, {}, or {}",
506 SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT
507 ))
508 .into());
509 }
510 }
511 Ok(None)
512 }
513
514 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
515 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
516 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
517 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
518 {
519 return Err(ErrorCode::InvalidInputSyntax(format!(
520 "`{}` must be true or false",
521 SINK_USER_FORCE_APPEND_ONLY_OPTION
522 ))
523 .into());
524 }
525 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
526 }
527
528 fn derive_sink_type(
529 input_append_only: bool,
530 properties: &WithOptionsSecResolved,
531 format_desc: Option<&SinkFormatDesc>,
532 ) -> Result<SinkType> {
533 let frontend_derived_append_only = input_append_only;
534 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
535 Some(f) => (
536 Some(match f.format {
537 SinkFormat::AppendOnly => SinkType::AppendOnly,
538 SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
539 }),
540 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
541 f.options.clone(),
542 ))?,
543 false,
544 ),
545 None => (
546 Self::sink_type_in_prop(properties)?,
547 Self::is_user_force_append_only(properties)?,
548 true,
549 ),
550 };
551
552 if user_force_append_only
553 && user_defined_sink_type.is_some()
554 && user_defined_sink_type != Some(SinkType::AppendOnly)
555 {
556 return Err(ErrorCode::InvalidInputSyntax(
557 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
558 )
559 .into());
560 }
561
562 let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
563 false
564 } else {
565 user_force_append_only
566 };
567
568 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
569 return Err(ErrorCode::InvalidInputSyntax(format!(
570 "Cannot force the sink to be append-only without \"{}\".",
571 if syntax_legacy {
572 "type='append-only'"
573 } else {
574 "FORMAT PLAIN"
575 }
576 ))
577 .into());
578 }
579
580 if let Some(user_defined_sink_type) = user_defined_sink_type {
581 if user_defined_sink_type == SinkType::AppendOnly {
582 if user_force_append_only {
583 return Ok(SinkType::ForceAppendOnly);
584 }
585 if !frontend_derived_append_only {
586 return Err(ErrorCode::InvalidInputSyntax(format!(
587 "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
588 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
589 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
590 ))
591 .into());
592 } else {
593 return Ok(SinkType::AppendOnly);
594 }
595 }
596
597 Ok(user_defined_sink_type)
598 } else {
599 match frontend_derived_append_only {
600 true => Ok(SinkType::AppendOnly),
601 false => Ok(SinkType::Upsert),
602 }
603 }
604 }
605
606 fn parse_downstream_pk(
612 columns: &[ColumnCatalog],
613 downstream_pk_str: Option<&String>,
614 ) -> Result<Vec<usize>> {
615 match downstream_pk_str {
616 Some(downstream_pk_str) => {
617 let downstream_pk = downstream_pk_str.split(',').collect_vec();
619 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
620 for key in downstream_pk {
621 let trimmed_key = key.trim();
622 if trimmed_key.is_empty() {
623 continue;
624 }
625 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
626 }
627 Ok(downstream_pk_indices)
628 }
629 None => {
630 Ok(Vec::new())
633 }
634 }
635 }
636
637 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
640 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
641 }
642}
643
644impl PlanTreeNodeUnary<Stream> for StreamSink {
645 fn input(&self) -> PlanRef {
646 self.input.clone()
647 }
648
649 fn clone_with_input(&self, input: PlanRef) -> Self {
650 Self::new(input, self.sink_desc.clone(), self.log_store_type)
651 }
653}
654
655impl_plan_tree_node_for_unary! { Stream, StreamSink }
656
657impl Distill for StreamSink {
658 fn distill<'a>(&self) -> XmlNode<'a> {
659 let sink_type = if self.sink_desc.sink_type.is_append_only() {
660 "append-only"
661 } else {
662 "upsert"
663 };
664 let column_names = self
665 .sink_desc
666 .columns
667 .iter()
668 .map(|col| col.name_with_hidden().to_string())
669 .map(Pretty::from)
670 .collect();
671 let column_names = Pretty::Array(column_names);
672 let mut vec = Vec::with_capacity(3);
673 vec.push(("type", Pretty::from(sink_type)));
674 vec.push(("columns", column_names));
675 if self.sink_desc.sink_type.is_upsert() {
676 let sink_pk = IndicesDisplay {
677 indices: &self.sink_desc.downstream_pk.clone(),
678 schema: self.base.schema(),
679 };
680 vec.push(("downstream_pk", sink_pk.distill()));
681 }
682 childless_record("StreamSink", vec)
683 }
684}
685
686impl StreamNode for StreamSink {
687 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
688 use risingwave_pb::stream_plan::*;
689
690 let table = self
692 .infer_kv_log_store_table_catalog()
693 .with_id(state.gen_table_id_wrapped());
694
695 PbNodeBody::Sink(Box::new(SinkNode {
696 sink_desc: Some(self.sink_desc.to_proto()),
697 table: Some(table.to_internal_table_prost()),
698 log_store_type: self.log_store_type as i32,
699 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
700 }))
701 }
702}
703
704impl ExprRewritable<Stream> for StreamSink {}
705
706impl ExprVisitable for StreamSink {}
707
708#[cfg(test)]
709mod test {
710 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
711 use risingwave_common::types::{DataType, StructType};
712 use risingwave_common::util::iter_util::ZipEqDebug;
713 use risingwave_pb::expr::expr_node::Type;
714
715 use super::{IcebergPartitionInfo, *};
716 use crate::expr::{Expr, ExprImpl};
717
718 fn create_column_catalog() -> Vec<ColumnCatalog> {
719 vec![
720 ColumnCatalog {
721 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
722 is_hidden: false,
723 },
724 ColumnCatalog {
725 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
726 is_hidden: false,
727 },
728 ColumnCatalog {
729 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
730 is_hidden: false,
731 },
732 ]
733 }
734
735 #[test]
736 fn test_iceberg_convert_to_expression() {
737 let partition_type = StructType::new(vec![
738 ("f1", DataType::Int32),
739 ("f2", DataType::Int32),
740 ("f3", DataType::Int32),
741 ("f4", DataType::Int32),
742 ("f5", DataType::Int32),
743 ("f6", DataType::Int32),
744 ("f7", DataType::Int32),
745 ("f8", DataType::Int32),
746 ("f9", DataType::Int32),
747 ]);
748 let partition_fields = vec![
749 ("v1".into(), Transform::Identity),
750 ("v1".into(), Transform::Bucket(10)),
751 ("v1".into(), Transform::Truncate(3)),
752 ("v2".into(), Transform::Year),
753 ("v2".into(), Transform::Month),
754 ("v3".into(), Transform::Day),
755 ("v3".into(), Transform::Hour),
756 ("v1".into(), Transform::Void),
757 ("v3".into(), Transform::Void),
758 ];
759 let partition_info = IcebergPartitionInfo {
760 partition_type: partition_type.clone(),
761 partition_fields: partition_fields.clone(),
762 };
763 let catalog = create_column_catalog();
764 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
765 let actual_expr = actual_expr.as_function_call().unwrap();
766
767 assert_eq!(
768 actual_expr.return_type(),
769 DataType::Struct(partition_type.clone())
770 );
771 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
772 assert_eq!(actual_expr.func_type(), Type::Row);
773
774 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
775 .inputs()
776 .iter()
777 .zip_eq_debug(partition_fields.iter())
778 .zip_eq_debug(partition_type.iter())
779 {
780 match transform {
781 Transform::Identity => {
782 assert!(expr.is_input_ref());
783 assert_eq!(expr.return_type(), *expect_type);
784 }
785 Transform::Void => {
786 assert!(expr.is_literal());
787 assert_eq!(expr.return_type(), *expect_type);
788 }
789 _ => {
790 let expr = expr.as_function_call().unwrap();
791 assert_eq!(expr.func_type(), Type::IcebergTransform);
792 assert_eq!(expr.inputs().len(), 2);
793 assert_eq!(
794 expr.inputs()[0],
795 ExprImpl::literal_varchar(transform.to_string())
796 );
797 }
798 }
799 }
800 }
801}