risingwave_connector/source/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::{DataContentType, ManifestList};
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use risingwave_common::array::arrow::IcebergArrowConvert;
33use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
34use risingwave_common::bail;
35use risingwave_common::catalog::{
36    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
37    Schema,
38};
39use risingwave_common::types::JsonbVal;
40use risingwave_common::util::iter_util::ZipEqFast;
41use risingwave_common_estimate_size::EstimateSize;
42use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
43use serde::{Deserialize, Serialize};
44
45pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
46use crate::connector_common::IcebergCommon;
47use crate::error::{ConnectorError, ConnectorResult};
48use crate::parser::ParserConfig;
49use crate::source::{
50    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
51    SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
52};
53pub const ICEBERG_CONNECTOR: &str = "iceberg";
54
55#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
56pub struct IcebergProperties {
57    #[serde(flatten)]
58    pub common: IcebergCommon,
59
60    // For jdbc catalog
61    #[serde(rename = "catalog.jdbc.user")]
62    pub jdbc_user: Option<String>,
63    #[serde(rename = "catalog.jdbc.password")]
64    pub jdbc_password: Option<String>,
65
66    #[serde(flatten)]
67    pub unknown_fields: HashMap<String, String>,
68}
69
70impl IcebergProperties {
71    pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
72        let mut java_catalog_props = HashMap::new();
73        if let Some(jdbc_user) = self.jdbc_user.clone() {
74            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
75        }
76        if let Some(jdbc_password) = self.jdbc_password.clone() {
77            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
78        }
79        // TODO: support path_style_access and java_catalog_props for iceberg source
80        self.common.create_catalog(&java_catalog_props).await
81    }
82
83    pub async fn load_table(&self) -> ConnectorResult<Table> {
84        let mut java_catalog_props = HashMap::new();
85        if let Some(jdbc_user) = self.jdbc_user.clone() {
86            java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
87        }
88        if let Some(jdbc_password) = self.jdbc_password.clone() {
89            java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
90        }
91        // TODO: support java_catalog_props for iceberg source
92        self.common.load_table(&java_catalog_props).await
93    }
94}
95
96impl SourceProperties for IcebergProperties {
97    type Split = IcebergSplit;
98    type SplitEnumerator = IcebergSplitEnumerator;
99    type SplitReader = IcebergFileReader;
100
101    const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
102}
103
104impl UnknownFields for IcebergProperties {
105    fn unknown_fields(&self) -> HashMap<String, String> {
106        self.unknown_fields.clone()
107    }
108}
109
110#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
111pub struct IcebergFileScanTaskJsonStr(String);
112
113impl IcebergFileScanTaskJsonStr {
114    pub fn deserialize(&self) -> FileScanTask {
115        serde_json::from_str(&self.0).unwrap()
116    }
117
118    pub fn serialize(task: &FileScanTask) -> Self {
119        Self(serde_json::to_string(task).unwrap())
120    }
121}
122
123#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
124pub enum IcebergFileScanTask {
125    Data(Vec<FileScanTask>),
126    EqualityDelete(Vec<FileScanTask>),
127    PositionDelete(Vec<FileScanTask>),
128    CountStar(u64),
129}
130
131impl IcebergFileScanTask {
132    pub fn new_count_star(count_sum: u64) -> Self {
133        IcebergFileScanTask::CountStar(count_sum)
134    }
135
136    pub fn new_scan_with_scan_type(
137        iceberg_scan_type: IcebergScanType,
138        data_files: Vec<FileScanTask>,
139        equality_delete_files: Vec<FileScanTask>,
140        position_delete_files: Vec<FileScanTask>,
141    ) -> Self {
142        match iceberg_scan_type {
143            IcebergScanType::EqualityDeleteScan => {
144                IcebergFileScanTask::EqualityDelete(equality_delete_files)
145            }
146            IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
147            IcebergScanType::PositionDeleteScan => {
148                IcebergFileScanTask::PositionDelete(position_delete_files)
149            }
150            IcebergScanType::Unspecified | IcebergScanType::CountStar => {
151                unreachable!("Unspecified iceberg scan type")
152            }
153        }
154    }
155
156    pub fn is_empty(&self) -> bool {
157        match self {
158            IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
159            IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
160                equality_delete_files.is_empty()
161            }
162            IcebergFileScanTask::PositionDelete(position_delete_files) => {
163                position_delete_files.is_empty()
164            }
165            IcebergFileScanTask::CountStar(_) => false,
166        }
167    }
168
169    pub fn files(&self) -> Vec<String> {
170        match self {
171            IcebergFileScanTask::Data(file_scan_tasks)
172            | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
173            | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
174                .iter()
175                .map(|task| task.data_file_path.clone())
176                .collect(),
177            IcebergFileScanTask::CountStar(_) => vec![],
178        }
179    }
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
183pub struct IcebergSplit {
184    pub split_id: i64,
185    // TODO: remove this field. It seems not used.
186    pub snapshot_id: i64,
187    pub task: IcebergFileScanTask,
188}
189
190impl IcebergSplit {
191    pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
192        if let IcebergScanType::CountStar = iceberg_scan_type {
193            Self {
194                split_id: 0,
195                snapshot_id: 0,
196                task: IcebergFileScanTask::new_count_star(0),
197            }
198        } else {
199            Self {
200                split_id: 0,
201                snapshot_id: 0,
202                task: IcebergFileScanTask::new_scan_with_scan_type(
203                    iceberg_scan_type,
204                    vec![],
205                    vec![],
206                    vec![],
207                ),
208            }
209        }
210    }
211}
212
213impl SplitMetaData for IcebergSplit {
214    fn id(&self) -> SplitId {
215        self.split_id.to_string().into()
216    }
217
218    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
219        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
220    }
221
222    fn encode_to_json(&self) -> JsonbVal {
223        serde_json::to_value(self.clone()).unwrap().into()
224    }
225
226    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
227        unimplemented!()
228    }
229}
230
231#[derive(Debug, Clone)]
232pub struct IcebergSplitEnumerator {
233    config: IcebergProperties,
234}
235
236#[async_trait]
237impl SplitEnumerator for IcebergSplitEnumerator {
238    type Properties = IcebergProperties;
239    type Split = IcebergSplit;
240
241    async fn new(
242        properties: Self::Properties,
243        context: SourceEnumeratorContextRef,
244    ) -> ConnectorResult<Self> {
245        Ok(Self::new_inner(properties, context))
246    }
247
248    async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
249        // Like file source, iceberg streaming source has a List Executor and a Fetch Executor,
250        // instead of relying on SplitEnumerator on meta.
251        // TODO: add some validation logic here.
252        Ok(vec![])
253    }
254}
255impl IcebergSplitEnumerator {
256    pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
257        Self { config: properties }
258    }
259}
260
261pub enum IcebergTimeTravelInfo {
262    Version(i64),
263    TimestampMs(i64),
264}
265
266impl IcebergSplitEnumerator {
267    pub fn get_snapshot_id(
268        table: &Table,
269        time_travel_info: Option<IcebergTimeTravelInfo>,
270    ) -> ConnectorResult<Option<i64>> {
271        let current_snapshot = table.metadata().current_snapshot();
272        if current_snapshot.is_none() {
273            return Ok(None);
274        }
275
276        let snapshot_id = match time_travel_info {
277            Some(IcebergTimeTravelInfo::Version(version)) => {
278                let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
279                    bail!("Cannot find the snapshot id in the iceberg table.");
280                };
281                snapshot.snapshot_id()
282            }
283            Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
284                let snapshot = table
285                    .metadata()
286                    .snapshots()
287                    .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
288                    .max_by_key(|snapshot| snapshot.timestamp_ms());
289                match snapshot {
290                    Some(snapshot) => snapshot.snapshot_id(),
291                    None => {
292                        // convert unix time to human-readable time
293                        let time = chrono::DateTime::from_timestamp_millis(timestamp);
294                        if time.is_some() {
295                            bail!("Cannot find a snapshot older than {}", time.unwrap());
296                        } else {
297                            bail!("Cannot find a snapshot");
298                        }
299                    }
300                }
301            }
302            None => {
303                assert!(current_snapshot.is_some());
304                current_snapshot.unwrap().snapshot_id()
305            }
306        };
307        Ok(Some(snapshot_id))
308    }
309
310    pub async fn list_splits_batch(
311        &self,
312        schema: Schema,
313        time_traval_info: Option<IcebergTimeTravelInfo>,
314        batch_parallelism: usize,
315        iceberg_scan_type: IcebergScanType,
316        predicate: IcebergPredicate,
317    ) -> ConnectorResult<Vec<IcebergSplit>> {
318        if batch_parallelism == 0 {
319            bail!("Batch parallelism is 0. Cannot split the iceberg files.");
320        }
321        let table = self.config.load_table().await?;
322        let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
323        if snapshot_id.is_none() {
324            // If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
325            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
326        }
327        if let IcebergScanType::CountStar = iceberg_scan_type {
328            self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
329                .await
330        } else {
331            self.list_splits_batch_scan(
332                &table,
333                snapshot_id.unwrap(),
334                schema,
335                batch_parallelism,
336                iceberg_scan_type,
337                predicate,
338            )
339            .await
340        }
341    }
342
343    async fn list_splits_batch_scan(
344        &self,
345        table: &Table,
346        snapshot_id: i64,
347        schema: Schema,
348        batch_parallelism: usize,
349        iceberg_scan_type: IcebergScanType,
350        predicate: IcebergPredicate,
351    ) -> ConnectorResult<Vec<IcebergSplit>> {
352        let schema_names = schema.names();
353        let require_names = schema_names
354            .iter()
355            .filter(|name| {
356                name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
357                    && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
358                    && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
359            })
360            .cloned()
361            .collect_vec();
362
363        let table_schema = table.metadata().current_schema();
364        tracing::debug!("iceberg_table_schema: {:?}", table_schema);
365
366        let mut position_delete_files = vec![];
367        let mut position_delete_files_set = HashSet::new();
368        let mut data_files = vec![];
369        let mut equality_delete_files = vec![];
370        let mut equality_delete_files_set = HashSet::new();
371        let scan = table
372            .scan()
373            .with_filter(predicate)
374            .snapshot_id(snapshot_id)
375            .with_delete_file_processing_enabled(true)
376            .select(require_names)
377            .build()
378            .map_err(|e| anyhow!(e))?;
379
380        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
381
382        #[for_await]
383        for task in file_scan_stream {
384            let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
385            for mut delete_file in task.deletes.drain(..) {
386                match delete_file.data_file_content {
387                    iceberg::spec::DataContentType::Data => {
388                        bail!("Data file should not in task deletes");
389                    }
390                    iceberg::spec::DataContentType::EqualityDeletes => {
391                        if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
392                            equality_delete_files.push(delete_file);
393                        }
394                    }
395                    iceberg::spec::DataContentType::PositionDeletes => {
396                        if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
397                            delete_file.project_field_ids = Vec::default();
398                            position_delete_files.push(delete_file);
399                        }
400                    }
401                }
402            }
403            match task.data_file_content {
404                iceberg::spec::DataContentType::Data => {
405                    data_files.push(task);
406                }
407                iceberg::spec::DataContentType::EqualityDeletes => {
408                    bail!("Equality delete files should not be in the data files");
409                }
410                iceberg::spec::DataContentType::PositionDeletes => {
411                    bail!("Position delete files should not be in the data files");
412                }
413            }
414        }
415        // evenly split the files into splits based on the parallelism.
416        let data_files = Self::split_n_vecs(data_files, batch_parallelism);
417        let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
418        let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
419
420        let splits = data_files
421            .into_iter()
422            .zip_eq_fast(equality_delete_files.into_iter())
423            .zip_eq_fast(position_delete_files.into_iter())
424            .enumerate()
425            .map(
426                |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
427                    split_id: index as i64,
428                    snapshot_id,
429                    task: IcebergFileScanTask::new_scan_with_scan_type(
430                        iceberg_scan_type,
431                        data_file,
432                        equality_delete_file,
433                        position_delete_file,
434                    ),
435                },
436            )
437            .filter(|split| !split.task.is_empty())
438            .collect_vec();
439
440        if splits.is_empty() {
441            return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
442        }
443        Ok(splits)
444    }
445
446    pub async fn list_splits_batch_count_star(
447        &self,
448        table: &Table,
449        snapshot_id: i64,
450    ) -> ConnectorResult<Vec<IcebergSplit>> {
451        let mut record_counts = 0;
452        let manifest_list: ManifestList = table
453            .metadata()
454            .snapshot_by_id(snapshot_id)
455            .unwrap()
456            .load_manifest_list(table.file_io(), table.metadata())
457            .await
458            .map_err(|e| anyhow!(e))?;
459
460        for entry in manifest_list.entries() {
461            let manifest = entry
462                .load_manifest(table.file_io())
463                .await
464                .map_err(|e| anyhow!(e))?;
465            let mut manifest_entries_stream =
466                futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));
467
468            while let Some(manifest_entry) = manifest_entries_stream.next().await {
469                let file = manifest_entry.data_file();
470                assert_eq!(file.content_type(), DataContentType::Data);
471                record_counts += file.record_count();
472            }
473        }
474        let split = IcebergSplit {
475            split_id: 0,
476            snapshot_id,
477            task: IcebergFileScanTask::new_count_star(record_counts),
478        };
479        Ok(vec![split])
480    }
481
482    /// List all files in the snapshot to check if there are deletes.
483    pub async fn all_delete_parameters(
484        table: &Table,
485        snapshot_id: i64,
486    ) -> ConnectorResult<(Vec<String>, bool)> {
487        let scan = table
488            .scan()
489            .snapshot_id(snapshot_id)
490            .with_delete_file_processing_enabled(true)
491            .build()
492            .map_err(|e| anyhow!(e))?;
493        let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
494        let schema = scan.snapshot().schema(table.metadata())?;
495        let mut equality_ids = vec![];
496        let mut have_position_delete = false;
497        #[for_await]
498        for task in file_scan_stream {
499            let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
500            for delete_file in task.deletes {
501                match delete_file.data_file_content {
502                    iceberg::spec::DataContentType::Data => {}
503                    iceberg::spec::DataContentType::EqualityDeletes => {
504                        if equality_ids.is_empty() {
505                            equality_ids = delete_file.equality_ids;
506                        } else if equality_ids != delete_file.equality_ids {
507                            bail!("The schema of iceberg equality delete file must be consistent");
508                        }
509                    }
510                    iceberg::spec::DataContentType::PositionDeletes => {
511                        have_position_delete = true;
512                    }
513                }
514            }
515        }
516        let delete_columns = equality_ids
517            .into_iter()
518            .map(|id| match schema.name_by_field_id(id) {
519                Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
520                None => bail!("Delete field id {} not found in schema", id),
521            })
522            .collect::<ConnectorResult<Vec<_>>>()?;
523
524        Ok((delete_columns, have_position_delete))
525    }
526
527    pub async fn get_delete_parameters(
528        &self,
529        time_travel_info: Option<IcebergTimeTravelInfo>,
530    ) -> ConnectorResult<(Vec<String>, bool)> {
531        let table = self.config.load_table().await?;
532        let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
533        if snapshot_id.is_none() {
534            return Ok((vec![], false));
535        }
536        let snapshot_id = snapshot_id.unwrap();
537        Self::all_delete_parameters(&table, snapshot_id).await
538    }
539
540    fn split_n_vecs(vecs: Vec<FileScanTask>, split_num: usize) -> Vec<Vec<FileScanTask>> {
541        let split_size = vecs.len() / split_num;
542        let remaining = vecs.len() % split_num;
543        let mut result_vecs = (0..split_num)
544            .map(|i| {
545                let start = i * split_size;
546                let end = (i + 1) * split_size;
547                vecs[start..end].to_vec()
548            })
549            .collect_vec();
550        for i in 0..remaining {
551            result_vecs[i].push(vecs[split_num * split_size + i].clone());
552        }
553        result_vecs
554    }
555}
556
557pub struct IcebergScanOpts {
558    pub chunk_size: usize,
559    pub need_seq_num: bool,
560    pub need_file_path_and_pos: bool,
561}
562
563#[try_stream(ok = DataChunk, error = ConnectorError)]
564pub async fn scan_task_to_chunk(
565    table: Table,
566    data_file_scan_task: FileScanTask,
567    IcebergScanOpts {
568        chunk_size,
569        need_seq_num,
570        need_file_path_and_pos,
571    }: IcebergScanOpts,
572    metrics: Option<Arc<IcebergScanMetrics>>,
573) {
574    let table_name = table.identifier().name().to_owned();
575
576    let mut read_bytes = scopeguard::guard(0, |read_bytes| {
577        if let Some(metrics) = metrics {
578            metrics
579                .iceberg_read_bytes
580                .with_guarded_label_values(&[&table_name])
581                .inc_by(read_bytes as _);
582        }
583    });
584
585    let data_file_path = data_file_scan_task.data_file_path.clone();
586    let data_sequence_number = data_file_scan_task.sequence_number;
587
588    let reader = table.reader_builder().with_batch_size(chunk_size).build();
589    let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
590
591    // FIXME: what if the start position is not 0? The logic for index seems not correct.
592    let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
593
594    while let Some((index, record_batch)) = record_batch_stream.next().await {
595        let record_batch = record_batch?;
596
597        let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
598        if need_seq_num {
599            let (mut columns, visibility) = chunk.into_parts();
600            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
601                vec![data_sequence_number; visibility.len()],
602            ))));
603            chunk = DataChunk::from_parts(columns.into(), visibility)
604        };
605        if need_file_path_and_pos {
606            let (mut columns, visibility) = chunk.into_parts();
607            columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
608                vec![data_file_path.as_str(); visibility.len()],
609            ))));
610            let index_start = (index * chunk_size) as i64;
611            columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
612                (index_start..(index_start + visibility.len() as i64)).collect::<Vec<i64>>(),
613            ))));
614            chunk = DataChunk::from_parts(columns.into(), visibility)
615        }
616        *read_bytes += chunk.estimated_heap_size() as u64;
617        yield chunk;
618    }
619}
620
621#[derive(Debug)]
622pub struct IcebergFileReader {}
623
624#[async_trait]
625impl SplitReader for IcebergFileReader {
626    type Properties = IcebergProperties;
627    type Split = IcebergSplit;
628
629    async fn new(
630        _props: IcebergProperties,
631        _splits: Vec<IcebergSplit>,
632        _parser_config: ParserConfig,
633        _source_ctx: SourceContextRef,
634        _columns: Option<Vec<Column>>,
635    ) -> ConnectorResult<Self> {
636        unimplemented!()
637    }
638
639    fn into_stream(self) -> BoxSourceChunkStream {
640        unimplemented!()
641    }
642}