risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::{BoundPredicate, Predicate as IcebergPredicate};
27use iceberg::scan::FileScanTask;
28use iceberg::spec::FormatVersion;
29use iceberg::table::Table;
30pub use parquet_file_handler::*;
31use phf::{Set, phf_set};
32use risingwave_common::array::arrow::IcebergArrowConvert;
33use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
34use risingwave_common::bail;
35use risingwave_common::types::JsonbVal;
36use risingwave_common_estimate_size::EstimateSize;
37use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
38use serde::{Deserialize, Serialize};
39
40pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
41use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
42use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
43use crate::error::{ConnectorError, ConnectorResult};
44use crate::parser::ParserConfig;
45use crate::source::{
46    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
47    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
48};
49pub const ICEBERG_CONNECTOR: &str = "iceberg";
50
51#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
52pub struct IcebergProperties {
53    #[serde(flatten)]
54    pub common: IcebergCommon,
55
56    #[serde(flatten)]
57    pub table: IcebergTableIdentifier,
58
59    // For jdbc catalog
60    #[serde(rename = "catalog.jdbc.user")]
61    pub jdbc_user: Option<String>,
62    #[serde(rename = "catalog.jdbc.password")]
63    pub jdbc_password: Option<String>,
64
65    #[serde(flatten)]
66    pub unknown_fields: HashMap<String, String>,
67}
68
69impl EnforceSecret for IcebergProperties {
70    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
71        "catalog.jdbc.password",
72    };
73
74    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
75        for prop in prop_iter {
76            IcebergCommon::enforce_one(prop)?;
77            if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
78                return Err(EnforceSecretError {
79                    key: prop.to_owned(),
80                }
81                .into());
82            }
83        }
84        Ok(())
85    }
86}
87
88impl IcebergProperties {
89    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
90        let mut java_catalog_props = HashMap::new();
91        if let Some(jdbc_user) = self.jdbc_user.clone() {
92            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
93        }
94        if let Some(jdbc_password) = self.jdbc_password.clone() {
95            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
96        }
97        // TODO: support path_style_access and java_catalog_props for iceberg source
98        self.common.create_catalog(&java_catalog_props).await
99    }
100
101    pub async fn load_table(&self) -> ConnectorResult<Table> {
102        let mut java_catalog_props = HashMap::new();
103        if let Some(jdbc_user) = self.jdbc_user.clone() {
104            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
105        }
106        if let Some(jdbc_password) = self.jdbc_password.clone() {
107            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
108        }
109        // TODO: support java_catalog_props for iceberg source
110        self.common
111            .load_table(&self.table, &java_catalog_props)
112            .await
113    }
114}
115
116impl SourceProperties for IcebergProperties {
117    type Split = IcebergSplit;
118    type SplitEnumerator = IcebergSplitEnumerator;
119    type SplitReader = IcebergFileReader;
120
121    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
122}
123
124impl UnknownFields for IcebergProperties {
125    fn unknown_fields(&self) -> HashMap<String, String> {
126        self.unknown_fields.clone()
127    }
128}
129
130#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
131pub struct IcebergFileScanTaskJsonStr(String);
132
133impl IcebergFileScanTaskJsonStr {
134    pub fn deserialize(&self) -> FileScanTask {
135        serde_json::from_str(&self.0).unwrap()
136    }
137
138    pub fn serialize(task: &FileScanTask) -> Self {
139        Self(serde_json::to_string(task).unwrap())
140    }
141}
142
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub enum IcebergFileScanTask {
145    Data(Vec<FileScanTask>),
146    EqualityDelete(Vec<FileScanTask>),
147    PositionDelete(Vec<FileScanTask>),
148}
149
150impl IcebergFileScanTask {
151    pub fn tasks(&self) -> &[FileScanTask] {
152        match self {
153            IcebergFileScanTask::Data(file_scan_tasks)
154            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
155            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks,
156        }
157    }
158
159    pub fn is_empty(&self) -> bool {
160        self.tasks().is_empty()
161    }
162
163    pub fn files(&self) -> Vec<String> {
164        self.tasks()
165            .iter()
166            .map(|task| task.data_file_path.clone())
167            .collect()
168    }
169
170    pub fn predicate(&self) -> Option<&BoundPredicate> {
171        let first_task = self.tasks().first()?;
172        first_task.predicate.as_ref()
173    }
174}
175
176#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
177pub struct IcebergSplit {
178    pub split_id: i64,
179    pub task: IcebergFileScanTask,
180}
181
182impl IcebergSplit {
183    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
184        let task = match iceberg_scan_type {
185            IcebergScanType::DataScan => IcebergFileScanTask::Data(vec![]),
186            IcebergScanType::EqualityDeleteScan => IcebergFileScanTask::EqualityDelete(vec![]),
187            IcebergScanType::PositionDeleteScan => IcebergFileScanTask::PositionDelete(vec![]),
188            _ => unimplemented!(),
189        };
190        Self { split_id: 0, task }
191    }
192}
193
194impl SplitMetaData for IcebergSplit {
195    fn id(&self) -> SplitId {
196        self.split_id.to_string().into()
197    }
198
199    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
200        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
201    }
202
203    fn encode_to_json(&self) -> JsonbVal {
204        serde_json::to_value(self.clone()).unwrap().into()
205    }
206
207    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
208        unimplemented!()
209    }
210}
211
212#[derive(Debug, Clone)]
213pub struct IcebergSplitEnumerator {
214    config: IcebergProperties,
215}
216
217#[derive(Debug, Clone)]
218pub struct IcebergDeleteParameters {
219    pub equality_delete_columns: Vec<String>,
220    pub has_position_delete: bool,
221    pub snapshot_id: Option<i64>,
222}
223
224#[async_trait]
225impl SplitEnumerator for IcebergSplitEnumerator {
226    type Properties = IcebergProperties;
227    type Split = IcebergSplit;
228
229    async fn new(
230        properties: Self::Properties,
231        context: SourceEnumeratorContextRef,
232    ) -> ConnectorResult<Self> {
233        Ok(Self::new_inner(properties, context))
234    }
235
236    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
237        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
238        // instead of relying on SplitEnumerator on meta.
239        // TODO: add some validation logic here.
240        Ok(vec![])
241    }
242}
243impl IcebergSplitEnumerator {
244    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
245        Self { config: properties }
246    }
247}
248
249#[derive(Debug, Clone, PartialEq, Eq, Hash)]
250pub enum IcebergTimeTravelInfo {
251    Version(i64),
252    TimestampMs(i64),
253}
254
255#[derive(Debug, Clone)]
256pub struct IcebergListResult {
257    pub data_files: Vec<FileScanTask>,
258    pub equality_delete_files: Vec<FileScanTask>,
259    pub position_delete_files: Vec<FileScanTask>,
260    pub equality_delete_columns: Vec<String>,
261    pub format_version: FormatVersion,
262}
263
264impl IcebergSplitEnumerator {
265    pub fn get_snapshot_id(
266        table: &Table,
267        time_travel_info: Option<IcebergTimeTravelInfo>,
268    ) -> ConnectorResult<Option<i64>> {
269        let current_snapshot = table.metadata().current_snapshot();
270        let Some(current_snapshot) = current_snapshot else {
271            return Ok(None);
272        };
273
274        let snapshot_id = match time_travel_info {
275            Some(IcebergTimeTravelInfo::Version(version)) => {
276                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
277                    bail!("Cannot find the snapshot id in the iceberg table.");
278                };
279                snapshot.snapshot_id()
280            }
281            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
282                let snapshot = table
283                    .metadata()
284                    .snapshots()
285                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
286                    .max_by_key(|snapshot| snapshot.timestamp_ms());
287                match snapshot {
288                    Some(snapshot) => snapshot.snapshot_id(),
289                    None => {
290                        // convert unix time to human-readable time
291                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
292                        if let Some(time) = time {
293                            tracing::warn!("Cannot find a snapshot older than {}", time);
294                        } else {
295                            tracing::warn!("Cannot find a snapshot");
296                        }
297                        return Ok(None);
298                    }
299                }
300            }
301            None => current_snapshot.snapshot_id(),
302        };
303        Ok(Some(snapshot_id))
304    }
305
306    pub async fn list_scan_tasks(
307        &self,
308        time_travel_info: Option<IcebergTimeTravelInfo>,
309        predicate: IcebergPredicate,
310    ) -> ConnectorResult<Option<IcebergListResult>> {
311        let table = self.config.load_table().await?;
312        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
313
314        let Some(snapshot_id) = snapshot_id else {
315            return Ok(None);
316        };
317        let res = self
318            .list_scan_tasks_inner(&table, snapshot_id, predicate)
319            .await?;
320        Ok(Some(res))
321    }
322
323    async fn list_scan_tasks_inner(
324        &self,
325        table: &Table,
326        snapshot_id: i64,
327        predicate: IcebergPredicate,
328    ) -> ConnectorResult<IcebergListResult> {
329        let format_version = table.metadata().format_version();
330        let table_schema = table.metadata().current_schema();
331        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
332
333        let mut position_delete_files = vec![];
334        let mut position_delete_files_set = HashSet::new();
335        let mut data_files = vec![];
336        let mut equality_delete_files = vec![];
337        let mut equality_delete_files_set = HashSet::new();
338        let mut equality_delete_ids = None;
339        let mut scan_builder = table.scan().snapshot_id(snapshot_id).select_all();
340        if predicate != IcebergPredicate::AlwaysTrue {
341            scan_builder = scan_builder.with_filter(predicate.clone());
342        }
343        let scan = scan_builder.build()?;
344        let file_scan_stream = scan.plan_files().await?;
345
346        #[for_await]
347        for task in file_scan_stream {
348            let task: FileScanTask = task?;
349
350            // Collect delete files for separate scan types, but keep task.deletes intact
351            for delete_file in &task.deletes {
352                let delete_file = delete_file.as_ref().clone();
353                match delete_file.data_file_content {
354                    iceberg::spec::DataContentType::Data => {
355                        bail!("Data file should not in task deletes");
356                    }
357                    iceberg::spec::DataContentType::EqualityDeletes => {
358                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
359                            if equality_delete_ids.is_none() {
360                                equality_delete_ids = delete_file.equality_ids.clone();
361                            } else if equality_delete_ids != delete_file.equality_ids {
362                                bail!(
363                                    "The schema of iceberg equality delete file must be consistent"
364                                );
365                            }
366                            equality_delete_files.push(delete_file);
367                        }
368                    }
369                    iceberg::spec::DataContentType::PositionDeletes => {
370                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
371                            position_delete_files.push(delete_file);
372                        }
373                    }
374                }
375            }
376
377            match task.data_file_content {
378                iceberg::spec::DataContentType::Data => {
379                    // Keep the original task with its deletes field intact
380                    data_files.push(task);
381                }
382                iceberg::spec::DataContentType::EqualityDeletes => {
383                    bail!("Equality delete files should not be in the data files");
384                }
385                iceberg::spec::DataContentType::PositionDeletes => {
386                    bail!("Position delete files should not be in the data files");
387                }
388            }
389        }
390        let schema = scan
391            .snapshot()
392            .context("snapshot not found")?
393            .schema(table.metadata())?;
394        let equality_delete_columns = equality_delete_ids
395            .unwrap_or_default()
396            .into_iter()
397            .map(|id| match schema.name_by_field_id(id) {
398                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
399                None => bail!("Delete field id {} not found in schema", id),
400            })
401            .collect::<ConnectorResult<Vec<_>>>()?;
402
403        Ok(IcebergListResult {
404            data_files,
405            equality_delete_files,
406            position_delete_files,
407            equality_delete_columns,
408            format_version,
409        })
410    }
411
412    /// Uniformly distribute scan tasks to compute nodes.
413    /// It's deterministic so that it can best utilize the data locality.
414    ///
415    /// # Arguments
416    /// * `file_scan_tasks`: The file scan tasks to be split.
417    /// * `split_num`: The number of splits to be created.
418    ///
419    /// This algorithm is based on a min-heap. It will push all groups into the heap, and then pop the smallest group and add the file scan task to it.
420    /// Ensure that the total length of each group is as balanced as possible.
421    /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
422    /// The space complexity is O(k), where k is the number of splits.
423    /// The algorithm is stable, so the order of the file scan tasks will be preserved.
424    pub fn split_n_vecs(
425        file_scan_tasks: Vec<FileScanTask>,
426        split_num: usize,
427    ) -> Vec<Vec<FileScanTask>> {
428        use std::cmp::{Ordering, Reverse};
429
430        #[derive(Default)]
431        struct FileScanTaskGroup {
432            idx: usize,
433            tasks: Vec<FileScanTask>,
434            total_length: u64,
435        }
436
437        impl Ord for FileScanTaskGroup {
438            fn cmp(&self, other: &Self) -> Ordering {
439                // when total_length is the same, we will sort by index
440                if self.total_length == other.total_length {
441                    self.idx.cmp(&other.idx)
442                } else {
443                    self.total_length.cmp(&other.total_length)
444                }
445            }
446        }
447
448        impl PartialOrd for FileScanTaskGroup {
449            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
450                Some(self.cmp(other))
451            }
452        }
453
454        impl Eq for FileScanTaskGroup {}
455
456        impl PartialEq for FileScanTaskGroup {
457            fn eq(&self, other: &Self) -> bool {
458                self.total_length == other.total_length
459            }
460        }
461
462        let mut heap = BinaryHeap::new();
463        // push all groups into heap
464        for idx in 0..split_num {
465            heap.push(Reverse(FileScanTaskGroup {
466                idx,
467                tasks: vec![],
468                total_length: 0,
469            }));
470        }
471
472        for file_task in file_scan_tasks {
473            let mut group = heap.peek_mut().unwrap();
474            group.0.total_length += file_task.length;
475            group.0.tasks.push(file_task);
476        }
477
478        // convert heap into vec and extract tasks
479        heap.into_vec()
480            .into_iter()
481            .map(|reverse_group| reverse_group.0.tasks)
482            .collect()
483    }
484}
485
486pub struct IcebergScanOpts {
487    pub chunk_size: usize,
488    pub need_seq_num: bool,
489    pub need_file_path_and_pos: bool,
490    pub handle_delete_files: bool,
491}
492
493/// Scan a data file. Delete files are handled by the iceberg-rust `reader.read` implementation.
494#[try_stream(ok = DataChunk, error = ConnectorError)]
495pub async fn scan_task_to_chunk_with_deletes(
496    table: Table,
497    mut data_file_scan_task: FileScanTask,
498    IcebergScanOpts {
499        chunk_size,
500        need_seq_num,
501        need_file_path_and_pos,
502        handle_delete_files,
503    }: IcebergScanOpts,
504    metrics: Option<Arc<IcebergScanMetrics>>,
505) {
506    let table_name = table.identifier().name().to_owned();
507
508    let mut read_bytes = scopeguard::guard(0, |read_bytes| {
509        if let Some(metrics) = metrics.clone() {
510            metrics
511                .iceberg_read_bytes
512                .with_guarded_label_values(&[&table_name])
513                .inc_by(read_bytes as _);
514        }
515    });
516
517    let data_file_path = data_file_scan_task.data_file_path.clone();
518    let data_sequence_number = data_file_scan_task.sequence_number;
519
520    tracing::debug!(
521        "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
522        data_file_path,
523        handle_delete_files,
524        data_file_scan_task.deletes.len()
525    );
526
527    if !handle_delete_files {
528        // Keep the delete files from being applied when the caller opts out.
529        data_file_scan_task.deletes.clear();
530    }
531
532    // Read the data file; delete application is delegated to the reader.
533    let reader = table
534        .reader_builder()
535        .with_batch_size(chunk_size)
536        .with_row_group_filtering_enabled(true)
537        .with_row_selection_enabled(true)
538        .build();
539    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
540
541    let record_batch_stream: iceberg::scan::ArrowRecordBatchStream =
542        reader.read(Box::pin(file_scan_stream))?;
543    let mut record_batch_stream = record_batch_stream.enumerate();
544
545    // Process each record batch. Delete application is handled by the SDK.
546    while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
547        let record_batch = record_batch?;
548        let batch_start_pos = (batch_index * chunk_size) as i64;
549
550        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
551        let row_count = chunk.capacity();
552
553        // Add metadata columns if requested
554        if need_seq_num {
555            let (mut columns, visibility) = chunk.into_parts();
556            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
557                std::iter::repeat_n(data_sequence_number, row_count),
558            ))));
559            chunk = DataChunk::from_parts(columns.into(), visibility);
560        }
561
562        if need_file_path_and_pos {
563            let (mut columns, visibility) = chunk.into_parts();
564            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
565                std::iter::repeat_n(data_file_path.as_str(), row_count),
566            ))));
567
568            // Generate position values for each row in the batch
569            let positions: Vec<i64> =
570                (batch_start_pos..(batch_start_pos + row_count as i64)).collect();
571            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
572
573            chunk = DataChunk::from_parts(columns.into(), visibility);
574        }
575
576        *read_bytes += chunk.estimated_heap_size() as u64;
577        yield chunk;
578    }
579}
580
581#[derive(Debug)]
582pub struct IcebergFileReader {}
583
584#[async_trait]
585impl SplitReader for IcebergFileReader {
586    type Properties = IcebergProperties;
587    type Split = IcebergSplit;
588
589    async fn new(
590        _props: IcebergProperties,
591        _splits: Vec<IcebergSplit>,
592        _parser_config: ParserConfig,
593        _source_ctx: SourceContextRef,
594        _columns: Option<Vec<Column>>,
595    ) -> ConnectorResult<Self> {
596        unimplemented!()
597    }
598
599    fn into_stream(self) -> BoxSourceChunkStream {
600        unimplemented!()
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use std::sync::Arc;
607
608    use iceberg::scan::FileScanTask;
609    use iceberg::spec::{DataContentType, Schema};
610
611    use super::*;
612
613    fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
614        FileScanTask {
615            length,
616            start: 0,
617            record_count: Some(0),
618            data_file_path: format!("test_{}.parquet", id),
619            data_file_content: DataContentType::Data,
620            data_file_format: iceberg::spec::DataFileFormat::Parquet,
621            schema: Arc::new(Schema::builder().build().unwrap()),
622            project_field_ids: vec![],
623            predicate: None,
624            deletes: vec![],
625            sequence_number: 0,
626            equality_ids: None,
627            file_size_in_bytes: 0,
628            partition: None,
629            partition_spec: None,
630            name_mapping: None,
631        }
632    }
633
634    #[test]
635    fn test_split_n_vecs_basic() {
636        let file_scan_tasks = (1..=12)
637            .map(|i| create_file_scan_task(i + 100, i))
638            .collect::<Vec<_>>(); // Ensure the correct function is called
639
640        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
641
642        assert_eq!(groups.len(), 3);
643
644        let group_lengths: Vec<u64> = groups
645            .iter()
646            .map(|group| group.iter().map(|task| task.length).sum())
647            .collect();
648
649        let max_length = *group_lengths.iter().max().unwrap();
650        let min_length = *group_lengths.iter().min().unwrap();
651        assert!(max_length - min_length <= 10, "Groups should be balanced");
652
653        let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
654        assert_eq!(total_tasks, 12);
655    }
656
657    #[test]
658    fn test_split_n_vecs_empty() {
659        let file_scan_tasks = Vec::new();
660        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
661        assert_eq!(groups.len(), 3);
662        assert!(groups.iter().all(|group| group.is_empty()));
663    }
664
665    #[test]
666    fn test_split_n_vecs_single_task() {
667        let file_scan_tasks = vec![create_file_scan_task(100, 1)];
668        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
669        assert_eq!(groups.len(), 3);
670        assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
671    }
672
673    #[test]
674    fn test_split_n_vecs_uneven_distribution() {
675        let file_scan_tasks = vec![
676            create_file_scan_task(1000, 1),
677            create_file_scan_task(100, 2),
678            create_file_scan_task(100, 3),
679            create_file_scan_task(100, 4),
680            create_file_scan_task(100, 5),
681        ];
682
683        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
684        assert_eq!(groups.len(), 2);
685
686        let group_with_large_task = groups
687            .iter()
688            .find(|group| group.iter().any(|task| task.length == 1000))
689            .unwrap();
690        assert_eq!(group_with_large_task.len(), 1);
691    }
692
693    #[test]
694    fn test_split_n_vecs_same_files_distribution() {
695        let file_scan_tasks = vec![
696            create_file_scan_task(100, 1),
697            create_file_scan_task(100, 2),
698            create_file_scan_task(100, 3),
699            create_file_scan_task(100, 4),
700            create_file_scan_task(100, 5),
701            create_file_scan_task(100, 6),
702            create_file_scan_task(100, 7),
703            create_file_scan_task(100, 8),
704        ];
705
706        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
707            .iter()
708            .map(|g| {
709                g.iter()
710                    .map(|task| task.data_file_path.clone())
711                    .collect::<Vec<_>>()
712            })
713            .collect::<Vec<_>>();
714
715        for _ in 0..10000 {
716            let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
717                .iter()
718                .map(|g| {
719                    g.iter()
720                        .map(|task| task.data_file_path.clone())
721                        .collect::<Vec<_>>()
722                })
723                .collect::<Vec<_>>();
724
725            assert_eq!(groups, groups_2);
726        }
727    }
728}