1pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::{BoundPredicate, Predicate as IcebergPredicate};
27use iceberg::scan::FileScanTask;
28use iceberg::spec::FormatVersion;
29use iceberg::table::Table;
30pub use parquet_file_handler::*;
31use phf::{Set, phf_set};
32use risingwave_common::array::arrow::IcebergArrowConvert;
33use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
34use risingwave_common::bail;
35use risingwave_common::types::JsonbVal;
36use risingwave_common_estimate_size::EstimateSize;
37use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
38use serde::{Deserialize, Serialize};
39
40pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
41use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
42use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
43use crate::error::{ConnectorError, ConnectorResult};
44use crate::parser::ParserConfig;
45use crate::source::{
46 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
47 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
48};
49pub const ICEBERG_CONNECTOR: &str = "iceberg";
50
51#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
52pub struct IcebergProperties {
53 #[serde(flatten)]
54 pub common: IcebergCommon,
55
56 #[serde(flatten)]
57 pub table: IcebergTableIdentifier,
58
59 #[serde(rename = "catalog.jdbc.user")]
61 pub jdbc_user: Option<String>,
62 #[serde(rename = "catalog.jdbc.password")]
63 pub jdbc_password: Option<String>,
64
65 #[serde(flatten)]
66 pub unknown_fields: HashMap<String, String>,
67}
68
69impl EnforceSecret for IcebergProperties {
70 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
71 "catalog.jdbc.password",
72 };
73
74 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
75 for prop in prop_iter {
76 IcebergCommon::enforce_one(prop)?;
77 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
78 return Err(EnforceSecretError {
79 key: prop.to_owned(),
80 }
81 .into());
82 }
83 }
84 Ok(())
85 }
86}
87
88impl IcebergProperties {
89 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
90 let mut java_catalog_props = HashMap::new();
91 if let Some(jdbc_user) = self.jdbc_user.clone() {
92 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
93 }
94 if let Some(jdbc_password) = self.jdbc_password.clone() {
95 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
96 }
97 self.common.create_catalog(&java_catalog_props).await
99 }
100
101 pub async fn load_table(&self) -> ConnectorResult<Table> {
102 let mut java_catalog_props = HashMap::new();
103 if let Some(jdbc_user) = self.jdbc_user.clone() {
104 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
105 }
106 if let Some(jdbc_password) = self.jdbc_password.clone() {
107 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
108 }
109 self.common
111 .load_table(&self.table, &java_catalog_props)
112 .await
113 }
114}
115
116impl SourceProperties for IcebergProperties {
117 type Split = IcebergSplit;
118 type SplitEnumerator = IcebergSplitEnumerator;
119 type SplitReader = IcebergFileReader;
120
121 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
122}
123
124impl UnknownFields for IcebergProperties {
125 fn unknown_fields(&self) -> HashMap<String, String> {
126 self.unknown_fields.clone()
127 }
128}
129
130#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
131pub struct IcebergFileScanTaskJsonStr(String);
132
133impl IcebergFileScanTaskJsonStr {
134 pub fn deserialize(&self) -> FileScanTask {
135 serde_json::from_str(&self.0).unwrap()
136 }
137
138 pub fn serialize(task: &FileScanTask) -> Self {
139 Self(serde_json::to_string(task).unwrap())
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub enum IcebergFileScanTask {
145 Data(Vec<FileScanTask>),
146 EqualityDelete(Vec<FileScanTask>),
147 PositionDelete(Vec<FileScanTask>),
148}
149
150impl IcebergFileScanTask {
151 pub fn tasks(&self) -> &[FileScanTask] {
152 match self {
153 IcebergFileScanTask::Data(file_scan_tasks)
154 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
155 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks,
156 }
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.tasks().is_empty()
161 }
162
163 pub fn files(&self) -> Vec<String> {
164 self.tasks()
165 .iter()
166 .map(|task| task.data_file_path.clone())
167 .collect()
168 }
169
170 pub fn predicate(&self) -> Option<&BoundPredicate> {
171 let first_task = self.tasks().first()?;
172 first_task.predicate.as_ref()
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
177pub struct IcebergSplit {
178 pub split_id: i64,
179 pub task: IcebergFileScanTask,
180}
181
182impl IcebergSplit {
183 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
184 let task = match iceberg_scan_type {
185 IcebergScanType::DataScan => IcebergFileScanTask::Data(vec![]),
186 IcebergScanType::EqualityDeleteScan => IcebergFileScanTask::EqualityDelete(vec![]),
187 IcebergScanType::PositionDeleteScan => IcebergFileScanTask::PositionDelete(vec![]),
188 _ => unimplemented!(),
189 };
190 Self { split_id: 0, task }
191 }
192}
193
194impl SplitMetaData for IcebergSplit {
195 fn id(&self) -> SplitId {
196 self.split_id.to_string().into()
197 }
198
199 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
200 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
201 }
202
203 fn encode_to_json(&self) -> JsonbVal {
204 serde_json::to_value(self.clone()).unwrap().into()
205 }
206
207 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
208 unimplemented!()
209 }
210}
211
212#[derive(Debug, Clone)]
213pub struct IcebergSplitEnumerator {
214 config: IcebergProperties,
215}
216
217#[derive(Debug, Clone)]
218pub struct IcebergDeleteParameters {
219 pub equality_delete_columns: Vec<String>,
220 pub has_position_delete: bool,
221 pub snapshot_id: Option<i64>,
222}
223
224#[async_trait]
225impl SplitEnumerator for IcebergSplitEnumerator {
226 type Properties = IcebergProperties;
227 type Split = IcebergSplit;
228
229 async fn new(
230 properties: Self::Properties,
231 context: SourceEnumeratorContextRef,
232 ) -> ConnectorResult<Self> {
233 Ok(Self::new_inner(properties, context))
234 }
235
236 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
237 Ok(vec![])
241 }
242}
243impl IcebergSplitEnumerator {
244 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
245 Self { config: properties }
246 }
247}
248
249#[derive(Debug, Clone, PartialEq, Eq, Hash)]
250pub enum IcebergTimeTravelInfo {
251 Version(i64),
252 TimestampMs(i64),
253}
254
255#[derive(Debug, Clone)]
256pub struct IcebergListResult {
257 pub data_files: Vec<FileScanTask>,
258 pub equality_delete_files: Vec<FileScanTask>,
259 pub position_delete_files: Vec<FileScanTask>,
260 pub equality_delete_columns: Vec<String>,
261 pub format_version: FormatVersion,
262}
263
264impl IcebergSplitEnumerator {
265 pub fn get_snapshot_id(
266 table: &Table,
267 time_travel_info: Option<IcebergTimeTravelInfo>,
268 ) -> ConnectorResult<Option<i64>> {
269 let current_snapshot = table.metadata().current_snapshot();
270 let Some(current_snapshot) = current_snapshot else {
271 return Ok(None);
272 };
273
274 let snapshot_id = match time_travel_info {
275 Some(IcebergTimeTravelInfo::Version(version)) => {
276 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
277 bail!("Cannot find the snapshot id in the iceberg table.");
278 };
279 snapshot.snapshot_id()
280 }
281 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
282 let snapshot = table
283 .metadata()
284 .snapshots()
285 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
286 .max_by_key(|snapshot| snapshot.timestamp_ms());
287 match snapshot {
288 Some(snapshot) => snapshot.snapshot_id(),
289 None => {
290 let time = chrono::DateTime::from_timestamp_millis(timestamp);
292 if let Some(time) = time {
293 tracing::warn!("Cannot find a snapshot older than {}", time);
294 } else {
295 tracing::warn!("Cannot find a snapshot");
296 }
297 return Ok(None);
298 }
299 }
300 }
301 None => current_snapshot.snapshot_id(),
302 };
303 Ok(Some(snapshot_id))
304 }
305
306 pub async fn list_scan_tasks(
307 &self,
308 time_travel_info: Option<IcebergTimeTravelInfo>,
309 predicate: IcebergPredicate,
310 ) -> ConnectorResult<Option<IcebergListResult>> {
311 let table = self.config.load_table().await?;
312 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
313
314 let Some(snapshot_id) = snapshot_id else {
315 return Ok(None);
316 };
317 let res = self
318 .list_scan_tasks_inner(&table, snapshot_id, predicate)
319 .await?;
320 Ok(Some(res))
321 }
322
323 async fn list_scan_tasks_inner(
324 &self,
325 table: &Table,
326 snapshot_id: i64,
327 predicate: IcebergPredicate,
328 ) -> ConnectorResult<IcebergListResult> {
329 let format_version = table.metadata().format_version();
330 let table_schema = table.metadata().current_schema();
331 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
332
333 let mut position_delete_files = vec![];
334 let mut position_delete_files_set = HashSet::new();
335 let mut data_files = vec![];
336 let mut equality_delete_files = vec![];
337 let mut equality_delete_files_set = HashSet::new();
338 let mut equality_delete_ids = None;
339 let mut scan_builder = table.scan().snapshot_id(snapshot_id).select_all();
340 if predicate != IcebergPredicate::AlwaysTrue {
341 scan_builder = scan_builder.with_filter(predicate.clone());
342 }
343 let scan = scan_builder.build()?;
344 let file_scan_stream = scan.plan_files().await?;
345
346 #[for_await]
347 for task in file_scan_stream {
348 let task: FileScanTask = task?;
349
350 for delete_file in &task.deletes {
352 let delete_file = delete_file.as_ref().clone();
353 match delete_file.data_file_content {
354 iceberg::spec::DataContentType::Data => {
355 bail!("Data file should not in task deletes");
356 }
357 iceberg::spec::DataContentType::EqualityDeletes => {
358 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
359 if equality_delete_ids.is_none() {
360 equality_delete_ids = delete_file.equality_ids.clone();
361 } else if equality_delete_ids != delete_file.equality_ids {
362 bail!(
363 "The schema of iceberg equality delete file must be consistent"
364 );
365 }
366 equality_delete_files.push(delete_file);
367 }
368 }
369 iceberg::spec::DataContentType::PositionDeletes => {
370 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
371 position_delete_files.push(delete_file);
372 }
373 }
374 }
375 }
376
377 match task.data_file_content {
378 iceberg::spec::DataContentType::Data => {
379 data_files.push(task);
381 }
382 iceberg::spec::DataContentType::EqualityDeletes => {
383 bail!("Equality delete files should not be in the data files");
384 }
385 iceberg::spec::DataContentType::PositionDeletes => {
386 bail!("Position delete files should not be in the data files");
387 }
388 }
389 }
390 let schema = scan
391 .snapshot()
392 .context("snapshot not found")?
393 .schema(table.metadata())?;
394 let equality_delete_columns = equality_delete_ids
395 .unwrap_or_default()
396 .into_iter()
397 .map(|id| match schema.name_by_field_id(id) {
398 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
399 None => bail!("Delete field id {} not found in schema", id),
400 })
401 .collect::<ConnectorResult<Vec<_>>>()?;
402
403 Ok(IcebergListResult {
404 data_files,
405 equality_delete_files,
406 position_delete_files,
407 equality_delete_columns,
408 format_version,
409 })
410 }
411
412 pub fn split_n_vecs(
425 file_scan_tasks: Vec<FileScanTask>,
426 split_num: usize,
427 ) -> Vec<Vec<FileScanTask>> {
428 use std::cmp::{Ordering, Reverse};
429
430 #[derive(Default)]
431 struct FileScanTaskGroup {
432 idx: usize,
433 tasks: Vec<FileScanTask>,
434 total_length: u64,
435 }
436
437 impl Ord for FileScanTaskGroup {
438 fn cmp(&self, other: &Self) -> Ordering {
439 if self.total_length == other.total_length {
441 self.idx.cmp(&other.idx)
442 } else {
443 self.total_length.cmp(&other.total_length)
444 }
445 }
446 }
447
448 impl PartialOrd for FileScanTaskGroup {
449 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
450 Some(self.cmp(other))
451 }
452 }
453
454 impl Eq for FileScanTaskGroup {}
455
456 impl PartialEq for FileScanTaskGroup {
457 fn eq(&self, other: &Self) -> bool {
458 self.total_length == other.total_length
459 }
460 }
461
462 let mut heap = BinaryHeap::new();
463 for idx in 0..split_num {
465 heap.push(Reverse(FileScanTaskGroup {
466 idx,
467 tasks: vec![],
468 total_length: 0,
469 }));
470 }
471
472 for file_task in file_scan_tasks {
473 let mut group = heap.peek_mut().unwrap();
474 group.0.total_length += file_task.length;
475 group.0.tasks.push(file_task);
476 }
477
478 heap.into_vec()
480 .into_iter()
481 .map(|reverse_group| reverse_group.0.tasks)
482 .collect()
483 }
484}
485
486pub struct IcebergScanOpts {
487 pub chunk_size: usize,
488 pub need_seq_num: bool,
489 pub need_file_path_and_pos: bool,
490 pub handle_delete_files: bool,
491}
492
493#[try_stream(ok = DataChunk, error = ConnectorError)]
495pub async fn scan_task_to_chunk_with_deletes(
496 table: Table,
497 mut data_file_scan_task: FileScanTask,
498 IcebergScanOpts {
499 chunk_size,
500 need_seq_num,
501 need_file_path_and_pos,
502 handle_delete_files,
503 }: IcebergScanOpts,
504 metrics: Option<Arc<IcebergScanMetrics>>,
505) {
506 let table_name = table.identifier().name().to_owned();
507
508 let mut read_bytes = scopeguard::guard(0, |read_bytes| {
509 if let Some(metrics) = metrics.clone() {
510 metrics
511 .iceberg_read_bytes
512 .with_guarded_label_values(&[&table_name])
513 .inc_by(read_bytes as _);
514 }
515 });
516
517 let data_file_path = data_file_scan_task.data_file_path.clone();
518 let data_sequence_number = data_file_scan_task.sequence_number;
519
520 tracing::debug!(
521 "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
522 data_file_path,
523 handle_delete_files,
524 data_file_scan_task.deletes.len()
525 );
526
527 if !handle_delete_files {
528 data_file_scan_task.deletes.clear();
530 }
531
532 let reader = table
534 .reader_builder()
535 .with_batch_size(chunk_size)
536 .with_row_group_filtering_enabled(true)
537 .with_row_selection_enabled(true)
538 .build();
539 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
540
541 let record_batch_stream: iceberg::scan::ArrowRecordBatchStream =
542 reader.read(Box::pin(file_scan_stream))?;
543 let mut record_batch_stream = record_batch_stream.enumerate();
544
545 while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
547 let record_batch = record_batch?;
548 let batch_start_pos = (batch_index * chunk_size) as i64;
549
550 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
551 let row_count = chunk.capacity();
552
553 if need_seq_num {
555 let (mut columns, visibility) = chunk.into_parts();
556 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
557 std::iter::repeat_n(data_sequence_number, row_count),
558 ))));
559 chunk = DataChunk::from_parts(columns.into(), visibility);
560 }
561
562 if need_file_path_and_pos {
563 let (mut columns, visibility) = chunk.into_parts();
564 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
565 std::iter::repeat_n(data_file_path.as_str(), row_count),
566 ))));
567
568 let positions: Vec<i64> =
570 (batch_start_pos..(batch_start_pos + row_count as i64)).collect();
571 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
572
573 chunk = DataChunk::from_parts(columns.into(), visibility);
574 }
575
576 *read_bytes += chunk.estimated_heap_size() as u64;
577 yield chunk;
578 }
579}
580
581#[derive(Debug)]
582pub struct IcebergFileReader {}
583
584#[async_trait]
585impl SplitReader for IcebergFileReader {
586 type Properties = IcebergProperties;
587 type Split = IcebergSplit;
588
589 async fn new(
590 _props: IcebergProperties,
591 _splits: Vec<IcebergSplit>,
592 _parser_config: ParserConfig,
593 _source_ctx: SourceContextRef,
594 _columns: Option<Vec<Column>>,
595 ) -> ConnectorResult<Self> {
596 unimplemented!()
597 }
598
599 fn into_stream(self) -> BoxSourceChunkStream {
600 unimplemented!()
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use std::sync::Arc;
607
608 use iceberg::scan::FileScanTask;
609 use iceberg::spec::{DataContentType, Schema};
610
611 use super::*;
612
613 fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
614 FileScanTask {
615 length,
616 start: 0,
617 record_count: Some(0),
618 data_file_path: format!("test_{}.parquet", id),
619 data_file_content: DataContentType::Data,
620 data_file_format: iceberg::spec::DataFileFormat::Parquet,
621 schema: Arc::new(Schema::builder().build().unwrap()),
622 project_field_ids: vec![],
623 predicate: None,
624 deletes: vec![],
625 sequence_number: 0,
626 equality_ids: None,
627 file_size_in_bytes: 0,
628 partition: None,
629 partition_spec: None,
630 name_mapping: None,
631 }
632 }
633
634 #[test]
635 fn test_split_n_vecs_basic() {
636 let file_scan_tasks = (1..=12)
637 .map(|i| create_file_scan_task(i + 100, i))
638 .collect::<Vec<_>>(); let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
641
642 assert_eq!(groups.len(), 3);
643
644 let group_lengths: Vec<u64> = groups
645 .iter()
646 .map(|group| group.iter().map(|task| task.length).sum())
647 .collect();
648
649 let max_length = *group_lengths.iter().max().unwrap();
650 let min_length = *group_lengths.iter().min().unwrap();
651 assert!(max_length - min_length <= 10, "Groups should be balanced");
652
653 let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
654 assert_eq!(total_tasks, 12);
655 }
656
657 #[test]
658 fn test_split_n_vecs_empty() {
659 let file_scan_tasks = Vec::new();
660 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
661 assert_eq!(groups.len(), 3);
662 assert!(groups.iter().all(|group| group.is_empty()));
663 }
664
665 #[test]
666 fn test_split_n_vecs_single_task() {
667 let file_scan_tasks = vec![create_file_scan_task(100, 1)];
668 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
669 assert_eq!(groups.len(), 3);
670 assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
671 }
672
673 #[test]
674 fn test_split_n_vecs_uneven_distribution() {
675 let file_scan_tasks = vec![
676 create_file_scan_task(1000, 1),
677 create_file_scan_task(100, 2),
678 create_file_scan_task(100, 3),
679 create_file_scan_task(100, 4),
680 create_file_scan_task(100, 5),
681 ];
682
683 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
684 assert_eq!(groups.len(), 2);
685
686 let group_with_large_task = groups
687 .iter()
688 .find(|group| group.iter().any(|task| task.length == 1000))
689 .unwrap();
690 assert_eq!(group_with_large_task.len(), 1);
691 }
692
693 #[test]
694 fn test_split_n_vecs_same_files_distribution() {
695 let file_scan_tasks = vec![
696 create_file_scan_task(100, 1),
697 create_file_scan_task(100, 2),
698 create_file_scan_task(100, 3),
699 create_file_scan_task(100, 4),
700 create_file_scan_task(100, 5),
701 create_file_scan_task(100, 6),
702 create_file_scan_task(100, 7),
703 create_file_scan_task(100, 8),
704 ];
705
706 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
707 .iter()
708 .map(|g| {
709 g.iter()
710 .map(|task| task.data_file_path.clone())
711 .collect::<Vec<_>>()
712 })
713 .collect::<Vec<_>>();
714
715 for _ in 0..10000 {
716 let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
717 .iter()
718 .map(|g| {
719 g.iter()
720 .map(|task| task.data_file_path.clone())
721 .collect::<Vec<_>>()
722 })
723 .collect::<Vec<_>>();
724
725 assert_eq!(groups, groups_2);
726 }
727 }
728}