risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
35use risingwave_common::bail;
36use risingwave_common::catalog::{
37    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
38    Schema,
39};
40use risingwave_common::types::JsonbVal;
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
44use serde::{Deserialize, Serialize};
45
46pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
47use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::error::{ConnectorError, ConnectorResult};
50use crate::parser::ParserConfig;
51use crate::source::{
52    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
53    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
54};
55pub const ICEBERG_CONNECTOR: &str = "iceberg";
56
57#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
58pub struct IcebergProperties {
59    #[serde(flatten)]
60    pub common: IcebergCommon,
61
62    #[serde(flatten)]
63    pub table: IcebergTableIdentifier,
64
65    // For jdbc catalog
66    #[serde(rename = "catalog.jdbc.user")]
67    pub jdbc_user: Option<String>,
68    #[serde(rename = "catalog.jdbc.password")]
69    pub jdbc_password: Option<String>,
70
71    #[serde(flatten)]
72    pub unknown_fields: HashMap<String, String>,
73}
74
75impl EnforceSecret for IcebergProperties {
76    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77        "catalog.jdbc.password",
78    };
79
80    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
81        for prop in prop_iter {
82            IcebergCommon::enforce_one(prop)?;
83            if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
84                return Err(EnforceSecretError {
85                    key: prop.to_owned(),
86                }
87                .into());
88            }
89        }
90        Ok(())
91    }
92}
93
94impl IcebergProperties {
95    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
96        let mut java_catalog_props = HashMap::new();
97        if let Some(jdbc_user) = self.jdbc_user.clone() {
98            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
99        }
100        if let Some(jdbc_password) = self.jdbc_password.clone() {
101            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
102        }
103        // TODO: support path_style_access and java_catalog_props for iceberg source
104        self.common.create_catalog(&java_catalog_props).await
105    }
106
107    pub async fn load_table(&self) -> ConnectorResult<Table> {
108        let mut java_catalog_props = HashMap::new();
109        if let Some(jdbc_user) = self.jdbc_user.clone() {
110            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
111        }
112        if let Some(jdbc_password) = self.jdbc_password.clone() {
113            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
114        }
115        // TODO: support java_catalog_props for iceberg source
116        self.common
117            .load_table(&self.table, &java_catalog_props)
118            .await
119    }
120}
121
122impl SourceProperties for IcebergProperties {
123    type Split = IcebergSplit;
124    type SplitEnumerator = IcebergSplitEnumerator;
125    type SplitReader = IcebergFileReader;
126
127    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
128}
129
130impl UnknownFields for IcebergProperties {
131    fn unknown_fields(&self) -> HashMap<String, String> {
132        self.unknown_fields.clone()
133    }
134}
135
136#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
137pub struct IcebergFileScanTaskJsonStr(String);
138
139impl IcebergFileScanTaskJsonStr {
140    pub fn deserialize(&self) -> FileScanTask {
141        serde_json::from_str(&self.0).unwrap()
142    }
143
144    pub fn serialize(task: &FileScanTask) -> Self {
145        Self(serde_json::to_string(task).unwrap())
146    }
147}
148
149#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
150pub enum IcebergFileScanTask {
151    Data(Vec<FileScanTask>),
152    EqualityDelete(Vec<FileScanTask>),
153    PositionDelete(Vec<FileScanTask>),
154    CountStar(u64),
155}
156
157impl IcebergFileScanTask {
158    pub fn new_count_star(count_sum: u64) -> Self {
159        IcebergFileScanTask::CountStar(count_sum)
160    }
161
162    pub fn new_scan_with_scan_type(
163        iceberg_scan_type: IcebergScanType,
164        data_files: Vec<FileScanTask>,
165        equality_delete_files: Vec<FileScanTask>,
166        position_delete_files: Vec<FileScanTask>,
167    ) -> Self {
168        match iceberg_scan_type {
169            IcebergScanType::EqualityDeleteScan => {
170                IcebergFileScanTask::EqualityDelete(equality_delete_files)
171            }
172            IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
173            IcebergScanType::PositionDeleteScan => {
174                IcebergFileScanTask::PositionDelete(position_delete_files)
175            }
176            IcebergScanType::Unspecified | IcebergScanType::CountStar => {
177                unreachable!("Unspecified iceberg scan type")
178            }
179        }
180    }
181
182    pub fn is_empty(&self) -> bool {
183        match self {
184            IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
185            IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
186                equality_delete_files.is_empty()
187            }
188            IcebergFileScanTask::PositionDelete(position_delete_files) => {
189                position_delete_files.is_empty()
190            }
191            IcebergFileScanTask::CountStar(_) => false,
192        }
193    }
194
195    pub fn files(&self) -> Vec<String> {
196        match self {
197            IcebergFileScanTask::Data(file_scan_tasks)
198            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
199            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
200                .iter()
201                .map(|task| task.data_file_path.clone())
202                .collect(),
203            IcebergFileScanTask::CountStar(_) => vec![],
204        }
205    }
206}
207
208#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
209pub struct IcebergSplit {
210    pub split_id: i64,
211    // TODO: remove this field. It seems not used.
212    pub snapshot_id: i64,
213    pub task: IcebergFileScanTask,
214}
215
216impl IcebergSplit {
217    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
218        if let IcebergScanType::CountStar = iceberg_scan_type {
219            Self {
220                split_id: 0,
221                snapshot_id: 0,
222                task: IcebergFileScanTask::new_count_star(0),
223            }
224        } else {
225            Self {
226                split_id: 0,
227                snapshot_id: 0,
228                task: IcebergFileScanTask::new_scan_with_scan_type(
229                    iceberg_scan_type,
230                    vec![],
231                    vec![],
232                    vec![],
233                ),
234            }
235        }
236    }
237}
238
239impl SplitMetaData for IcebergSplit {
240    fn id(&self) -> SplitId {
241        self.split_id.to_string().into()
242    }
243
244    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
245        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
246    }
247
248    fn encode_to_json(&self) -> JsonbVal {
249        serde_json::to_value(self.clone()).unwrap().into()
250    }
251
252    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
253        unimplemented!()
254    }
255}
256
257#[derive(Debug, Clone)]
258pub struct IcebergSplitEnumerator {
259    config: IcebergProperties,
260}
261
262#[async_trait]
263impl SplitEnumerator for IcebergSplitEnumerator {
264    type Properties = IcebergProperties;
265    type Split = IcebergSplit;
266
267    async fn new(
268        properties: Self::Properties,
269        context: SourceEnumeratorContextRef,
270    ) -> ConnectorResult<Self> {
271        Ok(Self::new_inner(properties, context))
272    }
273
274    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
275        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
276        // instead of relying on SplitEnumerator on meta.
277        // TODO: add some validation logic here.
278        Ok(vec![])
279    }
280}
281impl IcebergSplitEnumerator {
282    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
283        Self { config: properties }
284    }
285}
286
287pub enum IcebergTimeTravelInfo {
288    Version(i64),
289    TimestampMs(i64),
290}
291
292impl IcebergSplitEnumerator {
293    pub fn get_snapshot_id(
294        table: &Table,
295        time_travel_info: Option<IcebergTimeTravelInfo>,
296    ) -> ConnectorResult<Option<i64>> {
297        let current_snapshot = table.metadata().current_snapshot();
298        if current_snapshot.is_none() {
299            return Ok(None);
300        }
301
302        let snapshot_id = match time_travel_info {
303            Some(IcebergTimeTravelInfo::Version(version)) => {
304                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
305                    bail!("Cannot find the snapshot id in the iceberg table.");
306                };
307                snapshot.snapshot_id()
308            }
309            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
310                let snapshot = table
311                    .metadata()
312                    .snapshots()
313                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
314                    .max_by_key(|snapshot| snapshot.timestamp_ms());
315                match snapshot {
316                    Some(snapshot) => snapshot.snapshot_id(),
317                    None => {
318                        // convert unix time to human-readable time
319                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
320                        if let Some(time) = time {
321                            bail!("Cannot find a snapshot older than {}", time);
322                        } else {
323                            bail!("Cannot find a snapshot");
324                        }
325                    }
326                }
327            }
328            None => {
329                assert!(current_snapshot.is_some());
330                current_snapshot.unwrap().snapshot_id()
331            }
332        };
333        Ok(Some(snapshot_id))
334    }
335
336    pub async fn list_splits_batch(
337        &self,
338        schema: Schema,
339        time_traval_info: Option<IcebergTimeTravelInfo>,
340        batch_parallelism: usize,
341        iceberg_scan_type: IcebergScanType,
342        predicate: IcebergPredicate,
343    ) -> ConnectorResult<Vec<IcebergSplit>> {
344        if batch_parallelism == 0 {
345            bail!("Batch parallelism is 0. Cannot split the iceberg files.");
346        }
347        let table = self.config.load_table().await?;
348        let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
349        if snapshot_id.is_none() {
350            // If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
351            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
352        }
353        if let IcebergScanType::CountStar = iceberg_scan_type {
354            self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
355                .await
356        } else {
357            self.list_splits_batch_scan(
358                &table,
359                snapshot_id.unwrap(),
360                schema,
361                batch_parallelism,
362                iceberg_scan_type,
363                predicate,
364            )
365            .await
366        }
367    }
368
369    async fn list_splits_batch_scan(
370        &self,
371        table: &Table,
372        snapshot_id: i64,
373        schema: Schema,
374        batch_parallelism: usize,
375        iceberg_scan_type: IcebergScanType,
376        predicate: IcebergPredicate,
377    ) -> ConnectorResult<Vec<IcebergSplit>> {
378        let schema_names = schema.names();
379        let require_names = schema_names
380            .iter()
381            .filter(|name| {
382                name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
383                    && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
384                    && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
385            })
386            .cloned()
387            .collect_vec();
388
389        let table_schema = table.metadata().current_schema();
390        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
391
392        let mut position_delete_files = vec![];
393        let mut position_delete_files_set = HashSet::new();
394        let mut data_files = vec![];
395        let mut equality_delete_files = vec![];
396        let mut equality_delete_files_set = HashSet::new();
397        let scan = table
398            .scan()
399            .with_filter(predicate)
400            .snapshot_id(snapshot_id)
401            .with_delete_file_processing_enabled(true)
402            .select(require_names)
403            .build()
404            .map_err(|e| anyhow!(e))?;
405
406        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
407
408        #[for_await]
409        for task in file_scan_stream {
410            let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
411            for delete_file in task.deletes.drain(..) {
412                let mut delete_file = delete_file.as_ref().clone();
413                match delete_file.data_file_content {
414                    iceberg::spec::DataContentType::Data => {
415                        bail!("Data file should not in task deletes");
416                    }
417                    iceberg::spec::DataContentType::EqualityDeletes => {
418                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
419                            equality_delete_files.push(delete_file);
420                        }
421                    }
422                    iceberg::spec::DataContentType::PositionDeletes => {
423                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
424                            delete_file.project_field_ids = Vec::default();
425                            position_delete_files.push(delete_file);
426                        }
427                    }
428                }
429            }
430            match task.data_file_content {
431                iceberg::spec::DataContentType::Data => {
432                    data_files.push(task);
433                }
434                iceberg::spec::DataContentType::EqualityDeletes => {
435                    bail!("Equality delete files should not be in the data files");
436                }
437                iceberg::spec::DataContentType::PositionDeletes => {
438                    bail!("Position delete files should not be in the data files");
439                }
440            }
441        }
442        // evenly split the files into splits based on the parallelism.
443        let data_files = Self::split_n_vecs(data_files, batch_parallelism);
444        let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
445        let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
446
447        let splits = data_files
448            .into_iter()
449            .zip_eq_fast(equality_delete_files.into_iter())
450            .zip_eq_fast(position_delete_files.into_iter())
451            .enumerate()
452            .map(
453                |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
454                    split_id: index as i64,
455                    snapshot_id,
456                    task: IcebergFileScanTask::new_scan_with_scan_type(
457                        iceberg_scan_type,
458                        data_file,
459                        equality_delete_file,
460                        position_delete_file,
461                    ),
462                },
463            )
464            .filter(|split| !split.task.is_empty())
465            .collect_vec();
466
467        if splits.is_empty() {
468            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
469        }
470        Ok(splits)
471    }
472
473    pub async fn list_splits_batch_count_star(
474        &self,
475        table: &Table,
476        snapshot_id: i64,
477    ) -> ConnectorResult<Vec<IcebergSplit>> {
478        let mut record_counts = 0;
479        let scan = table
480            .scan()
481            .snapshot_id(snapshot_id)
482            .with_delete_file_processing_enabled(true)
483            .build()
484            .map_err(|e| anyhow!(e))?;
485        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
486
487        #[for_await]
488        for task in file_scan_stream {
489            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
490            assert_eq!(task.data_file_content, DataContentType::Data);
491            assert!(task.deletes.is_empty());
492            record_counts += task.record_count.expect("must have");
493        }
494        let split = IcebergSplit {
495            split_id: 0,
496            snapshot_id,
497            task: IcebergFileScanTask::new_count_star(record_counts),
498        };
499        Ok(vec![split])
500    }
501
502    /// List all files in the snapshot to check if there are deletes.
503    pub async fn all_delete_parameters(
504        table: &Table,
505        snapshot_id: i64,
506    ) -> ConnectorResult<(Vec<String>, bool)> {
507        let scan = table
508            .scan()
509            .snapshot_id(snapshot_id)
510            .with_delete_file_processing_enabled(true)
511            .build()
512            .map_err(|e| anyhow!(e))?;
513        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
514        let schema = scan.snapshot().schema(table.metadata())?;
515        let mut equality_ids = vec![];
516        let mut have_position_delete = false;
517        #[for_await]
518        for task in file_scan_stream {
519            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
520            for delete_file in task.deletes {
521                match delete_file.data_file_content {
522                    iceberg::spec::DataContentType::Data => {}
523                    iceberg::spec::DataContentType::EqualityDeletes => {
524                        if equality_ids.is_empty() {
525                            equality_ids = delete_file.equality_ids.clone();
526                        } else if equality_ids != delete_file.equality_ids {
527                            bail!("The schema of iceberg equality delete file must be consistent");
528                        }
529                    }
530                    iceberg::spec::DataContentType::PositionDeletes => {
531                        have_position_delete = true;
532                    }
533                }
534            }
535        }
536        let delete_columns = equality_ids
537            .into_iter()
538            .map(|id| match schema.name_by_field_id(id) {
539                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
540                None => bail!("Delete field id {} not found in schema", id),
541            })
542            .collect::<ConnectorResult<Vec<_>>>()?;
543
544        Ok((delete_columns, have_position_delete))
545    }
546
547    pub async fn get_delete_parameters(
548        &self,
549        time_travel_info: Option<IcebergTimeTravelInfo>,
550    ) -> ConnectorResult<(Vec<String>, bool)> {
551        let table = self.config.load_table().await?;
552        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
553        if snapshot_id.is_none() {
554            return Ok((vec![], false));
555        }
556        let snapshot_id = snapshot_id.unwrap();
557        Self::all_delete_parameters(&table, snapshot_id).await
558    }
559
560    /// Uniformly distribute scan tasks to compute nodes.
561    /// It's deterministic so that it can best utilize the data locality.
562    ///
563    /// # Arguments
564    /// * `file_scan_tasks`: The file scan tasks to be split.
565    /// * `split_num`: The number of splits to be created.
566    ///
567    /// This algorithm is based on a min-heap. It will push all groups into the heap, and then pop the smallest group and add the file scan task to it.
568    /// Ensure that the total length of each group is as balanced as possible.
569    /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
570    /// The space complexity is O(k), where k is the number of splits.
571    /// The algorithm is stable, so the order of the file scan tasks will be preserved.
572    fn split_n_vecs(
573        file_scan_tasks: Vec<FileScanTask>,
574        split_num: usize,
575    ) -> Vec<Vec<FileScanTask>> {
576        use std::cmp::{Ordering, Reverse};
577
578        #[derive(Default)]
579        struct FileScanTaskGroup {
580            idx: usize,
581            tasks: Vec<FileScanTask>,
582            total_length: u64,
583        }
584
585        impl Ord for FileScanTaskGroup {
586            fn cmp(&self, other: &Self) -> Ordering {
587                // when total_length is the same, we will sort by index
588                if self.total_length == other.total_length {
589                    self.idx.cmp(&other.idx)
590                } else {
591                    self.total_length.cmp(&other.total_length)
592                }
593            }
594        }
595
596        impl PartialOrd for FileScanTaskGroup {
597            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
598                Some(self.cmp(other))
599            }
600        }
601
602        impl Eq for FileScanTaskGroup {}
603
604        impl PartialEq for FileScanTaskGroup {
605            fn eq(&self, other: &Self) -> bool {
606                self.total_length == other.total_length
607            }
608        }
609
610        let mut heap = BinaryHeap::new();
611        // push all groups into heap
612        for idx in 0..split_num {
613            heap.push(Reverse(FileScanTaskGroup {
614                idx,
615                tasks: vec![],
616                total_length: 0,
617            }));
618        }
619
620        for file_task in file_scan_tasks {
621            let mut group = heap.peek_mut().unwrap();
622            group.0.total_length += file_task.length;
623            group.0.tasks.push(file_task);
624        }
625
626        // convert heap into vec and extract tasks
627        heap.into_vec()
628            .into_iter()
629            .map(|reverse_group| reverse_group.0.tasks)
630            .collect()
631    }
632}
633
634pub struct IcebergScanOpts {
635    pub chunk_size: usize,
636    pub need_seq_num: bool,
637    pub need_file_path_and_pos: bool,
638}
639
640#[try_stream(ok = DataChunk, error = ConnectorError)]
641pub async fn scan_task_to_chunk(
642    table: Table,
643    data_file_scan_task: FileScanTask,
644    IcebergScanOpts {
645        chunk_size,
646        need_seq_num,
647        need_file_path_and_pos,
648    }: IcebergScanOpts,
649    metrics: Option<Arc<IcebergScanMetrics>>,
650) {
651    let table_name = table.identifier().name().to_owned();
652
653    let mut read_bytes = scopeguard::guard(0, |read_bytes| {
654        if let Some(metrics) = metrics {
655            metrics
656                .iceberg_read_bytes
657                .with_guarded_label_values(&[&table_name])
658                .inc_by(read_bytes as _);
659        }
660    });
661
662    let data_file_path = data_file_scan_task.data_file_path.clone();
663    let data_sequence_number = data_file_scan_task.sequence_number;
664
665    let reader = table.reader_builder().with_batch_size(chunk_size).build();
666    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
667
668    // FIXME: what if the start position is not 0? The logic for index seems not correct.
669    let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
670
671    while let Some((index, record_batch)) = record_batch_stream.next().await {
672        let record_batch = record_batch?;
673
674        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
675        if need_seq_num {
676            let (mut columns, visibility) = chunk.into_parts();
677            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
678                vec![data_sequence_number; visibility.len()],
679            ))));
680            chunk = DataChunk::from_parts(columns.into(), visibility)
681        };
682        if need_file_path_and_pos {
683            let (mut columns, visibility) = chunk.into_parts();
684            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
685                vec![data_file_path.as_str(); visibility.len()],
686            ))));
687            let index_start = (index * chunk_size) as i64;
688            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
689                (index_start..(index_start + visibility.len() as i64)).collect::<Vec<i64>>(),
690            ))));
691            chunk = DataChunk::from_parts(columns.into(), visibility)
692        }
693        *read_bytes += chunk.estimated_heap_size() as u64;
694        yield chunk;
695    }
696}
697
698#[derive(Debug)]
699pub struct IcebergFileReader {}
700
701#[async_trait]
702impl SplitReader for IcebergFileReader {
703    type Properties = IcebergProperties;
704    type Split = IcebergSplit;
705
706    async fn new(
707        _props: IcebergProperties,
708        _splits: Vec<IcebergSplit>,
709        _parser_config: ParserConfig,
710        _source_ctx: SourceContextRef,
711        _columns: Option<Vec<Column>>,
712    ) -> ConnectorResult<Self> {
713        unimplemented!()
714    }
715
716    fn into_stream(self) -> BoxSourceChunkStream {
717        unimplemented!()
718    }
719}
720
721#[cfg(test)]
722mod tests {
723    use std::sync::Arc;
724
725    use iceberg::scan::FileScanTask;
726    use iceberg::spec::{DataContentType, Schema};
727
728    use super::*;
729
730    fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
731        FileScanTask {
732            length,
733            start: 0,
734            record_count: Some(0),
735            data_file_path: format!("test_{}.parquet", id),
736            data_file_content: DataContentType::Data,
737            data_file_format: iceberg::spec::DataFileFormat::Parquet,
738            schema: Arc::new(Schema::builder().build().unwrap()),
739            project_field_ids: vec![],
740            predicate: None,
741            deletes: vec![],
742            sequence_number: 0,
743            equality_ids: vec![],
744            file_size_in_bytes: 0,
745        }
746    }
747
748    #[test]
749    fn test_split_n_vecs_basic() {
750        let file_scan_tasks = (1..=12)
751            .map(|i| create_file_scan_task(i + 100, i))
752            .collect::<Vec<_>>(); // Ensure the correct function is called
753
754        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
755
756        assert_eq!(groups.len(), 3);
757
758        let group_lengths: Vec<u64> = groups
759            .iter()
760            .map(|group| group.iter().map(|task| task.length).sum())
761            .collect();
762
763        let max_length = *group_lengths.iter().max().unwrap();
764        let min_length = *group_lengths.iter().min().unwrap();
765        assert!(max_length - min_length <= 10, "Groups should be balanced");
766
767        let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
768        assert_eq!(total_tasks, 12);
769    }
770
771    #[test]
772    fn test_split_n_vecs_empty() {
773        let file_scan_tasks = Vec::new();
774        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
775        assert_eq!(groups.len(), 3);
776        assert!(groups.iter().all(|group| group.is_empty()));
777    }
778
779    #[test]
780    fn test_split_n_vecs_single_task() {
781        let file_scan_tasks = vec![create_file_scan_task(100, 1)];
782        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
783        assert_eq!(groups.len(), 3);
784        assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
785    }
786
787    #[test]
788    fn test_split_n_vecs_uneven_distribution() {
789        let file_scan_tasks = vec![
790            create_file_scan_task(1000, 1),
791            create_file_scan_task(100, 2),
792            create_file_scan_task(100, 3),
793            create_file_scan_task(100, 4),
794            create_file_scan_task(100, 5),
795        ];
796
797        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
798        assert_eq!(groups.len(), 2);
799
800        let group_with_large_task = groups
801            .iter()
802            .find(|group| group.iter().any(|task| task.length == 1000))
803            .unwrap();
804        assert_eq!(group_with_large_task.len(), 1);
805    }
806
807    #[test]
808    fn test_split_n_vecs_same_files_distribution() {
809        let file_scan_tasks = vec![
810            create_file_scan_task(100, 1),
811            create_file_scan_task(100, 2),
812            create_file_scan_task(100, 3),
813            create_file_scan_task(100, 4),
814            create_file_scan_task(100, 5),
815            create_file_scan_task(100, 6),
816            create_file_scan_task(100, 7),
817            create_file_scan_task(100, 8),
818        ];
819
820        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
821            .iter()
822            .map(|g| {
823                g.iter()
824                    .map(|task| task.data_file_path.clone())
825                    .collect::<Vec<_>>()
826            })
827            .collect::<Vec<_>>();
828
829        for _ in 0..10000 {
830            let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
831                .iter()
832                .map(|g| {
833                    g.iter()
834                        .map(|task| task.data_file_path.clone())
835                        .collect::<Vec<_>>()
836                })
837                .collect::<Vec<_>>();
838
839            assert_eq!(groups, groups_2);
840        }
841    }
842}