risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::arrow::arrow_array_iceberg::Array;
35use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
36use risingwave_common::bail;
37use risingwave_common::bitmap::Bitmap;
38use risingwave_common::catalog::{
39    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
40    Schema,
41};
42use risingwave_common::types::{Datum, DatumRef, JsonbVal, ToDatumRef, ToOwnedDatum};
43use risingwave_common::util::iter_util::ZipEqFast;
44use risingwave_common_estimate_size::EstimateSize;
45use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
46use serde::{Deserialize, Serialize};
47
48pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
49use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
50use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
51use crate::error::{ConnectorError, ConnectorResult};
52use crate::parser::ParserConfig;
53use crate::source::{
54    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
55    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
56};
57pub const ICEBERG_CONNECTOR: &str = "iceberg";
58
59#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
60pub struct IcebergProperties {
61    #[serde(flatten)]
62    pub common: IcebergCommon,
63
64    #[serde(flatten)]
65    pub table: IcebergTableIdentifier,
66
67    // For jdbc catalog
68    #[serde(rename = "catalog.jdbc.user")]
69    pub jdbc_user: Option<String>,
70    #[serde(rename = "catalog.jdbc.password")]
71    pub jdbc_password: Option<String>,
72
73    #[serde(flatten)]
74    pub unknown_fields: HashMap<String, String>,
75}
76
77impl EnforceSecret for IcebergProperties {
78    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
79        "catalog.jdbc.password",
80    };
81
82    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
83        for prop in prop_iter {
84            IcebergCommon::enforce_one(prop)?;
85            if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
86                return Err(EnforceSecretError {
87                    key: prop.to_owned(),
88                }
89                .into());
90            }
91        }
92        Ok(())
93    }
94}
95
96impl IcebergProperties {
97    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
98        let mut java_catalog_props = HashMap::new();
99        if let Some(jdbc_user) = self.jdbc_user.clone() {
100            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
101        }
102        if let Some(jdbc_password) = self.jdbc_password.clone() {
103            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
104        }
105        // TODO: support path_style_access and java_catalog_props for iceberg source
106        self.common.create_catalog(&java_catalog_props).await
107    }
108
109    pub async fn load_table(&self) -> ConnectorResult<Table> {
110        let mut java_catalog_props = HashMap::new();
111        if let Some(jdbc_user) = self.jdbc_user.clone() {
112            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
113        }
114        if let Some(jdbc_password) = self.jdbc_password.clone() {
115            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
116        }
117        // TODO: support java_catalog_props for iceberg source
118        self.common
119            .load_table(&self.table, &java_catalog_props)
120            .await
121    }
122}
123
124impl SourceProperties for IcebergProperties {
125    type Split = IcebergSplit;
126    type SplitEnumerator = IcebergSplitEnumerator;
127    type SplitReader = IcebergFileReader;
128
129    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
130}
131
132impl UnknownFields for IcebergProperties {
133    fn unknown_fields(&self) -> HashMap<String, String> {
134        self.unknown_fields.clone()
135    }
136}
137
138#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
139pub struct IcebergFileScanTaskJsonStr(String);
140
141impl IcebergFileScanTaskJsonStr {
142    pub fn deserialize(&self) -> FileScanTask {
143        serde_json::from_str(&self.0).unwrap()
144    }
145
146    pub fn serialize(task: &FileScanTask) -> Self {
147        Self(serde_json::to_string(task).unwrap())
148    }
149}
150
151#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
152pub enum IcebergFileScanTask {
153    Data(Vec<FileScanTask>),
154    EqualityDelete(Vec<FileScanTask>),
155    PositionDelete(Vec<FileScanTask>),
156    CountStar(u64),
157}
158
159impl IcebergFileScanTask {
160    pub fn new_count_star(count_sum: u64) -> Self {
161        IcebergFileScanTask::CountStar(count_sum)
162    }
163
164    pub fn new_scan_with_scan_type(
165        iceberg_scan_type: IcebergScanType,
166        data_files: Vec<FileScanTask>,
167        equality_delete_files: Vec<FileScanTask>,
168        position_delete_files: Vec<FileScanTask>,
169    ) -> Self {
170        match iceberg_scan_type {
171            IcebergScanType::EqualityDeleteScan => {
172                IcebergFileScanTask::EqualityDelete(equality_delete_files)
173            }
174            IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
175            IcebergScanType::PositionDeleteScan => {
176                IcebergFileScanTask::PositionDelete(position_delete_files)
177            }
178            IcebergScanType::Unspecified | IcebergScanType::CountStar => {
179                unreachable!("Unspecified iceberg scan type")
180            }
181        }
182    }
183
184    pub fn is_empty(&self) -> bool {
185        match self {
186            IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
187            IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
188                equality_delete_files.is_empty()
189            }
190            IcebergFileScanTask::PositionDelete(position_delete_files) => {
191                position_delete_files.is_empty()
192            }
193            IcebergFileScanTask::CountStar(_) => false,
194        }
195    }
196
197    pub fn files(&self) -> Vec<String> {
198        match self {
199            IcebergFileScanTask::Data(file_scan_tasks)
200            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
201            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
202                .iter()
203                .map(|task| task.data_file_path.clone())
204                .collect(),
205            IcebergFileScanTask::CountStar(_) => vec![],
206        }
207    }
208}
209
210#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
211pub struct IcebergSplit {
212    pub split_id: i64,
213    pub snapshot_id: i64,
214    pub task: IcebergFileScanTask,
215}
216
217impl IcebergSplit {
218    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
219        if let IcebergScanType::CountStar = iceberg_scan_type {
220            Self {
221                split_id: 0,
222                snapshot_id: 0,
223                task: IcebergFileScanTask::new_count_star(0),
224            }
225        } else {
226            Self {
227                split_id: 0,
228                snapshot_id: 0,
229                task: IcebergFileScanTask::new_scan_with_scan_type(
230                    iceberg_scan_type,
231                    vec![],
232                    vec![],
233                    vec![],
234                ),
235            }
236        }
237    }
238}
239
240impl SplitMetaData for IcebergSplit {
241    fn id(&self) -> SplitId {
242        self.split_id.to_string().into()
243    }
244
245    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
246        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
247    }
248
249    fn encode_to_json(&self) -> JsonbVal {
250        serde_json::to_value(self.clone()).unwrap().into()
251    }
252
253    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
254        unimplemented!()
255    }
256}
257
258#[derive(Debug, Clone)]
259pub struct IcebergSplitEnumerator {
260    config: IcebergProperties,
261}
262
263#[derive(Debug, Clone)]
264pub struct IcebergDeleteParameters {
265    pub equality_delete_columns: Vec<String>,
266    pub has_position_delete: bool,
267    pub snapshot_id: Option<i64>,
268}
269
270#[async_trait]
271impl SplitEnumerator for IcebergSplitEnumerator {
272    type Properties = IcebergProperties;
273    type Split = IcebergSplit;
274
275    async fn new(
276        properties: Self::Properties,
277        context: SourceEnumeratorContextRef,
278    ) -> ConnectorResult<Self> {
279        Ok(Self::new_inner(properties, context))
280    }
281
282    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
283        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
284        // instead of relying on SplitEnumerator on meta.
285        // TODO: add some validation logic here.
286        Ok(vec![])
287    }
288}
289impl IcebergSplitEnumerator {
290    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
291        Self { config: properties }
292    }
293}
294
295pub enum IcebergTimeTravelInfo {
296    Version(i64),
297    TimestampMs(i64),
298}
299
300impl IcebergSplitEnumerator {
301    pub fn get_snapshot_id(
302        table: &Table,
303        time_travel_info: Option<IcebergTimeTravelInfo>,
304    ) -> ConnectorResult<Option<i64>> {
305        let current_snapshot = table.metadata().current_snapshot();
306        if current_snapshot.is_none() {
307            return Ok(None);
308        }
309
310        let snapshot_id = match time_travel_info {
311            Some(IcebergTimeTravelInfo::Version(version)) => {
312                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
313                    bail!("Cannot find the snapshot id in the iceberg table.");
314                };
315                snapshot.snapshot_id()
316            }
317            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
318                let snapshot = table
319                    .metadata()
320                    .snapshots()
321                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
322                    .max_by_key(|snapshot| snapshot.timestamp_ms());
323                match snapshot {
324                    Some(snapshot) => snapshot.snapshot_id(),
325                    None => {
326                        // convert unix time to human-readable time
327                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
328                        if let Some(time) = time {
329                            bail!("Cannot find a snapshot older than {}", time);
330                        } else {
331                            bail!("Cannot find a snapshot");
332                        }
333                    }
334                }
335            }
336            None => {
337                assert!(current_snapshot.is_some());
338                current_snapshot.unwrap().snapshot_id()
339            }
340        };
341        Ok(Some(snapshot_id))
342    }
343
344    pub async fn list_splits_batch(
345        &self,
346        schema: Schema,
347        snapshot_id: Option<i64>,
348        batch_parallelism: usize,
349        iceberg_scan_type: IcebergScanType,
350        predicate: IcebergPredicate,
351    ) -> ConnectorResult<Vec<IcebergSplit>> {
352        if batch_parallelism == 0 {
353            bail!("Batch parallelism is 0. Cannot split the iceberg files.");
354        }
355        let table = self.config.load_table().await?;
356        if snapshot_id.is_none() {
357            // If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
358            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
359        }
360        if let IcebergScanType::CountStar = iceberg_scan_type {
361            self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
362                .await
363        } else {
364            self.list_splits_batch_scan(
365                &table,
366                snapshot_id.unwrap(),
367                schema,
368                batch_parallelism,
369                iceberg_scan_type,
370                predicate,
371            )
372            .await
373        }
374    }
375
376    async fn list_splits_batch_scan(
377        &self,
378        table: &Table,
379        snapshot_id: i64,
380        schema: Schema,
381        batch_parallelism: usize,
382        iceberg_scan_type: IcebergScanType,
383        predicate: IcebergPredicate,
384    ) -> ConnectorResult<Vec<IcebergSplit>> {
385        let schema_names = schema.names();
386        let require_names = schema_names
387            .iter()
388            .filter(|name| {
389                name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
390                    && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
391                    && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
392            })
393            .cloned()
394            .collect_vec();
395
396        let table_schema = table.metadata().current_schema();
397        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
398
399        let mut position_delete_files = vec![];
400        let mut position_delete_files_set = HashSet::new();
401        let mut data_files = vec![];
402        let mut equality_delete_files = vec![];
403        let mut equality_delete_files_set = HashSet::new();
404        let scan = table
405            .scan()
406            .with_filter(predicate)
407            .snapshot_id(snapshot_id)
408            .with_delete_file_processing_enabled(true)
409            .select(require_names)
410            .build()
411            .map_err(|e| anyhow!(e))?;
412
413        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
414
415        #[for_await]
416        for task in file_scan_stream {
417            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
418
419            // Collect delete files for separate scan types, but keep task.deletes intact
420            for delete_file in &task.deletes {
421                let delete_file = delete_file.as_ref().clone();
422                match delete_file.data_file_content {
423                    iceberg::spec::DataContentType::Data => {
424                        bail!("Data file should not in task deletes");
425                    }
426                    iceberg::spec::DataContentType::EqualityDeletes => {
427                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
428                            equality_delete_files.push(delete_file);
429                        }
430                    }
431                    iceberg::spec::DataContentType::PositionDeletes => {
432                        let mut delete_file = delete_file;
433                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
434                            delete_file.project_field_ids = Vec::default();
435                            position_delete_files.push(delete_file);
436                        }
437                    }
438                }
439            }
440
441            match task.data_file_content {
442                iceberg::spec::DataContentType::Data => {
443                    // Keep the original task with its deletes field intact
444                    data_files.push(task);
445                }
446                iceberg::spec::DataContentType::EqualityDeletes => {
447                    bail!("Equality delete files should not be in the data files");
448                }
449                iceberg::spec::DataContentType::PositionDeletes => {
450                    bail!("Position delete files should not be in the data files");
451                }
452            }
453        }
454        // evenly split the files into splits based on the parallelism.
455        let data_files = Self::split_n_vecs(data_files, batch_parallelism);
456        let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
457        let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
458
459        let splits = data_files
460            .into_iter()
461            .zip_eq_fast(equality_delete_files.into_iter())
462            .zip_eq_fast(position_delete_files.into_iter())
463            .enumerate()
464            .map(
465                |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
466                    split_id: index as i64,
467                    snapshot_id,
468                    task: IcebergFileScanTask::new_scan_with_scan_type(
469                        iceberg_scan_type,
470                        data_file,
471                        equality_delete_file,
472                        position_delete_file,
473                    ),
474                },
475            )
476            .filter(|split| !split.task.is_empty())
477            .collect_vec();
478
479        if splits.is_empty() {
480            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
481        }
482        Ok(splits)
483    }
484
485    pub async fn list_splits_batch_count_star(
486        &self,
487        table: &Table,
488        snapshot_id: i64,
489    ) -> ConnectorResult<Vec<IcebergSplit>> {
490        let mut record_counts = 0;
491        let scan = table
492            .scan()
493            .snapshot_id(snapshot_id)
494            .with_delete_file_processing_enabled(true)
495            .build()
496            .map_err(|e| anyhow!(e))?;
497        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
498
499        #[for_await]
500        for task in file_scan_stream {
501            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
502            assert_eq!(task.data_file_content, DataContentType::Data);
503            assert!(task.deletes.is_empty());
504            record_counts += task.record_count.expect("must have");
505        }
506        let split = IcebergSplit {
507            split_id: 0,
508            snapshot_id,
509            task: IcebergFileScanTask::new_count_star(record_counts),
510        };
511        Ok(vec![split])
512    }
513
514    /// List all files in the snapshot to check if there are deletes.
515    pub async fn all_delete_parameters(
516        table: &Table,
517        snapshot_id: i64,
518    ) -> ConnectorResult<(Vec<String>, bool)> {
519        let scan = table
520            .scan()
521            .snapshot_id(snapshot_id)
522            .with_delete_file_processing_enabled(true)
523            .build()
524            .map_err(|e| anyhow!(e))?;
525        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
526        let schema = scan.snapshot().schema(table.metadata())?;
527        let mut equality_ids = vec![];
528        let mut have_position_delete = false;
529        #[for_await]
530        for task in file_scan_stream {
531            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
532            for delete_file in task.deletes {
533                match delete_file.data_file_content {
534                    iceberg::spec::DataContentType::Data => {}
535                    iceberg::spec::DataContentType::EqualityDeletes => {
536                        if equality_ids.is_empty() {
537                            equality_ids = delete_file.equality_ids.clone();
538                        } else if equality_ids != delete_file.equality_ids {
539                            bail!("The schema of iceberg equality delete file must be consistent");
540                        }
541                    }
542                    iceberg::spec::DataContentType::PositionDeletes => {
543                        have_position_delete = true;
544                    }
545                }
546            }
547        }
548        let delete_columns = equality_ids
549            .into_iter()
550            .map(|id| match schema.name_by_field_id(id) {
551                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
552                None => bail!("Delete field id {} not found in schema", id),
553            })
554            .collect::<ConnectorResult<Vec<_>>>()?;
555
556        Ok((delete_columns, have_position_delete))
557    }
558
559    pub async fn get_delete_parameters(
560        &self,
561        time_travel_info: Option<IcebergTimeTravelInfo>,
562    ) -> ConnectorResult<IcebergDeleteParameters> {
563        let table = self.config.load_table().await?;
564        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
565        match snapshot_id {
566            Some(snapshot_id) => {
567                let (delete_columns, have_position_delete) =
568                    Self::all_delete_parameters(&table, snapshot_id).await?;
569                Ok(IcebergDeleteParameters {
570                    equality_delete_columns: delete_columns,
571                    has_position_delete: have_position_delete,
572                    snapshot_id: Some(snapshot_id),
573                })
574            }
575            None => Ok(IcebergDeleteParameters {
576                equality_delete_columns: vec![],
577                has_position_delete: false,
578                snapshot_id: None,
579            }),
580        }
581    }
582
583    /// Uniformly distribute scan tasks to compute nodes.
584    /// It's deterministic so that it can best utilize the data locality.
585    ///
586    /// # Arguments
587    /// * `file_scan_tasks`: The file scan tasks to be split.
588    /// * `split_num`: The number of splits to be created.
589    ///
590    /// This algorithm is based on a min-heap. It will push all groups into the heap, and then pop the smallest group and add the file scan task to it.
591    /// Ensure that the total length of each group is as balanced as possible.
592    /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
593    /// The space complexity is O(k), where k is the number of splits.
594    /// The algorithm is stable, so the order of the file scan tasks will be preserved.
595    fn split_n_vecs(
596        file_scan_tasks: Vec<FileScanTask>,
597        split_num: usize,
598    ) -> Vec<Vec<FileScanTask>> {
599        use std::cmp::{Ordering, Reverse};
600
601        #[derive(Default)]
602        struct FileScanTaskGroup {
603            idx: usize,
604            tasks: Vec<FileScanTask>,
605            total_length: u64,
606        }
607
608        impl Ord for FileScanTaskGroup {
609            fn cmp(&self, other: &Self) -> Ordering {
610                // when total_length is the same, we will sort by index
611                if self.total_length == other.total_length {
612                    self.idx.cmp(&other.idx)
613                } else {
614                    self.total_length.cmp(&other.total_length)
615                }
616            }
617        }
618
619        impl PartialOrd for FileScanTaskGroup {
620            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
621                Some(self.cmp(other))
622            }
623        }
624
625        impl Eq for FileScanTaskGroup {}
626
627        impl PartialEq for FileScanTaskGroup {
628            fn eq(&self, other: &Self) -> bool {
629                self.total_length == other.total_length
630            }
631        }
632
633        let mut heap = BinaryHeap::new();
634        // push all groups into heap
635        for idx in 0..split_num {
636            heap.push(Reverse(FileScanTaskGroup {
637                idx,
638                tasks: vec![],
639                total_length: 0,
640            }));
641        }
642
643        for file_task in file_scan_tasks {
644            let mut group = heap.peek_mut().unwrap();
645            group.0.total_length += file_task.length;
646            group.0.tasks.push(file_task);
647        }
648
649        // convert heap into vec and extract tasks
650        heap.into_vec()
651            .into_iter()
652            .map(|reverse_group| reverse_group.0.tasks)
653            .collect()
654    }
655}
656
657pub struct IcebergScanOpts {
658    pub chunk_size: usize,
659    pub need_seq_num: bool,
660    pub need_file_path_and_pos: bool,
661    pub handle_delete_files: bool,
662}
663
664type EqualityDeleteEntries = Vec<(Vec<String>, HashSet<Vec<Datum>>)>;
665
666/// Scan a data file and optionally apply delete files (both position delete and equality delete).
667/// This implementation follows the iceberg-rust `process_file_scan_task` logic for proper delete handling.
668#[try_stream(ok = DataChunk, error = ConnectorError)]
669pub async fn scan_task_to_chunk_with_deletes(
670    table: Table,
671    data_file_scan_task: FileScanTask,
672    IcebergScanOpts {
673        chunk_size,
674        need_seq_num,
675        need_file_path_and_pos,
676        handle_delete_files,
677    }: IcebergScanOpts,
678    metrics: Option<Arc<IcebergScanMetrics>>,
679) {
680    let table_name = table.identifier().name().to_owned();
681
682    let mut read_bytes = scopeguard::guard(0, |read_bytes| {
683        if let Some(metrics) = metrics.clone() {
684            metrics
685                .iceberg_read_bytes
686                .with_guarded_label_values(&[&table_name])
687                .inc_by(read_bytes as _);
688        }
689    });
690
691    let data_file_path = data_file_scan_task.data_file_path.clone();
692    let data_sequence_number = data_file_scan_task.sequence_number;
693
694    tracing::debug!(
695        "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
696        data_file_path,
697        handle_delete_files,
698        data_file_scan_task.deletes.len()
699    );
700
701    // Step 1: Load and process delete files
702    // We build both position deletes (sorted set of row positions) and equality deletes (hash set of row keys)
703
704    // Build position delete vector - using BTreeSet for sorted storage (similar to DeleteVector in iceberg-rust)
705    let position_delete_set: BTreeSet<i64> = if handle_delete_files {
706        let mut deletes = BTreeSet::new();
707
708        // Filter position delete tasks for this specific data file
709        let position_delete_tasks: Vec<_> = data_file_scan_task
710            .deletes
711            .iter()
712            .filter(|delete| delete.data_file_content == DataContentType::PositionDeletes)
713            .cloned()
714            .collect();
715
716        tracing::debug!(
717            "Processing position deletes for data file: {}, found {} position delete tasks",
718            data_file_path,
719            position_delete_tasks.len()
720        );
721
722        for delete_task in position_delete_tasks {
723            let delete_task_file_path = delete_task.data_file_path.clone();
724
725            let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
726            // Clone the FileScanTask (not Arc) to create a proper stream
727            let task_clone: FileScanTask = (*delete_task).clone();
728            let delete_stream = tokio_stream::once(Ok(task_clone));
729            let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
730
731            while let Some(record_batch) = delete_record_stream.next().await {
732                let record_batch = record_batch?;
733
734                // Position delete files have schema: file_path (string), pos (long)
735                if let Some(file_path_col) = record_batch.column_by_name("file_path")
736                    && let Some(pos_col) = record_batch.column_by_name("pos")
737                {
738                    let file_paths = file_path_col
739                            .as_any()
740                            .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::StringArray>()
741                            .with_context(|| "file_path column is not StringArray")?;
742                    let positions = pos_col
743                            .as_any()
744                            .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::Int64Array>()
745                            .with_context(|| "pos column is not Int64Array")?;
746
747                    // Only include positions that match the current data file
748                    for idx in 0..record_batch.num_rows() {
749                        if !file_paths.is_null(idx) && !positions.is_null(idx) {
750                            let file_path = file_paths.value(idx);
751                            let pos = positions.value(idx);
752
753                            if file_path == data_file_path {
754                                deletes.insert(pos);
755                            }
756                        } else {
757                            tracing::warn!(
758                                "Position delete file {} at line {}: file_path or pos is null",
759                                delete_task_file_path,
760                                idx
761                            );
762                        }
763                    }
764                }
765            }
766        }
767
768        tracing::debug!(
769            "Built position delete set for data file {}: {:?}",
770            data_file_path,
771            deletes
772        );
773
774        deletes
775    } else {
776        BTreeSet::new()
777    };
778
779    // Build equality delete predicates
780    let equality_deletes: Option<EqualityDeleteEntries> = if handle_delete_files {
781        let equality_delete_tasks: Vec<_> = data_file_scan_task
782            .deletes
783            .iter()
784            .filter(|delete| delete.data_file_content == DataContentType::EqualityDeletes)
785            .cloned()
786            .collect();
787
788        if !equality_delete_tasks.is_empty() {
789            let mut delete_key_map: HashMap<Vec<String>, HashSet<Vec<Datum>>> = HashMap::new();
790
791            for delete_task in equality_delete_tasks {
792                let equality_ids = delete_task.equality_ids.clone();
793
794                if equality_ids.is_empty() {
795                    continue;
796                }
797
798                let delete_schema = delete_task.schema();
799                let delete_name_vec = equality_ids
800                    .iter()
801                    .filter_map(|id| delete_schema.name_by_field_id(*id))
802                    .map(|s| s.to_owned())
803                    .collect_vec();
804
805                if delete_name_vec.len() != equality_ids.len() {
806                    tracing::warn!(
807                        "Skip equality delete task due to missing column mappings: expected {} names, got {}",
808                        equality_ids.len(),
809                        delete_name_vec.len()
810                    );
811                    continue;
812                }
813
814                let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
815                // Clone the FileScanTask (not Arc) to create a proper stream
816                let task_clone: FileScanTask = delete_task.as_ref().clone();
817                let delete_stream = tokio_stream::once(Ok(task_clone));
818                let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
819
820                let mut task_delete_key_set: HashSet<Vec<Datum>> = HashSet::new();
821
822                while let Some(record_batch) = delete_record_stream.next().await {
823                    let record_batch = record_batch?;
824
825                    let delete_chunk =
826                        IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
827
828                    let field_indices = delete_name_vec
829                        .iter()
830                        .map(|field_name| {
831                            record_batch
832                                .schema()
833                                .column_with_name(field_name)
834                                .map(|(idx, _)| idx)
835                                .unwrap()
836                        })
837                        .collect_vec();
838
839                    // Build delete keys from equality columns
840                    for row_idx in 0..delete_chunk.capacity() {
841                        let mut key = Vec::with_capacity(field_indices.len());
842                        for &col_idx in &field_indices {
843                            let col = delete_chunk.column_at(col_idx);
844                            key.push(col.value_at(row_idx).to_owned_datum());
845                        }
846                        task_delete_key_set.insert(key);
847                    }
848                }
849
850                if !task_delete_key_set.is_empty() {
851                    delete_key_map
852                        .entry(delete_name_vec.clone())
853                        .or_default()
854                        .extend(task_delete_key_set);
855                }
856            }
857
858            if delete_key_map.is_empty() {
859                None
860            } else {
861                Some(delete_key_map.into_iter().collect())
862            }
863        } else {
864            None
865        }
866    } else {
867        None
868    };
869
870    // Step 2: Read the data file
871    let reader = table.reader_builder().with_batch_size(chunk_size).build();
872    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
873
874    let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
875
876    // Step 3: Process each record batch and apply deletes
877    while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
878        let record_batch = record_batch?;
879        let batch_start_pos = (batch_index * chunk_size) as i64;
880        let batch_num_rows = record_batch.num_rows();
881
882        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
883
884        // Apply position deletes using efficient range-based filtering
885        // Build visibility bitmap based on position deletes
886        let mut visibility = vec![true; batch_num_rows];
887
888        if !position_delete_set.is_empty() {
889            let batch_end_pos = batch_start_pos + batch_num_rows as i64;
890
891            tracing::debug!(
892                "Applying position deletes to batch {}: range [{}, {}), total delete set size: {}",
893                batch_index,
894                batch_start_pos,
895                batch_end_pos,
896                position_delete_set.len()
897            );
898
899            let mut position_deleted_count = 0;
900
901            // Use BTreeSet's range query for efficient filtering
902            // Only check positions that fall within this batch's range
903            for &deleted_pos in position_delete_set.range(batch_start_pos..batch_end_pos) {
904                let local_idx = (deleted_pos - batch_start_pos) as usize;
905                if local_idx < batch_num_rows {
906                    visibility[local_idx] = false;
907                    position_deleted_count += 1;
908                }
909            }
910
911            tracing::debug!(
912                "Position delete results for batch {}: deleted {} rows",
913                batch_index,
914                position_deleted_count
915            );
916        }
917
918        // Apply equality deletes
919        if let Some(ref equality_delete_entries) = equality_deletes {
920            let (columns, _) = chunk.into_parts();
921
922            let delete_entries_info: Vec<(Vec<usize>, HashSet<Vec<DatumRef<'_>>>)> =
923                equality_delete_entries
924                    .iter()
925                    .map(|(delete_name_vec, delete_key_set)| {
926                        let indices = delete_name_vec
927                            .iter()
928                            .map(|field_name| {
929                                record_batch
930                                    .schema()
931                                    .column_with_name(field_name)
932                                    .map(|(idx, _)| idx)
933                                    .unwrap()
934                            })
935                            .collect_vec();
936                        let delete_key_ref_set: HashSet<Vec<DatumRef<'_>>> = delete_key_set
937                            .iter()
938                            .map(|datum_vec| {
939                                datum_vec.iter().map(|datum| datum.to_datum_ref()).collect()
940                            })
941                            .collect();
942                        (indices, delete_key_ref_set)
943                    })
944                    .collect::<Vec<_>>();
945
946            let mut deleted_count = 0;
947
948            // Check each row against the delete sets built per equality delete task
949            for (row_idx, item) in visibility.iter_mut().enumerate().take(batch_num_rows) {
950                if !*item {
951                    continue;
952                }
953
954                for (field_indices, delete_key_set) in &delete_entries_info {
955                    let mut row_key = Vec::with_capacity(field_indices.len());
956                    for &col_idx in field_indices {
957                        let datum = columns[col_idx].value_at(row_idx);
958                        row_key.push(datum);
959                    }
960
961                    if delete_key_set.contains(&row_key) {
962                        *item = false;
963                        deleted_count += 1;
964                        break;
965                    }
966                }
967            }
968
969            tracing::debug!(
970                "Equality delete results for batch {}: deleted {} rows",
971                batch_index,
972                deleted_count
973            );
974
975            let columns: Vec<_> = columns.into_iter().collect();
976            chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
977        } else {
978            // Only position deletes to apply
979            let (columns, _) = chunk.into_parts();
980            let columns: Vec<_> = columns.into_iter().collect();
981            chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
982        }
983
984        // Step 4: Add metadata columns if requested
985        if need_seq_num {
986            let (mut columns, visibility) = chunk.into_parts();
987            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
988                vec![data_sequence_number; visibility.len()],
989            ))));
990            chunk = DataChunk::from_parts(columns.into(), visibility);
991        }
992
993        if need_file_path_and_pos {
994            let (mut columns, visibility) = chunk.into_parts();
995            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
996                vec![data_file_path.as_str(); visibility.len()],
997            ))));
998
999            // Generate position values for each row in the batch
1000            let positions: Vec<i64> =
1001                (batch_start_pos..(batch_start_pos + visibility.len() as i64)).collect();
1002            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
1003
1004            chunk = DataChunk::from_parts(columns.into(), visibility);
1005        }
1006
1007        *read_bytes += chunk.estimated_heap_size() as u64;
1008        yield chunk;
1009    }
1010}
1011
1012#[derive(Debug)]
1013pub struct IcebergFileReader {}
1014
1015#[async_trait]
1016impl SplitReader for IcebergFileReader {
1017    type Properties = IcebergProperties;
1018    type Split = IcebergSplit;
1019
1020    async fn new(
1021        _props: IcebergProperties,
1022        _splits: Vec<IcebergSplit>,
1023        _parser_config: ParserConfig,
1024        _source_ctx: SourceContextRef,
1025        _columns: Option<Vec<Column>>,
1026    ) -> ConnectorResult<Self> {
1027        unimplemented!()
1028    }
1029
1030    fn into_stream(self) -> BoxSourceChunkStream {
1031        unimplemented!()
1032    }
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037    use std::sync::Arc;
1038
1039    use iceberg::scan::FileScanTask;
1040    use iceberg::spec::{DataContentType, Schema};
1041
1042    use super::*;
1043
1044    fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
1045        FileScanTask {
1046            length,
1047            start: 0,
1048            record_count: Some(0),
1049            data_file_path: format!("test_{}.parquet", id),
1050            data_file_content: DataContentType::Data,
1051            data_file_format: iceberg::spec::DataFileFormat::Parquet,
1052            schema: Arc::new(Schema::builder().build().unwrap()),
1053            project_field_ids: vec![],
1054            predicate: None,
1055            deletes: vec![],
1056            sequence_number: 0,
1057            equality_ids: vec![],
1058            file_size_in_bytes: 0,
1059        }
1060    }
1061
1062    #[test]
1063    fn test_split_n_vecs_basic() {
1064        let file_scan_tasks = (1..=12)
1065            .map(|i| create_file_scan_task(i + 100, i))
1066            .collect::<Vec<_>>(); // Ensure the correct function is called
1067
1068        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1069
1070        assert_eq!(groups.len(), 3);
1071
1072        let group_lengths: Vec<u64> = groups
1073            .iter()
1074            .map(|group| group.iter().map(|task| task.length).sum())
1075            .collect();
1076
1077        let max_length = *group_lengths.iter().max().unwrap();
1078        let min_length = *group_lengths.iter().min().unwrap();
1079        assert!(max_length - min_length <= 10, "Groups should be balanced");
1080
1081        let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
1082        assert_eq!(total_tasks, 12);
1083    }
1084
1085    #[test]
1086    fn test_split_n_vecs_empty() {
1087        let file_scan_tasks = Vec::new();
1088        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1089        assert_eq!(groups.len(), 3);
1090        assert!(groups.iter().all(|group| group.is_empty()));
1091    }
1092
1093    #[test]
1094    fn test_split_n_vecs_single_task() {
1095        let file_scan_tasks = vec![create_file_scan_task(100, 1)];
1096        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1097        assert_eq!(groups.len(), 3);
1098        assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
1099    }
1100
1101    #[test]
1102    fn test_split_n_vecs_uneven_distribution() {
1103        let file_scan_tasks = vec![
1104            create_file_scan_task(1000, 1),
1105            create_file_scan_task(100, 2),
1106            create_file_scan_task(100, 3),
1107            create_file_scan_task(100, 4),
1108            create_file_scan_task(100, 5),
1109        ];
1110
1111        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
1112        assert_eq!(groups.len(), 2);
1113
1114        let group_with_large_task = groups
1115            .iter()
1116            .find(|group| group.iter().any(|task| task.length == 1000))
1117            .unwrap();
1118        assert_eq!(group_with_large_task.len(), 1);
1119    }
1120
1121    #[test]
1122    fn test_split_n_vecs_same_files_distribution() {
1123        let file_scan_tasks = vec![
1124            create_file_scan_task(100, 1),
1125            create_file_scan_task(100, 2),
1126            create_file_scan_task(100, 3),
1127            create_file_scan_task(100, 4),
1128            create_file_scan_task(100, 5),
1129            create_file_scan_task(100, 6),
1130            create_file_scan_task(100, 7),
1131            create_file_scan_task(100, 8),
1132        ];
1133
1134        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1135            .iter()
1136            .map(|g| {
1137                g.iter()
1138                    .map(|task| task.data_file_path.clone())
1139                    .collect::<Vec<_>>()
1140            })
1141            .collect::<Vec<_>>();
1142
1143        for _ in 0..10000 {
1144            let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1145                .iter()
1146                .map(|g| {
1147                    g.iter()
1148                        .map(|task| task.data_file_path.clone())
1149                        .collect::<Vec<_>>()
1150                })
1151                .collect::<Vec<_>>();
1152
1153            assert_eq!(groups, groups_2);
1154        }
1155    }
1156}