1pub mod parquet_file_handler;
16
17pub mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::{BoundPredicate, Predicate as IcebergPredicate};
27use iceberg::scan::FileScanTask;
28use iceberg::spec::FormatVersion;
29use iceberg::table::Table;
30pub use parquet_file_handler::*;
31use phf::{Set, phf_set};
32use risingwave_common::array::arrow::IcebergArrowConvert;
33use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
34use risingwave_common::bail;
35use risingwave_common::types::JsonbVal;
36use risingwave_common_estimate_size::EstimateSize;
37use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
38use serde::{Deserialize, Serialize};
39
40pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
41use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
42use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
43use crate::error::{ConnectorError, ConnectorResult};
44use crate::parser::ParserConfig;
45use crate::source::{
46 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
47 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
48};
49pub const ICEBERG_CONNECTOR: &str = "iceberg";
50
51#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
52pub struct IcebergProperties {
53 #[serde(flatten)]
54 pub common: IcebergCommon,
55
56 #[serde(flatten)]
57 pub table: IcebergTableIdentifier,
58
59 #[serde(rename = "catalog.jdbc.user")]
61 pub jdbc_user: Option<String>,
62 #[serde(rename = "catalog.jdbc.password")]
63 pub jdbc_password: Option<String>,
64
65 #[serde(flatten)]
66 pub unknown_fields: HashMap<String, String>,
67}
68
69impl EnforceSecret for IcebergProperties {
70 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
71 "catalog.jdbc.password",
72 };
73
74 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
75 for prop in prop_iter {
76 IcebergCommon::enforce_one(prop)?;
77 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
78 return Err(EnforceSecretError {
79 key: prop.to_owned(),
80 }
81 .into());
82 }
83 }
84 Ok(())
85 }
86}
87
88impl IcebergProperties {
89 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
90 let mut java_catalog_props = HashMap::new();
91 if let Some(jdbc_user) = self.jdbc_user.clone() {
92 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
93 }
94 if let Some(jdbc_password) = self.jdbc_password.clone() {
95 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
96 }
97 self.common.create_catalog(&java_catalog_props).await
99 }
100
101 pub async fn load_table(&self) -> ConnectorResult<Table> {
102 let mut java_catalog_props = HashMap::new();
103 if let Some(jdbc_user) = self.jdbc_user.clone() {
104 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
105 }
106 if let Some(jdbc_password) = self.jdbc_password.clone() {
107 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
108 }
109 self.common
111 .load_table(&self.table, &java_catalog_props)
112 .await
113 }
114}
115
116impl SourceProperties for IcebergProperties {
117 type Split = IcebergSplit;
118 type SplitEnumerator = IcebergSplitEnumerator;
119 type SplitReader = IcebergFileReader;
120
121 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
122}
123
124impl UnknownFields for IcebergProperties {
125 fn unknown_fields(&self) -> HashMap<String, String> {
126 self.unknown_fields.clone()
127 }
128}
129
130#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
131pub struct IcebergFileScanTaskJsonStr(String);
132
133impl IcebergFileScanTaskJsonStr {
134 pub fn deserialize(&self) -> FileScanTask {
135 serde_json::from_str(&self.0).unwrap()
136 }
137
138 pub fn serialize(task: &FileScanTask) -> Self {
139 Self(serde_json::to_string(task).unwrap())
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub enum IcebergFileScanTask {
145 Data(Vec<FileScanTask>),
146 EqualityDelete(Vec<FileScanTask>),
147 PositionDelete(Vec<FileScanTask>),
148}
149
150impl IcebergFileScanTask {
151 pub fn tasks(&self) -> &[FileScanTask] {
152 match self {
153 IcebergFileScanTask::Data(file_scan_tasks)
154 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
155 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks,
156 }
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.tasks().is_empty()
161 }
162
163 pub fn files(&self) -> Vec<String> {
164 self.tasks()
165 .iter()
166 .map(|task| task.data_file_path.clone())
167 .collect()
168 }
169
170 pub fn predicate(&self) -> Option<&BoundPredicate> {
171 let first_task = self.tasks().first()?;
172 first_task.predicate.as_ref()
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
177pub struct IcebergSplit {
178 pub split_id: i64,
179 pub task: IcebergFileScanTask,
180 #[serde(default, skip_serializing_if = "Option::is_none")]
181 pub limit: Option<u64>,
182}
183
184impl IcebergSplit {
185 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
186 let task = match iceberg_scan_type {
187 IcebergScanType::DataScan => IcebergFileScanTask::Data(vec![]),
188 IcebergScanType::EqualityDeleteScan => IcebergFileScanTask::EqualityDelete(vec![]),
189 IcebergScanType::PositionDeleteScan => IcebergFileScanTask::PositionDelete(vec![]),
190 _ => unimplemented!(),
191 };
192 Self {
193 split_id: 0,
194 task,
195 limit: None,
196 }
197 }
198}
199
200impl SplitMetaData for IcebergSplit {
201 fn id(&self) -> SplitId {
202 self.split_id.to_string().into()
203 }
204
205 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
206 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
207 }
208
209 fn encode_to_json(&self) -> JsonbVal {
210 serde_json::to_value(self.clone()).unwrap().into()
211 }
212
213 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
214 unimplemented!()
215 }
216}
217
218#[derive(Debug, Clone)]
219pub struct IcebergSplitEnumerator {
220 config: IcebergProperties,
221}
222
223#[derive(Debug, Clone)]
224pub struct IcebergDeleteParameters {
225 pub equality_delete_columns: Vec<String>,
226 pub has_position_delete: bool,
227 pub snapshot_id: Option<i64>,
228}
229
230#[async_trait]
231impl SplitEnumerator for IcebergSplitEnumerator {
232 type Properties = IcebergProperties;
233 type Split = IcebergSplit;
234
235 async fn new(
236 properties: Self::Properties,
237 context: SourceEnumeratorContextRef,
238 ) -> ConnectorResult<Self> {
239 Ok(Self::new_inner(properties, context))
240 }
241
242 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
243 Ok(vec![])
247 }
248}
249impl IcebergSplitEnumerator {
250 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
251 Self { config: properties }
252 }
253}
254
255#[derive(Debug, Clone, PartialEq, Eq, Hash)]
256pub enum IcebergTimeTravelInfo {
257 Version(i64),
258 TimestampMs(i64),
259}
260
261#[derive(Debug, Clone)]
262pub struct IcebergListResult {
263 pub data_files: Vec<FileScanTask>,
264 pub equality_delete_files: Vec<FileScanTask>,
265 pub position_delete_files: Vec<FileScanTask>,
266 pub equality_delete_columns: Vec<String>,
267 pub format_version: FormatVersion,
268 pub schema: std::sync::Arc<iceberg::spec::Schema>,
269}
270
271impl IcebergSplitEnumerator {
272 pub fn get_snapshot_id(
273 table: &Table,
274 time_travel_info: Option<IcebergTimeTravelInfo>,
275 ) -> ConnectorResult<Option<i64>> {
276 let current_snapshot = table.metadata().current_snapshot();
277 let Some(current_snapshot) = current_snapshot else {
278 return Ok(None);
279 };
280
281 let snapshot_id = match time_travel_info {
282 Some(IcebergTimeTravelInfo::Version(version)) => {
283 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
284 bail!("Cannot find the snapshot id in the iceberg table.");
285 };
286 snapshot.snapshot_id()
287 }
288 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
289 let snapshot = table
290 .metadata()
291 .snapshots()
292 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
293 .max_by_key(|snapshot| snapshot.timestamp_ms());
294 match snapshot {
295 Some(snapshot) => snapshot.snapshot_id(),
296 None => {
297 let time = chrono::DateTime::from_timestamp_millis(timestamp);
299 if let Some(time) = time {
300 tracing::warn!("Cannot find a snapshot older than {}", time);
301 } else {
302 tracing::warn!("Cannot find a snapshot");
303 }
304 return Ok(None);
305 }
306 }
307 }
308 None => current_snapshot.snapshot_id(),
309 };
310 Ok(Some(snapshot_id))
311 }
312
313 pub async fn list_scan_tasks(
314 &self,
315 time_travel_info: Option<IcebergTimeTravelInfo>,
316 predicate: IcebergPredicate,
317 ) -> ConnectorResult<Option<IcebergListResult>> {
318 let table = self.config.load_table().await?;
319 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
320
321 let Some(snapshot_id) = snapshot_id else {
322 return Ok(None);
323 };
324 let res = self
325 .list_scan_tasks_inner(&table, snapshot_id, predicate)
326 .await?;
327 Ok(Some(res))
328 }
329
330 async fn list_scan_tasks_inner(
331 &self,
332 table: &Table,
333 snapshot_id: i64,
334 predicate: IcebergPredicate,
335 ) -> ConnectorResult<IcebergListResult> {
336 let format_version = table.metadata().format_version();
337 let table_schema = table.metadata().current_schema();
338 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
339
340 let mut position_delete_files = vec![];
341 let mut position_delete_files_set = HashSet::new();
342 let mut data_files = vec![];
343 let mut equality_delete_files = vec![];
344 let mut equality_delete_files_set = HashSet::new();
345 let mut equality_delete_ids = None;
346 let mut scan_builder = table.scan().snapshot_id(snapshot_id).select_all();
347 if predicate != IcebergPredicate::AlwaysTrue {
348 scan_builder = scan_builder.with_filter(predicate.clone());
349 }
350 let scan = scan_builder.build()?;
351 let file_scan_stream = scan.plan_files().await?;
352
353 #[for_await]
354 for task in file_scan_stream {
355 let task: FileScanTask = task?;
356
357 for delete_file in &task.deletes {
359 let delete_file = delete_file.as_ref().clone();
360 match delete_file.data_file_content {
361 iceberg::spec::DataContentType::Data => {
362 bail!("Data file should not in task deletes");
363 }
364 iceberg::spec::DataContentType::EqualityDeletes => {
365 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
366 if equality_delete_ids.is_none() {
367 equality_delete_ids = delete_file.equality_ids.clone();
368 } else if equality_delete_ids != delete_file.equality_ids {
369 bail!(
370 "The schema of iceberg equality delete file must be consistent"
371 );
372 }
373 equality_delete_files.push(delete_file);
374 }
375 }
376 iceberg::spec::DataContentType::PositionDeletes => {
377 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
378 position_delete_files.push(delete_file);
379 }
380 }
381 }
382 }
383
384 match task.data_file_content {
385 iceberg::spec::DataContentType::Data => {
386 data_files.push(task);
388 }
389 iceberg::spec::DataContentType::EqualityDeletes => {
390 bail!("Equality delete files should not be in the data files");
391 }
392 iceberg::spec::DataContentType::PositionDeletes => {
393 bail!("Position delete files should not be in the data files");
394 }
395 }
396 }
397 let schema = table_schema.clone();
398 let equality_delete_columns = equality_delete_ids
399 .unwrap_or_default()
400 .into_iter()
401 .map(|id| match schema.name_by_field_id(id) {
402 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
403 None => bail!("Delete field id {} not found in schema", id),
404 })
405 .collect::<ConnectorResult<Vec<_>>>()?;
406
407 Ok(IcebergListResult {
408 data_files,
409 equality_delete_files,
410 position_delete_files,
411 equality_delete_columns,
412 format_version,
413 schema,
414 })
415 }
416
417 pub fn split_n_vecs(
430 file_scan_tasks: Vec<FileScanTask>,
431 split_num: usize,
432 ) -> Vec<Vec<FileScanTask>> {
433 use std::cmp::{Ordering, Reverse};
434
435 #[derive(Default)]
436 struct FileScanTaskGroup {
437 idx: usize,
438 tasks: Vec<FileScanTask>,
439 total_length: u64,
440 }
441
442 impl Ord for FileScanTaskGroup {
443 fn cmp(&self, other: &Self) -> Ordering {
444 if self.total_length == other.total_length {
446 self.idx.cmp(&other.idx)
447 } else {
448 self.total_length.cmp(&other.total_length)
449 }
450 }
451 }
452
453 impl PartialOrd for FileScanTaskGroup {
454 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
455 Some(self.cmp(other))
456 }
457 }
458
459 impl Eq for FileScanTaskGroup {}
460
461 impl PartialEq for FileScanTaskGroup {
462 fn eq(&self, other: &Self) -> bool {
463 self.total_length == other.total_length
464 }
465 }
466
467 let mut heap = BinaryHeap::new();
468 for idx in 0..split_num {
470 heap.push(Reverse(FileScanTaskGroup {
471 idx,
472 tasks: vec![],
473 total_length: 0,
474 }));
475 }
476
477 for file_task in file_scan_tasks {
478 let mut group = heap.peek_mut().unwrap();
479 group.0.total_length += file_task.length;
480 group.0.tasks.push(file_task);
481 }
482
483 heap.into_vec()
485 .into_iter()
486 .map(|reverse_group| reverse_group.0.tasks)
487 .collect()
488 }
489}
490
491pub struct IcebergScanOpts {
492 pub chunk_size: usize,
493 pub need_seq_num: bool,
494 pub need_file_path_and_pos: bool,
495 pub handle_delete_files: bool,
496}
497
498#[try_stream(ok = DataChunk, error = ConnectorError)]
500pub async fn scan_task_to_chunk_with_deletes(
501 table: Table,
502 mut data_file_scan_task: FileScanTask,
503 IcebergScanOpts {
504 chunk_size,
505 need_seq_num,
506 need_file_path_and_pos,
507 handle_delete_files,
508 }: IcebergScanOpts,
509 metrics: Option<Arc<IcebergScanMetrics>>,
510) {
511 let table_name = table.identifier().name().to_owned();
512
513 let num_delete_files = data_file_scan_task.deletes.len();
514 let expected_record_count = data_file_scan_task.record_count;
515 let file_start = std::time::Instant::now();
516
517 let mut read_bytes = scopeguard::guard(0u64, |read_bytes| {
518 if let Some(metrics) = metrics.clone() {
519 metrics
520 .iceberg_read_bytes
521 .with_guarded_label_values(&[&table_name])
522 .inc_by(read_bytes as _);
523 }
524 });
525
526 let data_file_path = data_file_scan_task.data_file_path.clone();
527 let data_sequence_number = data_file_scan_task.sequence_number;
528
529 tracing::debug!(
530 "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
531 data_file_path,
532 handle_delete_files,
533 data_file_scan_task.deletes.len()
534 );
535
536 if !handle_delete_files {
537 data_file_scan_task.deletes.clear();
539 }
540
541 let reader = table
543 .reader_builder()
544 .with_batch_size(chunk_size)
545 .with_row_group_filtering_enabled(true)
546 .build();
547 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
548
549 let record_batch_stream: iceberg::scan::ArrowRecordBatchStream =
550 reader.read(Box::pin(file_scan_stream))?;
551 let mut record_batch_stream = record_batch_stream.enumerate();
552
553 let mut total_rows_read: u64 = 0;
554
555 while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
557 let record_batch = record_batch?;
558 let batch_start_pos = (batch_index * chunk_size) as i64;
559
560 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
561 let row_count = chunk.capacity();
562 total_rows_read += row_count as u64;
563
564 if need_seq_num {
566 let (mut columns, visibility) = chunk.into_parts();
567 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
568 std::iter::repeat_n(data_sequence_number, row_count),
569 ))));
570 chunk = DataChunk::from_parts(columns.into(), visibility);
571 }
572
573 if need_file_path_and_pos {
574 let (mut columns, visibility) = chunk.into_parts();
575 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
576 std::iter::repeat_n(data_file_path.as_str(), row_count),
577 ))));
578
579 let positions: Vec<i64> =
581 (batch_start_pos..(batch_start_pos + row_count as i64)).collect();
582 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
583
584 chunk = DataChunk::from_parts(columns.into(), visibility);
585 }
586
587 *read_bytes += chunk.estimated_heap_size() as u64;
588 yield chunk;
589 }
590
591 if let Some(ref metrics) = metrics {
593 let label_values = [table_name.as_str()];
594
595 metrics
597 .iceberg_source_file_read_duration_seconds
598 .with_guarded_label_values(&label_values)
599 .observe(file_start.elapsed().as_secs_f64());
600
601 if total_rows_read > 0 {
603 metrics
604 .iceberg_source_rows_read_total
605 .with_guarded_label_values(&label_values)
606 .inc_by(total_rows_read);
607 }
608
609 metrics
611 .iceberg_source_files_read_total
612 .with_guarded_label_values(&[table_name.as_str(), "data"])
613 .inc();
614
615 if handle_delete_files
620 && num_delete_files > 0
621 && let Some(expected) = expected_record_count
622 {
623 let deleted = expected.saturating_sub(total_rows_read);
624 if deleted > 0 {
625 metrics
626 .iceberg_source_delete_rows_applied_total
627 .with_guarded_label_values(&[table_name.as_str(), "sdk_applied_approx"])
628 .inc_by(deleted);
629 }
630 }
631 }
632}
633
634#[derive(Debug)]
635pub struct IcebergFileReader {}
636
637#[async_trait]
638impl SplitReader for IcebergFileReader {
639 type Properties = IcebergProperties;
640 type Split = IcebergSplit;
641
642 async fn new(
643 _props: IcebergProperties,
644 _splits: Vec<IcebergSplit>,
645 _parser_config: ParserConfig,
646 _source_ctx: SourceContextRef,
647 _columns: Option<Vec<Column>>,
648 ) -> ConnectorResult<Self> {
649 unimplemented!()
650 }
651
652 fn into_stream(self) -> BoxSourceChunkStream {
653 unimplemented!()
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use std::sync::Arc;
660
661 use iceberg::scan::FileScanTask;
662 use iceberg::spec::{DataContentType, Schema};
663
664 use super::*;
665
666 fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
667 FileScanTask {
668 length,
669 start: 0,
670 record_count: Some(0),
671 data_file_path: format!("test_{}.parquet", id),
672 referenced_data_file: None,
673 data_file_content: DataContentType::Data,
674 data_file_format: iceberg::spec::DataFileFormat::Parquet,
675 schema: Arc::new(Schema::builder().build().unwrap()),
676 project_field_ids: vec![],
677 predicate: None,
678 deletes: vec![],
679 sequence_number: 0,
680 equality_ids: None,
681 file_size_in_bytes: 0,
682 partition: None,
683 partition_spec: None,
684 name_mapping: None,
685 case_sensitive: true,
686 }
687 }
688
689 #[test]
690 fn test_split_n_vecs_basic() {
691 let file_scan_tasks = (1..=12)
692 .map(|i| create_file_scan_task(i + 100, i))
693 .collect::<Vec<_>>(); let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
696
697 assert_eq!(groups.len(), 3);
698
699 let group_lengths: Vec<u64> = groups
700 .iter()
701 .map(|group| group.iter().map(|task| task.length).sum())
702 .collect();
703
704 let max_length = *group_lengths.iter().max().unwrap();
705 let min_length = *group_lengths.iter().min().unwrap();
706 assert!(max_length - min_length <= 10, "Groups should be balanced");
707
708 let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
709 assert_eq!(total_tasks, 12);
710 }
711
712 #[test]
713 fn test_split_n_vecs_empty() {
714 let file_scan_tasks = Vec::new();
715 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
716 assert_eq!(groups.len(), 3);
717 assert!(groups.iter().all(|group| group.is_empty()));
718 }
719
720 #[test]
721 fn test_split_n_vecs_single_task() {
722 let file_scan_tasks = vec![create_file_scan_task(100, 1)];
723 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
724 assert_eq!(groups.len(), 3);
725 assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
726 }
727
728 #[test]
729 fn test_split_n_vecs_uneven_distribution() {
730 let file_scan_tasks = vec![
731 create_file_scan_task(1000, 1),
732 create_file_scan_task(100, 2),
733 create_file_scan_task(100, 3),
734 create_file_scan_task(100, 4),
735 create_file_scan_task(100, 5),
736 ];
737
738 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
739 assert_eq!(groups.len(), 2);
740
741 let group_with_large_task = groups
742 .iter()
743 .find(|group| group.iter().any(|task| task.length == 1000))
744 .unwrap();
745 assert_eq!(group_with_large_task.len(), 1);
746 }
747
748 #[test]
749 fn test_split_n_vecs_same_files_distribution() {
750 let file_scan_tasks = vec![
751 create_file_scan_task(100, 1),
752 create_file_scan_task(100, 2),
753 create_file_scan_task(100, 3),
754 create_file_scan_task(100, 4),
755 create_file_scan_task(100, 5),
756 create_file_scan_task(100, 6),
757 create_file_scan_task(100, 7),
758 create_file_scan_task(100, 8),
759 ];
760
761 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
762 .iter()
763 .map(|g| {
764 g.iter()
765 .map(|task| task.data_file_path.clone())
766 .collect::<Vec<_>>()
767 })
768 .collect::<Vec<_>>();
769
770 for _ in 0..10000 {
771 let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
772 .iter()
773 .map(|g| {
774 g.iter()
775 .map(|task| task.data_file_path.clone())
776 .collect::<Vec<_>>()
777 })
778 .collect::<Vec<_>>();
779
780 assert_eq!(groups, groups_2);
781 }
782 }
783}