1use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25 ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::metrics::GLOBAL_ICEBERG_SCAN_METRICS;
32use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
33use risingwave_connector::source::reader::desc::SourceDesc;
34use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
35use risingwave_pb::common::ThrottleType;
36use risingwave_storage::store::PrefetchOptions;
37use thiserror_ext::AsReport;
38
39use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
40use crate::common::rate_limit::limited_chunk_size;
41use crate::executor::prelude::*;
42use crate::executor::stream_reader::StreamReaderWithPause;
43
44pub struct IcebergFetchExecutor<S: StateStore> {
50 actor_ctx: ActorContextRef,
51
52 stream_source_core: Option<StreamSourceCore<S>>,
54
55 upstream: Option<Executor>,
58
59 rate_limit_rps: Option<u32>,
61
62 streaming_config: Arc<StreamingConfig>,
64}
65
66pub(crate) struct ChunksWithState {
71 pub chunks: Vec<StreamChunk>,
73
74 pub data_file_path: String,
76
77 #[expect(dead_code)]
79 pub last_read_pos: Datum,
80}
81
82pub(super) use state::PersistedFileScanTask;
83mod state {
84 use std::sync::Arc;
85
86 use anyhow::Context;
87 use iceberg::expr::BoundPredicate;
88 use iceberg::scan::FileScanTask;
89 use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
90 use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
91 use serde::{Deserialize, Serialize};
92
93 use crate::executor::StreamExecutorResult;
94
95 fn default_case_sensitive() -> bool {
96 true
97 }
98 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106 pub struct PersistedFileScanTask {
107 pub start: u64,
109 pub length: u64,
111 pub record_count: Option<u64>,
116
117 pub data_file_path: String,
119
120 pub data_file_content: DataContentType,
122
123 pub data_file_format: DataFileFormat,
125
126 pub schema: SchemaRef,
128 pub project_field_ids: Vec<i32>,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub predicate: Option<BoundPredicate>,
133
134 pub deletes: Vec<PersistedFileScanTask>,
136 pub sequence_number: i64,
138 pub equality_ids: Option<Vec<i32>>,
140
141 pub file_size_in_bytes: u64,
142
143 #[serde(default = "default_case_sensitive")]
144 pub case_sensitive: bool,
145 }
146
147 impl PersistedFileScanTask {
148 pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
150 let persisted_task: Self =
151 serde_json::from_value(jsonb_ref.to_owned_scalar().take())
152 .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
153 Ok(Self::to_task(persisted_task))
154 }
155
156 pub fn encode(task: FileScanTask) -> JsonbVal {
158 let persisted_task = Self::from_task(task);
159 serde_json::to_value(persisted_task).unwrap().into()
160 }
161
162 fn to_task(
164 Self {
165 start,
166 length,
167 record_count,
168 data_file_path,
169 data_file_content,
170 data_file_format,
171 schema,
172 project_field_ids,
173 predicate,
174 deletes,
175 sequence_number,
176 equality_ids,
177 file_size_in_bytes,
178 case_sensitive,
179 }: Self,
180 ) -> FileScanTask {
181 FileScanTask {
182 start,
183 length,
184 record_count,
185 data_file_path,
186 data_file_content,
187 data_file_format,
188 schema,
189 project_field_ids,
190 predicate,
191 deletes: deletes
192 .into_iter()
193 .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
194 .collect(),
195 sequence_number,
196 equality_ids,
197 file_size_in_bytes,
198 partition: None,
199 partition_spec: None,
200 name_mapping: None,
201 case_sensitive,
202 }
203 }
204
205 fn from_task(
207 FileScanTask {
208 start,
209 length,
210 record_count,
211 data_file_path,
212 data_file_content,
213 data_file_format,
214 schema,
215 project_field_ids,
216 predicate,
217 deletes,
218 sequence_number,
219 equality_ids,
220 file_size_in_bytes,
221 case_sensitive,
222 ..
223 }: FileScanTask,
224 ) -> Self {
225 Self {
226 start,
227 length,
228 record_count,
229 data_file_path,
230 data_file_content,
231 data_file_format,
232 schema,
233 project_field_ids,
234 predicate,
235 deletes: deletes
236 .into_iter()
237 .map(PersistedFileScanTask::from_task_ref)
238 .collect(),
239 sequence_number,
240 equality_ids,
241 file_size_in_bytes,
242 case_sensitive,
243 }
244 }
245
246 fn from_task_ref(task: Arc<FileScanTask>) -> Self {
247 Self {
248 start: task.start,
249 length: task.length,
250 record_count: task.record_count,
251 data_file_path: task.data_file_path.clone(),
252 data_file_content: task.data_file_content,
253 data_file_format: task.data_file_format,
254 schema: task.schema.clone(),
255 project_field_ids: task.project_field_ids.clone(),
256 predicate: task.predicate.clone(),
257 deletes: task
258 .deletes
259 .iter()
260 .cloned()
261 .map(PersistedFileScanTask::from_task_ref)
262 .collect(),
263 sequence_number: task.sequence_number,
264 equality_ids: task.equality_ids.clone(),
265 file_size_in_bytes: task.file_size_in_bytes,
266 case_sensitive: task.case_sensitive,
267 }
268 }
269 }
270}
271
272impl<S: StateStore> IcebergFetchExecutor<S> {
273 pub fn new(
274 actor_ctx: ActorContextRef,
275 stream_source_core: StreamSourceCore<S>,
276 upstream: Executor,
277 rate_limit_rps: Option<u32>,
278 streaming_config: Arc<StreamingConfig>,
279 ) -> Self {
280 Self {
281 actor_ctx,
282 stream_source_core: Some(stream_source_core),
283 upstream: Some(upstream),
284 rate_limit_rps,
285 streaming_config,
286 }
287 }
288
289 #[expect(clippy::too_many_arguments)]
290 async fn replace_with_new_batch_reader<const BIASED: bool>(
291 splits_on_fetch: &mut usize,
292 state_store_handler: &SourceStateTableHandler<S>,
293 column_ids: Vec<ColumnId>,
294 source_ctx: SourceContext,
295 source_desc: SourceDesc,
296 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
297 rate_limit_rps: Option<u32>,
298 streaming_config: Arc<StreamingConfig>,
299 ) -> StreamExecutorResult<()> {
300 let mut batch =
301 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
302 let state_table = state_store_handler.state_table();
303 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
304 let table_iter = state_table
305 .iter_with_vnode(
306 vnode,
307 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
308 PrefetchOptions::prefetch_for_small_range_scan(),
310 )
311 .await?;
312 pin_mut!(table_iter);
313 while let Some(item) = table_iter.next().await {
314 let row = item?;
315 let task = match row.datum_at(1) {
316 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
317 PersistedFileScanTask::decode(jsonb_ref)?
318 }
319 _ => unreachable!(),
320 };
321 batch.push(task);
322
323 if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
324 break 'vnodes;
325 }
326 }
327 }
328 if batch.is_empty() {
329 stream.replace_data_stream(stream::pending().boxed());
330 } else {
331 *splits_on_fetch += batch.len();
332 let batch_reader = Self::build_batched_stream_reader(
333 column_ids,
334 source_ctx,
335 source_desc,
336 batch,
337 rate_limit_rps,
338 streaming_config,
339 )
340 .map_err(StreamExecutorError::connector_error);
341 stream.replace_data_stream(batch_reader);
342 }
343
344 Ok(())
345 }
346
347 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
348 async fn build_batched_stream_reader(
349 _column_ids: Vec<ColumnId>,
350 _source_ctx: SourceContext,
351 source_desc: SourceDesc,
352 batch: Vec<FileScanTask>,
353 _rate_limit_rps: Option<u32>,
354 streaming_config: Arc<StreamingConfig>,
355 ) {
356 let file_path_idx = source_desc
357 .columns
358 .iter()
359 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
360 .unwrap();
361 let file_pos_idx = source_desc
362 .columns
363 .iter()
364 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
365 .unwrap();
366 let properties = source_desc.source.config.clone();
367 let properties = match properties {
368 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
369 iceberg_properties
370 }
371 _ => unreachable!(),
372 };
373 let table = properties.load_table().await?;
374 let metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
375
376 for task in batch {
377 let task_data_file_path = task.data_file_path.clone();
380 let mut chunks = vec![];
381 #[for_await]
382 for chunk in scan_task_to_chunk_with_deletes(
383 table.clone(),
384 task,
385 IcebergScanOpts {
386 chunk_size: streaming_config.developer.chunk_size,
387 need_seq_num: true, need_file_path_and_pos: true,
389 handle_delete_files: table.metadata().format_version()
390 >= iceberg::spec::FormatVersion::V3,
391 },
392 Some(metrics.clone()),
393 ) {
394 let chunk = chunk?;
395 if chunk.cardinality() == 0 {
401 continue;
402 }
403 chunks.push(StreamChunk::from_parts(
404 itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
405 chunk,
406 ));
407 }
408 let (data_file_path, last_read_pos) = if let Some(last_chunk) = chunks.last() {
414 let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
415 let path = last_row
416 .datum_at(file_path_idx)
417 .unwrap()
418 .into_utf8()
419 .to_owned();
420 let pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
421 (path, pos)
422 } else {
423 (task_data_file_path, None)
426 };
427 yield ChunksWithState {
428 chunks,
429 data_file_path,
430 last_read_pos,
431 };
432 }
433 }
434
435 fn build_source_ctx(
436 &self,
437 source_desc: &SourceDesc,
438 source_id: SourceId,
439 source_name: &str,
440 ) -> SourceContext {
441 SourceContext::new(
442 self.actor_ctx.id,
443 source_id,
444 self.actor_ctx.fragment_id,
445 source_name.to_owned(),
446 source_desc.metrics.clone(),
447 SourceCtrlOpts {
448 chunk_size: limited_chunk_size(self.rate_limit_rps),
449 split_txn: self.rate_limit_rps.is_some(), },
451 source_desc.source.config.clone(),
452 None,
453 )
454 }
455
456 #[try_stream(ok = Message, error = StreamExecutorError)]
457 async fn into_stream(mut self) {
458 let mut upstream = self.upstream.take().unwrap().execute();
459 let barrier = expect_first_barrier(&mut upstream).await?;
460 let first_epoch = barrier.epoch;
461 let is_pause_on_startup = barrier.is_pause_on_startup();
462 yield Message::Barrier(barrier);
463
464 let mut core = self.stream_source_core.take().unwrap();
465 let mut state_store_handler = core.split_state_store;
466
467 let source_desc_builder = core.source_desc_builder.take().unwrap();
469
470 let source_desc = source_desc_builder
471 .build()
472 .map_err(StreamExecutorError::connector_error)?;
473
474 let file_path_idx = source_desc
475 .columns
476 .iter()
477 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
478 .unwrap();
479 let file_pos_idx = source_desc
480 .columns
481 .iter()
482 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
483 .unwrap();
484 let row_id_idx = source_desc
486 .columns
487 .iter()
488 .position(|c| c.name == ROW_ID_COLUMN_NAME)
489 .unwrap();
490 tracing::trace!(
491 "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
492 source_desc.columns,
493 file_path_idx,
494 file_pos_idx,
495 row_id_idx
496 );
497 state_store_handler.init_epoch(first_epoch).await?;
499
500 let iceberg_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
502 let iceberg_table_name = {
503 match &source_desc.source.config {
504 risingwave_connector::source::ConnectorProperties::Iceberg(props) => {
505 props.table.table_name().to_owned()
506 }
507 _ => unreachable!("IcebergFetchExecutor must be built with Iceberg properties"),
508 }
509 };
510 let source_id_str = core.source_id.to_string();
511 let source_name_str = core.source_name.clone();
512 let metrics_labels = [
513 source_id_str.as_str(),
514 source_name_str.as_str(),
515 iceberg_table_name.as_str(),
516 ];
517
518 let mut splits_on_fetch: usize = 0;
519 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
520 upstream,
521 stream::pending().boxed(),
522 );
523
524 if is_pause_on_startup {
525 stream.pause_stream();
526 }
527
528 Self::replace_with_new_batch_reader(
532 &mut splits_on_fetch,
533 &state_store_handler, core.column_ids.clone(),
535 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
536 source_desc.clone(),
537 &mut stream,
538 self.rate_limit_rps,
539 self.streaming_config.clone(),
540 )
541 .await?;
542 iceberg_metrics
543 .iceberg_source_inflight_file_count
544 .with_guarded_label_values(&metrics_labels)
545 .set(splits_on_fetch as i64);
546
547 while let Some(msg) = stream.next().await {
548 match msg {
549 Err(e) => {
550 tracing::error!(error = %e.as_report(), "Fetch Error");
551 iceberg_metrics
552 .iceberg_source_scan_errors_total
553 .with_guarded_label_values(&[
554 metrics_labels[0],
555 metrics_labels[1],
556 metrics_labels[2],
557 "fetch_error",
558 ])
559 .inc();
560 splits_on_fetch = 0;
561 iceberg_metrics
562 .iceberg_source_inflight_file_count
563 .with_guarded_label_values(&metrics_labels)
564 .set(0);
565 }
566 Ok(msg) => {
567 match msg {
568 Either::Left(msg) => {
570 match msg {
571 Message::Barrier(barrier) => {
572 let mut need_rebuild_reader = false;
573
574 if let Some(mutation) = barrier.mutation.as_deref() {
575 match mutation {
576 Mutation::Pause => stream.pause_stream(),
577 Mutation::Resume => stream.resume_stream(),
578 Mutation::Throttle(fragment_to_apply) => {
579 if let Some(entry) = fragment_to_apply
580 .get(&self.actor_ctx.fragment_id)
581 && entry.throttle_type() == ThrottleType::Source
582 && entry.rate_limit != self.rate_limit_rps
583 {
584 tracing::debug!(
585 "updating rate limit from {:?} to {:?}",
586 self.rate_limit_rps,
587 entry.rate_limit
588 );
589 self.rate_limit_rps = entry.rate_limit;
590 need_rebuild_reader = true;
591 }
592 }
593 _ => (),
594 }
595 }
596
597 let post_commit = state_store_handler
598 .commit_may_update_vnode_bitmap(barrier.epoch)
599 .await?;
600
601 let update_vnode_bitmap =
602 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
603 yield Message::Barrier(barrier);
605
606 if post_commit
607 .post_yield_barrier(update_vnode_bitmap)
608 .await?
609 .is_some()
610 {
611 splits_on_fetch = 0;
615 }
616
617 if splits_on_fetch == 0 || need_rebuild_reader {
618 Self::replace_with_new_batch_reader(
619 &mut splits_on_fetch,
620 &state_store_handler,
621 core.column_ids.clone(),
622 self.build_source_ctx(
623 &source_desc,
624 core.source_id,
625 &core.source_name,
626 ),
627 source_desc.clone(),
628 &mut stream,
629 self.rate_limit_rps,
630 self.streaming_config.clone(),
631 )
632 .await?;
633 iceberg_metrics
634 .iceberg_source_inflight_file_count
635 .with_guarded_label_values(&metrics_labels)
636 .set(splits_on_fetch as i64);
637 }
638 }
639 Message::Chunk(chunk) => {
642 let jsonb_values: Vec<(String, JsonbVal)> = chunk
643 .data_chunk()
644 .rows()
645 .map(|row| {
646 let file_name = row.datum_at(0).unwrap().into_utf8();
647 let split = row.datum_at(1).unwrap().into_jsonb();
648 (file_name.to_owned(), split.to_owned_scalar())
649 })
650 .collect();
651 state_store_handler.set_states_json(jsonb_values).await?;
652 state_store_handler.try_flush().await?;
653 }
654 Message::Watermark(_) => unreachable!(),
655 }
656 }
657 Either::Right(ChunksWithState {
659 chunks,
660 data_file_path,
661 last_read_pos: _,
662 }) => {
663 if true {
665 splits_on_fetch = splits_on_fetch.saturating_sub(1);
666 state_store_handler.delete(&data_file_path).await?;
667 iceberg_metrics
668 .iceberg_source_inflight_file_count
669 .with_guarded_label_values(&metrics_labels)
670 .set(splits_on_fetch as i64);
671 }
672
673 for chunk in &chunks {
674 let chunk = prune_additional_cols(
675 chunk,
676 &[file_path_idx, file_pos_idx],
677 &source_desc.columns,
678 );
679 let (chunk, op) = chunk.into_parts();
681 let (mut columns, visibility) = chunk.into_parts();
682 columns.insert(
683 row_id_idx,
684 Arc::new(
685 SerialArray::from_iter_bitmap(
686 itertools::repeat_n(Serial::from(0), columns[0].len()),
687 Bitmap::zeros(columns[0].len()),
688 )
689 .into(),
690 ),
691 );
692 let chunk = StreamChunk::from_parts(
693 op,
694 DataChunk::from_parts(columns.into(), visibility),
695 );
696
697 yield Message::Chunk(chunk);
698 }
699 }
700 }
701 }
702 }
703 }
704 }
705}
706
707impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
708 fn execute(self: Box<Self>) -> BoxedMessageStream {
709 self.into_stream().boxed()
710 }
711}
712
713impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
714 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
715 if let Some(core) = &self.stream_source_core {
716 f.debug_struct("IcebergFetchExecutor")
717 .field("source_id", &core.source_id)
718 .field("column_ids", &core.column_ids)
719 .finish()
720 } else {
721 f.debug_struct("IcebergFetchExecutor").finish()
722 }
723 }
724}
725
726#[cfg(test)]
727mod tests {
728 use itertools::Itertools;
729 use risingwave_common::array::{DataChunk, Op, StreamChunk};
730
731 use super::ChunksWithState;
732
733 #[test]
749 fn test_empty_chunks_with_state_satisfies_into_stream_contract() {
750 let path = "s3://bucket/empty.parquet".to_owned();
751
752 let cws = ChunksWithState {
754 chunks: vec![],
755 data_file_path: path.clone(),
756 last_read_pos: None,
757 };
758
759 let forwarded: Vec<_> = cws.chunks.iter().collect();
761 assert!(
762 forwarded.is_empty(),
763 "empty ChunksWithState must not forward any rows"
764 );
765
766 assert_eq!(
768 cws.data_file_path, path,
769 "data_file_path must match the original task path"
770 );
771 }
772
773 #[test]
775 fn test_non_empty_chunks_with_state() {
776 let chunk = StreamChunk::from_parts(
777 vec![Op::Insert, Op::Insert, Op::Insert],
778 DataChunk::new_dummy(3),
779 );
780 let cws = ChunksWithState {
781 chunks: vec![chunk],
782 data_file_path: "s3://bucket/data.parquet".to_owned(),
783 last_read_pos: None,
784 };
785
786 assert_eq!(cws.chunks.len(), 1);
787 assert_eq!(cws.chunks[0].cardinality(), 3);
788 }
789
790 #[test]
798 fn test_zero_cardinality_chunks_are_excluded() {
799 let path = "s3://bucket/mostly-deleted.parquet".to_owned();
801
802 let mut chunks: Vec<StreamChunk> = vec![];
803
804 let zero_row_chunk = DataChunk::new_dummy(0);
806 if zero_row_chunk.cardinality() == 0 {
807 } else {
809 chunks.push(StreamChunk::from_parts(
810 itertools::repeat_n(Op::Insert, zero_row_chunk.cardinality()).collect_vec(),
811 zero_row_chunk,
812 ));
813 }
814
815 assert!(
817 chunks.is_empty(),
818 "zero-cardinality chunk must not be added to the chunks vec"
819 );
820
821 let cws = ChunksWithState {
823 chunks,
824 data_file_path: path.clone(),
825 last_read_pos: None,
826 };
827
828 assert_eq!(
830 cws.data_file_path, path,
831 "data_file_path must be set even when all chunks are zero-cardinality"
832 );
833 assert!(cws.chunks.is_empty());
835 }
836}