1use std::assert_matches::assert_matches;
16use std::sync::Arc;
17
18use iceberg::spec::Transform;
19use itertools::Itertools;
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{ColumnCatalog, CreateType, FieldLike};
22use risingwave_common::types::{DataType, StructType};
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_connector::match_sink_name_str;
25use risingwave_connector::sink::catalog::desc::SinkDesc;
26use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
27use risingwave_connector::sink::file_sink::fs::FsSink;
28use risingwave_connector::sink::iceberg::ICEBERG_SINK;
29use risingwave_connector::sink::trivial::TABLE_SINK;
30use risingwave_connector::sink::{
31 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
32 SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
33};
34use risingwave_pb::expr::expr_node::Type;
35use risingwave_pb::stream_plan::SinkLogStoreType;
36use risingwave_pb::stream_plan::stream_node::PbNodeBody;
37
38use super::derive::{derive_columns, derive_pk};
39use super::stream::prelude::*;
40use super::utils::{
41 Distill, IndicesDisplay, childless_record, infer_kv_log_store_table_catalog_inner,
42};
43use super::{
44 ExprRewritable, PlanBase, StreamExchange, StreamNode, StreamPlanRef as PlanRef, StreamProject,
45 StreamSyncLogStore, generic,
46};
47use crate::TableCatalog;
48use crate::error::{ErrorCode, Result, RwError};
49use crate::expr::{ExprImpl, FunctionCall, InputRef};
50use crate::optimizer::StreamOptimizedLogicalPlanRoot;
51use crate::optimizer::plan_node::PlanTreeNodeUnary;
52use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
53use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
54use crate::optimizer::property::{Distribution, RequiredDist};
55use crate::stream_fragmenter::BuildFragmentGraphState;
56use crate::utils::WithOptionsSecResolved;
57
58const DOWNSTREAM_PK_KEY: &str = "primary_key";
59const CREATE_TABLE_IF_NOT_EXISTS: &str = "create_table_if_not_exists";
60
61pub enum PartitionComputeInfo {
74 Iceberg(IcebergPartitionInfo),
75}
76
77impl PartitionComputeInfo {
78 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
79 match self {
80 PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns),
81 }
82 }
83}
84
85pub struct IcebergPartitionInfo {
86 pub partition_type: StructType,
87 pub partition_fields: Vec<(String, Transform)>,
89}
90
91impl IcebergPartitionInfo {
92 #[inline]
93 fn transform_to_expression(
94 transform: &Transform,
95 col_id: usize,
96 columns: &[ColumnCatalog],
97 result_type: DataType,
98 ) -> Result<ExprImpl> {
99 match transform {
100 Transform::Identity => {
101 if columns[col_id].column_desc.data_type != result_type {
102 return Err(ErrorCode::InvalidInputSyntax(format!(
103 "The partition field {} has type {}, but the partition field is {}",
104 columns[col_id].column_desc.name,
105 columns[col_id].column_desc.data_type,
106 result_type
107 ))
108 .into());
109 }
110 Ok(ExprImpl::InputRef(
111 InputRef::new(col_id, result_type).into(),
112 ))
113 }
114 Transform::Void => Ok(ExprImpl::literal_null(result_type)),
115 _ => Ok(ExprImpl::FunctionCall(
116 FunctionCall::new_unchecked(
117 Type::IcebergTransform,
118 vec![
119 ExprImpl::literal_varchar(transform.to_string()),
120 ExprImpl::InputRef(
121 InputRef::new(col_id, columns[col_id].column_desc.data_type.clone())
122 .into(),
123 ),
124 ],
125 result_type,
126 )
127 .into(),
128 )),
129 }
130 }
131
132 pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result<ExprImpl> {
133 let child_exprs = self
134 .partition_fields
135 .into_iter()
136 .zip_eq_debug(self.partition_type.iter())
137 .map(|((field_name, transform), (_, result_type))| {
138 let col_id = find_column_idx_by_name(columns, &field_name)?;
139 Self::transform_to_expression(&transform, col_id, columns, result_type.clone())
140 })
141 .collect::<Result<Vec<_>>>()?;
142
143 Ok(ExprImpl::FunctionCall(
144 FunctionCall::new_unchecked(
145 Type::Row,
146 child_exprs,
147 DataType::Struct(self.partition_type),
148 )
149 .into(),
150 ))
151 }
152}
153
154#[inline]
155fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result<usize> {
156 columns
157 .iter()
158 .position(|col| col.column_desc.name == col_name)
159 .ok_or_else(|| {
160 ErrorCode::InvalidInputSyntax(format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name))
161 .into()
162 })
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Hash)]
167pub struct StreamSink {
168 pub base: PlanBase<Stream>,
169 input: PlanRef,
170 sink_desc: SinkDesc,
171 log_store_type: SinkLogStoreType,
172}
173
174impl StreamSink {
175 #[must_use]
176 pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
177 let base = input.plan_base().clone_with_new_plan_id();
178
179 if let SinkType::AppendOnly = sink_desc.sink_type {
180 let kind = input.stream_kind();
181 assert_matches!(
182 kind,
183 StreamKind::AppendOnly,
184 "{kind} stream cannot be used as input of append-only sink",
185 );
186 }
187
188 Self {
189 base,
190 input,
191 sink_desc,
192 log_store_type,
193 }
194 }
195
196 pub fn sink_desc(&self) -> &SinkDesc {
197 &self.sink_desc
198 }
199
200 fn derive_iceberg_sink_distribution(
201 input: PlanRef,
202 partition_info: Option<PartitionComputeInfo>,
203 columns: &[ColumnCatalog],
204 ) -> Result<(RequiredDist, PlanRef, Option<usize>)> {
205 if let Some(partition_info) = partition_info {
207 let input_fields = input.schema().fields();
208
209 let mut exprs: Vec<_> = input_fields
210 .iter()
211 .enumerate()
212 .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
213 .collect();
214
215 exprs.push(partition_info.convert_to_expression(columns)?);
217 let partition_col_idx = exprs.len() - 1;
218 let project = StreamProject::new(generic::Project::new(exprs.clone(), input));
219 Ok((
220 RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]),
221 project.into(),
222 Some(partition_col_idx),
223 ))
224 } else {
225 Ok((
226 RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()),
227 input,
228 None,
229 ))
230 }
231 }
232
233 #[allow(clippy::too_many_arguments)]
234 pub fn create(
235 StreamOptimizedLogicalPlanRoot {
236 plan: mut input,
237 required_dist: user_distributed_by,
238 required_order: user_order_by,
239 out_fields: user_cols,
240 out_names,
241 ..
242 }: StreamOptimizedLogicalPlanRoot,
243 name: String,
244 db_name: String,
245 sink_from_table_name: String,
246 target_table: Option<Arc<TableCatalog>>,
247 target_table_mapping: Option<Vec<Option<usize>>>,
248 definition: String,
249 properties: WithOptionsSecResolved,
250 format_desc: Option<SinkFormatDesc>,
251 partition_info: Option<PartitionComputeInfo>,
252 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
253 ) -> Result<Self> {
254 let sink_type =
255 Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
256
257 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
258 let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
259 let mut downstream_pk = {
260 let downstream_pk =
261 Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
262 if let Some(t) = &target_table {
263 let user_defined_primary_key_table = t.row_id_index.is_none();
264 let sink_is_append_only =
265 sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly;
266
267 if !user_defined_primary_key_table && !sink_is_append_only {
268 return Err(RwError::from(ErrorCode::BindError(
269 "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
270 )));
271 }
272
273 if t.append_only && !sink_is_append_only {
274 return Err(RwError::from(ErrorCode::BindError(
275 "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_owned(),
276 )));
277 }
278
279 if sink_type != SinkType::Upsert {
280 vec![]
281 } else {
282 let target_table_mapping = target_table_mapping.unwrap();
283
284 t.pk()
285 .iter()
286 .map(|c| {
287 target_table_mapping[c.column_index].ok_or_else(
288 || ErrorCode::InvalidInputSyntax("When using non append only sink into table, the primary key of the table must be included in the sink result.".to_owned()).into())
289 })
290 .try_collect::<_, _, RwError>()?
291 }
292 } else if properties.get(CREATE_TABLE_IF_NOT_EXISTS) == Some(&"true".to_owned())
293 && sink_type == SinkType::Upsert
294 && downstream_pk.is_empty()
295 {
296 pk.iter().map(|k| k.column_index).collect_vec()
297 } else {
298 downstream_pk
299 }
300 };
301 if let Some(upstream_table) = &auto_refresh_schema_from_table
302 && !downstream_pk.is_empty()
303 {
304 let upstream_table_pk_col_names = upstream_table
305 .pk
306 .iter()
307 .map(|order| {
308 upstream_table.columns[order.column_index]
309 .column_desc
310 .name()
311 })
312 .collect_vec();
313 let sink_pk_col_names = downstream_pk
314 .iter()
315 .map(|&column_index| columns[column_index].name())
316 .collect_vec();
317 if upstream_table_pk_col_names != sink_pk_col_names {
318 return Err(ErrorCode::InvalidInputSyntax(format!("sink with auto schema change should have same pk as upstream table {:?}, but got {:?}", upstream_table_pk_col_names, sink_pk_col_names)).into());
319 }
320 }
321 let mut extra_partition_col_idx = None;
322
323 let required_dist = match input.distribution() {
324 Distribution::Single => RequiredDist::single(),
325 _ => {
326 match properties.get("connector") {
327 Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => {
328 if sink_type == SinkType::Upsert && downstream_pk.is_empty() {
329 return Err(ErrorCode::InvalidInputSyntax(format!(
330 "Primary key must be defined for upsert JDBC sink. Please specify the \"{key}='pk1,pk2,...'\" in WITH options.",
331 key = DOWNSTREAM_PK_KEY
332 )).into());
333 }
334 RequiredDist::hash_shard(downstream_pk.as_slice())
337 }
338 Some(s) if s == ICEBERG_SINK => {
339 if sink_type.is_upsert() && downstream_pk.is_empty() {
341 downstream_pk = pk.iter().map(|k| k.column_index).collect_vec();
342 }
343 let (required_dist, new_input, partition_col_idx) =
344 Self::derive_iceberg_sink_distribution(
345 input,
346 partition_info,
347 &columns,
348 )?;
349 input = new_input;
350 extra_partition_col_idx = partition_col_idx;
351 required_dist
352 }
353 _ => {
354 assert_matches!(user_distributed_by, RequiredDist::Any);
355 if downstream_pk.is_empty() {
356 RequiredDist::shard_by_key(
357 input.schema().len(),
358 input.expect_stream_key(),
359 )
360 } else {
361 RequiredDist::shard_by_key(
364 input.schema().len(),
365 downstream_pk.as_slice(),
366 )
367 }
368 }
369 }
370 }
371 };
372 let input = required_dist.streaming_enforce_if_not_satisfies(input)?;
373 let input = if input.ctx().session_ctx().config().streaming_separate_sink()
374 && input.as_stream_exchange().is_none()
375 {
376 StreamExchange::new_no_shuffle(input).into()
377 } else {
378 input
379 };
380
381 let distribution_key = input.distribution().dist_column_indices().to_vec();
382 let create_type = if input.ctx().session_ctx().config().background_ddl()
383 && plan_can_use_background_ddl(&input)
384 {
385 CreateType::Background
386 } else {
387 CreateType::Foreground
388 };
389 let (properties, secret_refs) = properties.into_parts();
390 let is_exactly_once = properties
391 .get("is_exactly_once")
392 .is_some_and(|v| v.to_lowercase() == "true");
393 let mut sink_desc = SinkDesc {
394 id: SinkId::placeholder(),
395 name,
396 db_name,
397 sink_from_name: sink_from_table_name,
398 definition,
399 columns,
400 plan_pk: pk,
401 downstream_pk,
402 distribution_key,
403 properties,
404 secret_refs,
405 sink_type,
406 format_desc,
407 target_table: target_table.as_ref().map(|catalog| catalog.id()),
408 extra_partition_col_idx,
409 create_type,
410 is_exactly_once,
411 auto_refresh_schema_from_table: auto_refresh_schema_from_table
412 .as_ref()
413 .map(|table| table.id),
414 };
415
416 let unsupported_sink = |sink: &str| -> Result<_> {
417 Err(ErrorCode::InvalidInputSyntax(format!("unsupported sink type {}", sink)).into())
418 };
419
420 let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) {
422 Some(connector) => {
423 let connector_type = connector.to_lowercase();
424 match_sink_name_str!(
425 connector_type.as_str(),
426 SinkType,
427 {
428 if connector == TABLE_SINK && sink_desc.target_table.is_none() {
430 unsupported_sink(TABLE_SINK)
431 } else {
432 SinkType::set_default_commit_checkpoint_interval(
433 &mut sink_desc,
434 &input.ctx().session_ctx().config().sink_decouple(),
435 )?;
436 let support_schema_change = SinkType::support_schema_change();
437 if !support_schema_change && auto_refresh_schema_from_table.is_some() {
438 return Err(ErrorCode::InvalidInputSyntax(format!(
439 "{} sink does not support schema change",
440 connector_type
441 ))
442 .into());
443 }
444 SinkType::is_sink_decouple(
445 &input.ctx().session_ctx().config().sink_decouple(),
446 )
447 .map_err(Into::into)
448 }
449 },
450 |other: &str| unsupported_sink(other)
451 )?
452 }
453 None => {
454 return Err(ErrorCode::InvalidInputSyntax(
455 "connector not specified when create sink".to_owned(),
456 )
457 .into());
458 }
459 };
460 let hint_string =
461 |expected: bool| format!("Please run `set sink_decouple = {}` first.", expected);
462 if !sink_decouple {
463 if sink_desc.is_file_sink() {
465 return Err(ErrorCode::NotSupported(
466 "File sink can only be created with sink_decouple enabled.".to_owned(),
467 hint_string(true),
468 )
469 .into());
470 }
471 if sink_desc.is_exactly_once {
472 return Err(ErrorCode::NotSupported(
473 "Exactly once sink can only be created with sink_decouple enabled.".to_owned(),
474 hint_string(true),
475 )
476 .into());
477 }
478 }
479 if sink_decouple && auto_refresh_schema_from_table.is_some() {
480 return Err(ErrorCode::NotSupported(
481 "sink with auto schema refresh can only be created with sink_decouple disabled."
482 .to_owned(),
483 hint_string(false),
484 )
485 .into());
486 }
487 let log_store_type = if sink_decouple {
488 SinkLogStoreType::KvLogStore
489 } else {
490 SinkLogStoreType::InMemoryLogStore
491 };
492
493 let input = if sink_decouple && target_table.is_some() {
495 StreamSyncLogStore::new(input).into()
496 } else {
497 input
498 };
499
500 Ok(Self::new(input, sink_desc, log_store_type))
501 }
502
503 fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
504 if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
505 if sink_type == SINK_TYPE_APPEND_ONLY {
506 return Ok(Some(SinkType::AppendOnly));
507 } else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
508 return Ok(Some(SinkType::Upsert));
509 } else {
510 return Err(ErrorCode::InvalidInputSyntax(format!(
511 "`{}` must be {}, {}, or {}",
512 SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT
513 ))
514 .into());
515 }
516 }
517 Ok(None)
518 }
519
520 fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
521 if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION)
522 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")
523 && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false")
524 {
525 return Err(ErrorCode::InvalidInputSyntax(format!(
526 "`{}` must be true or false",
527 SINK_USER_FORCE_APPEND_ONLY_OPTION
528 ))
529 .into());
530 }
531 Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"))
532 }
533
534 fn derive_sink_type(
535 input_append_only: bool,
536 properties: &WithOptionsSecResolved,
537 format_desc: Option<&SinkFormatDesc>,
538 ) -> Result<SinkType> {
539 let frontend_derived_append_only = input_append_only;
540 let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
541 Some(f) => (
542 Some(match f.format {
543 SinkFormat::AppendOnly => SinkType::AppendOnly,
544 SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
545 }),
546 Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
547 f.options.clone(),
548 ))?,
549 false,
550 ),
551 None => (
552 Self::sink_type_in_prop(properties)?,
553 Self::is_user_force_append_only(properties)?,
554 true,
555 ),
556 };
557
558 if user_force_append_only
559 && user_defined_sink_type.is_some()
560 && user_defined_sink_type != Some(SinkType::AppendOnly)
561 {
562 return Err(ErrorCode::InvalidInputSyntax(
563 "The force_append_only can be only used for type = \'append-only\'".to_owned(),
564 )
565 .into());
566 }
567
568 let user_force_append_only = if user_force_append_only && frontend_derived_append_only {
569 false
570 } else {
571 user_force_append_only
572 };
573
574 if user_force_append_only && user_defined_sink_type != Some(SinkType::AppendOnly) {
575 return Err(ErrorCode::InvalidInputSyntax(format!(
576 "Cannot force the sink to be append-only without \"{}\".",
577 if syntax_legacy {
578 "type='append-only'"
579 } else {
580 "FORMAT PLAIN"
581 }
582 ))
583 .into());
584 }
585
586 if let Some(user_defined_sink_type) = user_defined_sink_type {
587 if user_defined_sink_type == SinkType::AppendOnly {
588 if user_force_append_only {
589 return Ok(SinkType::ForceAppendOnly);
590 }
591 if !frontend_derived_append_only {
592 return Err(ErrorCode::InvalidInputSyntax(format!(
593 "The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
594 Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
595 if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
596 ))
597 .into());
598 } else {
599 return Ok(SinkType::AppendOnly);
600 }
601 }
602
603 Ok(user_defined_sink_type)
604 } else {
605 match frontend_derived_append_only {
606 true => Ok(SinkType::AppendOnly),
607 false => Ok(SinkType::Upsert),
608 }
609 }
610 }
611
612 fn parse_downstream_pk(
618 columns: &[ColumnCatalog],
619 downstream_pk_str: Option<&String>,
620 ) -> Result<Vec<usize>> {
621 match downstream_pk_str {
622 Some(downstream_pk_str) => {
623 let downstream_pk = downstream_pk_str.split(',').collect_vec();
625 let mut downstream_pk_indices = Vec::with_capacity(downstream_pk.len());
626 for key in downstream_pk {
627 let trimmed_key = key.trim();
628 if trimmed_key.is_empty() {
629 continue;
630 }
631 downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?);
632 }
633 Ok(downstream_pk_indices)
634 }
635 None => {
636 Ok(Vec::new())
639 }
640 }
641 }
642
643 fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
646 infer_kv_log_store_table_catalog_inner(&self.input, &self.sink_desc().columns)
647 }
648}
649
650impl PlanTreeNodeUnary<Stream> for StreamSink {
651 fn input(&self) -> PlanRef {
652 self.input.clone()
653 }
654
655 fn clone_with_input(&self, input: PlanRef) -> Self {
656 Self::new(input, self.sink_desc.clone(), self.log_store_type)
657 }
659}
660
661impl_plan_tree_node_for_unary! { Stream, StreamSink }
662
663impl Distill for StreamSink {
664 fn distill<'a>(&self) -> XmlNode<'a> {
665 let sink_type = if self.sink_desc.sink_type.is_append_only() {
666 "append-only"
667 } else {
668 "upsert"
669 };
670 let column_names = self
671 .sink_desc
672 .columns
673 .iter()
674 .map(|col| col.name_with_hidden().to_string())
675 .map(Pretty::from)
676 .collect();
677 let column_names = Pretty::Array(column_names);
678 let mut vec = Vec::with_capacity(3);
679 vec.push(("type", Pretty::from(sink_type)));
680 vec.push(("columns", column_names));
681 if self.sink_desc.sink_type.is_upsert() {
682 let sink_pk = IndicesDisplay {
683 indices: &self.sink_desc.downstream_pk.clone(),
684 schema: self.base.schema(),
685 };
686 vec.push(("downstream_pk", sink_pk.distill()));
687 }
688 childless_record("StreamSink", vec)
689 }
690}
691
692impl StreamNode for StreamSink {
693 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
694 use risingwave_pb::stream_plan::*;
695
696 let table = self
698 .infer_kv_log_store_table_catalog()
699 .with_id(state.gen_table_id_wrapped());
700
701 PbNodeBody::Sink(Box::new(SinkNode {
702 sink_desc: Some(self.sink_desc.to_proto()),
703 table: Some(table.to_internal_table_prost()),
704 log_store_type: self.log_store_type as i32,
705 rate_limit: self.base.ctx().overwrite_options().sink_rate_limit,
706 }))
707 }
708}
709
710impl ExprRewritable<Stream> for StreamSink {}
711
712impl ExprVisitable for StreamSink {}
713
714#[cfg(test)]
715mod test {
716 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
717 use risingwave_common::types::{DataType, StructType};
718 use risingwave_common::util::iter_util::ZipEqDebug;
719 use risingwave_pb::expr::expr_node::Type;
720
721 use super::{IcebergPartitionInfo, *};
722 use crate::expr::{Expr, ExprImpl};
723
724 fn create_column_catalog() -> Vec<ColumnCatalog> {
725 vec![
726 ColumnCatalog {
727 column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
728 is_hidden: false,
729 },
730 ColumnCatalog {
731 column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz),
732 is_hidden: false,
733 },
734 ColumnCatalog {
735 column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp),
736 is_hidden: false,
737 },
738 ]
739 }
740
741 #[test]
742 fn test_iceberg_convert_to_expression() {
743 let partition_type = StructType::new(vec![
744 ("f1", DataType::Int32),
745 ("f2", DataType::Int32),
746 ("f3", DataType::Int32),
747 ("f4", DataType::Int32),
748 ("f5", DataType::Int32),
749 ("f6", DataType::Int32),
750 ("f7", DataType::Int32),
751 ("f8", DataType::Int32),
752 ("f9", DataType::Int32),
753 ]);
754 let partition_fields = vec![
755 ("v1".into(), Transform::Identity),
756 ("v1".into(), Transform::Bucket(10)),
757 ("v1".into(), Transform::Truncate(3)),
758 ("v2".into(), Transform::Year),
759 ("v2".into(), Transform::Month),
760 ("v3".into(), Transform::Day),
761 ("v3".into(), Transform::Hour),
762 ("v1".into(), Transform::Void),
763 ("v3".into(), Transform::Void),
764 ];
765 let partition_info = IcebergPartitionInfo {
766 partition_type: partition_type.clone(),
767 partition_fields: partition_fields.clone(),
768 };
769 let catalog = create_column_catalog();
770 let actual_expr = partition_info.convert_to_expression(&catalog).unwrap();
771 let actual_expr = actual_expr.as_function_call().unwrap();
772
773 assert_eq!(
774 actual_expr.return_type(),
775 DataType::Struct(partition_type.clone())
776 );
777 assert_eq!(actual_expr.inputs().len(), partition_fields.len());
778 assert_eq!(actual_expr.func_type(), Type::Row);
779
780 for ((expr, (_, transform)), (_, expect_type)) in actual_expr
781 .inputs()
782 .iter()
783 .zip_eq_debug(partition_fields.iter())
784 .zip_eq_debug(partition_type.iter())
785 {
786 match transform {
787 Transform::Identity => {
788 assert!(expr.is_input_ref());
789 assert_eq!(expr.return_type(), *expect_type);
790 }
791 Transform::Void => {
792 assert!(expr.is_literal());
793 assert_eq!(expr.return_type(), *expect_type);
794 }
795 _ => {
796 let expr = expr.as_function_call().unwrap();
797 assert_eq!(expr.func_type(), Type::IcebergTransform);
798 assert_eq!(expr.inputs().len(), 2);
799 assert_eq!(
800 expr.inputs()[0],
801 ExprImpl::literal_varchar(transform.to_string())
802 );
803 }
804 }
805 }
806 }
807}