1use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25 ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::metrics::GLOBAL_ICEBERG_SCAN_METRICS;
32use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
33use risingwave_connector::source::reader::desc::SourceDesc;
34use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
35use risingwave_pb::common::ThrottleType;
36use risingwave_storage::store::PrefetchOptions;
37use thiserror_ext::AsReport;
38
39use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
40use crate::common::rate_limit::limited_chunk_size;
41use crate::executor::prelude::*;
42use crate::executor::stream_reader::StreamReaderWithPause;
43
44pub struct IcebergFetchExecutor<S: StateStore> {
50 actor_ctx: ActorContextRef,
51
52 stream_source_core: Option<StreamSourceCore<S>>,
54
55 upstream: Option<Executor>,
58
59 rate_limit_rps: Option<u32>,
61
62 streaming_config: Arc<StreamingConfig>,
64}
65
66pub(crate) struct ChunksWithState {
71 pub chunks: Vec<StreamChunk>,
73
74 pub data_file_path: String,
76
77 #[expect(dead_code)]
79 pub last_read_pos: Datum,
80}
81
82pub(super) use state::PersistedFileScanTask;
83mod state {
84 use std::sync::Arc;
85
86 use anyhow::Context;
87 use iceberg::expr::BoundPredicate;
88 use iceberg::scan::FileScanTask;
89 use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
90 use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
91 use serde::{Deserialize, Serialize};
92
93 use crate::executor::StreamExecutorResult;
94
95 fn default_case_sensitive() -> bool {
96 true
97 }
98 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106 pub struct PersistedFileScanTask {
107 pub start: u64,
109 pub length: u64,
111 pub record_count: Option<u64>,
116
117 pub data_file_path: String,
119
120 #[serde(default, skip_serializing_if = "Option::is_none")]
122 pub referenced_data_file: Option<String>,
123
124 pub data_file_content: DataContentType,
126
127 pub data_file_format: DataFileFormat,
129
130 pub schema: SchemaRef,
132 pub project_field_ids: Vec<i32>,
134 #[serde(skip_serializing_if = "Option::is_none")]
136 pub predicate: Option<BoundPredicate>,
137
138 pub deletes: Vec<PersistedFileScanTask>,
140 pub sequence_number: i64,
142 pub equality_ids: Option<Vec<i32>>,
144
145 pub file_size_in_bytes: u64,
146
147 #[serde(default = "default_case_sensitive")]
148 pub case_sensitive: bool,
149 }
150
151 impl PersistedFileScanTask {
152 pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
154 let persisted_task: Self =
155 serde_json::from_value(jsonb_ref.to_owned_scalar().take())
156 .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
157 Ok(Self::to_task(persisted_task))
158 }
159
160 pub fn encode(task: FileScanTask) -> JsonbVal {
162 let persisted_task = Self::from_task(task);
163 serde_json::to_value(persisted_task).unwrap().into()
164 }
165
166 fn to_task(
168 Self {
169 start,
170 length,
171 record_count,
172 data_file_path,
173 referenced_data_file,
174 data_file_content,
175 data_file_format,
176 schema,
177 project_field_ids,
178 predicate,
179 deletes,
180 sequence_number,
181 equality_ids,
182 file_size_in_bytes,
183 case_sensitive,
184 }: Self,
185 ) -> FileScanTask {
186 FileScanTask {
187 start,
188 length,
189 record_count,
190 data_file_path,
191 referenced_data_file,
192 data_file_content,
193 data_file_format,
194 schema,
195 project_field_ids,
196 predicate,
197 deletes: deletes
198 .into_iter()
199 .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
200 .collect(),
201 sequence_number,
202 equality_ids,
203 file_size_in_bytes,
204 partition: None,
205 partition_spec: None,
206 name_mapping: None,
207 case_sensitive,
208 }
209 }
210
211 fn from_task(
213 FileScanTask {
214 start,
215 length,
216 record_count,
217 data_file_path,
218 referenced_data_file,
219 data_file_content,
220 data_file_format,
221 schema,
222 project_field_ids,
223 predicate,
224 deletes,
225 sequence_number,
226 equality_ids,
227 file_size_in_bytes,
228 case_sensitive,
229 ..
230 }: FileScanTask,
231 ) -> Self {
232 Self {
233 start,
234 length,
235 record_count,
236 data_file_path,
237 referenced_data_file,
238 data_file_content,
239 data_file_format,
240 schema,
241 project_field_ids,
242 predicate,
243 deletes: deletes
244 .into_iter()
245 .map(PersistedFileScanTask::from_task_ref)
246 .collect(),
247 sequence_number,
248 equality_ids,
249 file_size_in_bytes,
250 case_sensitive,
251 }
252 }
253
254 fn from_task_ref(task: Arc<FileScanTask>) -> Self {
255 Self {
256 start: task.start,
257 length: task.length,
258 record_count: task.record_count,
259 data_file_path: task.data_file_path.clone(),
260 referenced_data_file: task.referenced_data_file.clone(),
261 data_file_content: task.data_file_content,
262 data_file_format: task.data_file_format,
263 schema: task.schema.clone(),
264 project_field_ids: task.project_field_ids.clone(),
265 predicate: task.predicate.clone(),
266 deletes: task
267 .deletes
268 .iter()
269 .cloned()
270 .map(PersistedFileScanTask::from_task_ref)
271 .collect(),
272 sequence_number: task.sequence_number,
273 equality_ids: task.equality_ids.clone(),
274 file_size_in_bytes: task.file_size_in_bytes,
275 case_sensitive: task.case_sensitive,
276 }
277 }
278 }
279}
280
281impl<S: StateStore> IcebergFetchExecutor<S> {
282 pub fn new(
283 actor_ctx: ActorContextRef,
284 stream_source_core: StreamSourceCore<S>,
285 upstream: Executor,
286 rate_limit_rps: Option<u32>,
287 streaming_config: Arc<StreamingConfig>,
288 ) -> Self {
289 Self {
290 actor_ctx,
291 stream_source_core: Some(stream_source_core),
292 upstream: Some(upstream),
293 rate_limit_rps,
294 streaming_config,
295 }
296 }
297
298 #[expect(clippy::too_many_arguments)]
299 async fn replace_with_new_batch_reader<const BIASED: bool>(
300 splits_on_fetch: &mut usize,
301 state_store_handler: &SourceStateTableHandler<S>,
302 column_ids: Vec<ColumnId>,
303 source_ctx: SourceContext,
304 source_desc: SourceDesc,
305 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
306 rate_limit_rps: Option<u32>,
307 streaming_config: Arc<StreamingConfig>,
308 ) -> StreamExecutorResult<()> {
309 let mut batch =
310 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
311 let state_table = state_store_handler.state_table();
312 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
313 let table_iter = state_table
314 .iter_with_vnode(
315 vnode,
316 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
317 PrefetchOptions::prefetch_for_small_range_scan(),
319 )
320 .await?;
321 pin_mut!(table_iter);
322 while let Some(item) = table_iter.next().await {
323 let row = item?;
324 let task = match row.datum_at(1) {
325 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
326 PersistedFileScanTask::decode(jsonb_ref)?
327 }
328 _ => unreachable!(),
329 };
330 batch.push(task);
331
332 if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
333 break 'vnodes;
334 }
335 }
336 }
337 if batch.is_empty() {
338 stream.replace_data_stream(stream::pending().boxed());
339 } else {
340 *splits_on_fetch += batch.len();
341 let batch_reader = Self::build_batched_stream_reader(
342 column_ids,
343 source_ctx,
344 source_desc,
345 batch,
346 rate_limit_rps,
347 streaming_config,
348 )
349 .map_err(StreamExecutorError::connector_error);
350 stream.replace_data_stream(batch_reader);
351 }
352
353 Ok(())
354 }
355
356 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
357 async fn build_batched_stream_reader(
358 _column_ids: Vec<ColumnId>,
359 _source_ctx: SourceContext,
360 source_desc: SourceDesc,
361 batch: Vec<FileScanTask>,
362 _rate_limit_rps: Option<u32>,
363 streaming_config: Arc<StreamingConfig>,
364 ) {
365 let file_path_idx = source_desc
366 .columns
367 .iter()
368 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
369 .unwrap();
370 let file_pos_idx = source_desc
371 .columns
372 .iter()
373 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
374 .unwrap();
375 let properties = source_desc.source.config.clone();
376 let properties = match properties {
377 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
378 iceberg_properties
379 }
380 _ => unreachable!(),
381 };
382 let table = properties.load_table().await?;
383 let metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
384
385 for task in batch {
386 let task_data_file_path = task.data_file_path.clone();
389 let mut chunks = vec![];
390 #[for_await]
391 for chunk in scan_task_to_chunk_with_deletes(
392 table.clone(),
393 task,
394 IcebergScanOpts {
395 chunk_size: streaming_config.developer.chunk_size,
396 need_seq_num: true, need_file_path_and_pos: true,
398 handle_delete_files: table.metadata().format_version()
399 >= iceberg::spec::FormatVersion::V3,
400 },
401 Some(metrics.clone()),
402 ) {
403 let chunk = chunk?;
404 if chunk.cardinality() == 0 {
410 continue;
411 }
412 chunks.push(StreamChunk::from_parts(
413 itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
414 chunk,
415 ));
416 }
417 let (data_file_path, last_read_pos) = if let Some(last_chunk) = chunks.last() {
423 let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
424 let path = last_row
425 .datum_at(file_path_idx)
426 .unwrap()
427 .into_utf8()
428 .to_owned();
429 let pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
430 (path, pos)
431 } else {
432 (task_data_file_path, None)
435 };
436 yield ChunksWithState {
437 chunks,
438 data_file_path,
439 last_read_pos,
440 };
441 }
442 }
443
444 fn build_source_ctx(
445 &self,
446 source_desc: &SourceDesc,
447 source_id: SourceId,
448 source_name: &str,
449 ) -> SourceContext {
450 SourceContext::new(
451 self.actor_ctx.id,
452 source_id,
453 self.actor_ctx.fragment_id,
454 source_name.to_owned(),
455 source_desc.metrics.clone(),
456 SourceCtrlOpts {
457 chunk_size: limited_chunk_size(self.rate_limit_rps),
458 split_txn: self.rate_limit_rps.is_some(), },
460 source_desc.source.config.clone(),
461 None,
462 )
463 }
464
465 #[try_stream(ok = Message, error = StreamExecutorError)]
466 async fn into_stream(mut self) {
467 let mut upstream = self.upstream.take().unwrap().execute();
468 let barrier = expect_first_barrier(&mut upstream).await?;
469 let first_epoch = barrier.epoch;
470 let is_pause_on_startup = barrier.is_pause_on_startup();
471 yield Message::Barrier(barrier);
472
473 let mut core = self.stream_source_core.take().unwrap();
474 let mut state_store_handler = core.split_state_store;
475
476 let source_desc_builder = core.source_desc_builder.take().unwrap();
478
479 let source_desc = source_desc_builder
480 .build()
481 .map_err(StreamExecutorError::connector_error)?;
482
483 let file_path_idx = source_desc
484 .columns
485 .iter()
486 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
487 .unwrap();
488 let file_pos_idx = source_desc
489 .columns
490 .iter()
491 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
492 .unwrap();
493 let row_id_idx = source_desc
495 .columns
496 .iter()
497 .position(|c| c.name == ROW_ID_COLUMN_NAME)
498 .unwrap();
499 tracing::trace!(
500 "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
501 source_desc.columns,
502 file_path_idx,
503 file_pos_idx,
504 row_id_idx
505 );
506 state_store_handler.init_epoch(first_epoch).await?;
508
509 let iceberg_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
511 let iceberg_table_name = {
512 match &source_desc.source.config {
513 risingwave_connector::source::ConnectorProperties::Iceberg(props) => {
514 props.table.table_name().to_owned()
515 }
516 _ => unreachable!("IcebergFetchExecutor must be built with Iceberg properties"),
517 }
518 };
519 let source_id_str = core.source_id.to_string();
520 let source_name_str = core.source_name.clone();
521 let metrics_labels = [
522 source_id_str.as_str(),
523 source_name_str.as_str(),
524 iceberg_table_name.as_str(),
525 ];
526
527 let mut splits_on_fetch: usize = 0;
528 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
529 upstream,
530 stream::pending().boxed(),
531 );
532
533 if is_pause_on_startup {
534 stream.pause_stream();
535 }
536
537 Self::replace_with_new_batch_reader(
541 &mut splits_on_fetch,
542 &state_store_handler, core.column_ids.clone(),
544 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
545 source_desc.clone(),
546 &mut stream,
547 self.rate_limit_rps,
548 self.streaming_config.clone(),
549 )
550 .await?;
551 iceberg_metrics
552 .iceberg_source_inflight_file_count
553 .with_guarded_label_values(&metrics_labels)
554 .set(splits_on_fetch as i64);
555
556 while let Some(msg) = stream.next().await {
557 match msg {
558 Err(e) => {
559 tracing::error!(error = %e.as_report(), "Fetch Error");
560 iceberg_metrics
561 .iceberg_source_scan_errors_total
562 .with_guarded_label_values(&[
563 metrics_labels[0],
564 metrics_labels[1],
565 metrics_labels[2],
566 "fetch_error",
567 ])
568 .inc();
569 splits_on_fetch = 0;
570 iceberg_metrics
571 .iceberg_source_inflight_file_count
572 .with_guarded_label_values(&metrics_labels)
573 .set(0);
574 }
575 Ok(msg) => {
576 match msg {
577 Either::Left(msg) => {
579 match msg {
580 Message::Barrier(barrier) => {
581 let mut need_rebuild_reader = false;
582
583 if let Some(mutation) = barrier.mutation.as_deref() {
584 match mutation {
585 Mutation::Pause => stream.pause_stream(),
586 Mutation::Resume => stream.resume_stream(),
587 Mutation::Throttle(fragment_to_apply) => {
588 if let Some(entry) = fragment_to_apply
589 .get(&self.actor_ctx.fragment_id)
590 && entry.throttle_type() == ThrottleType::Source
591 && entry.rate_limit != self.rate_limit_rps
592 {
593 tracing::debug!(
594 "updating rate limit from {:?} to {:?}",
595 self.rate_limit_rps,
596 entry.rate_limit
597 );
598 self.rate_limit_rps = entry.rate_limit;
599 need_rebuild_reader = true;
600 }
601 }
602 _ => (),
603 }
604 }
605
606 let post_commit = state_store_handler
607 .commit_may_update_vnode_bitmap(barrier.epoch)
608 .await?;
609
610 let update_vnode_bitmap =
611 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
612 yield Message::Barrier(barrier);
614
615 if post_commit
616 .post_yield_barrier(update_vnode_bitmap)
617 .await?
618 .is_some()
619 {
620 splits_on_fetch = 0;
624 }
625
626 if splits_on_fetch == 0 || need_rebuild_reader {
627 Self::replace_with_new_batch_reader(
628 &mut splits_on_fetch,
629 &state_store_handler,
630 core.column_ids.clone(),
631 self.build_source_ctx(
632 &source_desc,
633 core.source_id,
634 &core.source_name,
635 ),
636 source_desc.clone(),
637 &mut stream,
638 self.rate_limit_rps,
639 self.streaming_config.clone(),
640 )
641 .await?;
642 iceberg_metrics
643 .iceberg_source_inflight_file_count
644 .with_guarded_label_values(&metrics_labels)
645 .set(splits_on_fetch as i64);
646 }
647 }
648 Message::Chunk(chunk) => {
651 let jsonb_values: Vec<(String, JsonbVal)> = chunk
652 .data_chunk()
653 .rows()
654 .map(|row| {
655 let file_name = row.datum_at(0).unwrap().into_utf8();
656 let split = row.datum_at(1).unwrap().into_jsonb();
657 (file_name.to_owned(), split.to_owned_scalar())
658 })
659 .collect();
660 state_store_handler.set_states_json(jsonb_values).await?;
661 state_store_handler.try_flush().await?;
662 }
663 Message::Watermark(_) => unreachable!(),
664 }
665 }
666 Either::Right(ChunksWithState {
668 chunks,
669 data_file_path,
670 last_read_pos: _,
671 }) => {
672 if true {
674 splits_on_fetch = splits_on_fetch.saturating_sub(1);
675 state_store_handler.delete(&data_file_path).await?;
676 iceberg_metrics
677 .iceberg_source_inflight_file_count
678 .with_guarded_label_values(&metrics_labels)
679 .set(splits_on_fetch as i64);
680 }
681
682 for chunk in &chunks {
683 let chunk = prune_additional_cols(
684 chunk,
685 &[file_path_idx, file_pos_idx],
686 &source_desc.columns,
687 );
688 let (chunk, op) = chunk.into_parts();
690 let (mut columns, visibility) = chunk.into_parts();
691 columns.insert(
692 row_id_idx,
693 Arc::new(
694 SerialArray::from_iter_bitmap(
695 itertools::repeat_n(Serial::from(0), columns[0].len()),
696 Bitmap::zeros(columns[0].len()),
697 )
698 .into(),
699 ),
700 );
701 let chunk = StreamChunk::from_parts(
702 op,
703 DataChunk::from_parts(columns.into(), visibility),
704 );
705
706 yield Message::Chunk(chunk);
707 }
708 }
709 }
710 }
711 }
712 }
713 }
714}
715
716impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
717 fn execute(self: Box<Self>) -> BoxedMessageStream {
718 self.into_stream().boxed()
719 }
720}
721
722impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
723 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
724 if let Some(core) = &self.stream_source_core {
725 f.debug_struct("IcebergFetchExecutor")
726 .field("source_id", &core.source_id)
727 .field("column_ids", &core.column_ids)
728 .finish()
729 } else {
730 f.debug_struct("IcebergFetchExecutor").finish()
731 }
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use itertools::Itertools;
738 use risingwave_common::array::{DataChunk, Op, StreamChunk};
739
740 use super::ChunksWithState;
741
742 #[test]
758 fn test_empty_chunks_with_state_satisfies_into_stream_contract() {
759 let path = "s3://bucket/empty.parquet".to_owned();
760
761 let cws = ChunksWithState {
763 chunks: vec![],
764 data_file_path: path.clone(),
765 last_read_pos: None,
766 };
767
768 let forwarded: Vec<_> = cws.chunks.iter().collect();
770 assert!(
771 forwarded.is_empty(),
772 "empty ChunksWithState must not forward any rows"
773 );
774
775 assert_eq!(
777 cws.data_file_path, path,
778 "data_file_path must match the original task path"
779 );
780 }
781
782 #[test]
784 fn test_non_empty_chunks_with_state() {
785 let chunk = StreamChunk::from_parts(
786 vec![Op::Insert, Op::Insert, Op::Insert],
787 DataChunk::new_dummy(3),
788 );
789 let cws = ChunksWithState {
790 chunks: vec![chunk],
791 data_file_path: "s3://bucket/data.parquet".to_owned(),
792 last_read_pos: None,
793 };
794
795 assert_eq!(cws.chunks.len(), 1);
796 assert_eq!(cws.chunks[0].cardinality(), 3);
797 }
798
799 #[test]
807 fn test_zero_cardinality_chunks_are_excluded() {
808 let path = "s3://bucket/mostly-deleted.parquet".to_owned();
810
811 let mut chunks: Vec<StreamChunk> = vec![];
812
813 let zero_row_chunk = DataChunk::new_dummy(0);
815 if zero_row_chunk.cardinality() == 0 {
816 } else {
818 chunks.push(StreamChunk::from_parts(
819 itertools::repeat_n(Op::Insert, zero_row_chunk.cardinality()).collect_vec(),
820 zero_row_chunk,
821 ));
822 }
823
824 assert!(
826 chunks.is_empty(),
827 "zero-cardinality chunk must not be added to the chunks vec"
828 );
829
830 let cws = ChunksWithState {
832 chunks,
833 data_file_path: path.clone(),
834 last_read_pos: None,
835 };
836
837 assert_eq!(
839 cws.data_file_path, path,
840 "data_file_path must be set even when all chunks are zero-cardinality"
841 );
842 assert!(cws.chunks.is_empty());
844 }
845}