1pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::arrow::arrow_array_iceberg::Array;
35use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
36use risingwave_common::bail;
37use risingwave_common::bitmap::Bitmap;
38use risingwave_common::catalog::{
39 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
40 Schema,
41};
42use risingwave_common::types::{Datum, DatumRef, JsonbVal, ToDatumRef, ToOwnedDatum};
43use risingwave_common::util::iter_util::ZipEqFast;
44use risingwave_common_estimate_size::EstimateSize;
45use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
46use serde::{Deserialize, Serialize};
47
48pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
49use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
50use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
51use crate::error::{ConnectorError, ConnectorResult};
52use crate::parser::ParserConfig;
53use crate::source::{
54 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
55 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
56};
57pub const ICEBERG_CONNECTOR: &str = "iceberg";
58
59#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
60pub struct IcebergProperties {
61 #[serde(flatten)]
62 pub common: IcebergCommon,
63
64 #[serde(flatten)]
65 pub table: IcebergTableIdentifier,
66
67 #[serde(rename = "catalog.jdbc.user")]
69 pub jdbc_user: Option<String>,
70 #[serde(rename = "catalog.jdbc.password")]
71 pub jdbc_password: Option<String>,
72
73 #[serde(flatten)]
74 pub unknown_fields: HashMap<String, String>,
75}
76
77impl EnforceSecret for IcebergProperties {
78 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
79 "catalog.jdbc.password",
80 };
81
82 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
83 for prop in prop_iter {
84 IcebergCommon::enforce_one(prop)?;
85 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
86 return Err(EnforceSecretError {
87 key: prop.to_owned(),
88 }
89 .into());
90 }
91 }
92 Ok(())
93 }
94}
95
96impl IcebergProperties {
97 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
98 let mut java_catalog_props = HashMap::new();
99 if let Some(jdbc_user) = self.jdbc_user.clone() {
100 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
101 }
102 if let Some(jdbc_password) = self.jdbc_password.clone() {
103 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
104 }
105 self.common.create_catalog(&java_catalog_props).await
107 }
108
109 pub async fn load_table(&self) -> ConnectorResult<Table> {
110 let mut java_catalog_props = HashMap::new();
111 if let Some(jdbc_user) = self.jdbc_user.clone() {
112 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
113 }
114 if let Some(jdbc_password) = self.jdbc_password.clone() {
115 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
116 }
117 self.common
119 .load_table(&self.table, &java_catalog_props)
120 .await
121 }
122}
123
124impl SourceProperties for IcebergProperties {
125 type Split = IcebergSplit;
126 type SplitEnumerator = IcebergSplitEnumerator;
127 type SplitReader = IcebergFileReader;
128
129 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
130}
131
132impl UnknownFields for IcebergProperties {
133 fn unknown_fields(&self) -> HashMap<String, String> {
134 self.unknown_fields.clone()
135 }
136}
137
138#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
139pub struct IcebergFileScanTaskJsonStr(String);
140
141impl IcebergFileScanTaskJsonStr {
142 pub fn deserialize(&self) -> FileScanTask {
143 serde_json::from_str(&self.0).unwrap()
144 }
145
146 pub fn serialize(task: &FileScanTask) -> Self {
147 Self(serde_json::to_string(task).unwrap())
148 }
149}
150
151#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
152pub enum IcebergFileScanTask {
153 Data(Vec<FileScanTask>),
154 EqualityDelete(Vec<FileScanTask>),
155 PositionDelete(Vec<FileScanTask>),
156 CountStar(u64),
157}
158
159impl IcebergFileScanTask {
160 pub fn new_count_star(count_sum: u64) -> Self {
161 IcebergFileScanTask::CountStar(count_sum)
162 }
163
164 pub fn new_scan_with_scan_type(
165 iceberg_scan_type: IcebergScanType,
166 data_files: Vec<FileScanTask>,
167 equality_delete_files: Vec<FileScanTask>,
168 position_delete_files: Vec<FileScanTask>,
169 ) -> Self {
170 match iceberg_scan_type {
171 IcebergScanType::EqualityDeleteScan => {
172 IcebergFileScanTask::EqualityDelete(equality_delete_files)
173 }
174 IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
175 IcebergScanType::PositionDeleteScan => {
176 IcebergFileScanTask::PositionDelete(position_delete_files)
177 }
178 IcebergScanType::Unspecified | IcebergScanType::CountStar => {
179 unreachable!("Unspecified iceberg scan type")
180 }
181 }
182 }
183
184 pub fn is_empty(&self) -> bool {
185 match self {
186 IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
187 IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
188 equality_delete_files.is_empty()
189 }
190 IcebergFileScanTask::PositionDelete(position_delete_files) => {
191 position_delete_files.is_empty()
192 }
193 IcebergFileScanTask::CountStar(_) => false,
194 }
195 }
196
197 pub fn files(&self) -> Vec<String> {
198 match self {
199 IcebergFileScanTask::Data(file_scan_tasks)
200 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
201 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
202 .iter()
203 .map(|task| task.data_file_path.clone())
204 .collect(),
205 IcebergFileScanTask::CountStar(_) => vec![],
206 }
207 }
208}
209
210#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
211pub struct IcebergSplit {
212 pub split_id: i64,
213 pub snapshot_id: i64,
215 pub task: IcebergFileScanTask,
216}
217
218impl IcebergSplit {
219 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
220 if let IcebergScanType::CountStar = iceberg_scan_type {
221 Self {
222 split_id: 0,
223 snapshot_id: 0,
224 task: IcebergFileScanTask::new_count_star(0),
225 }
226 } else {
227 Self {
228 split_id: 0,
229 snapshot_id: 0,
230 task: IcebergFileScanTask::new_scan_with_scan_type(
231 iceberg_scan_type,
232 vec![],
233 vec![],
234 vec![],
235 ),
236 }
237 }
238 }
239}
240
241impl SplitMetaData for IcebergSplit {
242 fn id(&self) -> SplitId {
243 self.split_id.to_string().into()
244 }
245
246 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
247 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
248 }
249
250 fn encode_to_json(&self) -> JsonbVal {
251 serde_json::to_value(self.clone()).unwrap().into()
252 }
253
254 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
255 unimplemented!()
256 }
257}
258
259#[derive(Debug, Clone)]
260pub struct IcebergSplitEnumerator {
261 config: IcebergProperties,
262}
263
264#[async_trait]
265impl SplitEnumerator for IcebergSplitEnumerator {
266 type Properties = IcebergProperties;
267 type Split = IcebergSplit;
268
269 async fn new(
270 properties: Self::Properties,
271 context: SourceEnumeratorContextRef,
272 ) -> ConnectorResult<Self> {
273 Ok(Self::new_inner(properties, context))
274 }
275
276 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
277 Ok(vec![])
281 }
282}
283impl IcebergSplitEnumerator {
284 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
285 Self { config: properties }
286 }
287}
288
289pub enum IcebergTimeTravelInfo {
290 Version(i64),
291 TimestampMs(i64),
292}
293
294impl IcebergSplitEnumerator {
295 pub fn get_snapshot_id(
296 table: &Table,
297 time_travel_info: Option<IcebergTimeTravelInfo>,
298 ) -> ConnectorResult<Option<i64>> {
299 let current_snapshot = table.metadata().current_snapshot();
300 if current_snapshot.is_none() {
301 return Ok(None);
302 }
303
304 let snapshot_id = match time_travel_info {
305 Some(IcebergTimeTravelInfo::Version(version)) => {
306 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
307 bail!("Cannot find the snapshot id in the iceberg table.");
308 };
309 snapshot.snapshot_id()
310 }
311 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
312 let snapshot = table
313 .metadata()
314 .snapshots()
315 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
316 .max_by_key(|snapshot| snapshot.timestamp_ms());
317 match snapshot {
318 Some(snapshot) => snapshot.snapshot_id(),
319 None => {
320 let time = chrono::DateTime::from_timestamp_millis(timestamp);
322 if let Some(time) = time {
323 bail!("Cannot find a snapshot older than {}", time);
324 } else {
325 bail!("Cannot find a snapshot");
326 }
327 }
328 }
329 }
330 None => {
331 assert!(current_snapshot.is_some());
332 current_snapshot.unwrap().snapshot_id()
333 }
334 };
335 Ok(Some(snapshot_id))
336 }
337
338 pub async fn list_splits_batch(
339 &self,
340 schema: Schema,
341 time_traval_info: Option<IcebergTimeTravelInfo>,
342 batch_parallelism: usize,
343 iceberg_scan_type: IcebergScanType,
344 predicate: IcebergPredicate,
345 ) -> ConnectorResult<Vec<IcebergSplit>> {
346 if batch_parallelism == 0 {
347 bail!("Batch parallelism is 0. Cannot split the iceberg files.");
348 }
349 let table = self.config.load_table().await?;
350 let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
351 if snapshot_id.is_none() {
352 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
354 }
355 if let IcebergScanType::CountStar = iceberg_scan_type {
356 self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
357 .await
358 } else {
359 self.list_splits_batch_scan(
360 &table,
361 snapshot_id.unwrap(),
362 schema,
363 batch_parallelism,
364 iceberg_scan_type,
365 predicate,
366 )
367 .await
368 }
369 }
370
371 async fn list_splits_batch_scan(
372 &self,
373 table: &Table,
374 snapshot_id: i64,
375 schema: Schema,
376 batch_parallelism: usize,
377 iceberg_scan_type: IcebergScanType,
378 predicate: IcebergPredicate,
379 ) -> ConnectorResult<Vec<IcebergSplit>> {
380 let schema_names = schema.names();
381 let require_names = schema_names
382 .iter()
383 .filter(|name| {
384 name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
385 && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
386 && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
387 })
388 .cloned()
389 .collect_vec();
390
391 let table_schema = table.metadata().current_schema();
392 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
393
394 let mut position_delete_files = vec![];
395 let mut position_delete_files_set = HashSet::new();
396 let mut data_files = vec![];
397 let mut equality_delete_files = vec![];
398 let mut equality_delete_files_set = HashSet::new();
399 let scan = table
400 .scan()
401 .with_filter(predicate)
402 .snapshot_id(snapshot_id)
403 .with_delete_file_processing_enabled(true)
404 .select(require_names)
405 .build()
406 .map_err(|e| anyhow!(e))?;
407
408 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
409
410 #[for_await]
411 for task in file_scan_stream {
412 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
413
414 for delete_file in &task.deletes {
416 let delete_file = delete_file.as_ref().clone();
417 match delete_file.data_file_content {
418 iceberg::spec::DataContentType::Data => {
419 bail!("Data file should not in task deletes");
420 }
421 iceberg::spec::DataContentType::EqualityDeletes => {
422 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
423 equality_delete_files.push(delete_file);
424 }
425 }
426 iceberg::spec::DataContentType::PositionDeletes => {
427 let mut delete_file = delete_file;
428 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
429 delete_file.project_field_ids = Vec::default();
430 position_delete_files.push(delete_file);
431 }
432 }
433 }
434 }
435
436 match task.data_file_content {
437 iceberg::spec::DataContentType::Data => {
438 data_files.push(task);
440 }
441 iceberg::spec::DataContentType::EqualityDeletes => {
442 bail!("Equality delete files should not be in the data files");
443 }
444 iceberg::spec::DataContentType::PositionDeletes => {
445 bail!("Position delete files should not be in the data files");
446 }
447 }
448 }
449 let data_files = Self::split_n_vecs(data_files, batch_parallelism);
451 let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
452 let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
453
454 let splits = data_files
455 .into_iter()
456 .zip_eq_fast(equality_delete_files.into_iter())
457 .zip_eq_fast(position_delete_files.into_iter())
458 .enumerate()
459 .map(
460 |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
461 split_id: index as i64,
462 snapshot_id,
463 task: IcebergFileScanTask::new_scan_with_scan_type(
464 iceberg_scan_type,
465 data_file,
466 equality_delete_file,
467 position_delete_file,
468 ),
469 },
470 )
471 .filter(|split| !split.task.is_empty())
472 .collect_vec();
473
474 if splits.is_empty() {
475 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
476 }
477 Ok(splits)
478 }
479
480 pub async fn list_splits_batch_count_star(
481 &self,
482 table: &Table,
483 snapshot_id: i64,
484 ) -> ConnectorResult<Vec<IcebergSplit>> {
485 let mut record_counts = 0;
486 let scan = table
487 .scan()
488 .snapshot_id(snapshot_id)
489 .with_delete_file_processing_enabled(true)
490 .build()
491 .map_err(|e| anyhow!(e))?;
492 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
493
494 #[for_await]
495 for task in file_scan_stream {
496 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
497 assert_eq!(task.data_file_content, DataContentType::Data);
498 assert!(task.deletes.is_empty());
499 record_counts += task.record_count.expect("must have");
500 }
501 let split = IcebergSplit {
502 split_id: 0,
503 snapshot_id,
504 task: IcebergFileScanTask::new_count_star(record_counts),
505 };
506 Ok(vec![split])
507 }
508
509 pub async fn all_delete_parameters(
511 table: &Table,
512 snapshot_id: i64,
513 ) -> ConnectorResult<(Vec<String>, bool)> {
514 let scan = table
515 .scan()
516 .snapshot_id(snapshot_id)
517 .with_delete_file_processing_enabled(true)
518 .build()
519 .map_err(|e| anyhow!(e))?;
520 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
521 let schema = scan.snapshot().schema(table.metadata())?;
522 let mut equality_ids = vec![];
523 let mut have_position_delete = false;
524 #[for_await]
525 for task in file_scan_stream {
526 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
527 for delete_file in task.deletes {
528 match delete_file.data_file_content {
529 iceberg::spec::DataContentType::Data => {}
530 iceberg::spec::DataContentType::EqualityDeletes => {
531 if equality_ids.is_empty() {
532 equality_ids = delete_file.equality_ids.clone();
533 } else if equality_ids != delete_file.equality_ids {
534 bail!("The schema of iceberg equality delete file must be consistent");
535 }
536 }
537 iceberg::spec::DataContentType::PositionDeletes => {
538 have_position_delete = true;
539 }
540 }
541 }
542 }
543 let delete_columns = equality_ids
544 .into_iter()
545 .map(|id| match schema.name_by_field_id(id) {
546 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
547 None => bail!("Delete field id {} not found in schema", id),
548 })
549 .collect::<ConnectorResult<Vec<_>>>()?;
550
551 Ok((delete_columns, have_position_delete))
552 }
553
554 pub async fn get_delete_parameters(
555 &self,
556 time_travel_info: Option<IcebergTimeTravelInfo>,
557 ) -> ConnectorResult<(Vec<String>, bool)> {
558 let table = self.config.load_table().await?;
559 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
560 if snapshot_id.is_none() {
561 return Ok((vec![], false));
562 }
563 let snapshot_id = snapshot_id.unwrap();
564 Self::all_delete_parameters(&table, snapshot_id).await
565 }
566
567 fn split_n_vecs(
580 file_scan_tasks: Vec<FileScanTask>,
581 split_num: usize,
582 ) -> Vec<Vec<FileScanTask>> {
583 use std::cmp::{Ordering, Reverse};
584
585 #[derive(Default)]
586 struct FileScanTaskGroup {
587 idx: usize,
588 tasks: Vec<FileScanTask>,
589 total_length: u64,
590 }
591
592 impl Ord for FileScanTaskGroup {
593 fn cmp(&self, other: &Self) -> Ordering {
594 if self.total_length == other.total_length {
596 self.idx.cmp(&other.idx)
597 } else {
598 self.total_length.cmp(&other.total_length)
599 }
600 }
601 }
602
603 impl PartialOrd for FileScanTaskGroup {
604 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
605 Some(self.cmp(other))
606 }
607 }
608
609 impl Eq for FileScanTaskGroup {}
610
611 impl PartialEq for FileScanTaskGroup {
612 fn eq(&self, other: &Self) -> bool {
613 self.total_length == other.total_length
614 }
615 }
616
617 let mut heap = BinaryHeap::new();
618 for idx in 0..split_num {
620 heap.push(Reverse(FileScanTaskGroup {
621 idx,
622 tasks: vec![],
623 total_length: 0,
624 }));
625 }
626
627 for file_task in file_scan_tasks {
628 let mut group = heap.peek_mut().unwrap();
629 group.0.total_length += file_task.length;
630 group.0.tasks.push(file_task);
631 }
632
633 heap.into_vec()
635 .into_iter()
636 .map(|reverse_group| reverse_group.0.tasks)
637 .collect()
638 }
639}
640
641pub struct IcebergScanOpts {
642 pub chunk_size: usize,
643 pub need_seq_num: bool,
644 pub need_file_path_and_pos: bool,
645 pub handle_delete_files: bool,
646}
647
648type EqualityDeleteEntries = Vec<(Vec<String>, HashSet<Vec<Datum>>)>;
649
650#[try_stream(ok = DataChunk, error = ConnectorError)]
653pub async fn scan_task_to_chunk_with_deletes(
654 table: Table,
655 data_file_scan_task: FileScanTask,
656 IcebergScanOpts {
657 chunk_size,
658 need_seq_num,
659 need_file_path_and_pos,
660 handle_delete_files,
661 }: IcebergScanOpts,
662 metrics: Option<Arc<IcebergScanMetrics>>,
663) {
664 let table_name = table.identifier().name().to_owned();
665
666 let mut read_bytes = scopeguard::guard(0, |read_bytes| {
667 if let Some(metrics) = metrics.clone() {
668 metrics
669 .iceberg_read_bytes
670 .with_guarded_label_values(&[&table_name])
671 .inc_by(read_bytes as _);
672 }
673 });
674
675 let data_file_path = data_file_scan_task.data_file_path.clone();
676 let data_sequence_number = data_file_scan_task.sequence_number;
677
678 tracing::debug!(
679 "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
680 data_file_path,
681 handle_delete_files,
682 data_file_scan_task.deletes.len()
683 );
684
685 let position_delete_set: BTreeSet<i64> = if handle_delete_files {
690 let mut deletes = BTreeSet::new();
691
692 let position_delete_tasks: Vec<_> = data_file_scan_task
694 .deletes
695 .iter()
696 .filter(|delete| delete.data_file_content == DataContentType::PositionDeletes)
697 .cloned()
698 .collect();
699
700 tracing::debug!(
701 "Processing position deletes for data file: {}, found {} position delete tasks",
702 data_file_path,
703 position_delete_tasks.len()
704 );
705
706 for delete_task in position_delete_tasks {
707 let delete_task_file_path = delete_task.data_file_path.clone();
708
709 let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
710 let task_clone: FileScanTask = (*delete_task).clone();
712 let delete_stream = tokio_stream::once(Ok(task_clone));
713 let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
714
715 while let Some(record_batch) = delete_record_stream.next().await {
716 let record_batch = record_batch?;
717
718 if let Some(file_path_col) = record_batch.column_by_name("file_path")
720 && let Some(pos_col) = record_batch.column_by_name("pos")
721 {
722 let file_paths = file_path_col
723 .as_any()
724 .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::StringArray>()
725 .with_context(|| "file_path column is not StringArray")?;
726 let positions = pos_col
727 .as_any()
728 .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::Int64Array>()
729 .with_context(|| "pos column is not Int64Array")?;
730
731 for idx in 0..record_batch.num_rows() {
733 if !file_paths.is_null(idx) && !positions.is_null(idx) {
734 let file_path = file_paths.value(idx);
735 let pos = positions.value(idx);
736
737 if file_path == data_file_path {
738 deletes.insert(pos);
739 }
740 } else {
741 tracing::warn!(
742 "Position delete file {} at line {}: file_path or pos is null",
743 delete_task_file_path,
744 idx
745 );
746 }
747 }
748 }
749 }
750 }
751
752 tracing::debug!(
753 "Built position delete set for data file {}: {:?}",
754 data_file_path,
755 deletes
756 );
757
758 deletes
759 } else {
760 BTreeSet::new()
761 };
762
763 let equality_deletes: Option<EqualityDeleteEntries> = if handle_delete_files {
765 let equality_delete_tasks: Vec<_> = data_file_scan_task
766 .deletes
767 .iter()
768 .filter(|delete| delete.data_file_content == DataContentType::EqualityDeletes)
769 .cloned()
770 .collect();
771
772 if !equality_delete_tasks.is_empty() {
773 let mut delete_key_map: HashMap<Vec<String>, HashSet<Vec<Datum>>> = HashMap::new();
774
775 for delete_task in equality_delete_tasks {
776 let equality_ids = delete_task.equality_ids.clone();
777
778 if equality_ids.is_empty() {
779 continue;
780 }
781
782 let delete_schema = delete_task.schema();
783 let delete_name_vec = equality_ids
784 .iter()
785 .filter_map(|id| delete_schema.name_by_field_id(*id))
786 .map(|s| s.to_owned())
787 .collect_vec();
788
789 if delete_name_vec.len() != equality_ids.len() {
790 tracing::warn!(
791 "Skip equality delete task due to missing column mappings: expected {} names, got {}",
792 equality_ids.len(),
793 delete_name_vec.len()
794 );
795 continue;
796 }
797
798 let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
799 let task_clone: FileScanTask = delete_task.as_ref().clone();
801 let delete_stream = tokio_stream::once(Ok(task_clone));
802 let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
803
804 let mut task_delete_key_set: HashSet<Vec<Datum>> = HashSet::new();
805
806 while let Some(record_batch) = delete_record_stream.next().await {
807 let record_batch = record_batch?;
808
809 let delete_chunk =
810 IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
811
812 let field_indices = delete_name_vec
813 .iter()
814 .map(|field_name| {
815 record_batch
816 .schema()
817 .column_with_name(field_name)
818 .map(|(idx, _)| idx)
819 .unwrap()
820 })
821 .collect_vec();
822
823 for row_idx in 0..delete_chunk.capacity() {
825 let mut key = Vec::with_capacity(field_indices.len());
826 for &col_idx in &field_indices {
827 let col = delete_chunk.column_at(col_idx);
828 key.push(col.value_at(row_idx).to_owned_datum());
829 }
830 task_delete_key_set.insert(key);
831 }
832 }
833
834 if !task_delete_key_set.is_empty() {
835 delete_key_map
836 .entry(delete_name_vec.clone())
837 .or_default()
838 .extend(task_delete_key_set);
839 }
840 }
841
842 if delete_key_map.is_empty() {
843 None
844 } else {
845 Some(delete_key_map.into_iter().collect())
846 }
847 } else {
848 None
849 }
850 } else {
851 None
852 };
853
854 let reader = table.reader_builder().with_batch_size(chunk_size).build();
856 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
857
858 let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
859
860 while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
862 let record_batch = record_batch?;
863 let batch_start_pos = (batch_index * chunk_size) as i64;
864 let batch_num_rows = record_batch.num_rows();
865
866 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
867
868 let mut visibility = vec![true; batch_num_rows];
871
872 if !position_delete_set.is_empty() {
873 let batch_end_pos = batch_start_pos + batch_num_rows as i64;
874
875 tracing::debug!(
876 "Applying position deletes to batch {}: range [{}, {}), total delete set size: {}",
877 batch_index,
878 batch_start_pos,
879 batch_end_pos,
880 position_delete_set.len()
881 );
882
883 let mut position_deleted_count = 0;
884
885 for &deleted_pos in position_delete_set.range(batch_start_pos..batch_end_pos) {
888 let local_idx = (deleted_pos - batch_start_pos) as usize;
889 if local_idx < batch_num_rows {
890 visibility[local_idx] = false;
891 position_deleted_count += 1;
892 }
893 }
894
895 tracing::debug!(
896 "Position delete results for batch {}: deleted {} rows",
897 batch_index,
898 position_deleted_count
899 );
900 }
901
902 if let Some(ref equality_delete_entries) = equality_deletes {
904 let (columns, _) = chunk.into_parts();
905
906 let delete_entries_info: Vec<(Vec<usize>, HashSet<Vec<DatumRef<'_>>>)> =
907 equality_delete_entries
908 .iter()
909 .map(|(delete_name_vec, delete_key_set)| {
910 let indices = delete_name_vec
911 .iter()
912 .map(|field_name| {
913 record_batch
914 .schema()
915 .column_with_name(field_name)
916 .map(|(idx, _)| idx)
917 .unwrap()
918 })
919 .collect_vec();
920 let delete_key_ref_set: HashSet<Vec<DatumRef<'_>>> = delete_key_set
921 .iter()
922 .map(|datum_vec| {
923 datum_vec.iter().map(|datum| datum.to_datum_ref()).collect()
924 })
925 .collect();
926 (indices, delete_key_ref_set)
927 })
928 .collect::<Vec<_>>();
929
930 let mut deleted_count = 0;
931
932 for (row_idx, item) in visibility.iter_mut().enumerate().take(batch_num_rows) {
934 if !*item {
935 continue;
936 }
937
938 for (field_indices, delete_key_set) in &delete_entries_info {
939 let mut row_key = Vec::with_capacity(field_indices.len());
940 for &col_idx in field_indices {
941 let datum = columns[col_idx].value_at(row_idx);
942 row_key.push(datum);
943 }
944
945 if delete_key_set.contains(&row_key) {
946 *item = false;
947 deleted_count += 1;
948 break;
949 }
950 }
951 }
952
953 tracing::debug!(
954 "Equality delete results for batch {}: deleted {} rows",
955 batch_index,
956 deleted_count
957 );
958
959 let columns: Vec<_> = columns.into_iter().collect();
960 chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
961 } else {
962 let (columns, _) = chunk.into_parts();
964 let columns: Vec<_> = columns.into_iter().collect();
965 chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
966 }
967
968 if need_seq_num {
970 let (mut columns, visibility) = chunk.into_parts();
971 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
972 vec![data_sequence_number; visibility.len()],
973 ))));
974 chunk = DataChunk::from_parts(columns.into(), visibility);
975 }
976
977 if need_file_path_and_pos {
978 let (mut columns, visibility) = chunk.into_parts();
979 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
980 vec![data_file_path.as_str(); visibility.len()],
981 ))));
982
983 let positions: Vec<i64> =
985 (batch_start_pos..(batch_start_pos + visibility.len() as i64)).collect();
986 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
987
988 chunk = DataChunk::from_parts(columns.into(), visibility);
989 }
990
991 *read_bytes += chunk.estimated_heap_size() as u64;
992 yield chunk;
993 }
994}
995
996#[derive(Debug)]
997pub struct IcebergFileReader {}
998
999#[async_trait]
1000impl SplitReader for IcebergFileReader {
1001 type Properties = IcebergProperties;
1002 type Split = IcebergSplit;
1003
1004 async fn new(
1005 _props: IcebergProperties,
1006 _splits: Vec<IcebergSplit>,
1007 _parser_config: ParserConfig,
1008 _source_ctx: SourceContextRef,
1009 _columns: Option<Vec<Column>>,
1010 ) -> ConnectorResult<Self> {
1011 unimplemented!()
1012 }
1013
1014 fn into_stream(self) -> BoxSourceChunkStream {
1015 unimplemented!()
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use std::sync::Arc;
1022
1023 use iceberg::scan::FileScanTask;
1024 use iceberg::spec::{DataContentType, Schema};
1025
1026 use super::*;
1027
1028 fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
1029 FileScanTask {
1030 length,
1031 start: 0,
1032 record_count: Some(0),
1033 data_file_path: format!("test_{}.parquet", id),
1034 data_file_content: DataContentType::Data,
1035 data_file_format: iceberg::spec::DataFileFormat::Parquet,
1036 schema: Arc::new(Schema::builder().build().unwrap()),
1037 project_field_ids: vec![],
1038 predicate: None,
1039 deletes: vec![],
1040 sequence_number: 0,
1041 equality_ids: vec![],
1042 file_size_in_bytes: 0,
1043 }
1044 }
1045
1046 #[test]
1047 fn test_split_n_vecs_basic() {
1048 let file_scan_tasks = (1..=12)
1049 .map(|i| create_file_scan_task(i + 100, i))
1050 .collect::<Vec<_>>(); let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1053
1054 assert_eq!(groups.len(), 3);
1055
1056 let group_lengths: Vec<u64> = groups
1057 .iter()
1058 .map(|group| group.iter().map(|task| task.length).sum())
1059 .collect();
1060
1061 let max_length = *group_lengths.iter().max().unwrap();
1062 let min_length = *group_lengths.iter().min().unwrap();
1063 assert!(max_length - min_length <= 10, "Groups should be balanced");
1064
1065 let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
1066 assert_eq!(total_tasks, 12);
1067 }
1068
1069 #[test]
1070 fn test_split_n_vecs_empty() {
1071 let file_scan_tasks = Vec::new();
1072 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1073 assert_eq!(groups.len(), 3);
1074 assert!(groups.iter().all(|group| group.is_empty()));
1075 }
1076
1077 #[test]
1078 fn test_split_n_vecs_single_task() {
1079 let file_scan_tasks = vec![create_file_scan_task(100, 1)];
1080 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1081 assert_eq!(groups.len(), 3);
1082 assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
1083 }
1084
1085 #[test]
1086 fn test_split_n_vecs_uneven_distribution() {
1087 let file_scan_tasks = vec![
1088 create_file_scan_task(1000, 1),
1089 create_file_scan_task(100, 2),
1090 create_file_scan_task(100, 3),
1091 create_file_scan_task(100, 4),
1092 create_file_scan_task(100, 5),
1093 ];
1094
1095 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
1096 assert_eq!(groups.len(), 2);
1097
1098 let group_with_large_task = groups
1099 .iter()
1100 .find(|group| group.iter().any(|task| task.length == 1000))
1101 .unwrap();
1102 assert_eq!(group_with_large_task.len(), 1);
1103 }
1104
1105 #[test]
1106 fn test_split_n_vecs_same_files_distribution() {
1107 let file_scan_tasks = vec![
1108 create_file_scan_task(100, 1),
1109 create_file_scan_task(100, 2),
1110 create_file_scan_task(100, 3),
1111 create_file_scan_task(100, 4),
1112 create_file_scan_task(100, 5),
1113 create_file_scan_task(100, 6),
1114 create_file_scan_task(100, 7),
1115 create_file_scan_task(100, 8),
1116 ];
1117
1118 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1119 .iter()
1120 .map(|g| {
1121 g.iter()
1122 .map(|task| task.data_file_path.clone())
1123 .collect::<Vec<_>>()
1124 })
1125 .collect::<Vec<_>>();
1126
1127 for _ in 0..10000 {
1128 let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1129 .iter()
1130 .map(|g| {
1131 g.iter()
1132 .map(|task| task.data_file_path.clone())
1133 .collect::<Vec<_>>()
1134 })
1135 .collect::<Vec<_>>();
1136
1137 assert_eq!(groups, groups_2);
1138 }
1139 }
1140}