risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17pub mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::{BoundPredicate, Predicate as IcebergPredicate};
27use iceberg::scan::FileScanTask;
28use iceberg::spec::FormatVersion;
29use iceberg::table::Table;
30pub use parquet_file_handler::*;
31use phf::{Set, phf_set};
32use risingwave_common::array::arrow::IcebergArrowConvert;
33use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
34use risingwave_common::bail;
35use risingwave_common::types::JsonbVal;
36use risingwave_common_estimate_size::EstimateSize;
37use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
38use serde::{Deserialize, Serialize};
39
40pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
41use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
42use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
43use crate::error::{ConnectorError, ConnectorResult};
44use crate::parser::ParserConfig;
45use crate::source::{
46    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
47    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
48};
49pub const ICEBERG_CONNECTOR: &str = "iceberg";
50
51#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
52pub struct IcebergProperties {
53    #[serde(flatten)]
54    pub common: IcebergCommon,
55
56    #[serde(flatten)]
57    pub table: IcebergTableIdentifier,
58
59    // For jdbc catalog
60    #[serde(rename = "catalog.jdbc.user")]
61    pub jdbc_user: Option<String>,
62    #[serde(rename = "catalog.jdbc.password")]
63    pub jdbc_password: Option<String>,
64
65    #[serde(flatten)]
66    pub unknown_fields: HashMap<String, String>,
67}
68
69impl EnforceSecret for IcebergProperties {
70    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
71        "catalog.jdbc.password",
72    };
73
74    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
75        for prop in prop_iter {
76            IcebergCommon::enforce_one(prop)?;
77            if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
78                return Err(EnforceSecretError {
79                    key: prop.to_owned(),
80                }
81                .into());
82            }
83        }
84        Ok(())
85    }
86}
87
88impl IcebergProperties {
89    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
90        let mut java_catalog_props = HashMap::new();
91        if let Some(jdbc_user) = self.jdbc_user.clone() {
92            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
93        }
94        if let Some(jdbc_password) = self.jdbc_password.clone() {
95            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
96        }
97        // TODO: support path_style_access and java_catalog_props for iceberg source
98        self.common.create_catalog(&java_catalog_props).await
99    }
100
101    pub async fn load_table(&self) -> ConnectorResult<Table> {
102        let mut java_catalog_props = HashMap::new();
103        if let Some(jdbc_user) = self.jdbc_user.clone() {
104            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
105        }
106        if let Some(jdbc_password) = self.jdbc_password.clone() {
107            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
108        }
109        // TODO: support java_catalog_props for iceberg source
110        self.common
111            .load_table(&self.table, &java_catalog_props)
112            .await
113    }
114}
115
116impl SourceProperties for IcebergProperties {
117    type Split = IcebergSplit;
118    type SplitEnumerator = IcebergSplitEnumerator;
119    type SplitReader = IcebergFileReader;
120
121    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
122}
123
124impl UnknownFields for IcebergProperties {
125    fn unknown_fields(&self) -> HashMap<String, String> {
126        self.unknown_fields.clone()
127    }
128}
129
130#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
131pub struct IcebergFileScanTaskJsonStr(String);
132
133impl IcebergFileScanTaskJsonStr {
134    pub fn deserialize(&self) -> FileScanTask {
135        serde_json::from_str(&self.0).unwrap()
136    }
137
138    pub fn serialize(task: &FileScanTask) -> Self {
139        Self(serde_json::to_string(task).unwrap())
140    }
141}
142
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub enum IcebergFileScanTask {
145    Data(Vec<FileScanTask>),
146    EqualityDelete(Vec<FileScanTask>),
147    PositionDelete(Vec<FileScanTask>),
148}
149
150impl IcebergFileScanTask {
151    pub fn tasks(&self) -> &[FileScanTask] {
152        match self {
153            IcebergFileScanTask::Data(file_scan_tasks)
154            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
155            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks,
156        }
157    }
158
159    pub fn is_empty(&self) -> bool {
160        self.tasks().is_empty()
161    }
162
163    pub fn files(&self) -> Vec<String> {
164        self.tasks()
165            .iter()
166            .map(|task| task.data_file_path.clone())
167            .collect()
168    }
169
170    pub fn predicate(&self) -> Option<&BoundPredicate> {
171        let first_task = self.tasks().first()?;
172        first_task.predicate.as_ref()
173    }
174}
175
176#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
177pub struct IcebergSplit {
178    pub split_id: i64,
179    pub task: IcebergFileScanTask,
180    #[serde(default, skip_serializing_if = "Option::is_none")]
181    pub limit: Option<u64>,
182}
183
184impl IcebergSplit {
185    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
186        let task = match iceberg_scan_type {
187            IcebergScanType::DataScan => IcebergFileScanTask::Data(vec![]),
188            IcebergScanType::EqualityDeleteScan => IcebergFileScanTask::EqualityDelete(vec![]),
189            IcebergScanType::PositionDeleteScan => IcebergFileScanTask::PositionDelete(vec![]),
190            _ => unimplemented!(),
191        };
192        Self {
193            split_id: 0,
194            task,
195            limit: None,
196        }
197    }
198}
199
200impl SplitMetaData for IcebergSplit {
201    fn id(&self) -> SplitId {
202        self.split_id.to_string().into()
203    }
204
205    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
206        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
207    }
208
209    fn encode_to_json(&self) -> JsonbVal {
210        serde_json::to_value(self.clone()).unwrap().into()
211    }
212
213    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
214        unimplemented!()
215    }
216}
217
218#[derive(Debug, Clone)]
219pub struct IcebergSplitEnumerator {
220    config: IcebergProperties,
221}
222
223#[derive(Debug, Clone)]
224pub struct IcebergDeleteParameters {
225    pub equality_delete_columns: Vec<String>,
226    pub has_position_delete: bool,
227    pub snapshot_id: Option<i64>,
228}
229
230#[async_trait]
231impl SplitEnumerator for IcebergSplitEnumerator {
232    type Properties = IcebergProperties;
233    type Split = IcebergSplit;
234
235    async fn new(
236        properties: Self::Properties,
237        context: SourceEnumeratorContextRef,
238    ) -> ConnectorResult<Self> {
239        Ok(Self::new_inner(properties, context))
240    }
241
242    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
243        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
244        // instead of relying on SplitEnumerator on meta.
245        // TODO: add some validation logic here.
246        Ok(vec![])
247    }
248}
249impl IcebergSplitEnumerator {
250    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
251        Self { config: properties }
252    }
253}
254
255#[derive(Debug, Clone, PartialEq, Eq, Hash)]
256pub enum IcebergTimeTravelInfo {
257    Version(i64),
258    TimestampMs(i64),
259}
260
261#[derive(Debug, Clone)]
262pub struct IcebergListResult {
263    pub data_files: Vec<FileScanTask>,
264    pub equality_delete_files: Vec<FileScanTask>,
265    pub position_delete_files: Vec<FileScanTask>,
266    pub equality_delete_columns: Vec<String>,
267    pub format_version: FormatVersion,
268    pub schema: std::sync::Arc<iceberg::spec::Schema>,
269}
270
271impl IcebergSplitEnumerator {
272    pub fn get_snapshot_id(
273        table: &Table,
274        time_travel_info: Option<IcebergTimeTravelInfo>,
275    ) -> ConnectorResult<Option<i64>> {
276        let current_snapshot = table.metadata().current_snapshot();
277        let Some(current_snapshot) = current_snapshot else {
278            return Ok(None);
279        };
280
281        let snapshot_id = match time_travel_info {
282            Some(IcebergTimeTravelInfo::Version(version)) => {
283                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
284                    bail!("Cannot find the snapshot id in the iceberg table.");
285                };
286                snapshot.snapshot_id()
287            }
288            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
289                let snapshot = table
290                    .metadata()
291                    .snapshots()
292                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
293                    .max_by_key(|snapshot| snapshot.timestamp_ms());
294                match snapshot {
295                    Some(snapshot) => snapshot.snapshot_id(),
296                    None => {
297                        // convert unix time to human-readable time
298                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
299                        if let Some(time) = time {
300                            tracing::warn!("Cannot find a snapshot older than {}", time);
301                        } else {
302                            tracing::warn!("Cannot find a snapshot");
303                        }
304                        return Ok(None);
305                    }
306                }
307            }
308            None => current_snapshot.snapshot_id(),
309        };
310        Ok(Some(snapshot_id))
311    }
312
313    pub async fn list_scan_tasks(
314        &self,
315        time_travel_info: Option<IcebergTimeTravelInfo>,
316        predicate: IcebergPredicate,
317    ) -> ConnectorResult<Option<IcebergListResult>> {
318        let table = self.config.load_table().await?;
319        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
320
321        let Some(snapshot_id) = snapshot_id else {
322            return Ok(None);
323        };
324        let res = self
325            .list_scan_tasks_inner(&table, snapshot_id, predicate)
326            .await?;
327        Ok(Some(res))
328    }
329
330    async fn list_scan_tasks_inner(
331        &self,
332        table: &Table,
333        snapshot_id: i64,
334        predicate: IcebergPredicate,
335    ) -> ConnectorResult<IcebergListResult> {
336        let format_version = table.metadata().format_version();
337        let table_schema = table.metadata().current_schema();
338        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
339
340        let mut position_delete_files = vec![];
341        let mut position_delete_files_set = HashSet::new();
342        let mut data_files = vec![];
343        let mut equality_delete_files = vec![];
344        let mut equality_delete_files_set = HashSet::new();
345        let mut equality_delete_ids = None;
346        let mut scan_builder = table.scan().snapshot_id(snapshot_id).select_all();
347        if predicate != IcebergPredicate::AlwaysTrue {
348            scan_builder = scan_builder.with_filter(predicate.clone());
349        }
350        let scan = scan_builder.build()?;
351        let file_scan_stream = scan.plan_files().await?;
352
353        #[for_await]
354        for task in file_scan_stream {
355            let task: FileScanTask = task?;
356
357            // Collect delete files for separate scan types, but keep task.deletes intact
358            for delete_file in &task.deletes {
359                let delete_file = delete_file.as_ref().clone();
360                match delete_file.data_file_content {
361                    iceberg::spec::DataContentType::Data => {
362                        bail!("Data file should not in task deletes");
363                    }
364                    iceberg::spec::DataContentType::EqualityDeletes => {
365                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
366                            if equality_delete_ids.is_none() {
367                                equality_delete_ids = delete_file.equality_ids.clone();
368                            } else if equality_delete_ids != delete_file.equality_ids {
369                                bail!(
370                                    "The schema of iceberg equality delete file must be consistent"
371                                );
372                            }
373                            equality_delete_files.push(delete_file);
374                        }
375                    }
376                    iceberg::spec::DataContentType::PositionDeletes => {
377                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
378                            position_delete_files.push(delete_file);
379                        }
380                    }
381                }
382            }
383
384            match task.data_file_content {
385                iceberg::spec::DataContentType::Data => {
386                    // Keep the original task with its deletes field intact
387                    data_files.push(task);
388                }
389                iceberg::spec::DataContentType::EqualityDeletes => {
390                    bail!("Equality delete files should not be in the data files");
391                }
392                iceberg::spec::DataContentType::PositionDeletes => {
393                    bail!("Position delete files should not be in the data files");
394                }
395            }
396        }
397        let schema = table_schema.clone();
398        let equality_delete_columns = equality_delete_ids
399            .unwrap_or_default()
400            .into_iter()
401            .map(|id| match schema.name_by_field_id(id) {
402                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
403                None => bail!("Delete field id {} not found in schema", id),
404            })
405            .collect::<ConnectorResult<Vec<_>>>()?;
406
407        Ok(IcebergListResult {
408            data_files,
409            equality_delete_files,
410            position_delete_files,
411            equality_delete_columns,
412            format_version,
413            schema,
414        })
415    }
416
417    /// Uniformly distribute scan tasks to compute nodes.
418    /// It's deterministic so that it can best utilize the data locality.
419    ///
420    /// # Arguments
421    /// * `file_scan_tasks`: The file scan tasks to be split.
422    /// * `split_num`: The number of splits to be created.
423    ///
424    /// This algorithm is based on a min-heap. It will push all groups into the heap, and then pop the smallest group and add the file scan task to it.
425    /// Ensure that the total length of each group is as balanced as possible.
426    /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
427    /// The space complexity is O(k), where k is the number of splits.
428    /// The algorithm is stable, so the order of the file scan tasks will be preserved.
429    pub fn split_n_vecs(
430        file_scan_tasks: Vec<FileScanTask>,
431        split_num: usize,
432    ) -> Vec<Vec<FileScanTask>> {
433        use std::cmp::{Ordering, Reverse};
434
435        #[derive(Default)]
436        struct FileScanTaskGroup {
437            idx: usize,
438            tasks: Vec<FileScanTask>,
439            total_length: u64,
440        }
441
442        impl Ord for FileScanTaskGroup {
443            fn cmp(&self, other: &Self) -> Ordering {
444                // when total_length is the same, we will sort by index
445                if self.total_length == other.total_length {
446                    self.idx.cmp(&other.idx)
447                } else {
448                    self.total_length.cmp(&other.total_length)
449                }
450            }
451        }
452
453        impl PartialOrd for FileScanTaskGroup {
454            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
455                Some(self.cmp(other))
456            }
457        }
458
459        impl Eq for FileScanTaskGroup {}
460
461        impl PartialEq for FileScanTaskGroup {
462            fn eq(&self, other: &Self) -> bool {
463                self.total_length == other.total_length
464            }
465        }
466
467        let mut heap = BinaryHeap::new();
468        // push all groups into heap
469        for idx in 0..split_num {
470            heap.push(Reverse(FileScanTaskGroup {
471                idx,
472                tasks: vec![],
473                total_length: 0,
474            }));
475        }
476
477        for file_task in file_scan_tasks {
478            let mut group = heap.peek_mut().unwrap();
479            group.0.total_length += file_task.length;
480            group.0.tasks.push(file_task);
481        }
482
483        // convert heap into vec and extract tasks
484        heap.into_vec()
485            .into_iter()
486            .map(|reverse_group| reverse_group.0.tasks)
487            .collect()
488    }
489}
490
491pub struct IcebergScanOpts {
492    pub chunk_size: usize,
493    pub need_seq_num: bool,
494    pub need_file_path_and_pos: bool,
495    pub handle_delete_files: bool,
496}
497
498/// Scan a data file. Delete files are handled by the iceberg-rust `reader.read` implementation.
499#[try_stream(ok = DataChunk, error = ConnectorError)]
500pub async fn scan_task_to_chunk_with_deletes(
501    table: Table,
502    mut data_file_scan_task: FileScanTask,
503    IcebergScanOpts {
504        chunk_size,
505        need_seq_num,
506        need_file_path_and_pos,
507        handle_delete_files,
508    }: IcebergScanOpts,
509    metrics: Option<Arc<IcebergScanMetrics>>,
510) {
511    let table_name = table.identifier().name().to_owned();
512
513    let num_delete_files = data_file_scan_task.deletes.len();
514    let expected_record_count = data_file_scan_task.record_count;
515    let file_start = std::time::Instant::now();
516
517    let mut read_bytes = scopeguard::guard(0u64, |read_bytes| {
518        if let Some(metrics) = metrics.clone() {
519            metrics
520                .iceberg_read_bytes
521                .with_guarded_label_values(&[&table_name])
522                .inc_by(read_bytes as _);
523        }
524    });
525
526    let data_file_path = data_file_scan_task.data_file_path.clone();
527    let data_sequence_number = data_file_scan_task.sequence_number;
528
529    tracing::debug!(
530        "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
531        data_file_path,
532        handle_delete_files,
533        data_file_scan_task.deletes.len()
534    );
535
536    if !handle_delete_files {
537        // Keep the delete files from being applied when the caller opts out.
538        data_file_scan_task.deletes.clear();
539    }
540
541    // Read the data file; delete application is delegated to the reader.
542    let reader = table
543        .reader_builder()
544        .with_batch_size(chunk_size)
545        .with_row_group_filtering_enabled(true)
546        .build();
547    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
548
549    let record_batch_stream: iceberg::scan::ArrowRecordBatchStream =
550        reader.read(Box::pin(file_scan_stream))?;
551    let mut record_batch_stream = record_batch_stream.enumerate();
552
553    let mut total_rows_read: u64 = 0;
554
555    // Process each record batch. Delete application is handled by the SDK.
556    while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
557        let record_batch = record_batch?;
558        let batch_start_pos = (batch_index * chunk_size) as i64;
559
560        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
561        let row_count = chunk.capacity();
562        total_rows_read += row_count as u64;
563
564        // Add metadata columns if requested
565        if need_seq_num {
566            let (mut columns, visibility) = chunk.into_parts();
567            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
568                std::iter::repeat_n(data_sequence_number, row_count),
569            ))));
570            chunk = DataChunk::from_parts(columns.into(), visibility);
571        }
572
573        if need_file_path_and_pos {
574            let (mut columns, visibility) = chunk.into_parts();
575            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
576                std::iter::repeat_n(data_file_path.as_str(), row_count),
577            ))));
578
579            // Generate position values for each row in the batch
580            let positions: Vec<i64> =
581                (batch_start_pos..(batch_start_pos + row_count as i64)).collect();
582            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
583
584            chunk = DataChunk::from_parts(columns.into(), visibility);
585        }
586
587        *read_bytes += chunk.estimated_heap_size() as u64;
588        yield chunk;
589    }
590
591    // Record per-file metrics after reading all batches.
592    if let Some(ref metrics) = metrics {
593        let label_values = [table_name.as_str()];
594
595        // File read duration.
596        metrics
597            .iceberg_source_file_read_duration_seconds
598            .with_guarded_label_values(&label_values)
599            .observe(file_start.elapsed().as_secs_f64());
600
601        // Rows read.
602        if total_rows_read > 0 {
603            metrics
604                .iceberg_source_rows_read_total
605                .with_guarded_label_values(&label_values)
606                .inc_by(total_rows_read);
607        }
608
609        // File read count.
610        metrics
611            .iceberg_source_files_read_total
612            .with_guarded_label_values(&[table_name.as_str(), "data"])
613            .inc();
614
615        // APPROXIMATE: Estimate delete rows applied. The delta between expected_record_count
616        // and actual rows read may also include predicate pushdown / row-group pruning effects,
617        // so this metric can overcount. It is still useful as an approximate signal for
618        // detecting whether delete files cause significant row filtering.
619        if handle_delete_files
620            && num_delete_files > 0
621            && let Some(expected) = expected_record_count
622        {
623            let deleted = expected.saturating_sub(total_rows_read);
624            if deleted > 0 {
625                metrics
626                    .iceberg_source_delete_rows_applied_total
627                    .with_guarded_label_values(&[table_name.as_str(), "sdk_applied_approx"])
628                    .inc_by(deleted);
629            }
630        }
631    }
632}
633
634#[derive(Debug)]
635pub struct IcebergFileReader {}
636
637#[async_trait]
638impl SplitReader for IcebergFileReader {
639    type Properties = IcebergProperties;
640    type Split = IcebergSplit;
641
642    async fn new(
643        _props: IcebergProperties,
644        _splits: Vec<IcebergSplit>,
645        _parser_config: ParserConfig,
646        _source_ctx: SourceContextRef,
647        _columns: Option<Vec<Column>>,
648    ) -> ConnectorResult<Self> {
649        unimplemented!()
650    }
651
652    fn into_stream(self) -> BoxSourceChunkStream {
653        unimplemented!()
654    }
655}
656
657#[cfg(test)]
658mod tests {
659    use std::sync::Arc;
660
661    use iceberg::scan::FileScanTask;
662    use iceberg::spec::{DataContentType, Schema};
663
664    use super::*;
665
666    fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
667        FileScanTask {
668            length,
669            start: 0,
670            record_count: Some(0),
671            data_file_path: format!("test_{}.parquet", id),
672            referenced_data_file: None,
673            data_file_content: DataContentType::Data,
674            data_file_format: iceberg::spec::DataFileFormat::Parquet,
675            schema: Arc::new(Schema::builder().build().unwrap()),
676            project_field_ids: vec![],
677            predicate: None,
678            deletes: vec![],
679            sequence_number: 0,
680            equality_ids: None,
681            file_size_in_bytes: 0,
682            partition: None,
683            partition_spec: None,
684            name_mapping: None,
685            case_sensitive: true,
686        }
687    }
688
689    #[test]
690    fn test_split_n_vecs_basic() {
691        let file_scan_tasks = (1..=12)
692            .map(|i| create_file_scan_task(i + 100, i))
693            .collect::<Vec<_>>(); // Ensure the correct function is called
694
695        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
696
697        assert_eq!(groups.len(), 3);
698
699        let group_lengths: Vec<u64> = groups
700            .iter()
701            .map(|group| group.iter().map(|task| task.length).sum())
702            .collect();
703
704        let max_length = *group_lengths.iter().max().unwrap();
705        let min_length = *group_lengths.iter().min().unwrap();
706        assert!(max_length - min_length <= 10, "Groups should be balanced");
707
708        let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
709        assert_eq!(total_tasks, 12);
710    }
711
712    #[test]
713    fn test_split_n_vecs_empty() {
714        let file_scan_tasks = Vec::new();
715        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
716        assert_eq!(groups.len(), 3);
717        assert!(groups.iter().all(|group| group.is_empty()));
718    }
719
720    #[test]
721    fn test_split_n_vecs_single_task() {
722        let file_scan_tasks = vec![create_file_scan_task(100, 1)];
723        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
724        assert_eq!(groups.len(), 3);
725        assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
726    }
727
728    #[test]
729    fn test_split_n_vecs_uneven_distribution() {
730        let file_scan_tasks = vec![
731            create_file_scan_task(1000, 1),
732            create_file_scan_task(100, 2),
733            create_file_scan_task(100, 3),
734            create_file_scan_task(100, 4),
735            create_file_scan_task(100, 5),
736        ];
737
738        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
739        assert_eq!(groups.len(), 2);
740
741        let group_with_large_task = groups
742            .iter()
743            .find(|group| group.iter().any(|task| task.length == 1000))
744            .unwrap();
745        assert_eq!(group_with_large_task.len(), 1);
746    }
747
748    #[test]
749    fn test_split_n_vecs_same_files_distribution() {
750        let file_scan_tasks = vec![
751            create_file_scan_task(100, 1),
752            create_file_scan_task(100, 2),
753            create_file_scan_task(100, 3),
754            create_file_scan_task(100, 4),
755            create_file_scan_task(100, 5),
756            create_file_scan_task(100, 6),
757            create_file_scan_task(100, 7),
758            create_file_scan_task(100, 8),
759        ];
760
761        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
762            .iter()
763            .map(|g| {
764                g.iter()
765                    .map(|task| task.data_file_path.clone())
766                    .collect::<Vec<_>>()
767            })
768            .collect::<Vec<_>>();
769
770        for _ in 0..10000 {
771            let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
772                .iter()
773                .map(|g| {
774                    g.iter()
775                        .map(|task| task.data_file_path.clone())
776                        .collect::<Vec<_>>()
777                })
778                .collect::<Vec<_>>();
779
780            assert_eq!(groups, groups_2);
781        }
782    }
783}