risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
35use risingwave_common::bail;
36use risingwave_common::catalog::{
37    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
38    Schema,
39};
40use risingwave_common::types::JsonbVal;
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
44use serde::{Deserialize, Serialize};
45
46pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
47use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::error::{ConnectorError, ConnectorResult};
50use crate::parser::ParserConfig;
51use crate::source::{
52    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
53    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
54};
55pub const ICEBERG_CONNECTOR: &str = "iceberg";
56
57#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
58pub struct IcebergProperties {
59    #[serde(flatten)]
60    pub common: IcebergCommon,
61
62    #[serde(flatten)]
63    pub table: IcebergTableIdentifier,
64
65    // For jdbc catalog
66    #[serde(rename = "catalog.jdbc.user")]
67    pub jdbc_user: Option<String>,
68    #[serde(rename = "catalog.jdbc.password")]
69    pub jdbc_password: Option<String>,
70
71    #[serde(flatten)]
72    pub unknown_fields: HashMap<String, String>,
73}
74
75impl EnforceSecret for IcebergProperties {
76    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77        "catalog.jdbc.password",
78    };
79
80    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
81        for prop in prop_iter {
82            IcebergCommon::enforce_one(prop)?;
83            if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
84                return Err(EnforceSecretError {
85                    key: prop.to_owned(),
86                }
87                .into());
88            }
89        }
90        Ok(())
91    }
92}
93
94impl IcebergProperties {
95    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
96        let mut java_catalog_props = HashMap::new();
97        if let Some(jdbc_user) = self.jdbc_user.clone() {
98            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
99        }
100        if let Some(jdbc_password) = self.jdbc_password.clone() {
101            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
102        }
103        // TODO: support path_style_access and java_catalog_props for iceberg source
104        self.common.create_catalog(&java_catalog_props).await
105    }
106
107    pub async fn load_table(&self) -> ConnectorResult<Table> {
108        let mut java_catalog_props = HashMap::new();
109        if let Some(jdbc_user) = self.jdbc_user.clone() {
110            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
111        }
112        if let Some(jdbc_password) = self.jdbc_password.clone() {
113            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
114        }
115        // TODO: support java_catalog_props for iceberg source
116        self.common
117            .load_table(&self.table, &java_catalog_props)
118            .await
119    }
120}
121
122impl SourceProperties for IcebergProperties {
123    type Split = IcebergSplit;
124    type SplitEnumerator = IcebergSplitEnumerator;
125    type SplitReader = IcebergFileReader;
126
127    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
128}
129
130impl UnknownFields for IcebergProperties {
131    fn unknown_fields(&self) -> HashMap<String, String> {
132        self.unknown_fields.clone()
133    }
134}
135
136#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
137pub struct IcebergFileScanTaskJsonStr(String);
138
139impl IcebergFileScanTaskJsonStr {
140    pub fn deserialize(&self) -> FileScanTask {
141        serde_json::from_str(&self.0).unwrap()
142    }
143
144    pub fn serialize(task: &FileScanTask) -> Self {
145        Self(serde_json::to_string(task).unwrap())
146    }
147}
148
149#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
150pub enum IcebergFileScanTask {
151    Data(Vec<FileScanTask>),
152    EqualityDelete(Vec<FileScanTask>),
153    PositionDelete(Vec<FileScanTask>),
154    CountStar(u64),
155}
156
157impl IcebergFileScanTask {
158    pub fn new_count_star(count_sum: u64) -> Self {
159        IcebergFileScanTask::CountStar(count_sum)
160    }
161
162    pub fn new_scan_with_scan_type(
163        iceberg_scan_type: IcebergScanType,
164        data_files: Vec<FileScanTask>,
165        equality_delete_files: Vec<FileScanTask>,
166        position_delete_files: Vec<FileScanTask>,
167    ) -> Self {
168        match iceberg_scan_type {
169            IcebergScanType::EqualityDeleteScan => {
170                IcebergFileScanTask::EqualityDelete(equality_delete_files)
171            }
172            IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
173            IcebergScanType::PositionDeleteScan => {
174                IcebergFileScanTask::PositionDelete(position_delete_files)
175            }
176            IcebergScanType::Unspecified | IcebergScanType::CountStar => {
177                unreachable!("Unspecified iceberg scan type")
178            }
179        }
180    }
181
182    pub fn is_empty(&self) -> bool {
183        match self {
184            IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
185            IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
186                equality_delete_files.is_empty()
187            }
188            IcebergFileScanTask::PositionDelete(position_delete_files) => {
189                position_delete_files.is_empty()
190            }
191            IcebergFileScanTask::CountStar(_) => false,
192        }
193    }
194
195    pub fn files(&self) -> Vec<String> {
196        match self {
197            IcebergFileScanTask::Data(file_scan_tasks)
198            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
199            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
200                .iter()
201                .map(|task| task.data_file_path.clone())
202                .collect(),
203            IcebergFileScanTask::CountStar(_) => vec![],
204        }
205    }
206}
207
208#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
209pub struct IcebergSplit {
210    pub split_id: i64,
211    pub snapshot_id: i64,
212    pub task: IcebergFileScanTask,
213}
214
215impl IcebergSplit {
216    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
217        if let IcebergScanType::CountStar = iceberg_scan_type {
218            Self {
219                split_id: 0,
220                snapshot_id: 0,
221                task: IcebergFileScanTask::new_count_star(0),
222            }
223        } else {
224            Self {
225                split_id: 0,
226                snapshot_id: 0,
227                task: IcebergFileScanTask::new_scan_with_scan_type(
228                    iceberg_scan_type,
229                    vec![],
230                    vec![],
231                    vec![],
232                ),
233            }
234        }
235    }
236}
237
238impl SplitMetaData for IcebergSplit {
239    fn id(&self) -> SplitId {
240        self.split_id.to_string().into()
241    }
242
243    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
244        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
245    }
246
247    fn encode_to_json(&self) -> JsonbVal {
248        serde_json::to_value(self.clone()).unwrap().into()
249    }
250
251    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
252        unimplemented!()
253    }
254}
255
256#[derive(Debug, Clone)]
257pub struct IcebergSplitEnumerator {
258    config: IcebergProperties,
259}
260
261#[derive(Debug, Clone)]
262pub struct IcebergDeleteParameters {
263    pub equality_delete_columns: Vec<String>,
264    pub has_position_delete: bool,
265    pub snapshot_id: Option<i64>,
266}
267
268#[async_trait]
269impl SplitEnumerator for IcebergSplitEnumerator {
270    type Properties = IcebergProperties;
271    type Split = IcebergSplit;
272
273    async fn new(
274        properties: Self::Properties,
275        context: SourceEnumeratorContextRef,
276    ) -> ConnectorResult<Self> {
277        Ok(Self::new_inner(properties, context))
278    }
279
280    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
281        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
282        // instead of relying on SplitEnumerator on meta.
283        // TODO: add some validation logic here.
284        Ok(vec![])
285    }
286}
287impl IcebergSplitEnumerator {
288    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
289        Self { config: properties }
290    }
291}
292
293pub enum IcebergTimeTravelInfo {
294    Version(i64),
295    TimestampMs(i64),
296}
297
298impl IcebergSplitEnumerator {
299    pub fn get_snapshot_id(
300        table: &Table,
301        time_travel_info: Option<IcebergTimeTravelInfo>,
302    ) -> ConnectorResult<Option<i64>> {
303        let current_snapshot = table.metadata().current_snapshot();
304        if current_snapshot.is_none() {
305            return Ok(None);
306        }
307
308        let snapshot_id = match time_travel_info {
309            Some(IcebergTimeTravelInfo::Version(version)) => {
310                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
311                    bail!("Cannot find the snapshot id in the iceberg table.");
312                };
313                snapshot.snapshot_id()
314            }
315            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
316                let snapshot = table
317                    .metadata()
318                    .snapshots()
319                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
320                    .max_by_key(|snapshot| snapshot.timestamp_ms());
321                match snapshot {
322                    Some(snapshot) => snapshot.snapshot_id(),
323                    None => {
324                        // convert unix time to human-readable time
325                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
326                        if let Some(time) = time {
327                            bail!("Cannot find a snapshot older than {}", time);
328                        } else {
329                            bail!("Cannot find a snapshot");
330                        }
331                    }
332                }
333            }
334            None => {
335                assert!(current_snapshot.is_some());
336                current_snapshot.unwrap().snapshot_id()
337            }
338        };
339        Ok(Some(snapshot_id))
340    }
341
342    pub async fn list_splits_batch(
343        &self,
344        schema: Schema,
345        snapshot_id: Option<i64>,
346        batch_parallelism: usize,
347        iceberg_scan_type: IcebergScanType,
348        predicate: IcebergPredicate,
349    ) -> ConnectorResult<Vec<IcebergSplit>> {
350        if batch_parallelism == 0 {
351            bail!("Batch parallelism is 0. Cannot split the iceberg files.");
352        }
353        let table = self.config.load_table().await?;
354        if snapshot_id.is_none() {
355            // If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
356            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
357        }
358        if let IcebergScanType::CountStar = iceberg_scan_type {
359            self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
360                .await
361        } else {
362            self.list_splits_batch_scan(
363                &table,
364                snapshot_id.unwrap(),
365                schema,
366                batch_parallelism,
367                iceberg_scan_type,
368                predicate,
369            )
370            .await
371        }
372    }
373
374    async fn list_splits_batch_scan(
375        &self,
376        table: &Table,
377        snapshot_id: i64,
378        schema: Schema,
379        batch_parallelism: usize,
380        iceberg_scan_type: IcebergScanType,
381        predicate: IcebergPredicate,
382    ) -> ConnectorResult<Vec<IcebergSplit>> {
383        let schema_names = schema.names();
384        let require_names = schema_names
385            .iter()
386            .filter(|name| {
387                name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
388                    && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
389                    && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
390            })
391            .cloned()
392            .collect_vec();
393
394        let table_schema = table.metadata().current_schema();
395        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
396
397        let mut position_delete_files = vec![];
398        let mut position_delete_files_set = HashSet::new();
399        let mut data_files = vec![];
400        let mut equality_delete_files = vec![];
401        let mut equality_delete_files_set = HashSet::new();
402        let scan = table
403            .scan()
404            .with_filter(predicate)
405            .snapshot_id(snapshot_id)
406            .select(require_names)
407            .build()
408            .map_err(|e| anyhow!(e))?;
409
410        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
411
412        #[for_await]
413        for task in file_scan_stream {
414            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
415
416            // Collect delete files for separate scan types, but keep task.deletes intact
417            for delete_file in &task.deletes {
418                let delete_file = delete_file.as_ref().clone();
419                match delete_file.data_file_content {
420                    iceberg::spec::DataContentType::Data => {
421                        bail!("Data file should not in task deletes");
422                    }
423                    iceberg::spec::DataContentType::EqualityDeletes => {
424                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
425                            equality_delete_files.push(delete_file);
426                        }
427                    }
428                    iceberg::spec::DataContentType::PositionDeletes => {
429                        let mut delete_file = delete_file;
430                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
431                            delete_file.project_field_ids = Vec::default();
432                            position_delete_files.push(delete_file);
433                        }
434                    }
435                }
436            }
437
438            match task.data_file_content {
439                iceberg::spec::DataContentType::Data => {
440                    // Keep the original task with its deletes field intact
441                    data_files.push(task);
442                }
443                iceberg::spec::DataContentType::EqualityDeletes => {
444                    bail!("Equality delete files should not be in the data files");
445                }
446                iceberg::spec::DataContentType::PositionDeletes => {
447                    bail!("Position delete files should not be in the data files");
448                }
449            }
450        }
451        // evenly split the files into splits based on the parallelism.
452        let data_files = Self::split_n_vecs(data_files, batch_parallelism);
453        let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
454        let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
455
456        let splits = data_files
457            .into_iter()
458            .zip_eq_fast(equality_delete_files.into_iter())
459            .zip_eq_fast(position_delete_files.into_iter())
460            .enumerate()
461            .map(
462                |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
463                    split_id: index as i64,
464                    snapshot_id,
465                    task: IcebergFileScanTask::new_scan_with_scan_type(
466                        iceberg_scan_type,
467                        data_file,
468                        equality_delete_file,
469                        position_delete_file,
470                    ),
471                },
472            )
473            .filter(|split| !split.task.is_empty())
474            .collect_vec();
475
476        if splits.is_empty() {
477            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
478        }
479        Ok(splits)
480    }
481
482    pub async fn list_splits_batch_count_star(
483        &self,
484        table: &Table,
485        snapshot_id: i64,
486    ) -> ConnectorResult<Vec<IcebergSplit>> {
487        let mut record_counts = 0;
488        let scan = table
489            .scan()
490            .snapshot_id(snapshot_id)
491            .build()
492            .map_err(|e| anyhow!(e))?;
493        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
494
495        #[for_await]
496        for task in file_scan_stream {
497            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
498            assert_eq!(task.data_file_content, DataContentType::Data);
499            assert!(task.deletes.is_empty());
500            record_counts += task.record_count.expect("must have");
501        }
502        let split = IcebergSplit {
503            split_id: 0,
504            snapshot_id,
505            task: IcebergFileScanTask::new_count_star(record_counts),
506        };
507        Ok(vec![split])
508    }
509
510    /// List all files in the snapshot to check if there are deletes.
511    pub async fn all_delete_parameters(
512        table: &Table,
513        snapshot_id: i64,
514    ) -> ConnectorResult<(Vec<String>, bool)> {
515        let scan = table
516            .scan()
517            .snapshot_id(snapshot_id)
518            .build()
519            .map_err(|e| anyhow!(e))?;
520        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
521        let schema = scan
522            .snapshot()
523            .context("snapshot not found")?
524            .schema(table.metadata())?;
525        let mut equality_ids: Option<Vec<i32>> = None;
526        let mut have_position_delete = false;
527        #[for_await]
528        for task in file_scan_stream {
529            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
530            for delete_file in task.deletes {
531                match delete_file.data_file_content {
532                    iceberg::spec::DataContentType::Data => {}
533                    iceberg::spec::DataContentType::EqualityDeletes => {
534                        if equality_ids.is_none() {
535                            equality_ids = delete_file.equality_ids.clone();
536                        } else if equality_ids != delete_file.equality_ids {
537                            bail!("The schema of iceberg equality delete file must be consistent");
538                        }
539                    }
540                    iceberg::spec::DataContentType::PositionDeletes => {
541                        have_position_delete = true;
542                    }
543                }
544            }
545        }
546        let delete_columns = equality_ids
547            .unwrap_or_default()
548            .into_iter()
549            .map(|id| match schema.name_by_field_id(id) {
550                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
551                None => bail!("Delete field id {} not found in schema", id),
552            })
553            .collect::<ConnectorResult<Vec<_>>>()?;
554
555        Ok((delete_columns, have_position_delete))
556    }
557
558    pub async fn get_delete_parameters(
559        &self,
560        time_travel_info: Option<IcebergTimeTravelInfo>,
561    ) -> ConnectorResult<IcebergDeleteParameters> {
562        let table = self.config.load_table().await?;
563        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
564        match snapshot_id {
565            Some(snapshot_id) => {
566                let (delete_columns, have_position_delete) =
567                    Self::all_delete_parameters(&table, snapshot_id).await?;
568                Ok(IcebergDeleteParameters {
569                    equality_delete_columns: delete_columns,
570                    has_position_delete: have_position_delete,
571                    snapshot_id: Some(snapshot_id),
572                })
573            }
574            None => Ok(IcebergDeleteParameters {
575                equality_delete_columns: vec![],
576                has_position_delete: false,
577                snapshot_id: None,
578            }),
579        }
580    }
581
582    /// Uniformly distribute scan tasks to compute nodes.
583    /// It's deterministic so that it can best utilize the data locality.
584    ///
585    /// # Arguments
586    /// * `file_scan_tasks`: The file scan tasks to be split.
587    /// * `split_num`: The number of splits to be created.
588    ///
589    /// This algorithm is based on a min-heap. It will push all groups into the heap, and then pop the smallest group and add the file scan task to it.
590    /// Ensure that the total length of each group is as balanced as possible.
591    /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
592    /// The space complexity is O(k), where k is the number of splits.
593    /// The algorithm is stable, so the order of the file scan tasks will be preserved.
594    pub fn split_n_vecs(
595        file_scan_tasks: Vec<FileScanTask>,
596        split_num: usize,
597    ) -> Vec<Vec<FileScanTask>> {
598        use std::cmp::{Ordering, Reverse};
599
600        #[derive(Default)]
601        struct FileScanTaskGroup {
602            idx: usize,
603            tasks: Vec<FileScanTask>,
604            total_length: u64,
605        }
606
607        impl Ord for FileScanTaskGroup {
608            fn cmp(&self, other: &Self) -> Ordering {
609                // when total_length is the same, we will sort by index
610                if self.total_length == other.total_length {
611                    self.idx.cmp(&other.idx)
612                } else {
613                    self.total_length.cmp(&other.total_length)
614                }
615            }
616        }
617
618        impl PartialOrd for FileScanTaskGroup {
619            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
620                Some(self.cmp(other))
621            }
622        }
623
624        impl Eq for FileScanTaskGroup {}
625
626        impl PartialEq for FileScanTaskGroup {
627            fn eq(&self, other: &Self) -> bool {
628                self.total_length == other.total_length
629            }
630        }
631
632        let mut heap = BinaryHeap::new();
633        // push all groups into heap
634        for idx in 0..split_num {
635            heap.push(Reverse(FileScanTaskGroup {
636                idx,
637                tasks: vec![],
638                total_length: 0,
639            }));
640        }
641
642        for file_task in file_scan_tasks {
643            let mut group = heap.peek_mut().unwrap();
644            group.0.total_length += file_task.length;
645            group.0.tasks.push(file_task);
646        }
647
648        // convert heap into vec and extract tasks
649        heap.into_vec()
650            .into_iter()
651            .map(|reverse_group| reverse_group.0.tasks)
652            .collect()
653    }
654}
655
656pub struct IcebergScanOpts {
657    pub chunk_size: usize,
658    pub need_seq_num: bool,
659    pub need_file_path_and_pos: bool,
660    pub handle_delete_files: bool,
661}
662
663/// Scan a data file. Delete files are handled by the iceberg-rust `reader.read` implementation.
664#[try_stream(ok = DataChunk, error = ConnectorError)]
665pub async fn scan_task_to_chunk_with_deletes(
666    table: Table,
667    mut data_file_scan_task: FileScanTask,
668    IcebergScanOpts {
669        chunk_size,
670        need_seq_num,
671        need_file_path_and_pos,
672        handle_delete_files,
673    }: IcebergScanOpts,
674    metrics: Option<Arc<IcebergScanMetrics>>,
675) {
676    let table_name = table.identifier().name().to_owned();
677
678    let mut read_bytes = scopeguard::guard(0, |read_bytes| {
679        if let Some(metrics) = metrics.clone() {
680            metrics
681                .iceberg_read_bytes
682                .with_guarded_label_values(&[&table_name])
683                .inc_by(read_bytes as _);
684        }
685    });
686
687    let data_file_path = data_file_scan_task.data_file_path.clone();
688    let data_sequence_number = data_file_scan_task.sequence_number;
689
690    tracing::debug!(
691        "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
692        data_file_path,
693        handle_delete_files,
694        data_file_scan_task.deletes.len()
695    );
696
697    if !handle_delete_files {
698        // Keep the delete files from being applied when the caller opts out.
699        data_file_scan_task.deletes.clear();
700    }
701
702    // Read the data file; delete application is delegated to the reader.
703    let reader = table.reader_builder().with_batch_size(chunk_size).build();
704    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
705
706    let record_batch_stream: iceberg::scan::ArrowRecordBatchStream =
707        reader.read(Box::pin(file_scan_stream))?;
708    let mut record_batch_stream = record_batch_stream.enumerate();
709
710    // Process each record batch. Delete application is handled by the SDK.
711    while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
712        let record_batch = record_batch?;
713        let batch_start_pos = (batch_index * chunk_size) as i64;
714
715        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
716        let row_count = chunk.capacity();
717
718        // Add metadata columns if requested
719        if need_seq_num {
720            let (mut columns, visibility) = chunk.into_parts();
721            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
722                std::iter::repeat_n(data_sequence_number, row_count),
723            ))));
724            chunk = DataChunk::from_parts(columns.into(), visibility);
725        }
726
727        if need_file_path_and_pos {
728            let (mut columns, visibility) = chunk.into_parts();
729            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
730                std::iter::repeat_n(data_file_path.as_str(), row_count),
731            ))));
732
733            // Generate position values for each row in the batch
734            let positions: Vec<i64> =
735                (batch_start_pos..(batch_start_pos + row_count as i64)).collect();
736            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
737
738            chunk = DataChunk::from_parts(columns.into(), visibility);
739        }
740
741        *read_bytes += chunk.estimated_heap_size() as u64;
742        yield chunk;
743    }
744}
745
746#[derive(Debug)]
747pub struct IcebergFileReader {}
748
749#[async_trait]
750impl SplitReader for IcebergFileReader {
751    type Properties = IcebergProperties;
752    type Split = IcebergSplit;
753
754    async fn new(
755        _props: IcebergProperties,
756        _splits: Vec<IcebergSplit>,
757        _parser_config: ParserConfig,
758        _source_ctx: SourceContextRef,
759        _columns: Option<Vec<Column>>,
760    ) -> ConnectorResult<Self> {
761        unimplemented!()
762    }
763
764    fn into_stream(self) -> BoxSourceChunkStream {
765        unimplemented!()
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use std::sync::Arc;
772
773    use iceberg::scan::FileScanTask;
774    use iceberg::spec::{DataContentType, Schema};
775
776    use super::*;
777
778    fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
779        FileScanTask {
780            length,
781            start: 0,
782            record_count: Some(0),
783            data_file_path: format!("test_{}.parquet", id),
784            data_file_content: DataContentType::Data,
785            data_file_format: iceberg::spec::DataFileFormat::Parquet,
786            schema: Arc::new(Schema::builder().build().unwrap()),
787            project_field_ids: vec![],
788            predicate: None,
789            deletes: vec![],
790            sequence_number: 0,
791            equality_ids: None,
792            file_size_in_bytes: 0,
793        }
794    }
795
796    #[test]
797    fn test_split_n_vecs_basic() {
798        let file_scan_tasks = (1..=12)
799            .map(|i| create_file_scan_task(i + 100, i))
800            .collect::<Vec<_>>(); // Ensure the correct function is called
801
802        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
803
804        assert_eq!(groups.len(), 3);
805
806        let group_lengths: Vec<u64> = groups
807            .iter()
808            .map(|group| group.iter().map(|task| task.length).sum())
809            .collect();
810
811        let max_length = *group_lengths.iter().max().unwrap();
812        let min_length = *group_lengths.iter().min().unwrap();
813        assert!(max_length - min_length <= 10, "Groups should be balanced");
814
815        let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
816        assert_eq!(total_tasks, 12);
817    }
818
819    #[test]
820    fn test_split_n_vecs_empty() {
821        let file_scan_tasks = Vec::new();
822        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
823        assert_eq!(groups.len(), 3);
824        assert!(groups.iter().all(|group| group.is_empty()));
825    }
826
827    #[test]
828    fn test_split_n_vecs_single_task() {
829        let file_scan_tasks = vec![create_file_scan_task(100, 1)];
830        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
831        assert_eq!(groups.len(), 3);
832        assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
833    }
834
835    #[test]
836    fn test_split_n_vecs_uneven_distribution() {
837        let file_scan_tasks = vec![
838            create_file_scan_task(1000, 1),
839            create_file_scan_task(100, 2),
840            create_file_scan_task(100, 3),
841            create_file_scan_task(100, 4),
842            create_file_scan_task(100, 5),
843        ];
844
845        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
846        assert_eq!(groups.len(), 2);
847
848        let group_with_large_task = groups
849            .iter()
850            .find(|group| group.iter().any(|task| task.length == 1000))
851            .unwrap();
852        assert_eq!(group_with_large_task.len(), 1);
853    }
854
855    #[test]
856    fn test_split_n_vecs_same_files_distribution() {
857        let file_scan_tasks = vec![
858            create_file_scan_task(100, 1),
859            create_file_scan_task(100, 2),
860            create_file_scan_task(100, 3),
861            create_file_scan_task(100, 4),
862            create_file_scan_task(100, 5),
863            create_file_scan_task(100, 6),
864            create_file_scan_task(100, 7),
865            create_file_scan_task(100, 8),
866        ];
867
868        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
869            .iter()
870            .map(|g| {
871                g.iter()
872                    .map(|task| task.data_file_path.clone())
873                    .collect::<Vec<_>>()
874            })
875            .collect::<Vec<_>>();
876
877        for _ in 0..10000 {
878            let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
879                .iter()
880                .map(|g| {
881                    g.iter()
882                        .map(|task| task.data_file_path.clone())
883                        .collect::<Vec<_>>()
884                })
885                .collect::<Vec<_>>();
886
887            assert_eq!(groups, groups_2);
888        }
889    }
890}