1pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{BinaryHeap, HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::DataContentType;
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use phf::{Set, phf_set};
33use risingwave_common::array::arrow::IcebergArrowConvert;
34use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
35use risingwave_common::bail;
36use risingwave_common::catalog::{
37 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
38 Schema,
39};
40use risingwave_common::types::JsonbVal;
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
44use serde::{Deserialize, Serialize};
45
46pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
47use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::error::{ConnectorError, ConnectorResult};
50use crate::parser::ParserConfig;
51use crate::source::{
52 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
53 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
54};
55pub const ICEBERG_CONNECTOR: &str = "iceberg";
56
57#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
58pub struct IcebergProperties {
59 #[serde(flatten)]
60 pub common: IcebergCommon,
61
62 #[serde(flatten)]
63 pub table: IcebergTableIdentifier,
64
65 #[serde(rename = "catalog.jdbc.user")]
67 pub jdbc_user: Option<String>,
68 #[serde(rename = "catalog.jdbc.password")]
69 pub jdbc_password: Option<String>,
70
71 #[serde(flatten)]
72 pub unknown_fields: HashMap<String, String>,
73}
74
75impl EnforceSecret for IcebergProperties {
76 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77 "catalog.jdbc.password",
78 };
79
80 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
81 for prop in prop_iter {
82 IcebergCommon::enforce_one(prop)?;
83 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
84 return Err(EnforceSecretError {
85 key: prop.to_owned(),
86 }
87 .into());
88 }
89 }
90 Ok(())
91 }
92}
93
94impl IcebergProperties {
95 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
96 let mut java_catalog_props = HashMap::new();
97 if let Some(jdbc_user) = self.jdbc_user.clone() {
98 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
99 }
100 if let Some(jdbc_password) = self.jdbc_password.clone() {
101 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
102 }
103 self.common.create_catalog(&java_catalog_props).await
105 }
106
107 pub async fn load_table(&self) -> ConnectorResult<Table> {
108 let mut java_catalog_props = HashMap::new();
109 if let Some(jdbc_user) = self.jdbc_user.clone() {
110 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
111 }
112 if let Some(jdbc_password) = self.jdbc_password.clone() {
113 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
114 }
115 self.common
117 .load_table(&self.table, &java_catalog_props)
118 .await
119 }
120}
121
122impl SourceProperties for IcebergProperties {
123 type Split = IcebergSplit;
124 type SplitEnumerator = IcebergSplitEnumerator;
125 type SplitReader = IcebergFileReader;
126
127 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
128}
129
130impl UnknownFields for IcebergProperties {
131 fn unknown_fields(&self) -> HashMap<String, String> {
132 self.unknown_fields.clone()
133 }
134}
135
136#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
137pub struct IcebergFileScanTaskJsonStr(String);
138
139impl IcebergFileScanTaskJsonStr {
140 pub fn deserialize(&self) -> FileScanTask {
141 serde_json::from_str(&self.0).unwrap()
142 }
143
144 pub fn serialize(task: &FileScanTask) -> Self {
145 Self(serde_json::to_string(task).unwrap())
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
150pub enum IcebergFileScanTask {
151 Data(Vec<FileScanTask>),
152 EqualityDelete(Vec<FileScanTask>),
153 PositionDelete(Vec<FileScanTask>),
154 CountStar(u64),
155}
156
157impl IcebergFileScanTask {
158 pub fn new_count_star(count_sum: u64) -> Self {
159 IcebergFileScanTask::CountStar(count_sum)
160 }
161
162 pub fn new_scan_with_scan_type(
163 iceberg_scan_type: IcebergScanType,
164 data_files: Vec<FileScanTask>,
165 equality_delete_files: Vec<FileScanTask>,
166 position_delete_files: Vec<FileScanTask>,
167 ) -> Self {
168 match iceberg_scan_type {
169 IcebergScanType::EqualityDeleteScan => {
170 IcebergFileScanTask::EqualityDelete(equality_delete_files)
171 }
172 IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
173 IcebergScanType::PositionDeleteScan => {
174 IcebergFileScanTask::PositionDelete(position_delete_files)
175 }
176 IcebergScanType::Unspecified | IcebergScanType::CountStar => {
177 unreachable!("Unspecified iceberg scan type")
178 }
179 }
180 }
181
182 pub fn is_empty(&self) -> bool {
183 match self {
184 IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
185 IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
186 equality_delete_files.is_empty()
187 }
188 IcebergFileScanTask::PositionDelete(position_delete_files) => {
189 position_delete_files.is_empty()
190 }
191 IcebergFileScanTask::CountStar(_) => false,
192 }
193 }
194
195 pub fn files(&self) -> Vec<String> {
196 match self {
197 IcebergFileScanTask::Data(file_scan_tasks)
198 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
199 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
200 .iter()
201 .map(|task| task.data_file_path.clone())
202 .collect(),
203 IcebergFileScanTask::CountStar(_) => vec![],
204 }
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
209pub struct IcebergSplit {
210 pub split_id: i64,
211 pub snapshot_id: i64,
213 pub task: IcebergFileScanTask,
214}
215
216impl IcebergSplit {
217 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
218 if let IcebergScanType::CountStar = iceberg_scan_type {
219 Self {
220 split_id: 0,
221 snapshot_id: 0,
222 task: IcebergFileScanTask::new_count_star(0),
223 }
224 } else {
225 Self {
226 split_id: 0,
227 snapshot_id: 0,
228 task: IcebergFileScanTask::new_scan_with_scan_type(
229 iceberg_scan_type,
230 vec![],
231 vec![],
232 vec![],
233 ),
234 }
235 }
236 }
237}
238
239impl SplitMetaData for IcebergSplit {
240 fn id(&self) -> SplitId {
241 self.split_id.to_string().into()
242 }
243
244 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
245 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
246 }
247
248 fn encode_to_json(&self) -> JsonbVal {
249 serde_json::to_value(self.clone()).unwrap().into()
250 }
251
252 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
253 unimplemented!()
254 }
255}
256
257#[derive(Debug, Clone)]
258pub struct IcebergSplitEnumerator {
259 config: IcebergProperties,
260}
261
262#[async_trait]
263impl SplitEnumerator for IcebergSplitEnumerator {
264 type Properties = IcebergProperties;
265 type Split = IcebergSplit;
266
267 async fn new(
268 properties: Self::Properties,
269 context: SourceEnumeratorContextRef,
270 ) -> ConnectorResult<Self> {
271 Ok(Self::new_inner(properties, context))
272 }
273
274 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
275 Ok(vec![])
279 }
280}
281impl IcebergSplitEnumerator {
282 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
283 Self { config: properties }
284 }
285}
286
287pub enum IcebergTimeTravelInfo {
288 Version(i64),
289 TimestampMs(i64),
290}
291
292impl IcebergSplitEnumerator {
293 pub fn get_snapshot_id(
294 table: &Table,
295 time_travel_info: Option<IcebergTimeTravelInfo>,
296 ) -> ConnectorResult<Option<i64>> {
297 let current_snapshot = table.metadata().current_snapshot();
298 if current_snapshot.is_none() {
299 return Ok(None);
300 }
301
302 let snapshot_id = match time_travel_info {
303 Some(IcebergTimeTravelInfo::Version(version)) => {
304 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
305 bail!("Cannot find the snapshot id in the iceberg table.");
306 };
307 snapshot.snapshot_id()
308 }
309 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
310 let snapshot = table
311 .metadata()
312 .snapshots()
313 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
314 .max_by_key(|snapshot| snapshot.timestamp_ms());
315 match snapshot {
316 Some(snapshot) => snapshot.snapshot_id(),
317 None => {
318 let time = chrono::DateTime::from_timestamp_millis(timestamp);
320 if let Some(time) = time {
321 bail!("Cannot find a snapshot older than {}", time);
322 } else {
323 bail!("Cannot find a snapshot");
324 }
325 }
326 }
327 }
328 None => {
329 assert!(current_snapshot.is_some());
330 current_snapshot.unwrap().snapshot_id()
331 }
332 };
333 Ok(Some(snapshot_id))
334 }
335
336 pub async fn list_splits_batch(
337 &self,
338 schema: Schema,
339 time_traval_info: Option<IcebergTimeTravelInfo>,
340 batch_parallelism: usize,
341 iceberg_scan_type: IcebergScanType,
342 predicate: IcebergPredicate,
343 ) -> ConnectorResult<Vec<IcebergSplit>> {
344 if batch_parallelism == 0 {
345 bail!("Batch parallelism is 0. Cannot split the iceberg files.");
346 }
347 let table = self.config.load_table().await?;
348 let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
349 if snapshot_id.is_none() {
350 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
352 }
353 if let IcebergScanType::CountStar = iceberg_scan_type {
354 self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
355 .await
356 } else {
357 self.list_splits_batch_scan(
358 &table,
359 snapshot_id.unwrap(),
360 schema,
361 batch_parallelism,
362 iceberg_scan_type,
363 predicate,
364 )
365 .await
366 }
367 }
368
369 async fn list_splits_batch_scan(
370 &self,
371 table: &Table,
372 snapshot_id: i64,
373 schema: Schema,
374 batch_parallelism: usize,
375 iceberg_scan_type: IcebergScanType,
376 predicate: IcebergPredicate,
377 ) -> ConnectorResult<Vec<IcebergSplit>> {
378 let schema_names = schema.names();
379 let require_names = schema_names
380 .iter()
381 .filter(|name| {
382 name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
383 && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
384 && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
385 })
386 .cloned()
387 .collect_vec();
388
389 let table_schema = table.metadata().current_schema();
390 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
391
392 let mut position_delete_files = vec![];
393 let mut position_delete_files_set = HashSet::new();
394 let mut data_files = vec![];
395 let mut equality_delete_files = vec![];
396 let mut equality_delete_files_set = HashSet::new();
397 let scan = table
398 .scan()
399 .with_filter(predicate)
400 .snapshot_id(snapshot_id)
401 .with_delete_file_processing_enabled(true)
402 .select(require_names)
403 .build()
404 .map_err(|e| anyhow!(e))?;
405
406 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
407
408 #[for_await]
409 for task in file_scan_stream {
410 let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
411 for delete_file in task.deletes.drain(..) {
412 let mut delete_file = delete_file.as_ref().clone();
413 match delete_file.data_file_content {
414 iceberg::spec::DataContentType::Data => {
415 bail!("Data file should not in task deletes");
416 }
417 iceberg::spec::DataContentType::EqualityDeletes => {
418 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
419 equality_delete_files.push(delete_file);
420 }
421 }
422 iceberg::spec::DataContentType::PositionDeletes => {
423 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
424 delete_file.project_field_ids = Vec::default();
425 position_delete_files.push(delete_file);
426 }
427 }
428 }
429 }
430 match task.data_file_content {
431 iceberg::spec::DataContentType::Data => {
432 data_files.push(task);
433 }
434 iceberg::spec::DataContentType::EqualityDeletes => {
435 bail!("Equality delete files should not be in the data files");
436 }
437 iceberg::spec::DataContentType::PositionDeletes => {
438 bail!("Position delete files should not be in the data files");
439 }
440 }
441 }
442 let data_files = Self::split_n_vecs(data_files, batch_parallelism);
444 let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
445 let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
446
447 let splits = data_files
448 .into_iter()
449 .zip_eq_fast(equality_delete_files.into_iter())
450 .zip_eq_fast(position_delete_files.into_iter())
451 .enumerate()
452 .map(
453 |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
454 split_id: index as i64,
455 snapshot_id,
456 task: IcebergFileScanTask::new_scan_with_scan_type(
457 iceberg_scan_type,
458 data_file,
459 equality_delete_file,
460 position_delete_file,
461 ),
462 },
463 )
464 .filter(|split| !split.task.is_empty())
465 .collect_vec();
466
467 if splits.is_empty() {
468 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
469 }
470 Ok(splits)
471 }
472
473 pub async fn list_splits_batch_count_star(
474 &self,
475 table: &Table,
476 snapshot_id: i64,
477 ) -> ConnectorResult<Vec<IcebergSplit>> {
478 let mut record_counts = 0;
479 let scan = table
480 .scan()
481 .snapshot_id(snapshot_id)
482 .with_delete_file_processing_enabled(true)
483 .build()
484 .map_err(|e| anyhow!(e))?;
485 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
486
487 #[for_await]
488 for task in file_scan_stream {
489 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
490 assert_eq!(task.data_file_content, DataContentType::Data);
491 assert!(task.deletes.is_empty());
492 record_counts += task.record_count.expect("must have");
493 }
494 let split = IcebergSplit {
495 split_id: 0,
496 snapshot_id,
497 task: IcebergFileScanTask::new_count_star(record_counts),
498 };
499 Ok(vec![split])
500 }
501
502 pub async fn all_delete_parameters(
504 table: &Table,
505 snapshot_id: i64,
506 ) -> ConnectorResult<(Vec<String>, bool)> {
507 let scan = table
508 .scan()
509 .snapshot_id(snapshot_id)
510 .with_delete_file_processing_enabled(true)
511 .build()
512 .map_err(|e| anyhow!(e))?;
513 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
514 let schema = scan.snapshot().schema(table.metadata())?;
515 let mut equality_ids = vec![];
516 let mut have_position_delete = false;
517 #[for_await]
518 for task in file_scan_stream {
519 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
520 for delete_file in task.deletes {
521 match delete_file.data_file_content {
522 iceberg::spec::DataContentType::Data => {}
523 iceberg::spec::DataContentType::EqualityDeletes => {
524 if equality_ids.is_empty() {
525 equality_ids = delete_file.equality_ids.clone();
526 } else if equality_ids != delete_file.equality_ids {
527 bail!("The schema of iceberg equality delete file must be consistent");
528 }
529 }
530 iceberg::spec::DataContentType::PositionDeletes => {
531 have_position_delete = true;
532 }
533 }
534 }
535 }
536 let delete_columns = equality_ids
537 .into_iter()
538 .map(|id| match schema.name_by_field_id(id) {
539 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
540 None => bail!("Delete field id {} not found in schema", id),
541 })
542 .collect::<ConnectorResult<Vec<_>>>()?;
543
544 Ok((delete_columns, have_position_delete))
545 }
546
547 pub async fn get_delete_parameters(
548 &self,
549 time_travel_info: Option<IcebergTimeTravelInfo>,
550 ) -> ConnectorResult<(Vec<String>, bool)> {
551 let table = self.config.load_table().await?;
552 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
553 if snapshot_id.is_none() {
554 return Ok((vec![], false));
555 }
556 let snapshot_id = snapshot_id.unwrap();
557 Self::all_delete_parameters(&table, snapshot_id).await
558 }
559
560 fn split_n_vecs(
573 file_scan_tasks: Vec<FileScanTask>,
574 split_num: usize,
575 ) -> Vec<Vec<FileScanTask>> {
576 use std::cmp::{Ordering, Reverse};
577
578 #[derive(Default)]
579 struct FileScanTaskGroup {
580 idx: usize,
581 tasks: Vec<FileScanTask>,
582 total_length: u64,
583 }
584
585 impl Ord for FileScanTaskGroup {
586 fn cmp(&self, other: &Self) -> Ordering {
587 if self.total_length == other.total_length {
589 self.idx.cmp(&other.idx)
590 } else {
591 self.total_length.cmp(&other.total_length)
592 }
593 }
594 }
595
596 impl PartialOrd for FileScanTaskGroup {
597 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
598 Some(self.cmp(other))
599 }
600 }
601
602 impl Eq for FileScanTaskGroup {}
603
604 impl PartialEq for FileScanTaskGroup {
605 fn eq(&self, other: &Self) -> bool {
606 self.total_length == other.total_length
607 }
608 }
609
610 let mut heap = BinaryHeap::new();
611 for idx in 0..split_num {
613 heap.push(Reverse(FileScanTaskGroup {
614 idx,
615 tasks: vec![],
616 total_length: 0,
617 }));
618 }
619
620 for file_task in file_scan_tasks {
621 let mut group = heap.peek_mut().unwrap();
622 group.0.total_length += file_task.length;
623 group.0.tasks.push(file_task);
624 }
625
626 heap.into_vec()
628 .into_iter()
629 .map(|reverse_group| reverse_group.0.tasks)
630 .collect()
631 }
632}
633
634pub struct IcebergScanOpts {
635 pub chunk_size: usize,
636 pub need_seq_num: bool,
637 pub need_file_path_and_pos: bool,
638}
639
640#[try_stream(ok = DataChunk, error = ConnectorError)]
641pub async fn scan_task_to_chunk(
642 table: Table,
643 data_file_scan_task: FileScanTask,
644 IcebergScanOpts {
645 chunk_size,
646 need_seq_num,
647 need_file_path_and_pos,
648 }: IcebergScanOpts,
649 metrics: Option<Arc<IcebergScanMetrics>>,
650) {
651 let table_name = table.identifier().name().to_owned();
652
653 let mut read_bytes = scopeguard::guard(0, |read_bytes| {
654 if let Some(metrics) = metrics {
655 metrics
656 .iceberg_read_bytes
657 .with_guarded_label_values(&[&table_name])
658 .inc_by(read_bytes as _);
659 }
660 });
661
662 let data_file_path = data_file_scan_task.data_file_path.clone();
663 let data_sequence_number = data_file_scan_task.sequence_number;
664
665 let reader = table.reader_builder().with_batch_size(chunk_size).build();
666 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
667
668 let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
670
671 while let Some((index, record_batch)) = record_batch_stream.next().await {
672 let record_batch = record_batch?;
673
674 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
675 if need_seq_num {
676 let (mut columns, visibility) = chunk.into_parts();
677 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
678 vec![data_sequence_number; visibility.len()],
679 ))));
680 chunk = DataChunk::from_parts(columns.into(), visibility)
681 };
682 if need_file_path_and_pos {
683 let (mut columns, visibility) = chunk.into_parts();
684 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
685 vec![data_file_path.as_str(); visibility.len()],
686 ))));
687 let index_start = (index * chunk_size) as i64;
688 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
689 (index_start..(index_start + visibility.len() as i64)).collect::<Vec<i64>>(),
690 ))));
691 chunk = DataChunk::from_parts(columns.into(), visibility)
692 }
693 *read_bytes += chunk.estimated_heap_size() as u64;
694 yield chunk;
695 }
696}
697
698#[derive(Debug)]
699pub struct IcebergFileReader {}
700
701#[async_trait]
702impl SplitReader for IcebergFileReader {
703 type Properties = IcebergProperties;
704 type Split = IcebergSplit;
705
706 async fn new(
707 _props: IcebergProperties,
708 _splits: Vec<IcebergSplit>,
709 _parser_config: ParserConfig,
710 _source_ctx: SourceContextRef,
711 _columns: Option<Vec<Column>>,
712 ) -> ConnectorResult<Self> {
713 unimplemented!()
714 }
715
716 fn into_stream(self) -> BoxSourceChunkStream {
717 unimplemented!()
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use std::sync::Arc;
724
725 use iceberg::scan::FileScanTask;
726 use iceberg::spec::{DataContentType, Schema};
727
728 use super::*;
729
730 fn create_file_scan_task(length: u64, id: u64) -> FileScanTask {
731 FileScanTask {
732 length,
733 start: 0,
734 record_count: Some(0),
735 data_file_path: format!("test_{}.parquet", id),
736 data_file_content: DataContentType::Data,
737 data_file_format: iceberg::spec::DataFileFormat::Parquet,
738 schema: Arc::new(Schema::builder().build().unwrap()),
739 project_field_ids: vec![],
740 predicate: None,
741 deletes: vec![],
742 sequence_number: 0,
743 equality_ids: vec![],
744 file_size_in_bytes: 0,
745 }
746 }
747
748 #[test]
749 fn test_split_n_vecs_basic() {
750 let file_scan_tasks = (1..=12)
751 .map(|i| create_file_scan_task(i + 100, i))
752 .collect::<Vec<_>>(); let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
755
756 assert_eq!(groups.len(), 3);
757
758 let group_lengths: Vec<u64> = groups
759 .iter()
760 .map(|group| group.iter().map(|task| task.length).sum())
761 .collect();
762
763 let max_length = *group_lengths.iter().max().unwrap();
764 let min_length = *group_lengths.iter().min().unwrap();
765 assert!(max_length - min_length <= 10, "Groups should be balanced");
766
767 let total_tasks: usize = groups.iter().map(|group| group.len()).sum();
768 assert_eq!(total_tasks, 12);
769 }
770
771 #[test]
772 fn test_split_n_vecs_empty() {
773 let file_scan_tasks = Vec::new();
774 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
775 assert_eq!(groups.len(), 3);
776 assert!(groups.iter().all(|group| group.is_empty()));
777 }
778
779 #[test]
780 fn test_split_n_vecs_single_task() {
781 let file_scan_tasks = vec![create_file_scan_task(100, 1)];
782 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 3);
783 assert_eq!(groups.len(), 3);
784 assert_eq!(groups.iter().filter(|group| !group.is_empty()).count(), 1);
785 }
786
787 #[test]
788 fn test_split_n_vecs_uneven_distribution() {
789 let file_scan_tasks = vec![
790 create_file_scan_task(1000, 1),
791 create_file_scan_task(100, 2),
792 create_file_scan_task(100, 3),
793 create_file_scan_task(100, 4),
794 create_file_scan_task(100, 5),
795 ];
796
797 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks, 2);
798 assert_eq!(groups.len(), 2);
799
800 let group_with_large_task = groups
801 .iter()
802 .find(|group| group.iter().any(|task| task.length == 1000))
803 .unwrap();
804 assert_eq!(group_with_large_task.len(), 1);
805 }
806
807 #[test]
808 fn test_split_n_vecs_same_files_distribution() {
809 let file_scan_tasks = vec![
810 create_file_scan_task(100, 1),
811 create_file_scan_task(100, 2),
812 create_file_scan_task(100, 3),
813 create_file_scan_task(100, 4),
814 create_file_scan_task(100, 5),
815 create_file_scan_task(100, 6),
816 create_file_scan_task(100, 7),
817 create_file_scan_task(100, 8),
818 ];
819
820 let groups = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
821 .iter()
822 .map(|g| {
823 g.iter()
824 .map(|task| task.data_file_path.clone())
825 .collect::<Vec<_>>()
826 })
827 .collect::<Vec<_>>();
828
829 for _ in 0..10000 {
830 let groups_2 = IcebergSplitEnumerator::split_n_vecs(file_scan_tasks.clone(), 4)
831 .iter()
832 .map(|g| {
833 g.iter()
834 .map(|task| task.data_file_path.clone())
835 .collect::<Vec<_>>()
836 })
837 .collect::<Vec<_>>();
838
839 assert_eq!(groups, groups_2);
840 }
841 }
842}