1pub mod parquet_file_handler;
16
17mod metrics;
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use futures::StreamExt;
24use futures_async_stream::{for_await, try_stream};
25use iceberg::Catalog;
26use iceberg::expr::Predicate as IcebergPredicate;
27use iceberg::scan::FileScanTask;
28use iceberg::spec::{DataContentType, ManifestList};
29use iceberg::table::Table;
30use itertools::Itertools;
31pub use parquet_file_handler::*;
32use risingwave_common::array::arrow::IcebergArrowConvert;
33use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
34use risingwave_common::bail;
35use risingwave_common::catalog::{
36 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
37 Schema,
38};
39use risingwave_common::types::JsonbVal;
40use risingwave_common::util::iter_util::ZipEqFast;
41use risingwave_common_estimate_size::EstimateSize;
42use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
43use serde::{Deserialize, Serialize};
44
45pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics};
46use crate::connector_common::IcebergCommon;
47use crate::error::{ConnectorError, ConnectorResult};
48use crate::parser::ParserConfig;
49use crate::source::{
50 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
51 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
52};
53pub const ICEBERG_CONNECTOR: &str = "iceberg";
54
55#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
56pub struct IcebergProperties {
57 #[serde(flatten)]
58 pub common: IcebergCommon,
59
60 #[serde(rename = "catalog.jdbc.user")]
62 pub jdbc_user: Option<String>,
63 #[serde(rename = "catalog.jdbc.password")]
64 pub jdbc_password: Option<String>,
65
66 #[serde(flatten)]
67 pub unknown_fields: HashMap<String, String>,
68}
69
70impl IcebergProperties {
71 pub async fn create_catalog(&self) -> ConnectorResult<Arc<dyn Catalog>> {
72 let mut java_catalog_props = HashMap::new();
73 if let Some(jdbc_user) = self.jdbc_user.clone() {
74 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
75 }
76 if let Some(jdbc_password) = self.jdbc_password.clone() {
77 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
78 }
79 self.common.create_catalog(&java_catalog_props).await
81 }
82
83 pub async fn load_table(&self) -> ConnectorResult<Table> {
84 let mut java_catalog_props = HashMap::new();
85 if let Some(jdbc_user) = self.jdbc_user.clone() {
86 java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
87 }
88 if let Some(jdbc_password) = self.jdbc_password.clone() {
89 java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
90 }
91 self.common.load_table(&java_catalog_props).await
93 }
94}
95
96impl SourceProperties for IcebergProperties {
97 type Split = IcebergSplit;
98 type SplitEnumerator = IcebergSplitEnumerator;
99 type SplitReader = IcebergFileReader;
100
101 const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
102}
103
104impl UnknownFields for IcebergProperties {
105 fn unknown_fields(&self) -> HashMap<String, String> {
106 self.unknown_fields.clone()
107 }
108}
109
110#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
111pub struct IcebergFileScanTaskJsonStr(String);
112
113impl IcebergFileScanTaskJsonStr {
114 pub fn deserialize(&self) -> FileScanTask {
115 serde_json::from_str(&self.0).unwrap()
116 }
117
118 pub fn serialize(task: &FileScanTask) -> Self {
119 Self(serde_json::to_string(task).unwrap())
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
124pub enum IcebergFileScanTask {
125 Data(Vec<FileScanTask>),
126 EqualityDelete(Vec<FileScanTask>),
127 PositionDelete(Vec<FileScanTask>),
128 CountStar(u64),
129}
130
131impl IcebergFileScanTask {
132 pub fn new_count_star(count_sum: u64) -> Self {
133 IcebergFileScanTask::CountStar(count_sum)
134 }
135
136 pub fn new_scan_with_scan_type(
137 iceberg_scan_type: IcebergScanType,
138 data_files: Vec<FileScanTask>,
139 equality_delete_files: Vec<FileScanTask>,
140 position_delete_files: Vec<FileScanTask>,
141 ) -> Self {
142 match iceberg_scan_type {
143 IcebergScanType::EqualityDeleteScan => {
144 IcebergFileScanTask::EqualityDelete(equality_delete_files)
145 }
146 IcebergScanType::DataScan => IcebergFileScanTask::Data(data_files),
147 IcebergScanType::PositionDeleteScan => {
148 IcebergFileScanTask::PositionDelete(position_delete_files)
149 }
150 IcebergScanType::Unspecified | IcebergScanType::CountStar => {
151 unreachable!("Unspecified iceberg scan type")
152 }
153 }
154 }
155
156 pub fn is_empty(&self) -> bool {
157 match self {
158 IcebergFileScanTask::Data(data_files) => data_files.is_empty(),
159 IcebergFileScanTask::EqualityDelete(equality_delete_files) => {
160 equality_delete_files.is_empty()
161 }
162 IcebergFileScanTask::PositionDelete(position_delete_files) => {
163 position_delete_files.is_empty()
164 }
165 IcebergFileScanTask::CountStar(_) => false,
166 }
167 }
168
169 pub fn files(&self) -> Vec<String> {
170 match self {
171 IcebergFileScanTask::Data(file_scan_tasks)
172 | IcebergFileScanTask::EqualityDelete(file_scan_tasks)
173 | IcebergFileScanTask::PositionDelete(file_scan_tasks) => file_scan_tasks
174 .iter()
175 .map(|task| task.data_file_path.clone())
176 .collect(),
177 IcebergFileScanTask::CountStar(_) => vec![],
178 }
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
183pub struct IcebergSplit {
184 pub split_id: i64,
185 pub snapshot_id: i64,
187 pub task: IcebergFileScanTask,
188}
189
190impl IcebergSplit {
191 pub fn empty(iceberg_scan_type: IcebergScanType) -> Self {
192 if let IcebergScanType::CountStar = iceberg_scan_type {
193 Self {
194 split_id: 0,
195 snapshot_id: 0,
196 task: IcebergFileScanTask::new_count_star(0),
197 }
198 } else {
199 Self {
200 split_id: 0,
201 snapshot_id: 0,
202 task: IcebergFileScanTask::new_scan_with_scan_type(
203 iceberg_scan_type,
204 vec![],
205 vec![],
206 vec![],
207 ),
208 }
209 }
210 }
211}
212
213impl SplitMetaData for IcebergSplit {
214 fn id(&self) -> SplitId {
215 self.split_id.to_string().into()
216 }
217
218 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
219 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
220 }
221
222 fn encode_to_json(&self) -> JsonbVal {
223 serde_json::to_value(self.clone()).unwrap().into()
224 }
225
226 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
227 unimplemented!()
228 }
229}
230
231#[derive(Debug, Clone)]
232pub struct IcebergSplitEnumerator {
233 config: IcebergProperties,
234}
235
236#[async_trait]
237impl SplitEnumerator for IcebergSplitEnumerator {
238 type Properties = IcebergProperties;
239 type Split = IcebergSplit;
240
241 async fn new(
242 properties: Self::Properties,
243 context: SourceEnumeratorContextRef,
244 ) -> ConnectorResult<Self> {
245 Ok(Self::new_inner(properties, context))
246 }
247
248 async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
249 Ok(vec![])
253 }
254}
255impl IcebergSplitEnumerator {
256 pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self {
257 Self { config: properties }
258 }
259}
260
261pub enum IcebergTimeTravelInfo {
262 Version(i64),
263 TimestampMs(i64),
264}
265
266impl IcebergSplitEnumerator {
267 pub fn get_snapshot_id(
268 table: &Table,
269 time_travel_info: Option<IcebergTimeTravelInfo>,
270 ) -> ConnectorResult<Option<i64>> {
271 let current_snapshot = table.metadata().current_snapshot();
272 if current_snapshot.is_none() {
273 return Ok(None);
274 }
275
276 let snapshot_id = match time_travel_info {
277 Some(IcebergTimeTravelInfo::Version(version)) => {
278 let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
279 bail!("Cannot find the snapshot id in the iceberg table.");
280 };
281 snapshot.snapshot_id()
282 }
283 Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
284 let snapshot = table
285 .metadata()
286 .snapshots()
287 .filter(|snapshot| snapshot.timestamp_ms() <= timestamp)
288 .max_by_key(|snapshot| snapshot.timestamp_ms());
289 match snapshot {
290 Some(snapshot) => snapshot.snapshot_id(),
291 None => {
292 let time = chrono::DateTime::from_timestamp_millis(timestamp);
294 if time.is_some() {
295 bail!("Cannot find a snapshot older than {}", time.unwrap());
296 } else {
297 bail!("Cannot find a snapshot");
298 }
299 }
300 }
301 }
302 None => {
303 assert!(current_snapshot.is_some());
304 current_snapshot.unwrap().snapshot_id()
305 }
306 };
307 Ok(Some(snapshot_id))
308 }
309
310 pub async fn list_splits_batch(
311 &self,
312 schema: Schema,
313 time_traval_info: Option<IcebergTimeTravelInfo>,
314 batch_parallelism: usize,
315 iceberg_scan_type: IcebergScanType,
316 predicate: IcebergPredicate,
317 ) -> ConnectorResult<Vec<IcebergSplit>> {
318 if batch_parallelism == 0 {
319 bail!("Batch parallelism is 0. Cannot split the iceberg files.");
320 }
321 let table = self.config.load_table().await?;
322 let snapshot_id = Self::get_snapshot_id(&table, time_traval_info)?;
323 if snapshot_id.is_none() {
324 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
326 }
327 if let IcebergScanType::CountStar = iceberg_scan_type {
328 self.list_splits_batch_count_star(&table, snapshot_id.unwrap())
329 .await
330 } else {
331 self.list_splits_batch_scan(
332 &table,
333 snapshot_id.unwrap(),
334 schema,
335 batch_parallelism,
336 iceberg_scan_type,
337 predicate,
338 )
339 .await
340 }
341 }
342
343 async fn list_splits_batch_scan(
344 &self,
345 table: &Table,
346 snapshot_id: i64,
347 schema: Schema,
348 batch_parallelism: usize,
349 iceberg_scan_type: IcebergScanType,
350 predicate: IcebergPredicate,
351 ) -> ConnectorResult<Vec<IcebergSplit>> {
352 let schema_names = schema.names();
353 let require_names = schema_names
354 .iter()
355 .filter(|name| {
356 name.ne(&ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
357 && name.ne(&ICEBERG_FILE_PATH_COLUMN_NAME)
358 && name.ne(&ICEBERG_FILE_POS_COLUMN_NAME)
359 })
360 .cloned()
361 .collect_vec();
362
363 let table_schema = table.metadata().current_schema();
364 tracing::debug!("iceberg_table_schema: {:?}", table_schema);
365
366 let mut position_delete_files = vec![];
367 let mut position_delete_files_set = HashSet::new();
368 let mut data_files = vec![];
369 let mut equality_delete_files = vec![];
370 let mut equality_delete_files_set = HashSet::new();
371 let scan = table
372 .scan()
373 .with_filter(predicate)
374 .snapshot_id(snapshot_id)
375 .with_delete_file_processing_enabled(true)
376 .select(require_names)
377 .build()
378 .map_err(|e| anyhow!(e))?;
379
380 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
381
382 #[for_await]
383 for task in file_scan_stream {
384 let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
385 for mut delete_file in task.deletes.drain(..) {
386 match delete_file.data_file_content {
387 iceberg::spec::DataContentType::Data => {
388 bail!("Data file should not in task deletes");
389 }
390 iceberg::spec::DataContentType::EqualityDeletes => {
391 if equality_delete_files_set.insert(delete_file.data_file_path.clone()) {
392 equality_delete_files.push(delete_file);
393 }
394 }
395 iceberg::spec::DataContentType::PositionDeletes => {
396 if position_delete_files_set.insert(delete_file.data_file_path.clone()) {
397 delete_file.project_field_ids = Vec::default();
398 position_delete_files.push(delete_file);
399 }
400 }
401 }
402 }
403 match task.data_file_content {
404 iceberg::spec::DataContentType::Data => {
405 data_files.push(task);
406 }
407 iceberg::spec::DataContentType::EqualityDeletes => {
408 bail!("Equality delete files should not be in the data files");
409 }
410 iceberg::spec::DataContentType::PositionDeletes => {
411 bail!("Position delete files should not be in the data files");
412 }
413 }
414 }
415 let data_files = Self::split_n_vecs(data_files, batch_parallelism);
417 let equality_delete_files = Self::split_n_vecs(equality_delete_files, batch_parallelism);
418 let position_delete_files = Self::split_n_vecs(position_delete_files, batch_parallelism);
419
420 let splits = data_files
421 .into_iter()
422 .zip_eq_fast(equality_delete_files.into_iter())
423 .zip_eq_fast(position_delete_files.into_iter())
424 .enumerate()
425 .map(
426 |(index, ((data_file, equality_delete_file), position_delete_file))| IcebergSplit {
427 split_id: index as i64,
428 snapshot_id,
429 task: IcebergFileScanTask::new_scan_with_scan_type(
430 iceberg_scan_type,
431 data_file,
432 equality_delete_file,
433 position_delete_file,
434 ),
435 },
436 )
437 .filter(|split| !split.task.is_empty())
438 .collect_vec();
439
440 if splits.is_empty() {
441 return Ok(vec![IcebergSplit::empty(iceberg_scan_type)]);
442 }
443 Ok(splits)
444 }
445
446 pub async fn list_splits_batch_count_star(
447 &self,
448 table: &Table,
449 snapshot_id: i64,
450 ) -> ConnectorResult<Vec<IcebergSplit>> {
451 let mut record_counts = 0;
452 let manifest_list: ManifestList = table
453 .metadata()
454 .snapshot_by_id(snapshot_id)
455 .unwrap()
456 .load_manifest_list(table.file_io(), table.metadata())
457 .await
458 .map_err(|e| anyhow!(e))?;
459
460 for entry in manifest_list.entries() {
461 let manifest = entry
462 .load_manifest(table.file_io())
463 .await
464 .map_err(|e| anyhow!(e))?;
465 let mut manifest_entries_stream =
466 futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));
467
468 while let Some(manifest_entry) = manifest_entries_stream.next().await {
469 let file = manifest_entry.data_file();
470 assert_eq!(file.content_type(), DataContentType::Data);
471 record_counts += file.record_count();
472 }
473 }
474 let split = IcebergSplit {
475 split_id: 0,
476 snapshot_id,
477 task: IcebergFileScanTask::new_count_star(record_counts),
478 };
479 Ok(vec![split])
480 }
481
482 pub async fn all_delete_parameters(
484 table: &Table,
485 snapshot_id: i64,
486 ) -> ConnectorResult<(Vec<String>, bool)> {
487 let scan = table
488 .scan()
489 .snapshot_id(snapshot_id)
490 .with_delete_file_processing_enabled(true)
491 .build()
492 .map_err(|e| anyhow!(e))?;
493 let file_scan_stream = scan.plan_files().await.map_err(|e| anyhow!(e))?;
494 let schema = scan.snapshot().schema(table.metadata())?;
495 let mut equality_ids = vec![];
496 let mut have_position_delete = false;
497 #[for_await]
498 for task in file_scan_stream {
499 let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
500 for delete_file in task.deletes {
501 match delete_file.data_file_content {
502 iceberg::spec::DataContentType::Data => {}
503 iceberg::spec::DataContentType::EqualityDeletes => {
504 if equality_ids.is_empty() {
505 equality_ids = delete_file.equality_ids;
506 } else if equality_ids != delete_file.equality_ids {
507 bail!("The schema of iceberg equality delete file must be consistent");
508 }
509 }
510 iceberg::spec::DataContentType::PositionDeletes => {
511 have_position_delete = true;
512 }
513 }
514 }
515 }
516 let delete_columns = equality_ids
517 .into_iter()
518 .map(|id| match schema.name_by_field_id(id) {
519 Some(name) => Ok::<std::string::String, ConnectorError>(name.to_owned()),
520 None => bail!("Delete field id {} not found in schema", id),
521 })
522 .collect::<ConnectorResult<Vec<_>>>()?;
523
524 Ok((delete_columns, have_position_delete))
525 }
526
527 pub async fn get_delete_parameters(
528 &self,
529 time_travel_info: Option<IcebergTimeTravelInfo>,
530 ) -> ConnectorResult<(Vec<String>, bool)> {
531 let table = self.config.load_table().await?;
532 let snapshot_id = Self::get_snapshot_id(&table, time_travel_info)?;
533 if snapshot_id.is_none() {
534 return Ok((vec![], false));
535 }
536 let snapshot_id = snapshot_id.unwrap();
537 Self::all_delete_parameters(&table, snapshot_id).await
538 }
539
540 fn split_n_vecs(vecs: Vec<FileScanTask>, split_num: usize) -> Vec<Vec<FileScanTask>> {
541 let split_size = vecs.len() / split_num;
542 let remaining = vecs.len() % split_num;
543 let mut result_vecs = (0..split_num)
544 .map(|i| {
545 let start = i * split_size;
546 let end = (i + 1) * split_size;
547 vecs[start..end].to_vec()
548 })
549 .collect_vec();
550 for i in 0..remaining {
551 result_vecs[i].push(vecs[split_num * split_size + i].clone());
552 }
553 result_vecs
554 }
555}
556
557pub struct IcebergScanOpts {
558 pub chunk_size: usize,
559 pub need_seq_num: bool,
560 pub need_file_path_and_pos: bool,
561}
562
563#[try_stream(ok = DataChunk, error = ConnectorError)]
564pub async fn scan_task_to_chunk(
565 table: Table,
566 data_file_scan_task: FileScanTask,
567 IcebergScanOpts {
568 chunk_size,
569 need_seq_num,
570 need_file_path_and_pos,
571 }: IcebergScanOpts,
572 metrics: Option<Arc<IcebergScanMetrics>>,
573) {
574 let table_name = table.identifier().name().to_owned();
575
576 let mut read_bytes = scopeguard::guard(0, |read_bytes| {
577 if let Some(metrics) = metrics {
578 metrics
579 .iceberg_read_bytes
580 .with_guarded_label_values(&[&table_name])
581 .inc_by(read_bytes as _);
582 }
583 });
584
585 let data_file_path = data_file_scan_task.data_file_path.clone();
586 let data_sequence_number = data_file_scan_task.sequence_number;
587
588 let reader = table.reader_builder().with_batch_size(chunk_size).build();
589 let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
590
591 let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate();
593
594 while let Some((index, record_batch)) = record_batch_stream.next().await {
595 let record_batch = record_batch?;
596
597 let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
598 if need_seq_num {
599 let (mut columns, visibility) = chunk.into_parts();
600 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
601 vec![data_sequence_number; visibility.len()],
602 ))));
603 chunk = DataChunk::from_parts(columns.into(), visibility)
604 };
605 if need_file_path_and_pos {
606 let (mut columns, visibility) = chunk.into_parts();
607 columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
608 vec![data_file_path.as_str(); visibility.len()],
609 ))));
610 let index_start = (index * chunk_size) as i64;
611 columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
612 (index_start..(index_start + visibility.len() as i64)).collect::<Vec<i64>>(),
613 ))));
614 chunk = DataChunk::from_parts(columns.into(), visibility)
615 }
616 *read_bytes += chunk.estimated_heap_size() as u64;
617 yield chunk;
618 }
619}
620
621#[derive(Debug)]
622pub struct IcebergFileReader {}
623
624#[async_trait]
625impl SplitReader for IcebergFileReader {
626 type Properties = IcebergProperties;
627 type Split = IcebergSplit;
628
629 async fn new(
630 _props: IcebergProperties,
631 _splits: Vec<IcebergSplit>,
632 _parser_config: ParserConfig,
633 _source_ctx: SourceContextRef,
634 _columns: Option<Vec<Column>>,
635 ) -> ConnectorResult<Self> {
636 unimplemented!()
637 }
638
639 fn into_stream(self) -> BoxSourceChunkStream {
640 unimplemented!()
641 }
642}