1pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::arrow::arrow_array_iceberg::Array;
35use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
36use risingwave_common::bail;
37use risingwave_common::bitmap::Bitmap;
38use risingwave_common::catalog::{
39 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
40 Schema,
41};
42use risingwave_common::types::{Datum, DatumRef, JsonbVal, ToDatumRef, ToOwnedDatum};
43use risingwave_common::util::iter_util::ZipEqFast;
44use risingwave_common_estimate_size::EstimateSize;
45use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
46use serde::{Deserialize, Serialize};
47
48pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
49use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
50use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
51use crate::error::{ConnectorError, ConnectorResult};
52use crate::parser::ParserConfig;
53use crate::source::{
54 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
55 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
56};
57pub const ICEBERG_CONNECTOR: &str = "iceberg";
58
59#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
60pub struct IcebergProperties {
61 #[serde(flatten)]
62 pub common: IcebergCommon,
63
64 #[serde(flatten)]
65 pub table: IcebergTableIdentifier,
66
67 #[serde(rename = "catalog.jdbc.user")]
69 pub jdbc_user: Option<String>,
70 #[serde(rename = "catalog.jdbc.password")]
71 pub jdbc_password: Option<String>,
72
73 #[serde(flatten)]
74 pub unknown_fields: HashMap<String, String>,
75}
76
77impl EnforceSecret for IcebergProperties {
78 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
79 "catalog.jdbc.password",
80 };
81
82 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
83 for prop in prop_iter {
84 IcebergCommon::enforce_one(prop)?;
85 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
86 return Err(EnforceSecretError {
87 key: prop.to_owned(),
88 }
89 .into());
90 }
91 }
92 Ok(())
93 }
94}
95
96impl IcebergProperties {
97 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
98 let mut java_catalog_props = HashMap::new();
99 if let Some(jdbc_user) = self.jdbc_user.clone() {
100 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
101 }
102 if let Some(jdbc_password) = self.jdbc_password.clone() {
103 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
104 }
105 self.common.create_catalog(&java_catalog_props).await
107 }
108
109 pub async fn load_table(&self) -> ConnectorResult<Table> {
110 let mut java_catalog_props = HashMap::new();
111 if let Some(jdbc_user) = self.jdbc_user.clone() {
112 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
113 }
114 if let Some(jdbc_password) = self.jdbc_password.clone() {
115 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
116 }
117 self.common
119 .load_table(&self.table, &java_catalog_props)
120 .await
121 }
122}
123
124impl SourceProperties for IcebergProperties {
125 type Split = IcebergSplit;
126 type SplitEnumerator = IcebergSplitEnumerator;
127 type SplitReader = IcebergFileReader;
128
129 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
130}
131
132impl UnknownFields for IcebergProperties {
133 fn unknown_fields(&self) -> HashMap<String, String> {
134 self.unknown_fields.clone()
135 }
136}
137
138#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
139pub struct IcebergFileScanTaskJsonStr(String);
140
141impl IcebergFileScanTaskJsonStr {
142 pub fn deserialize(&self) -> FileScanTask {
143 serde_json::from_str(&self.0).unwrap()
144 }
145
146 pub fn serialize(task: &FileScanTask) -> Self {
147 Self(serde_json::to_string(task).unwrap())
148 }
149}
150
151#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
152pub enum IcebergFileScanTask {
153 Data(Vec<FileScanTask>),
154 EqualityDelete(Vec<FileScanTask>),
155 PositionDelete(Vec<FileScanTask>),
156 CountStar(u64),
157}
158
159impl IcebergFileScanTask {
160 pub fn new_count_star(count_sum: u64) -> Self {
161 IcebergFileScanTask::CountStar(count_sum)
162 }
163
164 pub fn new_scan_with_scan_type(
165 iceberg_scan_type: IcebergScanType,
166 data_files: Vec<FileScanTask>,
167 equality_delete_files: Vec<FileScanTask>,
168 position_delete_files: Vec<FileScanTask>,
169 ) -> Self {
170 match iceberg_scan_type {
171 IcebergScanType::EqualityDeleteScan => {
172 IcebergFileScanTask::EqualityDelete(equality_delete_files)
173 }
174 IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
175 IcebergScanType::PositionDeleteScan => {
176 IcebergFileScanTask::PositionDelete(position_delete_files)
177 }
178 IcebergScanType::Unspecified | IcebergScanType::CountStar => {
179 unreachable!("Unspecified iceberg scan type")
180 }
181 }
182 }
183
184 pub fn is_empty(&self) -> bool {
185 match self {
186 IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
187 IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
188 equality_delete_files.is_empty()
189 }
190 IcebergFileScanTask::PositionDelete(position_delete_files) => {
191 position_delete_files.is_empty()
192 }
193 IcebergFileScanTask::CountStar(_) => false,
194 }
195 }
196
197 pub fn files(&self) -> Vec<String> {
198 match self {
199 IcebergFileScanTask::Data(file_scan_tasks)
200 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
201 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
202 .iter()
203 .map(|task| task.data_file_path.clone())
204 .collect(),
205 IcebergFileScanTask::CountStar(_) => vec![],
206 }
207 }
208}
209
210#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
211pub struct IcebergSplit {
212 pub split_id: i64,
213 pub snapshot_id: i64,
214 pub task: IcebergFileScanTask,
215}
216
217impl IcebergSplit {
218 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
219 if let IcebergScanType::CountStar = iceberg_scan_type {
220 Self {
221 split_id: 0,
222 snapshot_id: 0,
223 task: IcebergFileScanTask::new_count_star(0),
224 }
225 } else {
226 Self {
227 split_id: 0,
228 snapshot_id: 0,
229 task: IcebergFileScanTask::new_scan_with_scan_type(
230 iceberg_scan_type,
231 vec![],
232 vec![],
233 vec![],
234 ),
235 }
236 }
237 }
238}
239
240impl SplitMetaData for IcebergSplit {
241 fn id(&self) -> SplitId {
242 self.split_id.to_string().into()
243 }
244
245 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
246 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
247 }
248
249 fn encode_to_json(&self) -> JsonbVal {
250 serde_json::to_value(self.clone()).unwrap().into()
251 }
252
253 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
254 unimplemented!()
255 }
256}
257
258#[derive(Debug, Clone)]
259pub struct IcebergSplitEnumerator {
260 config: IcebergProperties,
261}
262
263#[derive(Debug, Clone)]
264pub struct IcebergDeleteParameters {
265 pub equality_delete_columns: Vec<String>,
266 pub has_position_delete: bool,
267 pub snapshot_id: Option<i64>,
268}
269
270#[async_trait]
271impl SplitEnumerator for IcebergSplitEnumerator {
272 type Properties = IcebergProperties;
273 type Split = IcebergSplit;
274
275 async fn new(
276 properties: Self::Properties,
277 context: SourceEnumeratorContextRef,
278 ) -> ConnectorResult<Self> {
279 Ok(Self::new_inner(properties, context))
280 }
281
282 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
283 Ok(vec![])
287 }
288}
289impl IcebergSplitEnumerator {
290 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
291 Self { config: properties }
292 }
293}
294
295pub enum IcebergTimeTravelInfo {
296 Version(i64),
297 TimestampMs(i64),
298}
299
300impl IcebergSplitEnumerator {
301 pub fn get_snapshot_id(
302 table: &Table,
303 time_travel_info: Option<IcebergTimeTravelInfo>,
304 ) -> ConnectorResult<Option<i64>> {
305 let current_snapshot = table.metadata().current_snapshot();
306 if current_snapshot.is_none() {
307 return Ok(None);
308 }
309
310 let snapshot_id = match time_travel_info {
311 Some(IcebergTimeTravelInfo::Version(version)) => {
312 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
313 bail!("Cannot find the snapshot id in the iceberg table.");
314 };
315 snapshot.snapshot_id()
316 }
317 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
318 let snapshot = table
319 .metadata()
320 .snapshots()
321 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
322 .max_by_key(|snapshot| snapshot.timestamp_ms());
323 match snapshot {
324 Some(snapshot) => snapshot.snapshot_id(),
325 None => {
326 let time = chrono::DateTime::from_timestamp_millis(timestamp);
328 if let Some(time) = time {
329 bail!("Cannot find a snapshot older than {}", time);
330 } else {
331 bail!("Cannot find a snapshot");
332 }
333 }
334 }
335 }
336 None => {
337 assert!(current_snapshot.is_some());
338 current_snapshot.unwrap().snapshot_id()
339 }
340 };
341 Ok(Some(snapshot_id))
342 }
343
344 pub async fn list_splits_batch(
345 &self,
346 schema: Schema,
347 snapshot_id: Option<i64>,
348 batch_parallelism: usize,
349 iceberg_scan_type: IcebergScanType,
350 predicate: IcebergPredicate,
351 ) -> ConnectorResult<Vec<IcebergSplit>> {
352 if batch_parallelism == 0 {
353 bail!("Batch parallelism is 0. Cannot split the iceberg files.");
354 }
355 let table = self.config.load_table().await?;
356 if snapshot_id.is_none() {
357 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
359 }
360 if let IcebergScanType::CountStar = iceberg_scan_type {
361 self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
362 .await
363 } else {
364 self.list_splits_batch_scan(
365 &table,
366 snapshot_id.unwrap(),
367 schema,
368 batch_parallelism,
369 iceberg_scan_type,
370 predicate,
371 )
372 .await
373 }
374 }
375
376 async fn list_splits_batch_scan(
377 &self,
378 table: &Table,
379 snapshot_id: i64,
380 schema: Schema,
381 batch_parallelism: usize,
382 iceberg_scan_type: IcebergScanType,
383 predicate: IcebergPredicate,
384 ) -> ConnectorResult<Vec<IcebergSplit>> {
385 let schema_names = schema.names();
386 let require_names = schema_names
387 .iter()
388 .filter(|name| {
389 name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
390 && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
391 && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
392 })
393 .cloned()
394 .collect_vec();
395
396 let table_schema = table.metadata().current_schema();
397 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
398
399 let mut position_delete_files = vec![];
400 let mut position_delete_files_set = HashSet::new();
401 let mut data_files = vec![];
402 let mut equality_delete_files = vec![];
403 let mut equality_delete_files_set = HashSet::new();
404 let scan = table
405 .scan()
406 .with_filter(predicate)
407 .snapshot_id(snapshot_id)
408 .with_delete_file_processing_enabled(true)
409 .select(require_names)
410 .build()
411 .map_err(|e| anyhow!(e))?;
412
413 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
414
415 #[for_await]
416 for task in file_scan_stream {
417 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
418
419 for delete_file in &task.deletes {
421 let delete_file = delete_file.as_ref().clone();
422 match delete_file.data_file_content {
423 iceberg::spec::DataContentType::Data => {
424 bail!("Data file should not in task deletes");
425 }
426 iceberg::spec::DataContentType::EqualityDeletes => {
427 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
428 equality_delete_files.push(delete_file);
429 }
430 }
431 iceberg::spec::DataContentType::PositionDeletes => {
432 let mut delete_file = delete_file;
433 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
434 delete_file.project_field_ids = Vec::default();
435 position_delete_files.push(delete_file);
436 }
437 }
438 }
439 }
440
441 match task.data_file_content {
442 iceberg::spec::DataContentType::Data => {
443 data_files.push(task);
445 }
446 iceberg::spec::DataContentType::EqualityDeletes => {
447 bail!("Equality delete files should not be in the data files");
448 }
449 iceberg::spec::DataContentType::PositionDeletes => {
450 bail!("Position delete files should not be in the data files");
451 }
452 }
453 }
454 let data_files = Self::split_n_vecs(data_files, batch_parallelism);
456 let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
457 let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
458
459 let splits = data_files
460 .into_iter()
461 .zip_eq_fast(equality_delete_files.into_iter())
462 .zip_eq_fast(position_delete_files.into_iter())
463 .enumerate()
464 .map(
465 |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
466 split_id: index as i64,
467 snapshot_id,
468 task: IcebergFileScanTask::new_scan_with_scan_type(
469 iceberg_scan_type,
470 data_file,
471 equality_delete_file,
472 position_delete_file,
473 ),
474 },
475 )
476 .filter(|split| !split.task.is_empty())
477 .collect_vec();
478
479 if splits.is_empty() {
480 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
481 }
482 Ok(splits)
483 }
484
485 pub async fn list_splits_batch_count_star(
486 &self,
487 table: &Table,
488 snapshot_id: i64,
489 ) -> ConnectorResult<Vec<IcebergSplit>> {
490 let mut record_counts = 0;
491 let scan = table
492 .scan()
493 .snapshot_id(snapshot_id)
494 .with_delete_file_processing_enabled(true)
495 .build()
496 .map_err(|e| anyhow!(e))?;
497 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
498
499 #[for_await]
500 for task in file_scan_stream {
501 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
502 assert_eq!(task.data_file_content, DataContentType::Data);
503 assert!(task.deletes.is_empty());
504 record_counts += task.record_count.expect("must have");
505 }
506 let split = IcebergSplit {
507 split_id: 0,
508 snapshot_id,
509 task: IcebergFileScanTask::new_count_star(record_counts),
510 };
511 Ok(vec![split])
512 }
513
514 pub async fn all_delete_parameters(
516 table: &Table,
517 snapshot_id: i64,
518 ) -> ConnectorResult<(Vec<String>, bool)> {
519 let scan = table
520 .scan()
521 .snapshot_id(snapshot_id)
522 .with_delete_file_processing_enabled(true)
523 .build()
524 .map_err(|e| anyhow!(e))?;
525 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
526 let schema = scan.snapshot().schema(table.metadata())?;
527 let mut equality_ids = vec![];
528 let mut have_position_delete = false;
529 #[for_await]
530 for task in file_scan_stream {
531 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
532 for delete_file in task.deletes {
533 match delete_file.data_file_content {
534 iceberg::spec::DataContentType::Data => {}
535 iceberg::spec::DataContentType::EqualityDeletes => {
536 if equality_ids.is_empty() {
537 equality_ids = delete_file.equality_ids.clone();
538 } else if equality_ids != delete_file.equality_ids {
539 bail!("The schema of iceberg equality delete file must be consistent");
540 }
541 }
542 iceberg::spec::DataContentType::PositionDeletes => {
543 have_position_delete = true;
544 }
545 }
546 }
547 }
548 let delete_columns = equality_ids
549 .into_iter()
550 .map(|id| match schema.name_by_field_id(id) {
551 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
552 None => bail!("Delete field id {} not found in schema", id),
553 })
554 .collect::<ConnectorResult<Vec<_>>>()?;
555
556 Ok((delete_columns, have_position_delete))
557 }
558
559 pub async fn get_delete_parameters(
560 &self,
561 time_travel_info: Option<IcebergTimeTravelInfo>,
562 ) -> ConnectorResult<IcebergDeleteParameters> {
563 let table = self.config.load_table().await?;
564 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
565 match snapshot_id {
566 Some(snapshot_id) => {
567 let (delete_columns, have_position_delete) =
568 Self::all_delete_parameters(&table, snapshot_id).await?;
569 Ok(IcebergDeleteParameters {
570 equality_delete_columns: delete_columns,
571 has_position_delete: have_position_delete,
572 snapshot_id: Some(snapshot_id),
573 })
574 }
575 None => Ok(IcebergDeleteParameters {
576 equality_delete_columns: vec![],
577 has_position_delete: false,
578 snapshot_id: None,
579 }),
580 }
581 }
582
583 fn split_n_vecs(
596 file_scan_tasks: Vec<FileScanTask>,
597 split_num: usize,
598 ) -> Vec<Vec<FileScanTask>> {
599 use std::cmp::{Ordering, Reverse};
600
601 #[derive(Default)]
602 struct FileScanTaskGroup {
603 idx: usize,
604 tasks: Vec<FileScanTask>,
605 total_length: u64,
606 }
607
608 impl Ord for FileScanTaskGroup {
609 fn cmp(&self, other: &Self) -> Ordering {
610 if self.total_length == other.total_length {
612 self.idx.cmp(&other.idx)
613 } else {
614 self.total_length.cmp(&other.total_length)
615 }
616 }
617 }
618
619 impl PartialOrd for FileScanTaskGroup {
620 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
621 Some(self.cmp(other))
622 }
623 }
624
625 impl Eq for FileScanTaskGroup {}
626
627 impl PartialEq for FileScanTaskGroup {
628 fn eq(&self, other: &Self) -> bool {
629 self.total_length == other.total_length
630 }
631 }
632
633 let mut heap = BinaryHeap::new();
634 for idx in 0..split_num {
636 heap.push(Reverse(FileScanTaskGroup {
637 idx,
638 tasks: vec![],
639 total_length: 0,
640 }));
641 }
642
643 for file_task in file_scan_tasks {
644 let mut group = heap.peek_mut().unwrap();
645 group.0.total_length += file_task.length;
646 group.0.tasks.push(file_task);
647 }
648
649 heap.into_vec()
651 .into_iter()
652 .map(|reverse_group| reverse_group.0.tasks)
653 .collect()
654 }
655}
656
657pub struct IcebergScanOpts {
658 pub chunk_size: usize,
659 pub need_seq_num: bool,
660 pub need_file_path_and_pos: bool,
661 pub handle_delete_files: bool,
662}
663
664type EqualityDeleteEntries = Vec<(Vec<String>, HashSet<Vec<Datum>>)>;
665
666#[try_stream(ok = DataChunk, error = ConnectorError)]
669pub async fn scan_task_to_chunk_with_deletes(
670 table: Table,
671 data_file_scan_task: FileScanTask,
672 IcebergScanOpts {
673 chunk_size,
674 need_seq_num,
675 need_file_path_and_pos,
676 handle_delete_files,
677 }: IcebergScanOpts,
678 metrics: Option<Arc<IcebergScanMetrics>>,
679) {
680 let table_name = table.identifier().name().to_owned();
681
682 let mut read_bytes = scopeguard::guard(0, |read_bytes| {
683 if let Some(metrics) = metrics.clone() {
684 metrics
685 .iceberg_read_bytes
686 .with_guarded_label_values(&[&table_name])
687 .inc_by(read_bytes as _);
688 }
689 });
690
691 let data_file_path = data_file_scan_task.data_file_path.clone();
692 let data_sequence_number = data_file_scan_task.sequence_number;
693
694 tracing::debug!(
695 "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
696 data_file_path,
697 handle_delete_files,
698 data_file_scan_task.deletes.len()
699 );
700
701 let position_delete_set: BTreeSet<i64> = if handle_delete_files {
706 let mut deletes = BTreeSet::new();
707
708 let position_delete_tasks: Vec<_> = data_file_scan_task
710 .deletes
711 .iter()
712 .filter(|delete| delete.data_file_content == DataContentType::PositionDeletes)
713 .cloned()
714 .collect();
715
716 tracing::debug!(
717 "Processing position deletes for data file: {}, found {} position delete tasks",
718 data_file_path,
719 position_delete_tasks.len()
720 );
721
722 for delete_task in position_delete_tasks {
723 let delete_task_file_path = delete_task.data_file_path.clone();
724
725 let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
726 let task_clone: FileScanTask = (*delete_task).clone();
728 let delete_stream = tokio_stream::once(Ok(task_clone));
729 let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
730
731 while let Some(record_batch) = delete_record_stream.next().await {
732 let record_batch = record_batch?;
733
734 if let Some(file_path_col) = record_batch.column_by_name("file_path")
736 && let Some(pos_col) = record_batch.column_by_name("pos")
737 {
738 let file_paths = file_path_col
739 .as_any()
740 .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::StringArray>()
741 .with_context(|| "file_path column is not StringArray")?;
742 let positions = pos_col
743 .as_any()
744 .downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::Int64Array>()
745 .with_context(|| "pos column is not Int64Array")?;
746
747 for idx in 0..record_batch.num_rows() {
749 if !file_paths.is_null(idx) && !positions.is_null(idx) {
750 let file_path = file_paths.value(idx);
751 let pos = positions.value(idx);
752
753 if file_path == data_file_path {
754 deletes.insert(pos);
755 }
756 } else {
757 tracing::warn!(
758 "Position delete file {} at line {}: file_path or pos is null",
759 delete_task_file_path,
760 idx
761 );
762 }
763 }
764 }
765 }
766 }
767
768 tracing::debug!(
769 "Built position delete set for data file {}: {:?}",
770 data_file_path,
771 deletes
772 );
773
774 deletes
775 } else {
776 BTreeSet::new()
777 };
778
779 let equality_deletes: Option<EqualityDeleteEntries> = if handle_delete_files {
781 let equality_delete_tasks: Vec<_> = data_file_scan_task
782 .deletes
783 .iter()
784 .filter(|delete| delete.data_file_content == DataContentType::EqualityDeletes)
785 .cloned()
786 .collect();
787
788 if !equality_delete_tasks.is_empty() {
789 let mut delete_key_map: HashMap<Vec<String>, HashSet<Vec<Datum>>> = HashMap::new();
790
791 for delete_task in equality_delete_tasks {
792 let equality_ids = delete_task.equality_ids.clone();
793
794 if equality_ids.is_empty() {
795 continue;
796 }
797
798 let delete_schema = delete_task.schema();
799 let delete_name_vec = equality_ids
800 .iter()
801 .filter_map(|id| delete_schema.name_by_field_id(*id))
802 .map(|s| s.to_owned())
803 .collect_vec();
804
805 if delete_name_vec.len() != equality_ids.len() {
806 tracing::warn!(
807 "Skip equality delete task due to missing column mappings: expected {} names, got {}",
808 equality_ids.len(),
809 delete_name_vec.len()
810 );
811 continue;
812 }
813
814 let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
815 let task_clone: FileScanTask = delete_task.as_ref().clone();
817 let delete_stream = tokio_stream::once(Ok(task_clone));
818 let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
819
820 let mut task_delete_key_set: HashSet<Vec<Datum>> = HashSet::new();
821
822 while let Some(record_batch) = delete_record_stream.next().await {
823 let record_batch = record_batch?;
824
825 let delete_chunk =
826 IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
827
828 let field_indices = delete_name_vec
829 .iter()
830 .map(|field_name| {
831 record_batch
832 .schema()
833 .column_with_name(field_name)
834 .map(|(idx, _)| idx)
835 .unwrap()
836 })
837 .collect_vec();
838
839 for row_idx in 0..delete_chunk.capacity() {
841 let mut key = Vec::with_capacity(field_indices.len());
842 for &col_idx in &field_indices {
843 let col = delete_chunk.column_at(col_idx);
844 key.push(col.value_at(row_idx).to_owned_datum());
845 }
846 task_delete_key_set.insert(key);
847 }
848 }
849
850 if !task_delete_key_set.is_empty() {
851 delete_key_map
852 .entry(delete_name_vec.clone())
853 .or_default()
854 .extend(task_delete_key_set);
855 }
856 }
857
858 if delete_key_map.is_empty() {
859 None
860 } else {
861 Some(delete_key_map.into_iter().collect())
862 }
863 } else {
864 None
865 }
866 } else {
867 None
868 };
869
870 let reader = table.reader_builder().with_batch_size(chunk_size).build();
872 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
873
874 let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
875
876 while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
878 let record_batch = record_batch?;
879 let batch_start_pos = (batch_index * chunk_size) as i64;
880 let batch_num_rows = record_batch.num_rows();
881
882 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
883
884 let mut visibility = vec![true; batch_num_rows];
887
888 if !position_delete_set.is_empty() {
889 let batch_end_pos = batch_start_pos + batch_num_rows as i64;
890
891 tracing::debug!(
892 "Applying position deletes to batch {}: range [{}, {}), total delete set size: {}",
893 batch_index,
894 batch_start_pos,
895 batch_end_pos,
896 position_delete_set.len()
897 );
898
899 let mut position_deleted_count = 0;
900
901 for &deleted_pos in position_delete_set.range(batch_start_pos..batch_end_pos) {
904 let local_idx = (deleted_pos - batch_start_pos) as usize;
905 if local_idx < batch_num_rows {
906 visibility[local_idx] = false;
907 position_deleted_count += 1;
908 }
909 }
910
911 tracing::debug!(
912 "Position delete results for batch {}: deleted {} rows",
913 batch_index,
914 position_deleted_count
915 );
916 }
917
918 if let Some(ref equality_delete_entries) = equality_deletes {
920 let (columns, _) = chunk.into_parts();
921
922 let delete_entries_info: Vec<(Vec<usize>, HashSet<Vec<DatumRef<'_>>>)> =
923 equality_delete_entries
924 .iter()
925 .map(|(delete_name_vec, delete_key_set)| {
926 let indices = delete_name_vec
927 .iter()
928 .map(|field_name| {
929 record_batch
930 .schema()
931 .column_with_name(field_name)
932 .map(|(idx, _)| idx)
933 .unwrap()
934 })
935 .collect_vec();
936 let delete_key_ref_set: HashSet<Vec<DatumRef<'_>>> = delete_key_set
937 .iter()
938 .map(|datum_vec| {
939 datum_vec.iter().map(|datum| datum.to_datum_ref()).collect()
940 })
941 .collect();
942 (indices, delete_key_ref_set)
943 })
944 .collect::<Vec<_>>();
945
946 let mut deleted_count = 0;
947
948 for (row_idx, item) in visibility.iter_mut().enumerate().take(batch_num_rows) {
950 if !*item {
951 continue;
952 }
953
954 for (field_indices, delete_key_set) in &delete_entries_info {
955 let mut row_key = Vec::with_capacity(field_indices.len());
956 for &col_idx in field_indices {
957 let datum = columns[col_idx].value_at(row_idx);
958 row_key.push(datum);
959 }
960
961 if delete_key_set.contains(&row_key) {
962 *item = false;
963 deleted_count += 1;
964 break;
965 }
966 }
967 }
968
969 tracing::debug!(
970 "Equality delete results for batch {}: deleted {} rows",
971 batch_index,
972 deleted_count
973 );
974
975 let columns: Vec<_> = columns.into_iter().collect();
976 chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
977 } else {
978 let (columns, _) = chunk.into_parts();
980 let columns: Vec<_> = columns.into_iter().collect();
981 chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
982 }
983
984 if need_seq_num {
986 let (mut columns, visibility) = chunk.into_parts();
987 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
988 vec![data_sequence_number; visibility.len()],
989 ))));
990 chunk = DataChunk::from_parts(columns.into(), visibility);
991 }
992
993 if need_file_path_and_pos {
994 let (mut columns, visibility) = chunk.into_parts();
995 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
996 vec![data_file_path.as_str(); visibility.len()],
997 ))));
998
999 let positions: Vec<i64> =
1001 (batch_start_pos..(batch_start_pos + visibility.len() as i64)).collect();
1002 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
1003
1004 chunk = DataChunk::from_parts(columns.into(), visibility);
1005 }
1006
1007 *read_bytes += chunk.estimated_heap_size() as u64;
1008 yield chunk;
1009 }
1010}
1011
1012#[derive(Debug)]
1013pub struct IcebergFileReader {}
1014
1015#[async_trait]
1016impl SplitReader for IcebergFileReader {
1017 type Properties = IcebergProperties;
1018 type Split = IcebergSplit;
1019
1020 async fn new(
1021 _props: IcebergProperties,
1022 _splits: Vec<IcebergSplit>,
1023 _parser_config: ParserConfig,
1024 _source_ctx: SourceContextRef,
1025 _columns: Option<Vec<Column>>,
1026 ) -> ConnectorResult<Self> {
1027 unimplemented!()
1028 }
1029
1030 fn into_stream(self) -> BoxSourceChunkStream {
1031 unimplemented!()
1032 }
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037 use std::sync::Arc;
1038
1039 use iceberg::scan::FileScanTask;
1040 use iceberg::spec::{DataContentType, Schema};
1041
1042 use super::*;
1043
1044 fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
1045 FileScanTask {
1046 length,
1047 start: 0,
1048 record_count: Some(0),
1049 data_file_path: format!("test_{}.parquet", id),
1050 data_file_content: DataContentType::Data,
1051 data_file_format: iceberg::spec::DataFileFormat::Parquet,
1052 schema: Arc::new(Schema::builder().build().unwrap()),
1053 project_field_ids: vec![],
1054 predicate: None,
1055 deletes: vec![],
1056 sequence_number: 0,
1057 equality_ids: vec![],
1058 file_size_in_bytes: 0,
1059 }
1060 }
1061
1062 #[test]
1063 fn test_split_n_vecs_basic() {
1064 let file_scan_tasks = (1..=12)
1065 .map(|i| create_file_scan_task(i + 100, i))
1066 .collect::<Vec<_>>(); let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1069
1070 assert_eq!(groups.len(), 3);
1071
1072 let group_lengths: Vec<u64> = groups
1073 .iter()
1074 .map(|group| group.iter().map(|task| task.length).sum())
1075 .collect();
1076
1077 let max_length = *group_lengths.iter().max().unwrap();
1078 let min_length = *group_lengths.iter().min().unwrap();
1079 assert!(max_length - min_length <= 10, "Groups should be balanced");
1080
1081 let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
1082 assert_eq!(total_tasks, 12);
1083 }
1084
1085 #[test]
1086 fn test_split_n_vecs_empty() {
1087 let file_scan_tasks = Vec::new();
1088 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1089 assert_eq!(groups.len(), 3);
1090 assert!(groups.iter().all(|group| group.is_empty()));
1091 }
1092
1093 #[test]
1094 fn test_split_n_vecs_single_task() {
1095 let file_scan_tasks = vec![create_file_scan_task(100, 1)];
1096 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
1097 assert_eq!(groups.len(), 3);
1098 assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
1099 }
1100
1101 #[test]
1102 fn test_split_n_vecs_uneven_distribution() {
1103 let file_scan_tasks = vec![
1104 create_file_scan_task(1000, 1),
1105 create_file_scan_task(100, 2),
1106 create_file_scan_task(100, 3),
1107 create_file_scan_task(100, 4),
1108 create_file_scan_task(100, 5),
1109 ];
1110
1111 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
1112 assert_eq!(groups.len(), 2);
1113
1114 let group_with_large_task = groups
1115 .iter()
1116 .find(|group| group.iter().any(|task| task.length == 1000))
1117 .unwrap();
1118 assert_eq!(group_with_large_task.len(), 1);
1119 }
1120
1121 #[test]
1122 fn test_split_n_vecs_same_files_distribution() {
1123 let file_scan_tasks = vec![
1124 create_file_scan_task(100, 1),
1125 create_file_scan_task(100, 2),
1126 create_file_scan_task(100, 3),
1127 create_file_scan_task(100, 4),
1128 create_file_scan_task(100, 5),
1129 create_file_scan_task(100, 6),
1130 create_file_scan_task(100, 7),
1131 create_file_scan_task(100, 8),
1132 ];
1133
1134 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1135 .iter()
1136 .map(|g| {
1137 g.iter()
1138 .map(|task| task.data_file_path.clone())
1139 .collect::<Vec<_>>()
1140 })
1141 .collect::<Vec<_>>();
1142
1143 for _ in 0..10000 {
1144 let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
1145 .iter()
1146 .map(|g| {
1147 g.iter()
1148 .map(|task| task.data_file_path.clone())
1149 .collect::<Vec<_>>()
1150 })
1151 .collect::<Vec<_>>();
1152
1153 assert_eq!(groups, groups_2);
1154 }
1155 }
1156}