risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
35use risingwave_common::bail;
36use risingwave_common::catalog::{
37    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
38    Schema,
39};
40use risingwave_common::types::JsonbVal;
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
44use serde::{Deserialize, Serialize};
45
46pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
47use crate::connector_common::IcebergCommon;
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::error::{ConnectorError, ConnectorResult};
50use crate::parser::ParserConfig;
51use crate::source::{
52    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
53    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
54};
55pub const ICEBERG_CONNECTOR: &str = "iceberg";
56
57#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
58pub struct IcebergProperties {
59    #[serde(flatten)]
60    pub common: IcebergCommon,
61
62    // For jdbc catalog
63    #[serde(rename = "catalog.jdbc.user")]
64    pub jdbc_user: Option<String>,
65    #[serde(rename = "catalog.jdbc.password")]
66    pub jdbc_password: Option<String>,
67
68    #[serde(flatten)]
69    pub unknown_fields: HashMap<String, String>,
70}
71
72impl EnforceSecret for IcebergProperties {
73    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
74        "catalog.jdbc.password",
75    };
76
77    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
78        for prop in prop_iter {
79            IcebergCommon::enforce_one(prop)?;
80            if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
81                return Err(EnforceSecretError {
82                    key: prop.to_owned(),
83                }
84                .into());
85            }
86        }
87        Ok(())
88    }
89}
90
91impl IcebergProperties {
92    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
93        let mut java_catalog_props = HashMap::new();
94        if let Some(jdbc_user) = self.jdbc_user.clone() {
95            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
96        }
97        if let Some(jdbc_password) = self.jdbc_password.clone() {
98            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
99        }
100        // TODO: support path_style_access and java_catalog_props for iceberg source
101        self.common.create_catalog(&java_catalog_props).await
102    }
103
104    pub async fn load_table(&self) -> ConnectorResult<Table> {
105        let mut java_catalog_props = HashMap::new();
106        if let Some(jdbc_user) = self.jdbc_user.clone() {
107            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
108        }
109        if let Some(jdbc_password) = self.jdbc_password.clone() {
110            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
111        }
112        // TODO: support java_catalog_props for iceberg source
113        self.common.load_table(&java_catalog_props).await
114    }
115}
116
117impl SourceProperties for IcebergProperties {
118    type Split = IcebergSplit;
119    type SplitEnumerator = IcebergSplitEnumerator;
120    type SplitReader = IcebergFileReader;
121
122    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
123}
124
125impl UnknownFields for IcebergProperties {
126    fn unknown_fields(&self) -> HashMap<String, String> {
127        self.unknown_fields.clone()
128    }
129}
130
131#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
132pub struct IcebergFileScanTaskJsonStr(String);
133
134impl IcebergFileScanTaskJsonStr {
135    pub fn deserialize(&self) -> FileScanTask {
136        serde_json::from_str(&self.0).unwrap()
137    }
138
139    pub fn serialize(task: &FileScanTask) -> Self {
140        Self(serde_json::to_string(task).unwrap())
141    }
142}
143
144#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
145pub enum IcebergFileScanTask {
146    Data(Vec<FileScanTask>),
147    EqualityDelete(Vec<FileScanTask>),
148    PositionDelete(Vec<FileScanTask>),
149    CountStar(u64),
150}
151
152impl IcebergFileScanTask {
153    pub fn new_count_star(count_sum: u64) -> Self {
154        IcebergFileScanTask::CountStar(count_sum)
155    }
156
157    pub fn new_scan_with_scan_type(
158        iceberg_scan_type: IcebergScanType,
159        data_files: Vec<FileScanTask>,
160        equality_delete_files: Vec<FileScanTask>,
161        position_delete_files: Vec<FileScanTask>,
162    ) -> Self {
163        match iceberg_scan_type {
164            IcebergScanType::EqualityDeleteScan => {
165                IcebergFileScanTask::EqualityDelete(equality_delete_files)
166            }
167            IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
168            IcebergScanType::PositionDeleteScan => {
169                IcebergFileScanTask::PositionDelete(position_delete_files)
170            }
171            IcebergScanType::Unspecified | IcebergScanType::CountStar => {
172                unreachable!("Unspecified iceberg scan type")
173            }
174        }
175    }
176
177    pub fn is_empty(&self) -> bool {
178        match self {
179            IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
180            IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
181                equality_delete_files.is_empty()
182            }
183            IcebergFileScanTask::PositionDelete(position_delete_files) => {
184                position_delete_files.is_empty()
185            }
186            IcebergFileScanTask::CountStar(_) => false,
187        }
188    }
189
190    pub fn files(&self) -> Vec<String> {
191        match self {
192            IcebergFileScanTask::Data(file_scan_tasks)
193            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
194            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
195                .iter()
196                .map(|task| task.data_file_path.clone())
197                .collect(),
198            IcebergFileScanTask::CountStar(_) => vec![],
199        }
200    }
201}
202
203#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
204pub struct IcebergSplit {
205    pub split_id: i64,
206    // TODO: remove this field. It seems not used.
207    pub snapshot_id: i64,
208    pub task: IcebergFileScanTask,
209}
210
211impl IcebergSplit {
212    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
213        if let IcebergScanType::CountStar = iceberg_scan_type {
214            Self {
215                split_id: 0,
216                snapshot_id: 0,
217                task: IcebergFileScanTask::new_count_star(0),
218            }
219        } else {
220            Self {
221                split_id: 0,
222                snapshot_id: 0,
223                task: IcebergFileScanTask::new_scan_with_scan_type(
224                    iceberg_scan_type,
225                    vec![],
226                    vec![],
227                    vec![],
228                ),
229            }
230        }
231    }
232}
233
234impl SplitMetaData for IcebergSplit {
235    fn id(&self) -> SplitId {
236        self.split_id.to_string().into()
237    }
238
239    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
240        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
241    }
242
243    fn encode_to_json(&self) -> JsonbVal {
244        serde_json::to_value(self.clone()).unwrap().into()
245    }
246
247    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
248        unimplemented!()
249    }
250}
251
252#[derive(Debug, Clone)]
253pub struct IcebergSplitEnumerator {
254    config: IcebergProperties,
255}
256
257#[async_trait]
258impl SplitEnumerator for IcebergSplitEnumerator {
259    type Properties = IcebergProperties;
260    type Split = IcebergSplit;
261
262    async fn new(
263        properties: Self::Properties,
264        context: SourceEnumeratorContextRef,
265    ) -> ConnectorResult<Self> {
266        Ok(Self::new_inner(properties, context))
267    }
268
269    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
270        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
271        // instead of relying on SplitEnumerator on meta.
272        // TODO: add some validation logic here.
273        Ok(vec![])
274    }
275}
276impl IcebergSplitEnumerator {
277    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
278        Self { config: properties }
279    }
280}
281
282pub enum IcebergTimeTravelInfo {
283    Version(i64),
284    TimestampMs(i64),
285}
286
287impl IcebergSplitEnumerator {
288    pub fn get_snapshot_id(
289        table: &Table,
290        time_travel_info: Option<IcebergTimeTravelInfo>,
291    ) -> ConnectorResult<Option<i64>> {
292        let current_snapshot = table.metadata().current_snapshot();
293        if current_snapshot.is_none() {
294            return Ok(None);
295        }
296
297        let snapshot_id = match time_travel_info {
298            Some(IcebergTimeTravelInfo::Version(version)) => {
299                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
300                    bail!("Cannot find the snapshot id in the iceberg table.");
301                };
302                snapshot.snapshot_id()
303            }
304            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
305                let snapshot = table
306                    .metadata()
307                    .snapshots()
308                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
309                    .max_by_key(|snapshot| snapshot.timestamp_ms());
310                match snapshot {
311                    Some(snapshot) => snapshot.snapshot_id(),
312                    None => {
313                        // convert unix time to human-readable time
314                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
315                        if time.is_some() {
316                            bail!("Cannot find a snapshot older than {}", time.unwrap());
317                        } else {
318                            bail!("Cannot find a snapshot");
319                        }
320                    }
321                }
322            }
323            None => {
324                assert!(current_snapshot.is_some());
325                current_snapshot.unwrap().snapshot_id()
326            }
327        };
328        Ok(Some(snapshot_id))
329    }
330
331    pub async fn list_splits_batch(
332        &self,
333        schema: Schema,
334        time_traval_info: Option<IcebergTimeTravelInfo>,
335        batch_parallelism: usize,
336        iceberg_scan_type: IcebergScanType,
337        predicate: IcebergPredicate,
338    ) -> ConnectorResult<Vec<IcebergSplit>> {
339        if batch_parallelism == 0 {
340            bail!("Batch parallelism is 0. Cannot split the iceberg files.");
341        }
342        let table = self.config.load_table().await?;
343        let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
344        if snapshot_id.is_none() {
345            // If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
346            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
347        }
348        if let IcebergScanType::CountStar = iceberg_scan_type {
349            self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
350                .await
351        } else {
352            self.list_splits_batch_scan(
353                &table,
354                snapshot_id.unwrap(),
355                schema,
356                batch_parallelism,
357                iceberg_scan_type,
358                predicate,
359            )
360            .await
361        }
362    }
363
364    async fn list_splits_batch_scan(
365        &self,
366        table: &Table,
367        snapshot_id: i64,
368        schema: Schema,
369        batch_parallelism: usize,
370        iceberg_scan_type: IcebergScanType,
371        predicate: IcebergPredicate,
372    ) -> ConnectorResult<Vec<IcebergSplit>> {
373        let schema_names = schema.names();
374        let require_names = schema_names
375            .iter()
376            .filter(|name| {
377                name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
378                    && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
379                    && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
380            })
381            .cloned()
382            .collect_vec();
383
384        let table_schema = table.metadata().current_schema();
385        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
386
387        let mut position_delete_files = vec![];
388        let mut position_delete_files_set = HashSet::new();
389        let mut data_files = vec![];
390        let mut equality_delete_files = vec![];
391        let mut equality_delete_files_set = HashSet::new();
392        let scan = table
393            .scan()
394            .with_filter(predicate)
395            .snapshot_id(snapshot_id)
396            .with_delete_file_processing_enabled(true)
397            .select(require_names)
398            .build()
399            .map_err(|e| anyhow!(e))?;
400
401        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
402
403        #[for_await]
404        for task in file_scan_stream {
405            let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
406            for mut delete_file in task.deletes.drain(..) {
407                match delete_file.data_file_content {
408                    iceberg::spec::DataContentType::Data => {
409                        bail!("Data file should not in task deletes");
410                    }
411                    iceberg::spec::DataContentType::EqualityDeletes => {
412                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
413                            equality_delete_files.push(delete_file);
414                        }
415                    }
416                    iceberg::spec::DataContentType::PositionDeletes => {
417                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
418                            delete_file.project_field_ids = Vec::default();
419                            position_delete_files.push(delete_file);
420                        }
421                    }
422                }
423            }
424            match task.data_file_content {
425                iceberg::spec::DataContentType::Data => {
426                    data_files.push(task);
427                }
428                iceberg::spec::DataContentType::EqualityDeletes => {
429                    bail!("Equality delete files should not be in the data files");
430                }
431                iceberg::spec::DataContentType::PositionDeletes => {
432                    bail!("Position delete files should not be in the data files");
433                }
434            }
435        }
436        // evenly split the files into splits based on the parallelism.
437        let data_files = Self::split_n_vecs(data_files, batch_parallelism);
438        let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
439        let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
440
441        let splits = data_files
442            .into_iter()
443            .zip_eq_fast(equality_delete_files.into_iter())
444            .zip_eq_fast(position_delete_files.into_iter())
445            .enumerate()
446            .map(
447                |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
448                    split_id: index as i64,
449                    snapshot_id,
450                    task: IcebergFileScanTask::new_scan_with_scan_type(
451                        iceberg_scan_type,
452                        data_file,
453                        equality_delete_file,
454                        position_delete_file,
455                    ),
456                },
457            )
458            .filter(|split| !split.task.is_empty())
459            .collect_vec();
460
461        if splits.is_empty() {
462            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
463        }
464        Ok(splits)
465    }
466
467    pub async fn list_splits_batch_count_star(
468        &self,
469        table: &Table,
470        snapshot_id: i64,
471    ) -> ConnectorResult<Vec<IcebergSplit>> {
472        let mut record_counts = 0;
473        let scan = table
474            .scan()
475            .snapshot_id(snapshot_id)
476            .with_delete_file_processing_enabled(true)
477            .build()
478            .map_err(|e| anyhow!(e))?;
479        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
480
481        #[for_await]
482        for task in file_scan_stream {
483            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
484            assert_eq!(task.data_file_content, DataContentType::Data);
485            assert!(task.deletes.is_empty());
486            record_counts += task.record_count.expect("must have");
487        }
488        let split = IcebergSplit {
489            split_id: 0,
490            snapshot_id,
491            task: IcebergFileScanTask::new_count_star(record_counts),
492        };
493        Ok(vec![split])
494    }
495
496    /// List all files in the snapshot to check if there are deletes.
497    pub async fn all_delete_parameters(
498        table: &Table,
499        snapshot_id: i64,
500    ) -> ConnectorResult<(Vec<String>, bool)> {
501        let scan = table
502            .scan()
503            .snapshot_id(snapshot_id)
504            .with_delete_file_processing_enabled(true)
505            .build()
506            .map_err(|e| anyhow!(e))?;
507        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
508        let schema = scan.snapshot().schema(table.metadata())?;
509        let mut equality_ids = vec![];
510        let mut have_position_delete = false;
511        #[for_await]
512        for task in file_scan_stream {
513            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
514            for delete_file in task.deletes {
515                match delete_file.data_file_content {
516                    iceberg::spec::DataContentType::Data => {}
517                    iceberg::spec::DataContentType::EqualityDeletes => {
518                        if equality_ids.is_empty() {
519                            equality_ids = delete_file.equality_ids;
520                        } else if equality_ids != delete_file.equality_ids {
521                            bail!("The schema of iceberg equality delete file must be consistent");
522                        }
523                    }
524                    iceberg::spec::DataContentType::PositionDeletes => {
525                        have_position_delete = true;
526                    }
527                }
528            }
529        }
530        let delete_columns = equality_ids
531            .into_iter()
532            .map(|id| match schema.name_by_field_id(id) {
533                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
534                None => bail!("Delete field id {} not found in schema", id),
535            })
536            .collect::<ConnectorResult<Vec<_>>>()?;
537
538        Ok((delete_columns, have_position_delete))
539    }
540
541    pub async fn get_delete_parameters(
542        &self,
543        time_travel_info: Option<IcebergTimeTravelInfo>,
544    ) -> ConnectorResult<(Vec<String>, bool)> {
545        let table = self.config.load_table().await?;
546        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
547        if snapshot_id.is_none() {
548            return Ok((vec![], false));
549        }
550        let snapshot_id = snapshot_id.unwrap();
551        Self::all_delete_parameters(&table, snapshot_id).await
552    }
553
554    /// Uniformly distribute scan tasks to compute nodes.
555    /// It's deterministic so that it can best utilize the data locality.
556    ///
557    /// # Arguments
558    /// * `file_scan_tasks`: The file scan tasks to be split.
559    /// * `split_num`: The number of splits to be created.
560    ///
561    /// This algorithm is based on a min-heap. It will push all groups into the heap, and then pop the smallest group and add the file scan task to it.
562    /// Ensure that the total length of each group is as balanced as possible.
563    /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
564    /// The space complexity is O(k), where k is the number of splits.
565    /// The algorithm is stable, so the order of the file scan tasks will be preserved.
566    fn split_n_vecs(
567        file_scan_tasks: Vec<FileScanTask>,
568        split_num: usize,
569    ) -> Vec<Vec<FileScanTask>> {
570        use std::cmp::{Ordering, Reverse};
571
572        #[derive(Default)]
573        struct FileScanTaskGroup {
574            idx: usize,
575            tasks: Vec<FileScanTask>,
576            total_length: u64,
577        }
578
579        impl Ord for FileScanTaskGroup {
580            fn cmp(&self, other: &Self) -> Ordering {
581                // when total_length is the same, we will sort by index
582                if self.total_length == other.total_length {
583                    self.idx.cmp(&other.idx)
584                } else {
585                    self.total_length.cmp(&other.total_length)
586                }
587            }
588        }
589
590        impl PartialOrd for FileScanTaskGroup {
591            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
592                Some(self.cmp(other))
593            }
594        }
595
596        impl Eq for FileScanTaskGroup {}
597
598        impl PartialEq for FileScanTaskGroup {
599            fn eq(&self, other: &Self) -> bool {
600                self.total_length == other.total_length
601            }
602        }
603
604        let mut heap = BinaryHeap::new();
605        // push all groups into heap
606        for idx in 0..split_num {
607            heap.push(Reverse(FileScanTaskGroup {
608                idx,
609                tasks: vec![],
610                total_length: 0,
611            }));
612        }
613
614        for file_task in file_scan_tasks {
615            let mut group = heap.peek_mut().unwrap();
616            group.0.total_length += file_task.length;
617            group.0.tasks.push(file_task);
618        }
619
620        // convert heap into vec and extract tasks
621        heap.into_vec()
622            .into_iter()
623            .map(|reverse_group| reverse_group.0.tasks)
624            .collect()
625    }
626}
627
628pub struct IcebergScanOpts {
629    pub chunk_size: usize,
630    pub need_seq_num: bool,
631    pub need_file_path_and_pos: bool,
632}
633
634#[try_stream(ok = DataChunk, error = ConnectorError)]
635pub async fn scan_task_to_chunk(
636    table: Table,
637    data_file_scan_task: FileScanTask,
638    IcebergScanOpts {
639        chunk_size,
640        need_seq_num,
641        need_file_path_and_pos,
642    }: IcebergScanOpts,
643    metrics: Option<Arc<IcebergScanMetrics>>,
644) {
645    let table_name = table.identifier().name().to_owned();
646
647    let mut read_bytes = scopeguard::guard(0, |read_bytes| {
648        if let Some(metrics) = metrics {
649            metrics
650                .iceberg_read_bytes
651                .with_guarded_label_values(&[&table_name])
652                .inc_by(read_bytes as _);
653        }
654    });
655
656    let data_file_path = data_file_scan_task.data_file_path.clone();
657    let data_sequence_number = data_file_scan_task.sequence_number;
658
659    let reader = table.reader_builder().with_batch_size(chunk_size).build();
660    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
661
662    // FIXME: what if the start position is not 0? The logic for index seems not correct.
663    let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
664
665    while let Some((index, record_batch)) = record_batch_stream.next().await {
666        let record_batch = record_batch?;
667
668        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
669        if need_seq_num {
670            let (mut columns, visibility) = chunk.into_parts();
671            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
672                vec![data_sequence_number; visibility.len()],
673            ))));
674            chunk = DataChunk::from_parts(columns.into(), visibility)
675        };
676        if need_file_path_and_pos {
677            let (mut columns, visibility) = chunk.into_parts();
678            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
679                vec![data_file_path.as_str(); visibility.len()],
680            ))));
681            let index_start = (index * chunk_size) as i64;
682            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
683                (index_start..(index_start + visibility.len() as i64)).collect::<Vec<i64>>(),
684            ))));
685            chunk = DataChunk::from_parts(columns.into(), visibility)
686        }
687        *read_bytes += chunk.estimated_heap_size() as u64;
688        yield chunk;
689    }
690}
691
692#[derive(Debug)]
693pub struct IcebergFileReader {}
694
695#[async_trait]
696impl SplitReader for IcebergFileReader {
697    type Properties = IcebergProperties;
698    type Split = IcebergSplit;
699
700    async fn new(
701        _props: IcebergProperties,
702        _splits: Vec<IcebergSplit>,
703        _parser_config: ParserConfig,
704        _source_ctx: SourceContextRef,
705        _columns: Option<Vec<Column>>,
706    ) -> ConnectorResult<Self> {
707        unimplemented!()
708    }
709
710    fn into_stream(self) -> BoxSourceChunkStream {
711        unimplemented!()
712    }
713}
714
715#[cfg(test)]
716mod tests {
717    use std::sync::Arc;
718
719    use iceberg::scan::FileScanTask;
720    use iceberg::spec::{DataContentType, Schema};
721
722    use super::*;
723
724    fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
725        FileScanTask {
726            length,
727            start: 0,
728            record_count: Some(0),
729            data_file_path: format!("test_{}.parquet", id).to_owned(),
730            data_file_content: DataContentType::Data,
731            data_file_format: iceberg::spec::DataFileFormat::Parquet,
732            schema: Arc::new(Schema::builder().build().unwrap()),
733            project_field_ids: vec![],
734            predicate: None,
735            deletes: vec![],
736            sequence_number: 0,
737            equality_ids: vec![],
738        }
739    }
740
741    #[test]
742    fn test_split_n_vecs_basic() {
743        let file_scan_tasks = (1..=12)
744            .map(|i| create_file_scan_task(i + 100, i))
745            .collect::<Vec<_>>(); // Ensure the correct function is called
746
747        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
748
749        assert_eq!(groups.len(), 3);
750
751        let group_lengths: Vec<u64> = groups
752            .iter()
753            .map(|group| group.iter().map(|task| task.length).sum())
754            .collect();
755
756        let max_length = *group_lengths.iter().max().unwrap();
757        let min_length = *group_lengths.iter().min().unwrap();
758        assert!(max_length - min_length <= 10, "Groups should be balanced");
759
760        let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
761        assert_eq!(total_tasks, 12);
762    }
763
764    #[test]
765    fn test_split_n_vecs_empty() {
766        let file_scan_tasks = Vec::new();
767        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
768        assert_eq!(groups.len(), 3);
769        assert!(groups.iter().all(|group| group.is_empty()));
770    }
771
772    #[test]
773    fn test_split_n_vecs_single_task() {
774        let file_scan_tasks = vec![create_file_scan_task(100, 1)];
775        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
776        assert_eq!(groups.len(), 3);
777        assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
778    }
779
780    #[test]
781    fn test_split_n_vecs_uneven_distribution() {
782        let file_scan_tasks = vec![
783            create_file_scan_task(1000, 1),
784            create_file_scan_task(100, 2),
785            create_file_scan_task(100, 3),
786            create_file_scan_task(100, 4),
787            create_file_scan_task(100, 5),
788        ];
789
790        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
791        assert_eq!(groups.len(), 2);
792
793        let group_with_large_task = groups
794            .iter()
795            .find(|group| group.iter().any(|task| task.length == 1000))
796            .unwrap();
797        assert_eq!(group_with_large_task.len(), 1);
798    }
799
800    #[test]
801    fn test_split_n_vecs_same_files_distribution() {
802        let file_scan_tasks = vec![
803            create_file_scan_task(100, 1),
804            create_file_scan_task(100, 2),
805            create_file_scan_task(100, 3),
806            create_file_scan_task(100, 4),
807            create_file_scan_task(100, 5),
808            create_file_scan_task(100, 6),
809            create_file_scan_task(100, 7),
810            create_file_scan_task(100, 8),
811        ];
812
813        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
814            .iter()
815            .map(|g| {
816                g.iter()
817                    .map(|task| task.data_file_path.clone())
818                    .collect::<Vec<_>>()
819            })
820            .collect::<Vec<_>>();
821
822        for _ in 0..10000 {
823            let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
824                .iter()
825                .map(|g| {
826                    g.iter()
827                        .map(|task| task.data_file_path.clone())
828                        .collect::<Vec<_>>()
829                })
830                .collect::<Vec<_>>();
831
832            assert_eq!(groups, groups_2);
833        }
834    }
835}