risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::arrow::arrow_array_iceberg::Array;
35use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
36use risingwave_common::bail;
37use risingwave_common::bitmap::Bitmap;
38use risingwave_common::catalog::{
39    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
40    Schema,
41};
42use risingwave_common::types::{Datum, DatumRef, JsonbVal, ToDatumRef, ToOwnedDatum};
43use risingwave_common::util::iter_util::ZipEqFast;
44use risingwave_common_estimate_size::EstimateSize;
45use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
46use serde::{Deserialize, Serialize};
47
48pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
49use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
50use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
51use crate::error::{ConnectorError, ConnectorResult};
52use crate::parser::ParserConfig;
53use crate::source::{
54    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
55    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
56};
57pub const ICEBERG_CONNECTOR: &str = "iceberg";
58
59#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
60pub struct IcebergProperties {
61    #[serde(flatten)]
62    pub common: IcebergCommon,
63
64    #[serde(flatten)]
65    pub table: IcebergTableIdentifier,
66
67    // For jdbc catalog
68    #[serde(rename = "catalog.jdbc.user")]
69    pub jdbc_user: Option<String>,
70    #[serde(rename = "catalog.jdbc.password")]
71    pub jdbc_password: Option<String>,
72
73    #[serde(flatten)]
74    pub unknown_fields: HashMap<String, String>,
75}
76
77impl EnforceSecret for IcebergProperties {
78    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
79        "catalog.jdbc.password",
80    };
81
82    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
83        for prop in prop_iter {
84            IcebergCommon::enforce_one(prop)?;
85            if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
86                return Err(EnforceSecretError {
87                    key: prop.to_owned(),
88                }
89                .into());
90            }
91        }
92        Ok(())
93    }
94}
95
96impl IcebergProperties {
97    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
98        let mut java_catalog_props = HashMap::new();
99        if let Some(jdbc_user) = self.jdbc_user.clone() {
100            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
101        }
102        if let Some(jdbc_password) = self.jdbc_password.clone() {
103            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
104        }
105        // TODO: support path_style_access and java_catalog_props for iceberg source
106        self.common.create_catalog(&java_catalog_props).await
107    }
108
109    pub async fn load_table(&self) -> ConnectorResult<Table> {
110        let mut java_catalog_props = HashMap::new();
111        if let Some(jdbc_user) = self.jdbc_user.clone() {
112            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
113        }
114        if let Some(jdbc_password) = self.jdbc_password.clone() {
115            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
116        }
117        // TODO: support java_catalog_props for iceberg source
118        self.common
119            .load_table(&self.table, &java_catalog_props)
120            .await
121    }
122}
123
124impl SourceProperties for IcebergProperties {
125    type Split = IcebergSplit;
126    type SplitEnumerator = IcebergSplitEnumerator;
127    type SplitReader = IcebergFileReader;
128
129    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
130}
131
132impl UnknownFields for IcebergProperties {
133    fn unknown_fields(&self) -> HashMap<String, String> {
134        self.unknown_fields.clone()
135    }
136}
137
138#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
139pub struct IcebergFileScanTaskJsonStr(String);
140
141impl IcebergFileScanTaskJsonStr {
142    pub fn deserialize(&self) -> FileScanTask {
143        serde_json::from_str(&self.0).unwrap()
144    }
145
146    pub fn serialize(task: &FileScanTask) -> Self {
147        Self(serde_json::to_string(task).unwrap())
148    }
149}
150
151#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
152pub enum IcebergFileScanTask {
153    Data(Vec<FileScanTask>),
154    EqualityDelete(Vec<FileScanTask>),
155    PositionDelete(Vec<FileScanTask>),
156    CountStar(u64),
157}
158
159impl IcebergFileScanTask {
160    pub fn new_count_star(count_sum: u64) -> Self {
161        IcebergFileScanTask::CountStar(count_sum)
162    }
163
164    pub fn new_scan_with_scan_type(
165        iceberg_scan_type: IcebergScanType,
166        data_files: Vec<FileScanTask>,
167        equality_delete_files: Vec<FileScanTask>,
168        position_delete_files: Vec<FileScanTask>,
169    ) -> Self {
170        match iceberg_scan_type {
171            IcebergScanType::EqualityDeleteScan => {
172                IcebergFileScanTask::EqualityDelete(equality_delete_files)
173            }
174            IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
175            IcebergScanType::PositionDeleteScan => {
176                IcebergFileScanTask::PositionDelete(position_delete_files)
177            }
178            IcebergScanType::Unspecified | IcebergScanType::CountStar => {
179                unreachable!("Unspecified iceberg scan type")
180            }
181        }
182    }
183
184    pub fn is_empty(&self) -> bool {
185        match self {
186            IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
187            IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
188                equality_delete_files.is_empty()
189            }
190            IcebergFileScanTask::PositionDelete(position_delete_files) => {
191                position_delete_files.is_empty()
192            }
193            IcebergFileScanTask::CountStar(_) => false,
194        }
195    }
196
197    pub fn files(&self) -> Vec<String> {
198        match self {
199            IcebergFileScanTask::Data(file_scan_tasks)
200            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
201            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
202                .iter()
203                .map(|task| task.data_file_path.clone())
204                .collect(),
205            IcebergFileScanTask::CountStar(_) => vec![],
206        }
207    }
208}
209
210#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
211pub struct IcebergSplit {
212    pub split_id: i64,
213    // TODO: remove this field. It seems not used.
214    pub snapshot_id: i64,
215    pub task: IcebergFileScanTask,
216}
217
218impl IcebergSplit {
219    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
220        if let IcebergScanType::CountStar = iceberg_scan_type {
221            Self {
222                split_id: 0,
223                snapshot_id: 0,
224                task: IcebergFileScanTask::new_count_star(0),
225            }
226        } else {
227            Self {
228                split_id: 0,
229                snapshot_id: 0,
230                task: IcebergFileScanTask::new_scan_with_scan_type(
231                    iceberg_scan_type,
232                    vec![],
233                    vec![],
234                    vec![],
235                ),
236            }
237        }
238    }
239}
240
241impl SplitMetaData for IcebergSplit {
242    fn id(&self) -> SplitId {
243        self.split_id.to_string().into()
244    }
245
246    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
247        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
248    }
249
250    fn encode_to_json(&self) -> JsonbVal {
251        serde_json::to_value(self.clone()).unwrap().into()
252    }
253
254    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
255        unimplemented!()
256    }
257}
258
259#[derive(Debug, Clone)]
260pub struct IcebergSplitEnumerator {
261    config: IcebergProperties,
262}
263
264#[async_trait]
265impl SplitEnumerator for IcebergSplitEnumerator {
266    type Properties = IcebergProperties;
267    type Split = IcebergSplit;
268
269    async fn new(
270        properties: Self::Properties,
271        context: SourceEnumeratorContextRef,
272    ) -> ConnectorResult<Self> {
273        Ok(Self::new_inner(properties, context))
274    }
275
276    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
277        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
278        // instead of relying on SplitEnumerator on meta.
279        // TODO: add some validation logic here.
280        Ok(vec![])
281    }
282}
283impl IcebergSplitEnumerator {
284    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
285        Self { config: properties }
286    }
287}
288
289pub enum IcebergTimeTravelInfo {
290    Version(i64),
291    TimestampMs(i64),
292}
293
294impl IcebergSplitEnumerator {
295    pub fn get_snapshot_id(
296        table: &Table,
297        time_travel_info: Option<IcebergTimeTravelInfo>,
298    ) -> ConnectorResult<Option<i64>> {
299        let current_snapshot = table.metadata().current_snapshot();
300        if current_snapshot.is_none() {
301            return Ok(None);
302        }
303
304        let snapshot_id = match time_travel_info {
305            Some(IcebergTimeTravelInfo::Version(version)) => {
306                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
307                    bail!("Cannot find the snapshot id in the iceberg table.");
308                };
309                snapshot.snapshot_id()
310            }
311            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
312                let snapshot = table
313                    .metadata()
314                    .snapshots()
315                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
316                    .max_by_key(|snapshot| snapshot.timestamp_ms());
317                match snapshot {
318                    Some(snapshot) => snapshot.snapshot_id(),
319                    None => {
320                        // convert unix time to human-readable time
321                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
322                        if let Some(time) = time {
323                            bail!("Cannot find a snapshot older than {}", time);
324                        } else {
325                            bail!("Cannot find a snapshot");
326                        }
327                    }
328                }
329            }
330            None => {
331                assert!(current_snapshot.is_some());
332                current_snapshot.unwrap().snapshot_id()
333            }
334        };
335        Ok(Some(snapshot_id))
336    }
337
338    pub async fn list_splits_batch(
339        &self,
340        schema: Schema,
341        time_traval_info: Option<IcebergTimeTravelInfo>,
342        batch_parallelism: usize,
343        iceberg_scan_type: IcebergScanType,
344        predicate: IcebergPredicate,
345    ) -> ConnectorResult<Vec<IcebergSplit>> {
346        if batch_parallelism == 0 {
347            bail!("Batch parallelism is 0. Cannot split the iceberg files.");
348        }
349        let table = self.config.load_table().await?;
350        let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
351        if snapshot_id.is_none() {
352            // If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
353            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
354        }
355        if let IcebergScanType::CountStar = iceberg_scan_type {
356            self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
357                .await
358        } else {
359            self.list_splits_batch_scan(
360                &table,
361                snapshot_id.unwrap(),
362                schema,
363                batch_parallelism,
364                iceberg_scan_type,
365                predicate,
366            )
367            .await
368        }
369    }
370
371    async fn list_splits_batch_scan(
372        &self,
373        table: &Table,
374        snapshot_id: i64,
375        schema: Schema,
376        batch_parallelism: usize,
377        iceberg_scan_type: IcebergScanType,
378        predicate: IcebergPredicate,
379    ) -> ConnectorResult<Vec<IcebergSplit>> {
380        let schema_names = schema.names();
381        let require_names = schema_names
382            .iter()
383            .filter(|name| {
384                name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
385                    && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
386                    && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
387            })
388            .cloned()
389            .collect_vec();
390
391        let table_schema = table.metadata().current_schema();
392        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
393
394        let mut position_delete_files = vec![];
395        let mut position_delete_files_set = HashSet::new();
396        let mut data_files = vec![];
397        let mut equality_delete_files = vec![];
398        let mut equality_delete_files_set = HashSet::new();
399        let scan = table
400            .scan()
401            .with_filter(predicate)
402            .snapshot_id(snapshot_id)
403            .with_delete_file_processing_enabled(true)
404            .select(require_names)
405            .build()
406            .map_err(|e| anyhow!(e))?;
407
408        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
409
410        #[for_await]
411        for task in file_scan_stream {
412            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
413
414            // Collect delete files for separate scan types, but keep task.deletes intact
415            for delete_file in &task.deletes {
416                let delete_file = delete_file.as_ref().clone();
417                match delete_file.data_file_content {
418                    iceberg::spec::DataContentType::Data => {
419                        bail!("Data file should not in task deletes");
420                    }
421                    iceberg::spec::DataContentType::EqualityDeletes => {
422                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
423                            equality_delete_files.push(delete_file);
424                        }
425                    }
426                    iceberg::spec::DataContentType::PositionDeletes => {
427                        let mut delete_file = delete_file;
428                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
429                            delete_file.project_field_ids = Vec::default();
430                            position_delete_files.push(delete_file);
431                        }
432                    }
433                }
434            }
435
436            match task.data_file_content {
437                iceberg::spec::DataContentType::Data => {
438                    // Keep the original task with its deletes field intact
439                    data_files.push(task);
440                }
441                iceberg::spec::DataContentType::EqualityDeletes => {
442                    bail!("Equality delete files should not be in the data files");
443                }
444                iceberg::spec::DataContentType::PositionDeletes => {
445                    bail!("Position delete files should not be in the data files");
446                }
447            }
448        }
449        // evenly split the files into splits based on the parallelism.
450        let data_files = Self::split_n_vecs(data_files, batch_parallelism);
451        let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
452        let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
453
454        let splits = data_files
455            .into_iter()
456            .zip_eq_fast(equality_delete_files.into_iter())
457            .zip_eq_fast(position_delete_files.into_iter())
458            .enumerate()
459            .map(
460                |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
461                    split_id: index as i64,
462                    snapshot_id,
463                    task: IcebergFileScanTask::new_scan_with_scan_type(
464                        iceberg_scan_type,
465                        data_file,
466                        equality_delete_file,
467                        position_delete_file,
468                    ),
469                },
470            )
471            .filter(|split| !split.task.is_empty())
472            .collect_vec();
473
474        if splits.is_empty() {
475            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
476        }
477        Ok(splits)
478    }
479
480    pub async fn list_splits_batch_count_star(
481        &self,
482        table: &Table,
483        snapshot_id: i64,
484    ) -> ConnectorResult<Vec<IcebergSplit>> {
485        let mut record_counts = 0;
486        let scan = table
487            .scan()
488            .snapshot_id(snapshot_id)
489            .with_delete_file_processing_enabled(true)
490            .build()
491            .map_err(|e| anyhow!(e))?;
492        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
493
494        #[for_await]
495        for task in file_scan_stream {
496            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
497            assert_eq!(task.data_file_content, DataContentType::Data);
498            assert!(task.deletes.is_empty());
499            record_counts += task.record_count.expect("must have");
500        }
501        let split = IcebergSplit {
502            split_id: 0,
503            snapshot_id,
504            task: IcebergFileScanTask::new_count_star(record_counts),
505        };
506        Ok(vec![split])
507    }
508
509    /// List all files in the snapshot to check if there are deletes.
510    pub async fn all_delete_parameters(
511        table: &Table,
512        snapshot_id: i64,
513    ) -> ConnectorResult<(Vec<String>, bool)> {
514        let scan = table
515            .scan()
516            .snapshot_id(snapshot_id)
517            .with_delete_file_processing_enabled(true)
518            .build()
519            .map_err(|e| anyhow!(e))?;
520        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
521        let schema = scan.snapshot().schema(table.metadata())?;
522        let mut equality_ids = vec![];
523        let mut have_position_delete = false;
524        #[for_await]
525        for task in file_scan_stream {
526            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
527            for delete_file in task.deletes {
528                match delete_file.data_file_content {
529                    iceberg::spec::DataContentType::Data => {}
530                    iceberg::spec::DataContentType::EqualityDeletes => {
531                        if equality_ids.is_empty() {
532                            equality_ids = delete_file.equality_ids.clone();
533                        } else if equality_ids != delete_file.equality_ids {
534                            bail!("The schema of iceberg equality delete file must be consistent");
535                        }
536                    }
537                    iceberg::spec::DataContentType::PositionDeletes => {
538                        have_position_delete = true;
539                    }
540                }
541            }
542        }
543        let delete_columns = equality_ids
544            .into_iter()
545            .map(|id| match schema.name_by_field_id(id) {
546                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
547                None => bail!("Delete field id {} not found in schema", id),
548            })
549            .collect::<ConnectorResult<Vec<_>>>()?;
550
551        Ok((delete_columns, have_position_delete))
552    }
553
554    pub async fn get_delete_parameters(
555        &self,
556        time_travel_info: Option<IcebergTimeTravelInfo>,
557    ) -> ConnectorResult<(Vec<String>, bool)> {
558        let table = self.config.load_table().await?;
559        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
560        if snapshot_id.is_none() {
561            return Ok((vec![], false));
562        }
563        let snapshot_id = snapshot_id.unwrap();
564        Self::all_delete_parameters(&table, snapshot_id).await
565    }
566
567    /// Uniformly distribute scan tasks to compute nodes.
568    /// It's deterministic so that it can best utilize the data locality.
569    ///
570    /// # Arguments
571    /// * `file_scan_tasks`: The file scan tasks to be split.
572    /// * `split_num`: The number of splits to be created.
573    ///
574    /// This algorithm is based on a min-heap. It will push all groups into the heap, and then pop the smallest group and add the file scan task to it.
575    /// Ensure that the total length of each group is as balanced as possible.
576    /// The time complexity is O(n log k), where n is the number of file scan tasks and k is the number of splits.
577    /// The space complexity is O(k), where k is the number of splits.
578    /// The algorithm is stable, so the order of the file scan tasks will be preserved.
579    fn split_n_vecs(
580        file_scan_tasks: Vec<FileScanTask>,
581        split_num: usize,
582    ) -> Vec<Vec<FileScanTask>> {
583        use std::cmp::{Ordering, Reverse};
584
585        #[derive(Default)]
586        struct FileScanTaskGroup {
587            idx: usize,
588            tasks: Vec<FileScanTask>,
589            total_length: u64,
590        }
591
592        impl Ord for FileScanTaskGroup {
593            fn cmp(&self, other: &Self) -> Ordering {
594                // when total_length is the same, we will sort by index
595                if self.total_length == other.total_length {
596                    self.idx.cmp(&other.idx)
597                } else {
598                    self.total_length.cmp(&other.total_length)
599                }
600            }
601        }
602
603        impl PartialOrd for FileScanTaskGroup {
604            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
605                Some(self.cmp(other))
606            }
607        }
608
609        impl Eq for FileScanTaskGroup {}
610
611        impl PartialEq for FileScanTaskGroup {
612            fn eq(&self, other: &Self) -> bool {
613                self.total_length == other.total_length
614            }
615        }
616
617        let mut heap = BinaryHeap::new();
618        // push all groups into heap
619        for idx in 0..split_num {
620            heap.push(Reverse(FileScanTaskGroup {
621                idx,
622                tasks: vec![],
623                total_length: 0,
624            }));
625        }
626
627        for file_task in file_scan_tasks {
628            let mut group = heap.peek_mut().unwrap();
629            group.0.total_length += file_task.length;
630            group.0.tasks.push(file_task);
631        }
632
633        // convert heap into vec and extract tasks
634        heap.into_vec()
635            .into_iter()
636            .map(|reverse_group| reverse_group.0.tasks)
637            .collect()
638    }
639}
640
641pub struct IcebergScanOpts {
642    pub chunk_size: usize,
643    pub need_seq_num: bool,
644    pub need_file_path_and_pos: bool,
645    pub handle_delete_files: bool,
646}
647
648type EqualityDeleteEntries = Vec<(Vec<String>, HashSet<Vec<Datum>>)>;
649
650/// Scan a data file and optionally apply delete files (both position delete and equality delete).
651/// This implementation follows the iceberg-rust `process_file_scan_task` logic for proper delete handling.
652#[try_stream(ok = DataChunk, error = ConnectorError)]
653pub async fn scan_task_to_chunk_with_deletes(
654    table: Table,
655    data_file_scan_task: FileScanTask,
656    IcebergScanOpts {
657        chunk_size,
658        need_seq_num,
659        need_file_path_and_pos,
660        handle_delete_files,
661    }: IcebergScanOpts,
662    metrics: Option<Arc<IcebergScanMetrics>>,
663) {
664    let table_name = table.identifier().name().to_owned();
665
666    let mut read_bytes = scopeguard::guard(0, |read_bytes| {
667        if let Some(metrics) = metrics.clone() {
668            metrics
669                .iceberg_read_bytes
670                .with_guarded_label_values(&[&table_name])
671                .inc_by(read_bytes as _);
672        }
673    });
674
675    let data_file_path = data_file_scan_task.data_file_path.clone();
676    let data_sequence_number = data_file_scan_task.sequence_number;
677
678    tracing::debug!(
679        "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
680        data_file_path,
681        handle_delete_files,
682        data_file_scan_task.deletes.len()
683    );
684
685    // Step 1: Load and process delete files
686    // We build both position deletes (sorted set of row positions) and equality deletes (hash set of row keys)
687
688    // Build position delete vector - using BTreeSet for sorted storage (similar to DeleteVector in iceberg-rust)
689    let position_delete_set: BTreeSet<i64> = if handle_delete_files {
690        let mut deletes = BTreeSet::new();
691
692        // Filter position delete tasks for this specific data file
693        let position_delete_tasks: Vec<_> = data_file_scan_task
694            .deletes
695            .iter()
696            .filter(|delete| delete.data_file_content == DataContentType::PositionDeletes)
697            .cloned()
698            .collect();
699
700        tracing::debug!(
701            "Processing position deletes for data file: {}, found {} position delete tasks",
702            data_file_path,
703            position_delete_tasks.len()
704        );
705
706        for delete_task in position_delete_tasks {
707            let delete_task_file_path = delete_task.data_file_path.clone();
708
709            let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
710            // Clone the FileScanTask (not Arc) to create a proper stream
711            let task_clone: FileScanTask = (*delete_task).clone();
712            let delete_stream = tokio_stream::once(Ok(task_clone));
713            let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
714
715            while let Some(record_batch) = delete_record_stream.next().await {
716                let record_batch = record_batch?;
717
718                // Position delete files have schema: file_path (string), pos (long)
719                if let Some(file_path_col) = record_batch.column_by_name("file_path")
720                    && let Some(pos_col) = record_batch.column_by_name("pos")
721                {
722                    let file_paths = file_path_col
723                            .as_any()
724                            .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::StringArray>()
725                            .with_context(|| "file_path column is not StringArray")?;
726                    let positions = pos_col
727                            .as_any()
728                            .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::Int64Array>()
729                            .with_context(|| "pos column is not Int64Array")?;
730
731                    // Only include positions that match the current data file
732                    for idx in 0..record_batch.num_rows() {
733                        if !file_paths.is_null(idx) && !positions.is_null(idx) {
734                            let file_path = file_paths.value(idx);
735                            let pos = positions.value(idx);
736
737                            if file_path == data_file_path {
738                                deletes.insert(pos);
739                            }
740                        } else {
741                            tracing::warn!(
742                                "Position delete file {} at line {}: file_path or pos is null",
743                                delete_task_file_path,
744                                idx
745                            );
746                        }
747                    }
748                }
749            }
750        }
751
752        tracing::debug!(
753            "Built position delete set for data file {}: {:?}",
754            data_file_path,
755            deletes
756        );
757
758        deletes
759    } else {
760        BTreeSet::new()
761    };
762
763    // Build equality delete predicates
764    let equality_deletes: Option<EqualityDeleteEntries> = if handle_delete_files {
765        let equality_delete_tasks: Vec<_> = data_file_scan_task
766            .deletes
767            .iter()
768            .filter(|delete| delete.data_file_content == DataContentType::EqualityDeletes)
769            .cloned()
770            .collect();
771
772        if !equality_delete_tasks.is_empty() {
773            let mut delete_key_map: HashMap<Vec<String>, HashSet<Vec<Datum>>> = HashMap::new();
774
775            for delete_task in equality_delete_tasks {
776                let equality_ids = delete_task.equality_ids.clone();
777
778                if equality_ids.is_empty() {
779                    continue;
780                }
781
782                let delete_schema = delete_task.schema();
783                let delete_name_vec = equality_ids
784                    .iter()
785                    .filter_map(|id| delete_schema.name_by_field_id(*id))
786                    .map(|s| s.to_owned())
787                    .collect_vec();
788
789                if delete_name_vec.len() != equality_ids.len() {
790                    tracing::warn!(
791                        "Skip equality delete task due to missing column mappings: expected {} names, got {}",
792                        equality_ids.len(),
793                        delete_name_vec.len()
794                    );
795                    continue;
796                }
797
798                let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
799                // Clone the FileScanTask (not Arc) to create a proper stream
800                let task_clone: FileScanTask = delete_task.as_ref().clone();
801                let delete_stream = tokio_stream::once(Ok(task_clone));
802                let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
803
804                let mut task_delete_key_set: HashSet<Vec<Datum>> = HashSet::new();
805
806                while let Some(record_batch) = delete_record_stream.next().await {
807                    let record_batch = record_batch?;
808
809                    let delete_chunk =
810                        IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
811
812                    let field_indices = delete_name_vec
813                        .iter()
814                        .map(|field_name| {
815                            record_batch
816                                .schema()
817                                .column_with_name(field_name)
818                                .map(|(idx, _)| idx)
819                                .unwrap()
820                        })
821                        .collect_vec();
822
823                    // Build delete keys from equality columns
824                    for row_idx in 0..delete_chunk.capacity() {
825                        let mut key = Vec::with_capacity(field_indices.len());
826                        for &col_idx in &field_indices {
827                            let col = delete_chunk.column_at(col_idx);
828                            key.push(col.value_at(row_idx).to_owned_datum());
829                        }
830                        task_delete_key_set.insert(key);
831                    }
832                }
833
834                if !task_delete_key_set.is_empty() {
835                    delete_key_map
836                        .entry(delete_name_vec.clone())
837                        .or_default()
838                        .extend(task_delete_key_set);
839                }
840            }
841
842            if delete_key_map.is_empty() {
843                None
844            } else {
845                Some(delete_key_map.into_iter().collect())
846            }
847        } else {
848            None
849        }
850    } else {
851        None
852    };
853
854    // Step 2: Read the data file
855    let reader = table.reader_builder().with_batch_size(chunk_size).build();
856    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
857
858    let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
859
860    // Step 3: Process each record batch and apply deletes
861    while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
862        let record_batch = record_batch?;
863        let batch_start_pos = (batch_index * chunk_size) as i64;
864        let batch_num_rows = record_batch.num_rows();
865
866        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
867
868        // Apply position deletes using efficient range-based filtering
869        // Build visibility bitmap based on position deletes
870        let mut visibility = vec![true; batch_num_rows];
871
872        if !position_delete_set.is_empty() {
873            let batch_end_pos = batch_start_pos + batch_num_rows as i64;
874
875            tracing::debug!(
876                "Applying position deletes to batch {}: range [{}, {}), total delete set size: {}",
877                batch_index,
878                batch_start_pos,
879                batch_end_pos,
880                position_delete_set.len()
881            );
882
883            let mut position_deleted_count = 0;
884
885            // Use BTreeSet's range query for efficient filtering
886            // Only check positions that fall within this batch's range
887            for &deleted_pos in position_delete_set.range(batch_start_pos..batch_end_pos) {
888                let local_idx = (deleted_pos - batch_start_pos) as usize;
889                if local_idx < batch_num_rows {
890                    visibility[local_idx] = false;
891                    position_deleted_count += 1;
892                }
893            }
894
895            tracing::debug!(
896                "Position delete results for batch {}: deleted {} rows",
897                batch_index,
898                position_deleted_count
899            );
900        }
901
902        // Apply equality deletes
903        if let Some(ref equality_delete_entries) = equality_deletes {
904            let (columns, _) = chunk.into_parts();
905
906            let delete_entries_info: Vec<(Vec<usize>, HashSet<Vec<DatumRef<'_>>>)> =
907                equality_delete_entries
908                    .iter()
909                    .map(|(delete_name_vec, delete_key_set)| {
910                        let indices = delete_name_vec
911                            .iter()
912                            .map(|field_name| {
913                                record_batch
914                                    .schema()
915                                    .column_with_name(field_name)
916                                    .map(|(idx, _)| idx)
917                                    .unwrap()
918                            })
919                            .collect_vec();
920                        let delete_key_ref_set: HashSet<Vec<DatumRef<'_>>> = delete_key_set
921                            .iter()
922                            .map(|datum_vec| {
923                                datum_vec.iter().map(|datum| datum.to_datum_ref()).collect()
924                            })
925                            .collect();
926                        (indices, delete_key_ref_set)
927                    })
928                    .collect::<Vec<_>>();
929
930            let mut deleted_count = 0;
931
932            // Check each row against the delete sets built per equality delete task
933            for (row_idx, item) in visibility.iter_mut().enumerate().take(batch_num_rows) {
934                if !*item {
935                    continue;
936                }
937
938                for (field_indices, delete_key_set) in &delete_entries_info {
939                    let mut row_key = Vec::with_capacity(field_indices.len());
940                    for &col_idx in field_indices {
941                        let datum = columns[col_idx].value_at(row_idx);
942                        row_key.push(datum);
943                    }
944
945                    if delete_key_set.contains(&row_key) {
946                        *item = false;
947                        deleted_count += 1;
948                        break;
949                    }
950                }
951            }
952
953            tracing::debug!(
954                "Equality delete results for batch {}: deleted {} rows",
955                batch_index,
956                deleted_count
957            );
958
959            let columns: Vec<_> = columns.into_iter().collect();
960            chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
961        } else {
962            // Only position deletes to apply
963            let (columns, _) = chunk.into_parts();
964            let columns: Vec<_> = columns.into_iter().collect();
965            chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
966        }
967
968        // Step 4: Add metadata columns if requested
969        if need_seq_num {
970            let (mut columns, visibility) = chunk.into_parts();
971            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
972                vec![data_sequence_number; visibility.len()],
973            ))));
974            chunk = DataChunk::from_parts(columns.into(), visibility);
975        }
976
977        if need_file_path_and_pos {
978            let (mut columns, visibility) = chunk.into_parts();
979            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
980                vec![data_file_path.as_str(); visibility.len()],
981            ))));
982
983            // Generate position values for each row in the batch
984            let positions: Vec<i64> =
985                (batch_start_pos..(batch_start_pos + visibility.len() as i64)).collect();
986            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
987
988            chunk = DataChunk::from_parts(columns.into(), visibility);
989        }
990
991        *read_bytes += chunk.estimated_heap_size() as u64;
992        yield chunk;
993    }
994}
995
996#[derive(Debug)]
997pub struct IcebergFileReader {}
998
999#[async_trait]
1000impl SplitReader for IcebergFileReader {
1001    type Properties = IcebergProperties;
1002    type Split = IcebergSplit;
1003
1004    async fn new(
1005        _props: IcebergProperties,
1006        _splits: Vec<IcebergSplit>,
1007        _parser_config: ParserConfig,
1008        _source_ctx: SourceContextRef,
1009        _columns: Option<Vec<Column>>,
1010    ) -> ConnectorResult<Self> {
1011        unimplemented!()
1012    }
1013
1014    fn into_stream(self) -> BoxSourceChunkStream {
1015        unimplemented!()
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use std::sync::Arc;
1022
1023    use iceberg::scan::FileScanTask;
1024    use iceberg::spec::{DataContentType, Schema};
1025
1026    use super::*;
1027
1028    fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
1029        FileScanTask {
1030            length,
1031            start: 0,
1032            record_count: Some(0),
1033            data_file_path: format!("test_{}.parquet", id),
1034            data_file_content: DataContentType::Data,
1035            data_file_format: iceberg::spec::DataFileFormat::Parquet,
1036            schema: Arc::new(Schema::builder().build().unwrap()),
1037            project_field_ids: vec![],
1038            predicate: None,
1039            deletes: vec![],
1040            sequence_number: 0,
1041            equality_ids: vec![],
1042            file_size_in_bytes: 0,
1043        }
1044    }
1045
1046    #[test]
1047    fn test_split_n_vecs_basic() {
1048        let file_scan_tasks = (1..=12)
1049            .map(|i| create_file_scan_task(i + 100, i))
1050            .collect::<Vec<_>>(); // Ensure the correct function is called
1051
1052        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1053
1054        assert_eq!(groups.len(), 3);
1055
1056        let group_lengths: Vec<u64> = groups
1057            .iter()
1058            .map(|group| group.iter().map(|task| task.length).sum())
1059            .collect();
1060
1061        let max_length = *group_lengths.iter().max().unwrap();
1062        let min_length = *group_lengths.iter().min().unwrap();
1063        assert!(max_length - min_length <= 10, "Groups should be balanced");
1064
1065        let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
1066        assert_eq!(total_tasks, 12);
1067    }
1068
1069    #[test]
1070    fn test_split_n_vecs_empty() {
1071        let file_scan_tasks = Vec::new();
1072        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1073        assert_eq!(groups.len(), 3);
1074        assert!(groups.iter().all(|group| group.is_empty()));
1075    }
1076
1077    #[test]
1078    fn test_split_n_vecs_single_task() {
1079        let file_scan_tasks = vec![create_file_scan_task(100, 1)];
1080        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1081        assert_eq!(groups.len(), 3);
1082        assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
1083    }
1084
1085    #[test]
1086    fn test_split_n_vecs_uneven_distribution() {
1087        let file_scan_tasks = vec![
1088            create_file_scan_task(1000, 1),
1089            create_file_scan_task(100, 2),
1090            create_file_scan_task(100, 3),
1091            create_file_scan_task(100, 4),
1092            create_file_scan_task(100, 5),
1093        ];
1094
1095        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
1096        assert_eq!(groups.len(), 2);
1097
1098        let group_with_large_task = groups
1099            .iter()
1100            .find(|group| group.iter().any(|task| task.length == 1000))
1101            .unwrap();
1102        assert_eq!(group_with_large_task.len(), 1);
1103    }
1104
1105    #[test]
1106    fn test_split_n_vecs_same_files_distribution() {
1107        let file_scan_tasks = vec![
1108            create_file_scan_task(100, 1),
1109            create_file_scan_task(100, 2),
1110            create_file_scan_task(100, 3),
1111            create_file_scan_task(100, 4),
1112            create_file_scan_task(100, 5),
1113            create_file_scan_task(100, 6),
1114            create_file_scan_task(100, 7),
1115            create_file_scan_task(100, 8),
1116        ];
1117
1118        let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1119            .iter()
1120            .map(|g| {
1121                g.iter()
1122                    .map(|task| task.data_file_path.clone())
1123                    .collect::<Vec<_>>()
1124            })
1125            .collect::<Vec<_>>();
1126
1127        for _ in 0..10000 {
1128            let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1129                .iter()
1130                .map(|g| {
1131                    g.iter()
1132                        .map(|task| task.data_file_path.clone())
1133                        .collect::<Vec<_>>()
1134                })
1135                .collect::<Vec<_>>();
1136
1137            assert_eq!(groups, groups_2);
1138        }
1139    }
1140}