1pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
35use risingwave_common::bail;
36use risingwave_common::catalog::{
37 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
38 Schema,
39};
40use risingwave_common::types::JsonbVal;
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
44use serde::{Deserialize, Serialize};
45
46pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
47use crate::connector_common::IcebergCommon;
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::error::{ConnectorError, ConnectorResult};
50use crate::parser::ParserConfig;
51use crate::source::{
52 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
53 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
54};
55pub const ICEBERG_CONNECTOR: &str = "iceberg";
56
57#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
58pub struct IcebergProperties {
59 #[serde(flatten)]
60 pub common: IcebergCommon,
61
62 #[serde(rename = "catalog.jdbc.user")]
64 pub jdbc_user: Option<String>,
65 #[serde(rename = "catalog.jdbc.password")]
66 pub jdbc_password: Option<String>,
67
68 #[serde(flatten)]
69 pub unknown_fields: HashMap<String, String>,
70}
71
72impl EnforceSecret for IcebergProperties {
73 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
74 "catalog.jdbc.password",
75 };
76
77 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
78 for prop in prop_iter {
79 IcebergCommon::enforce_one(prop)?;
80 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
81 return Err(EnforceSecretError {
82 key: prop.to_owned(),
83 }
84 .into());
85 }
86 }
87 Ok(())
88 }
89}
90
91impl IcebergProperties {
92 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
93 let mut java_catalog_props = HashMap::new();
94 if let Some(jdbc_user) = self.jdbc_user.clone() {
95 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
96 }
97 if let Some(jdbc_password) = self.jdbc_password.clone() {
98 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
99 }
100 self.common.create_catalog(&java_catalog_props).await
102 }
103
104 pub async fn load_table(&self) -> ConnectorResult<Table> {
105 let mut java_catalog_props = HashMap::new();
106 if let Some(jdbc_user) = self.jdbc_user.clone() {
107 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
108 }
109 if let Some(jdbc_password) = self.jdbc_password.clone() {
110 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
111 }
112 self.common.load_table(&java_catalog_props).await
114 }
115}
116
117impl SourceProperties for IcebergProperties {
118 type Split = IcebergSplit;
119 type SplitEnumerator = IcebergSplitEnumerator;
120 type SplitReader = IcebergFileReader;
121
122 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
123}
124
125impl UnknownFields for IcebergProperties {
126 fn unknown_fields(&self) -> HashMap<String, String> {
127 self.unknown_fields.clone()
128 }
129}
130
131#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
132pub struct IcebergFileScanTaskJsonStr(String);
133
134impl IcebergFileScanTaskJsonStr {
135 pub fn deserialize(&self) -> FileScanTask {
136 serde_json::from_str(&self.0).unwrap()
137 }
138
139 pub fn serialize(task: &FileScanTask) -> Self {
140 Self(serde_json::to_string(task).unwrap())
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
145pub enum IcebergFileScanTask {
146 Data(Vec<FileScanTask>),
147 EqualityDelete(Vec<FileScanTask>),
148 PositionDelete(Vec<FileScanTask>),
149 CountStar(u64),
150}
151
152impl IcebergFileScanTask {
153 pub fn new_count_star(count_sum: u64) -> Self {
154 IcebergFileScanTask::CountStar(count_sum)
155 }
156
157 pub fn new_scan_with_scan_type(
158 iceberg_scan_type: IcebergScanType,
159 data_files: Vec<FileScanTask>,
160 equality_delete_files: Vec<FileScanTask>,
161 position_delete_files: Vec<FileScanTask>,
162 ) -> Self {
163 match iceberg_scan_type {
164 IcebergScanType::EqualityDeleteScan => {
165 IcebergFileScanTask::EqualityDelete(equality_delete_files)
166 }
167 IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
168 IcebergScanType::PositionDeleteScan => {
169 IcebergFileScanTask::PositionDelete(position_delete_files)
170 }
171 IcebergScanType::Unspecified | IcebergScanType::CountStar => {
172 unreachable!("Unspecified iceberg scan type")
173 }
174 }
175 }
176
177 pub fn is_empty(&self) -> bool {
178 match self {
179 IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
180 IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
181 equality_delete_files.is_empty()
182 }
183 IcebergFileScanTask::PositionDelete(position_delete_files) => {
184 position_delete_files.is_empty()
185 }
186 IcebergFileScanTask::CountStar(_) => false,
187 }
188 }
189
190 pub fn files(&self) -> Vec<String> {
191 match self {
192 IcebergFileScanTask::Data(file_scan_tasks)
193 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
194 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
195 .iter()
196 .map(|task| task.data_file_path.clone())
197 .collect(),
198 IcebergFileScanTask::CountStar(_) => vec![],
199 }
200 }
201}
202
203#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
204pub struct IcebergSplit {
205 pub split_id: i64,
206 pub snapshot_id: i64,
208 pub task: IcebergFileScanTask,
209}
210
211impl IcebergSplit {
212 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
213 if let IcebergScanType::CountStar = iceberg_scan_type {
214 Self {
215 split_id: 0,
216 snapshot_id: 0,
217 task: IcebergFileScanTask::new_count_star(0),
218 }
219 } else {
220 Self {
221 split_id: 0,
222 snapshot_id: 0,
223 task: IcebergFileScanTask::new_scan_with_scan_type(
224 iceberg_scan_type,
225 vec![],
226 vec![],
227 vec![],
228 ),
229 }
230 }
231 }
232}
233
234impl SplitMetaData for IcebergSplit {
235 fn id(&self) -> SplitId {
236 self.split_id.to_string().into()
237 }
238
239 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
240 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
241 }
242
243 fn encode_to_json(&self) -> JsonbVal {
244 serde_json::to_value(self.clone()).unwrap().into()
245 }
246
247 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
248 unimplemented!()
249 }
250}
251
252#[derive(Debug, Clone)]
253pub struct IcebergSplitEnumerator {
254 config: IcebergProperties,
255}
256
257#[async_trait]
258impl SplitEnumerator for IcebergSplitEnumerator {
259 type Properties = IcebergProperties;
260 type Split = IcebergSplit;
261
262 async fn new(
263 properties: Self::Properties,
264 context: SourceEnumeratorContextRef,
265 ) -> ConnectorResult<Self> {
266 Ok(Self::new_inner(properties, context))
267 }
268
269 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
270 Ok(vec![])
274 }
275}
276impl IcebergSplitEnumerator {
277 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
278 Self { config: properties }
279 }
280}
281
282pub enum IcebergTimeTravelInfo {
283 Version(i64),
284 TimestampMs(i64),
285}
286
287impl IcebergSplitEnumerator {
288 pub fn get_snapshot_id(
289 table: &Table,
290 time_travel_info: Option<IcebergTimeTravelInfo>,
291 ) -> ConnectorResult<Option<i64>> {
292 let current_snapshot = table.metadata().current_snapshot();
293 if current_snapshot.is_none() {
294 return Ok(None);
295 }
296
297 let snapshot_id = match time_travel_info {
298 Some(IcebergTimeTravelInfo::Version(version)) => {
299 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
300 bail!("Cannot find the snapshot id in the iceberg table.");
301 };
302 snapshot.snapshot_id()
303 }
304 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
305 let snapshot = table
306 .metadata()
307 .snapshots()
308 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
309 .max_by_key(|snapshot| snapshot.timestamp_ms());
310 match snapshot {
311 Some(snapshot) => snapshot.snapshot_id(),
312 None => {
313 let time = chrono::DateTime::from_timestamp_millis(timestamp);
315 if time.is_some() {
316 bail!("Cannot find a snapshot older than {}", time.unwrap());
317 } else {
318 bail!("Cannot find a snapshot");
319 }
320 }
321 }
322 }
323 None => {
324 assert!(current_snapshot.is_some());
325 current_snapshot.unwrap().snapshot_id()
326 }
327 };
328 Ok(Some(snapshot_id))
329 }
330
331 pub async fn list_splits_batch(
332 &self,
333 schema: Schema,
334 time_traval_info: Option<IcebergTimeTravelInfo>,
335 batch_parallelism: usize,
336 iceberg_scan_type: IcebergScanType,
337 predicate: IcebergPredicate,
338 ) -> ConnectorResult<Vec<IcebergSplit>> {
339 if batch_parallelism == 0 {
340 bail!("Batch parallelism is 0. Cannot split the iceberg files.");
341 }
342 let table = self.config.load_table().await?;
343 let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
344 if snapshot_id.is_none() {
345 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
347 }
348 if let IcebergScanType::CountStar = iceberg_scan_type {
349 self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
350 .await
351 } else {
352 self.list_splits_batch_scan(
353 &table,
354 snapshot_id.unwrap(),
355 schema,
356 batch_parallelism,
357 iceberg_scan_type,
358 predicate,
359 )
360 .await
361 }
362 }
363
364 async fn list_splits_batch_scan(
365 &self,
366 table: &Table,
367 snapshot_id: i64,
368 schema: Schema,
369 batch_parallelism: usize,
370 iceberg_scan_type: IcebergScanType,
371 predicate: IcebergPredicate,
372 ) -> ConnectorResult<Vec<IcebergSplit>> {
373 let schema_names = schema.names();
374 let require_names = schema_names
375 .iter()
376 .filter(|name| {
377 name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
378 && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
379 && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
380 })
381 .cloned()
382 .collect_vec();
383
384 let table_schema = table.metadata().current_schema();
385 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
386
387 let mut position_delete_files = vec![];
388 let mut position_delete_files_set = HashSet::new();
389 let mut data_files = vec![];
390 let mut equality_delete_files = vec![];
391 let mut equality_delete_files_set = HashSet::new();
392 let scan = table
393 .scan()
394 .with_filter(predicate)
395 .snapshot_id(snapshot_id)
396 .with_delete_file_processing_enabled(true)
397 .select(require_names)
398 .build()
399 .map_err(|e| anyhow!(e))?;
400
401 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
402
403 #[for_await]
404 for task in file_scan_stream {
405 let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
406 for delete_file in task.deletes.drain(..) {
407 let mut delete_file = delete_file.as_ref().clone();
408 match delete_file.data_file_content {
409 iceberg::spec::DataContentType::Data => {
410 bail!("Data file should not in task deletes");
411 }
412 iceberg::spec::DataContentType::EqualityDeletes => {
413 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
414 equality_delete_files.push(delete_file);
415 }
416 }
417 iceberg::spec::DataContentType::PositionDeletes => {
418 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
419 delete_file.project_field_ids = Vec::default();
420 position_delete_files.push(delete_file);
421 }
422 }
423 }
424 }
425 match task.data_file_content {
426 iceberg::spec::DataContentType::Data => {
427 data_files.push(task);
428 }
429 iceberg::spec::DataContentType::EqualityDeletes => {
430 bail!("Equality delete files should not be in the data files");
431 }
432 iceberg::spec::DataContentType::PositionDeletes => {
433 bail!("Position delete files should not be in the data files");
434 }
435 }
436 }
437 let data_files = Self::split_n_vecs(data_files, batch_parallelism);
439 let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
440 let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
441
442 let splits = data_files
443 .into_iter()
444 .zip_eq_fast(equality_delete_files.into_iter())
445 .zip_eq_fast(position_delete_files.into_iter())
446 .enumerate()
447 .map(
448 |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
449 split_id: index as i64,
450 snapshot_id,
451 task: IcebergFileScanTask::new_scan_with_scan_type(
452 iceberg_scan_type,
453 data_file,
454 equality_delete_file,
455 position_delete_file,
456 ),
457 },
458 )
459 .filter(|split| !split.task.is_empty())
460 .collect_vec();
461
462 if splits.is_empty() {
463 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
464 }
465 Ok(splits)
466 }
467
468 pub async fn list_splits_batch_count_star(
469 &self,
470 table: &Table,
471 snapshot_id: i64,
472 ) -> ConnectorResult<Vec<IcebergSplit>> {
473 let mut record_counts = 0;
474 let scan = table
475 .scan()
476 .snapshot_id(snapshot_id)
477 .with_delete_file_processing_enabled(true)
478 .build()
479 .map_err(|e| anyhow!(e))?;
480 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
481
482 #[for_await]
483 for task in file_scan_stream {
484 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
485 assert_eq!(task.data_file_content, DataContentType::Data);
486 assert!(task.deletes.is_empty());
487 record_counts += task.record_count.expect("must have");
488 }
489 let split = IcebergSplit {
490 split_id: 0,
491 snapshot_id,
492 task: IcebergFileScanTask::new_count_star(record_counts),
493 };
494 Ok(vec![split])
495 }
496
497 pub async fn all_delete_parameters(
499 table: &Table,
500 snapshot_id: i64,
501 ) -> ConnectorResult<(Vec<String>, bool)> {
502 let scan = table
503 .scan()
504 .snapshot_id(snapshot_id)
505 .with_delete_file_processing_enabled(true)
506 .build()
507 .map_err(|e| anyhow!(e))?;
508 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
509 let schema = scan.snapshot().schema(table.metadata())?;
510 let mut equality_ids = vec![];
511 let mut have_position_delete = false;
512 #[for_await]
513 for task in file_scan_stream {
514 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
515 for delete_file in task.deletes {
516 match delete_file.data_file_content {
517 iceberg::spec::DataContentType::Data => {}
518 iceberg::spec::DataContentType::EqualityDeletes => {
519 if equality_ids.is_empty() {
520 equality_ids = delete_file.equality_ids.clone();
521 } else if equality_ids != delete_file.equality_ids {
522 bail!("The schema of iceberg equality delete file must be consistent");
523 }
524 }
525 iceberg::spec::DataContentType::PositionDeletes => {
526 have_position_delete = true;
527 }
528 }
529 }
530 }
531 let delete_columns = equality_ids
532 .into_iter()
533 .map(|id| match schema.name_by_field_id(id) {
534 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
535 None => bail!("Delete field id {} not found in schema", id),
536 })
537 .collect::<ConnectorResult<Vec<_>>>()?;
538
539 Ok((delete_columns, have_position_delete))
540 }
541
542 pub async fn get_delete_parameters(
543 &self,
544 time_travel_info: Option<IcebergTimeTravelInfo>,
545 ) -> ConnectorResult<(Vec<String>, bool)> {
546 let table = self.config.load_table().await?;
547 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
548 if snapshot_id.is_none() {
549 return Ok((vec![], false));
550 }
551 let snapshot_id = snapshot_id.unwrap();
552 Self::all_delete_parameters(&table, snapshot_id).await
553 }
554
555 fn split_n_vecs(
568 file_scan_tasks: Vec<FileScanTask>,
569 split_num: usize,
570 ) -> Vec<Vec<FileScanTask>> {
571 use std::cmp::{Ordering, Reverse};
572
573 #[derive(Default)]
574 struct FileScanTaskGroup {
575 idx: usize,
576 tasks: Vec<FileScanTask>,
577 total_length: u64,
578 }
579
580 impl Ord for FileScanTaskGroup {
581 fn cmp(&self, other: &Self) -> Ordering {
582 if self.total_length == other.total_length {
584 self.idx.cmp(&other.idx)
585 } else {
586 self.total_length.cmp(&other.total_length)
587 }
588 }
589 }
590
591 impl PartialOrd for FileScanTaskGroup {
592 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
593 Some(self.cmp(other))
594 }
595 }
596
597 impl Eq for FileScanTaskGroup {}
598
599 impl PartialEq for FileScanTaskGroup {
600 fn eq(&self, other: &Self) -> bool {
601 self.total_length == other.total_length
602 }
603 }
604
605 let mut heap = BinaryHeap::new();
606 for idx in 0..split_num {
608 heap.push(Reverse(FileScanTaskGroup {
609 idx,
610 tasks: vec![],
611 total_length: 0,
612 }));
613 }
614
615 for file_task in file_scan_tasks {
616 let mut group = heap.peek_mut().unwrap();
617 group.0.total_length += file_task.length;
618 group.0.tasks.push(file_task);
619 }
620
621 heap.into_vec()
623 .into_iter()
624 .map(|reverse_group| reverse_group.0.tasks)
625 .collect()
626 }
627}
628
629pub struct IcebergScanOpts {
630 pub chunk_size: usize,
631 pub need_seq_num: bool,
632 pub need_file_path_and_pos: bool,
633}
634
635#[try_stream(ok = DataChunk, error = ConnectorError)]
636pub async fn scan_task_to_chunk(
637 table: Table,
638 data_file_scan_task: FileScanTask,
639 IcebergScanOpts {
640 chunk_size,
641 need_seq_num,
642 need_file_path_and_pos,
643 }: IcebergScanOpts,
644 metrics: Option<Arc<IcebergScanMetrics>>,
645) {
646 let table_name = table.identifier().name().to_owned();
647
648 let mut read_bytes = scopeguard::guard(0, |read_bytes| {
649 if let Some(metrics) = metrics {
650 metrics
651 .iceberg_read_bytes
652 .with_guarded_label_values(&[&table_name])
653 .inc_by(read_bytes as _);
654 }
655 });
656
657 let data_file_path = data_file_scan_task.data_file_path.clone();
658 let data_sequence_number = data_file_scan_task.sequence_number;
659
660 let reader = table.reader_builder().with_batch_size(chunk_size).build();
661 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
662
663 let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
665
666 while let Some((index, record_batch)) = record_batch_stream.next().await {
667 let record_batch = record_batch?;
668
669 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
670 if need_seq_num {
671 let (mut columns, visibility) = chunk.into_parts();
672 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
673 vec![data_sequence_number; visibility.len()],
674 ))));
675 chunk = DataChunk::from_parts(columns.into(), visibility)
676 };
677 if need_file_path_and_pos {
678 let (mut columns, visibility) = chunk.into_parts();
679 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
680 vec![data_file_path.as_str(); visibility.len()],
681 ))));
682 let index_start = (index * chunk_size) as i64;
683 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
684 (index_start..(index_start + visibility.len() as i64)).collect::<Vec<i64>>(),
685 ))));
686 chunk = DataChunk::from_parts(columns.into(), visibility)
687 }
688 *read_bytes += chunk.estimated_heap_size() as u64;
689 yield chunk;
690 }
691}
692
693#[derive(Debug)]
694pub struct IcebergFileReader {}
695
696#[async_trait]
697impl SplitReader for IcebergFileReader {
698 type Properties = IcebergProperties;
699 type Split = IcebergSplit;
700
701 async fn new(
702 _props: IcebergProperties,
703 _splits: Vec<IcebergSplit>,
704 _parser_config: ParserConfig,
705 _source_ctx: SourceContextRef,
706 _columns: Option<Vec<Column>>,
707 ) -> ConnectorResult<Self> {
708 unimplemented!()
709 }
710
711 fn into_stream(self) -> BoxSourceChunkStream {
712 unimplemented!()
713 }
714}
715
716#[cfg(test)]
717mod tests {
718 use std::sync::Arc;
719
720 use iceberg::scan::FileScanTask;
721 use iceberg::spec::{DataContentType, Schema};
722
723 use super::*;
724
725 fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
726 FileScanTask {
727 length,
728 start: 0,
729 record_count: Some(0),
730 data_file_path: format!("test_{}.parquet", id).to_owned(),
731 data_file_content: DataContentType::Data,
732 data_file_format: iceberg::spec::DataFileFormat::Parquet,
733 schema: Arc::new(Schema::builder().build().unwrap()),
734 project_field_ids: vec![],
735 predicate: None,
736 deletes: vec![],
737 sequence_number: 0,
738 equality_ids: vec![],
739 file_size_in_bytes: 0,
740 }
741 }
742
743 #[test]
744 fn test_split_n_vecs_basic() {
745 let file_scan_tasks = (1..=12)
746 .map(|i| create_file_scan_task(i + 100, i))
747 .collect::<Vec<_>>(); let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
750
751 assert_eq!(groups.len(), 3);
752
753 let group_lengths: Vec<u64> = groups
754 .iter()
755 .map(|group| group.iter().map(|task| task.length).sum())
756 .collect();
757
758 let max_length = *group_lengths.iter().max().unwrap();
759 let min_length = *group_lengths.iter().min().unwrap();
760 assert!(max_length - min_length <= 10, "Groups should be balanced");
761
762 let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
763 assert_eq!(total_tasks, 12);
764 }
765
766 #[test]
767 fn test_split_n_vecs_empty() {
768 let file_scan_tasks = Vec::new();
769 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
770 assert_eq!(groups.len(), 3);
771 assert!(groups.iter().all(|group| group.is_empty()));
772 }
773
774 #[test]
775 fn test_split_n_vecs_single_task() {
776 let file_scan_tasks = vec![create_file_scan_task(100, 1)];
777 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
778 assert_eq!(groups.len(), 3);
779 assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
780 }
781
782 #[test]
783 fn test_split_n_vecs_uneven_distribution() {
784 let file_scan_tasks = vec![
785 create_file_scan_task(1000, 1),
786 create_file_scan_task(100, 2),
787 create_file_scan_task(100, 3),
788 create_file_scan_task(100, 4),
789 create_file_scan_task(100, 5),
790 ];
791
792 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
793 assert_eq!(groups.len(), 2);
794
795 let group_with_large_task = groups
796 .iter()
797 .find(|group| group.iter().any(|task| task.length == 1000))
798 .unwrap();
799 assert_eq!(group_with_large_task.len(), 1);
800 }
801
802 #[test]
803 fn test_split_n_vecs_same_files_distribution() {
804 let file_scan_tasks = vec![
805 create_file_scan_task(100, 1),
806 create_file_scan_task(100, 2),
807 create_file_scan_task(100, 3),
808 create_file_scan_task(100, 4),
809 create_file_scan_task(100, 5),
810 create_file_scan_task(100, 6),
811 create_file_scan_task(100, 7),
812 create_file_scan_task(100, 8),
813 ];
814
815 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
816 .iter()
817 .map(|g| {
818 g.iter()
819 .map(|task| task.data_file_path.clone())
820 .collect::<Vec<_>>()
821 })
822 .collect::<Vec<_>>();
823
824 for _ in 0..10000 {
825 let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
826 .iter()
827 .map(|g| {
828 g.iter()
829 .map(|task| task.data_file_path.clone())
830 .collect::<Vec<_>>()
831 })
832 .collect::<Vec<_>>();
833
834 assert_eq!(groups, groups_2);
835 }
836 }
837}