1pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::{Context, anyhow};
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
35use risingwave_common::bail;
36use risingwave_common::catalog::{
37 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
38 Schema,
39};
40use risingwave_common::types::JsonbVal;
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
44use serde::{Deserialize, Serialize};
45
46pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
47use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::error::{ConnectorError, ConnectorResult};
50use crate::parser::ParserConfig;
51use crate::source::{
52 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
53 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
54};
55pub const ICEBERG_CONNECTOR: &str = "iceberg";
56
57#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
58pub struct IcebergProperties {
59 #[serde(flatten)]
60 pub common: IcebergCommon,
61
62 #[serde(flatten)]
63 pub table: IcebergTableIdentifier,
64
65 #[serde(rename = "catalog.jdbc.user")]
67 pub jdbc_user: Option<String>,
68 #[serde(rename = "catalog.jdbc.password")]
69 pub jdbc_password: Option<String>,
70
71 #[serde(flatten)]
72 pub unknown_fields: HashMap<String, String>,
73}
74
75impl EnforceSecret for IcebergProperties {
76 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77 "catalog.jdbc.password",
78 };
79
80 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
81 for prop in prop_iter {
82 IcebergCommon::enforce_one(prop)?;
83 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
84 return Err(EnforceSecretError {
85 key: prop.to_owned(),
86 }
87 .into());
88 }
89 }
90 Ok(())
91 }
92}
93
94impl IcebergProperties {
95 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
96 let mut java_catalog_props = HashMap::new();
97 if let Some(jdbc_user) = self.jdbc_user.clone() {
98 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
99 }
100 if let Some(jdbc_password) = self.jdbc_password.clone() {
101 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
102 }
103 self.common.create_catalog(&java_catalog_props).await
105 }
106
107 pub async fn load_table(&self) -> ConnectorResult<Table> {
108 let mut java_catalog_props = HashMap::new();
109 if let Some(jdbc_user) = self.jdbc_user.clone() {
110 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
111 }
112 if let Some(jdbc_password) = self.jdbc_password.clone() {
113 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
114 }
115 self.common
117 .load_table(&self.table, &java_catalog_props)
118 .await
119 }
120}
121
122impl SourceProperties for IcebergProperties {
123 type Split = IcebergSplit;
124 type SplitEnumerator = IcebergSplitEnumerator;
125 type SplitReader = IcebergFileReader;
126
127 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
128}
129
130impl UnknownFields for IcebergProperties {
131 fn unknown_fields(&self) -> HashMap<String, String> {
132 self.unknown_fields.clone()
133 }
134}
135
136#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
137pub struct IcebergFileScanTaskJsonStr(String);
138
139impl IcebergFileScanTaskJsonStr {
140 pub fn deserialize(&self) -> FileScanTask {
141 serde_json::from_str(&self.0).unwrap()
142 }
143
144 pub fn serialize(task: &FileScanTask) -> Self {
145 Self(serde_json::to_string(task).unwrap())
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
150pub enum IcebergFileScanTask {
151 Data(Vec<FileScanTask>),
152 EqualityDelete(Vec<FileScanTask>),
153 PositionDelete(Vec<FileScanTask>),
154 CountStar(u64),
155}
156
157impl IcebergFileScanTask {
158 pub fn new_count_star(count_sum: u64) -> Self {
159 IcebergFileScanTask::CountStar(count_sum)
160 }
161
162 pub fn new_scan_with_scan_type(
163 iceberg_scan_type: IcebergScanType,
164 data_files: Vec<FileScanTask>,
165 equality_delete_files: Vec<FileScanTask>,
166 position_delete_files: Vec<FileScanTask>,
167 ) -> Self {
168 match iceberg_scan_type {
169 IcebergScanType::EqualityDeleteScan => {
170 IcebergFileScanTask::EqualityDelete(equality_delete_files)
171 }
172 IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
173 IcebergScanType::PositionDeleteScan => {
174 IcebergFileScanTask::PositionDelete(position_delete_files)
175 }
176 IcebergScanType::Unspecified | IcebergScanType::CountStar => {
177 unreachable!("Unspecified iceberg scan type")
178 }
179 }
180 }
181
182 pub fn is_empty(&self) -> bool {
183 match self {
184 IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
185 IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
186 equality_delete_files.is_empty()
187 }
188 IcebergFileScanTask::PositionDelete(position_delete_files) => {
189 position_delete_files.is_empty()
190 }
191 IcebergFileScanTask::CountStar(_) => false,
192 }
193 }
194
195 pub fn files(&self) -> Vec<String> {
196 match self {
197 IcebergFileScanTask::Data(file_scan_tasks)
198 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
199 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
200 .iter()
201 .map(|task| task.data_file_path.clone())
202 .collect(),
203 IcebergFileScanTask::CountStar(_) => vec![],
204 }
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
209pub struct IcebergSplit {
210 pub split_id: i64,
211 pub snapshot_id: i64,
212 pub task: IcebergFileScanTask,
213}
214
215impl IcebergSplit {
216 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
217 if let IcebergScanType::CountStar = iceberg_scan_type {
218 Self {
219 split_id: 0,
220 snapshot_id: 0,
221 task: IcebergFileScanTask::new_count_star(0),
222 }
223 } else {
224 Self {
225 split_id: 0,
226 snapshot_id: 0,
227 task: IcebergFileScanTask::new_scan_with_scan_type(
228 iceberg_scan_type,
229 vec![],
230 vec![],
231 vec![],
232 ),
233 }
234 }
235 }
236}
237
238impl SplitMetaData for IcebergSplit {
239 fn id(&self) -> SplitId {
240 self.split_id.to_string().into()
241 }
242
243 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
244 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
245 }
246
247 fn encode_to_json(&self) -> JsonbVal {
248 serde_json::to_value(self.clone()).unwrap().into()
249 }
250
251 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
252 unimplemented!()
253 }
254}
255
256#[derive(Debug, Clone)]
257pub struct IcebergSplitEnumerator {
258 config: IcebergProperties,
259}
260
261#[derive(Debug, Clone)]
262pub struct IcebergDeleteParameters {
263 pub equality_delete_columns: Vec<String>,
264 pub has_position_delete: bool,
265 pub snapshot_id: Option<i64>,
266}
267
268#[async_trait]
269impl SplitEnumerator for IcebergSplitEnumerator {
270 type Properties = IcebergProperties;
271 type Split = IcebergSplit;
272
273 async fn new(
274 properties: Self::Properties,
275 context: SourceEnumeratorContextRef,
276 ) -> ConnectorResult<Self> {
277 Ok(Self::new_inner(properties, context))
278 }
279
280 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
281 Ok(vec![])
285 }
286}
287impl IcebergSplitEnumerator {
288 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
289 Self { config: properties }
290 }
291}
292
293pub enum IcebergTimeTravelInfo {
294 Version(i64),
295 TimestampMs(i64),
296}
297
298impl IcebergSplitEnumerator {
299 pub fn get_snapshot_id(
300 table: &Table,
301 time_travel_info: Option<IcebergTimeTravelInfo>,
302 ) -> ConnectorResult<Option<i64>> {
303 let current_snapshot = table.metadata().current_snapshot();
304 if current_snapshot.is_none() {
305 return Ok(None);
306 }
307
308 let snapshot_id = match time_travel_info {
309 Some(IcebergTimeTravelInfo::Version(version)) => {
310 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
311 bail!("Cannot find the snapshot id in the iceberg table.");
312 };
313 snapshot.snapshot_id()
314 }
315 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
316 let snapshot = table
317 .metadata()
318 .snapshots()
319 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
320 .max_by_key(|snapshot| snapshot.timestamp_ms());
321 match snapshot {
322 Some(snapshot) => snapshot.snapshot_id(),
323 None => {
324 let time = chrono::DateTime::from_timestamp_millis(timestamp);
326 if let Some(time) = time {
327 bail!("Cannot find a snapshot older than {}", time);
328 } else {
329 bail!("Cannot find a snapshot");
330 }
331 }
332 }
333 }
334 None => {
335 assert!(current_snapshot.is_some());
336 current_snapshot.unwrap().snapshot_id()
337 }
338 };
339 Ok(Some(snapshot_id))
340 }
341
342 pub async fn list_splits_batch(
343 &self,
344 schema: Schema,
345 snapshot_id: Option<i64>,
346 batch_parallelism: usize,
347 iceberg_scan_type: IcebergScanType,
348 predicate: IcebergPredicate,
349 ) -> ConnectorResult<Vec<IcebergSplit>> {
350 if batch_parallelism == 0 {
351 bail!("Batch parallelism is 0. Cannot split the iceberg files.");
352 }
353 let table = self.config.load_table().await?;
354 if snapshot_id.is_none() {
355 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
357 }
358 if let IcebergScanType::CountStar = iceberg_scan_type {
359 self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
360 .await
361 } else {
362 self.list_splits_batch_scan(
363 &table,
364 snapshot_id.unwrap(),
365 schema,
366 batch_parallelism,
367 iceberg_scan_type,
368 predicate,
369 )
370 .await
371 }
372 }
373
374 async fn list_splits_batch_scan(
375 &self,
376 table: &Table,
377 snapshot_id: i64,
378 schema: Schema,
379 batch_parallelism: usize,
380 iceberg_scan_type: IcebergScanType,
381 predicate: IcebergPredicate,
382 ) -> ConnectorResult<Vec<IcebergSplit>> {
383 let schema_names = schema.names();
384 let require_names = schema_names
385 .iter()
386 .filter(|name| {
387 name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
388 && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
389 && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
390 })
391 .cloned()
392 .collect_vec();
393
394 let table_schema = table.metadata().current_schema();
395 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
396
397 let mut position_delete_files = vec![];
398 let mut position_delete_files_set = HashSet::new();
399 let mut data_files = vec![];
400 let mut equality_delete_files = vec![];
401 let mut equality_delete_files_set = HashSet::new();
402 let scan = table
403 .scan()
404 .with_filter(predicate)
405 .snapshot_id(snapshot_id)
406 .select(require_names)
407 .build()
408 .map_err(|e| anyhow!(e))?;
409
410 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
411
412 #[for_await]
413 for task in file_scan_stream {
414 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
415
416 for delete_file in &task.deletes {
418 let delete_file = delete_file.as_ref().clone();
419 match delete_file.data_file_content {
420 iceberg::spec::DataContentType::Data => {
421 bail!("Data file should not in task deletes");
422 }
423 iceberg::spec::DataContentType::EqualityDeletes => {
424 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
425 equality_delete_files.push(delete_file);
426 }
427 }
428 iceberg::spec::DataContentType::PositionDeletes => {
429 let mut delete_file = delete_file;
430 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
431 delete_file.project_field_ids = Vec::default();
432 position_delete_files.push(delete_file);
433 }
434 }
435 }
436 }
437
438 match task.data_file_content {
439 iceberg::spec::DataContentType::Data => {
440 data_files.push(task);
442 }
443 iceberg::spec::DataContentType::EqualityDeletes => {
444 bail!("Equality delete files should not be in the data files");
445 }
446 iceberg::spec::DataContentType::PositionDeletes => {
447 bail!("Position delete files should not be in the data files");
448 }
449 }
450 }
451 let data_files = Self::split_n_vecs(data_files, batch_parallelism);
453 let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
454 let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
455
456 let splits = data_files
457 .into_iter()
458 .zip_eq_fast(equality_delete_files.into_iter())
459 .zip_eq_fast(position_delete_files.into_iter())
460 .enumerate()
461 .map(
462 |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
463 split_id: index as i64,
464 snapshot_id,
465 task: IcebergFileScanTask::new_scan_with_scan_type(
466 iceberg_scan_type,
467 data_file,
468 equality_delete_file,
469 position_delete_file,
470 ),
471 },
472 )
473 .filter(|split| !split.task.is_empty())
474 .collect_vec();
475
476 if splits.is_empty() {
477 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
478 }
479 Ok(splits)
480 }
481
482 pub async fn list_splits_batch_count_star(
483 &self,
484 table: &Table,
485 snapshot_id: i64,
486 ) -> ConnectorResult<Vec<IcebergSplit>> {
487 let mut record_counts = 0;
488 let scan = table
489 .scan()
490 .snapshot_id(snapshot_id)
491 .build()
492 .map_err(|e| anyhow!(e))?;
493 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
494
495 #[for_await]
496 for task in file_scan_stream {
497 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
498 assert_eq!(task.data_file_content, DataContentType::Data);
499 assert!(task.deletes.is_empty());
500 record_counts += task.record_count.expect("must have");
501 }
502 let split = IcebergSplit {
503 split_id: 0,
504 snapshot_id,
505 task: IcebergFileScanTask::new_count_star(record_counts),
506 };
507 Ok(vec![split])
508 }
509
510 pub async fn all_delete_parameters(
512 table: &Table,
513 snapshot_id: i64,
514 ) -> ConnectorResult<(Vec<String>, bool)> {
515 let scan = table
516 .scan()
517 .snapshot_id(snapshot_id)
518 .build()
519 .map_err(|e| anyhow!(e))?;
520 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
521 let schema = scan
522 .snapshot()
523 .context("snapshot not found")?
524 .schema(table.metadata())?;
525 let mut equality_ids: Option<Vec<i32>> = None;
526 let mut have_position_delete = false;
527 #[for_await]
528 for task in file_scan_stream {
529 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
530 for delete_file in task.deletes {
531 match delete_file.data_file_content {
532 iceberg::spec::DataContentType::Data => {}
533 iceberg::spec::DataContentType::EqualityDeletes => {
534 if equality_ids.is_none() {
535 equality_ids = delete_file.equality_ids.clone();
536 } else if equality_ids != delete_file.equality_ids {
537 bail!("The schema of iceberg equality delete file must be consistent");
538 }
539 }
540 iceberg::spec::DataContentType::PositionDeletes => {
541 have_position_delete = true;
542 }
543 }
544 }
545 }
546 let delete_columns = equality_ids
547 .unwrap_or_default()
548 .into_iter()
549 .map(|id| match schema.name_by_field_id(id) {
550 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
551 None => bail!("Delete field id {} not found in schema", id),
552 })
553 .collect::<ConnectorResult<Vec<_>>>()?;
554
555 Ok((delete_columns, have_position_delete))
556 }
557
558 pub async fn get_delete_parameters(
559 &self,
560 time_travel_info: Option<IcebergTimeTravelInfo>,
561 ) -> ConnectorResult<IcebergDeleteParameters> {
562 let table = self.config.load_table().await?;
563 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
564 match snapshot_id {
565 Some(snapshot_id) => {
566 let (delete_columns, have_position_delete) =
567 Self::all_delete_parameters(&table, snapshot_id).await?;
568 Ok(IcebergDeleteParameters {
569 equality_delete_columns: delete_columns,
570 has_position_delete: have_position_delete,
571 snapshot_id: Some(snapshot_id),
572 })
573 }
574 None => Ok(IcebergDeleteParameters {
575 equality_delete_columns: vec![],
576 has_position_delete: false,
577 snapshot_id: None,
578 }),
579 }
580 }
581
582 pub fn split_n_vecs(
595 file_scan_tasks: Vec<FileScanTask>,
596 split_num: usize,
597 ) -> Vec<Vec<FileScanTask>> {
598 use std::cmp::{Ordering, Reverse};
599
600 #[derive(Default)]
601 struct FileScanTaskGroup {
602 idx: usize,
603 tasks: Vec<FileScanTask>,
604 total_length: u64,
605 }
606
607 impl Ord for FileScanTaskGroup {
608 fn cmp(&self, other: &Self) -> Ordering {
609 if self.total_length == other.total_length {
611 self.idx.cmp(&other.idx)
612 } else {
613 self.total_length.cmp(&other.total_length)
614 }
615 }
616 }
617
618 impl PartialOrd for FileScanTaskGroup {
619 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
620 Some(self.cmp(other))
621 }
622 }
623
624 impl Eq for FileScanTaskGroup {}
625
626 impl PartialEq for FileScanTaskGroup {
627 fn eq(&self, other: &Self) -> bool {
628 self.total_length == other.total_length
629 }
630 }
631
632 let mut heap = BinaryHeap::new();
633 for idx in 0..split_num {
635 heap.push(Reverse(FileScanTaskGroup {
636 idx,
637 tasks: vec![],
638 total_length: 0,
639 }));
640 }
641
642 for file_task in file_scan_tasks {
643 let mut group = heap.peek_mut().unwrap();
644 group.0.total_length += file_task.length;
645 group.0.tasks.push(file_task);
646 }
647
648 heap.into_vec()
650 .into_iter()
651 .map(|reverse_group| reverse_group.0.tasks)
652 .collect()
653 }
654}
655
656pub struct IcebergScanOpts {
657 pub chunk_size: usize,
658 pub need_seq_num: bool,
659 pub need_file_path_and_pos: bool,
660 pub handle_delete_files: bool,
661}
662
663#[try_stream(ok = DataChunk, error = ConnectorError)]
665pub async fn scan_task_to_chunk_with_deletes(
666 table: Table,
667 mut data_file_scan_task: FileScanTask,
668 IcebergScanOpts {
669 chunk_size,
670 need_seq_num,
671 need_file_path_and_pos,
672 handle_delete_files,
673 }: IcebergScanOpts,
674 metrics: Option<Arc<IcebergScanMetrics>>,
675) {
676 let table_name = table.identifier().name().to_owned();
677
678 let mut read_bytes = scopeguard::guard(0, |read_bytes| {
679 if let Some(metrics) = metrics.clone() {
680 metrics
681 .iceberg_read_bytes
682 .with_guarded_label_values(&[&table_name])
683 .inc_by(read_bytes as _);
684 }
685 });
686
687 let data_file_path = data_file_scan_task.data_file_path.clone();
688 let data_sequence_number = data_file_scan_task.sequence_number;
689
690 tracing::debug!(
691 "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}",
692 data_file_path,
693 handle_delete_files,
694 data_file_scan_task.deletes.len()
695 );
696
697 if !handle_delete_files {
698 data_file_scan_task.deletes.clear();
700 }
701
702 let reader = table.reader_builder().with_batch_size(chunk_size).build();
704 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
705
706 let record_batch_stream: iceberg::scan::ArrowRecordBatchStream =
707 reader.read(Box::pin(file_scan_stream))?;
708 let mut record_batch_stream = record_batch_stream.enumerate();
709
710 while let Some((batch_index, record_batch)) = record_batch_stream.next().await {
712 let record_batch = record_batch?;
713 let batch_start_pos = (batch_index * chunk_size) as i64;
714
715 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
716 let row_count = chunk.capacity();
717
718 if need_seq_num {
720 let (mut columns, visibility) = chunk.into_parts();
721 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
722 std::iter::repeat_n(data_sequence_number, row_count),
723 ))));
724 chunk = DataChunk::from_parts(columns.into(), visibility);
725 }
726
727 if need_file_path_and_pos {
728 let (mut columns, visibility) = chunk.into_parts();
729 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
730 std::iter::repeat_n(data_file_path.as_str(), row_count),
731 ))));
732
733 let positions: Vec<i64> =
735 (batch_start_pos..(batch_start_pos + row_count as i64)).collect();
736 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions))));
737
738 chunk = DataChunk::from_parts(columns.into(), visibility);
739 }
740
741 *read_bytes += chunk.estimated_heap_size() as u64;
742 yield chunk;
743 }
744}
745
746#[derive(Debug)]
747pub struct IcebergFileReader {}
748
749#[async_trait]
750impl SplitReader for IcebergFileReader {
751 type Properties = IcebergProperties;
752 type Split = IcebergSplit;
753
754 async fn new(
755 _props: IcebergProperties,
756 _splits: Vec<IcebergSplit>,
757 _parser_config: ParserConfig,
758 _source_ctx: SourceContextRef,
759 _columns: Option<Vec<Column>>,
760 ) -> ConnectorResult<Self> {
761 unimplemented!()
762 }
763
764 fn into_stream(self) -> BoxSourceChunkStream {
765 unimplemented!()
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use std::sync::Arc;
772
773 use iceberg::scan::FileScanTask;
774 use iceberg::spec::{DataContentType, Schema};
775
776 use super::*;
777
778 fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
779 FileScanTask {
780 length,
781 start: 0,
782 record_count: Some(0),
783 data_file_path: format!("test_{}.parquet", id),
784 data_file_content: DataContentType::Data,
785 data_file_format: iceberg::spec::DataFileFormat::Parquet,
786 schema: Arc::new(Schema::builder().build().unwrap()),
787 project_field_ids: vec![],
788 predicate: None,
789 deletes: vec![],
790 sequence_number: 0,
791 equality_ids: None,
792 file_size_in_bytes: 0,
793 }
794 }
795
796 #[test]
797 fn test_split_n_vecs_basic() {
798 let file_scan_tasks = (1..=12)
799 .map(|i| create_file_scan_task(i + 100, i))
800 .collect::<Vec<_>>(); let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
803
804 assert_eq!(groups.len(), 3);
805
806 let group_lengths: Vec<u64> = groups
807 .iter()
808 .map(|group| group.iter().map(|task| task.length).sum())
809 .collect();
810
811 let max_length = *group_lengths.iter().max().unwrap();
812 let min_length = *group_lengths.iter().min().unwrap();
813 assert!(max_length - min_length <= 10, "Groups should be balanced");
814
815 let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
816 assert_eq!(total_tasks, 12);
817 }
818
819 #[test]
820 fn test_split_n_vecs_empty() {
821 let file_scan_tasks = Vec::new();
822 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
823 assert_eq!(groups.len(), 3);
824 assert!(groups.iter().all(|group| group.is_empty()));
825 }
826
827 #[test]
828 fn test_split_n_vecs_single_task() {
829 let file_scan_tasks = vec![create_file_scan_task(100, 1)];
830 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
831 assert_eq!(groups.len(), 3);
832 assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
833 }
834
835 #[test]
836 fn test_split_n_vecs_uneven_distribution() {
837 let file_scan_tasks = vec![
838 create_file_scan_task(1000, 1),
839 create_file_scan_task(100, 2),
840 create_file_scan_task(100, 3),
841 create_file_scan_task(100, 4),
842 create_file_scan_task(100, 5),
843 ];
844
845 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
846 assert_eq!(groups.len(), 2);
847
848 let group_with_large_task = groups
849 .iter()
850 .find(|group| group.iter().any(|task| task.length == 1000))
851 .unwrap();
852 assert_eq!(group_with_large_task.len(), 1);
853 }
854
855 #[test]
856 fn test_split_n_vecs_same_files_distribution() {
857 let file_scan_tasks = vec![
858 create_file_scan_task(100, 1),
859 create_file_scan_task(100, 2),
860 create_file_scan_task(100, 3),
861 create_file_scan_task(100, 4),
862 create_file_scan_task(100, 5),
863 create_file_scan_task(100, 6),
864 create_file_scan_task(100, 7),
865 create_file_scan_task(100, 8),
866 ];
867
868 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
869 .iter()
870 .map(|g| {
871 g.iter()
872 .map(|task| task.data_file_path.clone())
873 .collect::<Vec<_>>()
874 })
875 .collect::<Vec<_>>();
876
877 for _ in 0..10000 {
878 let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
879 .iter()
880 .map(|g| {
881 g.iter()
882 .map(|task| task.data_file_path.clone())
883 .collect::<Vec<_>>()
884 })
885 .collect::<Vec<_>>();
886
887 assert_eq!(groups, groups_2);
888 }
889 }
890}