risingwave_stream/executor/source/batch_source/
batch_iceberg_fetch.rs1use std::collections::VecDeque;
16
17use either::Either;
18use futures::stream;
19use iceberg::scan::FileScanTask;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::array::Op;
23use risingwave_common::catalog::{ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME};
24use risingwave_common::config::StreamingConfig;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
27use risingwave_common::types::{JsonbVal, Scalar, ScalarRef};
28use risingwave_connector::source::iceberg::metrics::{
29 GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics,
30};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use thiserror_ext::AsReport;
34
35use crate::executor::prelude::*;
36use crate::executor::source::{
37 ChunksWithState, PersistedFileScanTask, StreamSourceCore, prune_additional_cols,
38};
39use crate::executor::stream_reader::StreamReaderWithPause;
40use crate::task::LocalBarrierManager;
41
42type FileEntry = (String, JsonbVal);
44
45struct FetchState {
46 is_refreshing: bool,
48
49 is_list_finished: bool,
51
52 splits_on_fetch: usize,
54
55 is_batch_finished: Arc<RwLock<bool>>,
57
58 file_queue: VecDeque<FileEntry>,
60
61 in_flight_files: Vec<FileEntry>,
64}
65
66impl FetchState {
67 fn new() -> Self {
68 Self {
69 is_refreshing: false,
70 is_list_finished: false,
71 splits_on_fetch: 0,
72 is_batch_finished: Arc::new(RwLock::new(false)),
73 file_queue: VecDeque::new(),
74 in_flight_files: Vec::new(),
75 }
76 }
77
78 fn reset_for_refresh(&mut self) {
80 tracing::info!(
81 "reset_for_refresh: clearing file_queue_len={}, in_flight_files_len={}, splits_on_fetch={}",
82 self.file_queue.len(),
83 self.in_flight_files.len(),
84 self.splits_on_fetch
85 );
86 self.file_queue.clear();
87 self.in_flight_files.clear();
88 self.splits_on_fetch = 0;
89 self.is_refreshing = true;
90 self.is_list_finished = false;
91 *self.is_batch_finished.write() = false;
92 }
93
94 fn should_report_load_finished(&self) -> bool {
96 self.splits_on_fetch == 0
97 && self.file_queue.is_empty()
98 && self.in_flight_files.is_empty()
99 && self.is_list_finished
100 && self.is_refreshing
101 }
102
103 fn mark_refresh_complete(&mut self) {
105 self.is_list_finished = false;
106 self.is_refreshing = false;
107 }
108
109 fn should_start_batch_reader(&self, need_rebuild: bool) -> bool {
111 need_rebuild
112 || (self.splits_on_fetch == 0 && !self.file_queue.is_empty() && self.is_refreshing)
113 }
114
115 fn mark_file_fetched(&mut self) {
117 self.splits_on_fetch -= 1;
118
119 if self.splits_on_fetch == 0 {
123 tracing::info!("All files fetched successfully, clearing in_flight_files");
124 self.in_flight_files.clear();
125 }
126 }
127
128 fn handle_error_recovery(&mut self) {
131 if !self.in_flight_files.is_empty() {
132 for file in self.in_flight_files.drain(..).rev() {
134 self.file_queue.push_front(file);
135 }
136 }
137 self.splits_on_fetch = 0;
138 *self.is_batch_finished.write() = false;
139 }
140
141 fn enqueue_files(&mut self, files: impl IntoIterator<Item = FileEntry>) {
143 self.file_queue.extend(files);
144 }
145}
146
147struct ColumnIndices {
153 file_path_idx: usize,
154 file_pos_idx: usize,
155}
156
157impl ColumnIndices {
158 fn from_source_desc(source_desc: &SourceDesc) -> Self {
159 let file_path_idx = source_desc
160 .columns
161 .iter()
162 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
163 .expect("file path column not found");
164 let file_pos_idx = source_desc
165 .columns
166 .iter()
167 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
168 .expect("file pos column not found");
169 Self {
170 file_path_idx,
171 file_pos_idx,
172 }
173 }
174
175 fn to_prune(&self) -> [usize; 2] {
176 [self.file_path_idx, self.file_pos_idx]
177 }
178}
179
180pub struct BatchIcebergFetchExecutor<S: StateStore> {
200 actor_ctx: ActorContextRef,
201
202 stream_source_core: Option<StreamSourceCore<S>>,
204
205 upstream: Option<Executor>,
207
208 barrier_manager: LocalBarrierManager,
210
211 streaming_config: Arc<StreamingConfig>,
212
213 associated_table_id: TableId,
214}
215
216impl<S: StateStore> BatchIcebergFetchExecutor<S> {
217 pub fn new(
218 actor_ctx: ActorContextRef,
219 stream_source_core: StreamSourceCore<S>,
220 upstream: Executor,
221 barrier_manager: LocalBarrierManager,
222 streaming_config: Arc<StreamingConfig>,
223 associated_table_id: Option<TableId>,
224 ) -> Self {
225 assert!(associated_table_id.is_some());
226 Self {
227 actor_ctx,
228 stream_source_core: Some(stream_source_core),
229 upstream: Some(upstream),
230 barrier_manager,
231 streaming_config,
232 associated_table_id: associated_table_id.unwrap(),
233 }
234 }
235}
236
237impl<S: StateStore> BatchIcebergFetchExecutor<S> {
238 #[try_stream(ok = Message, error = StreamExecutorError)]
239 async fn into_stream(mut self) {
240 let mut upstream = self.upstream.take().unwrap().execute();
242 let first_barrier = expect_first_barrier(&mut upstream).await?;
243 yield Message::Barrier(first_barrier);
244
245 let mut core = self.stream_source_core.take().unwrap();
247 let source_desc = core
248 .source_desc_builder
249 .take()
250 .unwrap()
251 .build()
252 .map_err(StreamExecutorError::connector_error)?;
253
254 let column_indices = ColumnIndices::from_source_desc(&source_desc);
256
257 let iceberg_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
259 let iceberg_table_name = match &source_desc.source.config {
260 risingwave_connector::source::ConnectorProperties::Iceberg(props) => {
261 props.table.table_name().to_owned()
262 }
263 _ => unreachable!("BatchIcebergFetchExecutor must be built with Iceberg properties"),
264 };
265 let source_id_str = core.source_id.to_string();
266 let source_name_str = core.source_name.clone();
267 let metrics_labels = [
268 source_id_str.as_str(),
269 source_name_str.as_str(),
270 iceberg_table_name.as_str(),
271 ];
272
273 let mut state = FetchState::new();
275 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
276 upstream,
277 stream::pending().boxed(),
278 );
279
280 while let Some(msg) = stream.next().await {
282 match msg {
283 Err(e) => {
285 tracing::error!(error = %e.as_report(), "Fetch Error");
286
287 GLOBAL_ERROR_METRICS.user_source_error.report([
288 e.variant_name().to_owned(),
289 core.source_id.to_string(),
290 self.actor_ctx.fragment_id.to_string(),
291 self.associated_table_id.to_string(),
292 ]);
293
294 iceberg_metrics
295 .iceberg_source_scan_errors_total
296 .with_guarded_label_values(&[
297 metrics_labels[0],
298 metrics_labels[1],
299 metrics_labels[2],
300 "fetch_error",
301 ])
302 .inc();
303
304 let in_flight_count = state.in_flight_files.len();
305 state.handle_error_recovery();
306
307 if in_flight_count > 0 {
308 tracing::info!(
309 source_id = %core.source_id,
310 table_id = %self.associated_table_id,
311 in_flight_count = %in_flight_count,
312 "re-queued in-flight files for retry to ensure at-least-once semantics"
313 );
314 }
315
316 stream.replace_data_stream(stream::pending().boxed());
317
318 tracing::info!(
319 source_id = %core.source_id,
320 table_id = %self.associated_table_id,
321 remaining_files = %state.file_queue.len(),
322 "attempting to recover from fetch error, will retry on next barrier"
323 );
324
325 continue;
326 }
327
328 Ok(Either::Left(msg)) => match msg {
330 Message::Barrier(barrier) => {
331 let need_rebuild = Self::handle_barrier_mutations(
332 &barrier,
333 &core,
334 &mut state,
335 &mut stream,
336 );
337
338 if barrier.is_checkpoint() && state.should_report_load_finished() {
339 tracing::info!(
340 ?barrier.epoch,
341 actor_id = %self.actor_ctx.id,
342 source_id = %core.source_id,
343 table_id = %self.associated_table_id,
344 "Reporting load finished"
345 );
346 self.barrier_manager.report_source_load_finished(
347 barrier.epoch,
348 self.actor_ctx.id,
349 self.associated_table_id,
350 core.source_id,
351 );
352 state.mark_refresh_complete();
353 }
354
355 yield Message::Barrier(barrier);
356
357 if state.should_start_batch_reader(need_rebuild) {
358 Self::start_batch_reader(
359 &mut state,
360 &mut stream,
361 source_desc.clone(),
362 &self.streaming_config,
363 )?;
364
365 iceberg_metrics
366 .iceberg_source_inflight_file_count
367 .with_guarded_label_values(&metrics_labels)
368 .set(state.splits_on_fetch as i64);
369 }
370 }
371
372 Message::Chunk(chunk) => {
373 let files = Self::parse_file_assignments(&chunk);
374 tracing::debug!("Received {} file assignments from upstream", files.len());
375 state.enqueue_files(files);
376 }
377
378 Message::Watermark(_) => unreachable!(),
379 },
380
381 Ok(Either::Right(ChunksWithState { chunks, .. })) => {
383 state.mark_file_fetched();
384
385 iceberg_metrics
386 .iceberg_source_inflight_file_count
387 .with_guarded_label_values(&metrics_labels)
388 .set(state.splits_on_fetch as i64);
389
390 for chunk in &chunks {
391 let pruned = prune_additional_cols(
392 chunk,
393 &column_indices.to_prune(),
394 &source_desc.columns,
395 );
396 yield Message::Chunk(pruned);
397 }
398 }
399 }
400 }
401 }
402
403 fn handle_barrier_mutations(
405 barrier: &Barrier,
406 core: &StreamSourceCore<S>,
407 state: &mut FetchState,
408 stream: &mut StreamReaderWithPause<true, ChunksWithState>,
409 ) -> bool {
410 let Some(mutation) = barrier.mutation.as_deref() else {
411 return false;
412 };
413
414 match mutation {
415 Mutation::Pause => {
416 stream.pause_stream();
417 false
418 }
419 Mutation::Resume => {
420 stream.resume_stream();
421 false
422 }
423 Mutation::RefreshStart {
424 associated_source_id,
425 ..
426 } if associated_source_id == &core.source_id => {
427 tracing::info!(
428 ?barrier.epoch,
429 source_id = %core.source_id,
430 is_checkpoint = barrier.is_checkpoint(),
431 "RefreshStart: resetting state for new refresh cycle"
432 );
433 state.reset_for_refresh();
434 true
435 }
436 Mutation::ListFinish {
437 associated_source_id,
438 } if associated_source_id == &core.source_id => {
439 tracing::info!(
440 ?barrier.epoch,
441 source_id = %core.source_id,
442 is_checkpoint = barrier.is_checkpoint(),
443 "ListFinish: upstream finished listing files"
444 );
445 state.is_list_finished = true;
446 false
447 }
448 _ => false,
449 }
450 }
451
452 fn parse_file_assignments(chunk: &StreamChunk) -> Vec<FileEntry> {
454 chunk
455 .data_chunk()
456 .rows()
457 .map(|row| {
458 let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
459 let scan_task = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
460 (file_name, scan_task)
461 })
462 .collect()
463 }
464
465 fn start_batch_reader(
467 state: &mut FetchState,
468 stream: &mut StreamReaderWithPause<true, ChunksWithState>,
469 source_desc: SourceDesc,
470 streaming_config: &StreamingConfig,
471 ) -> StreamExecutorResult<()> {
472 let metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
473 state.in_flight_files.clear();
475
476 let batch_size = streaming_config.developer.iceberg_fetch_batch_size as usize;
478 let mut batch = Vec::with_capacity(batch_size);
479
480 for _ in 0..batch_size {
481 let Some(file_entry) = state.file_queue.pop_front() else {
482 break;
483 };
484 state.in_flight_files.push(file_entry.clone());
486 batch.push(PersistedFileScanTask::decode(file_entry.1.as_scalar_ref())?);
487 }
488
489 if batch.is_empty() {
490 tracing::info!("Batch is empty, setting stream to pending");
491 stream.replace_data_stream(stream::pending().boxed());
492 } else {
493 tracing::debug!("Starting batch reader with {} files", batch.len());
494 state.splits_on_fetch += batch.len();
495 *state.is_batch_finished.write() = false;
496
497 let batch_reader = Self::build_batched_stream_reader(
498 source_desc,
499 batch,
500 streaming_config.developer.chunk_size,
501 state.is_batch_finished.clone(),
502 metrics,
503 );
504 stream.replace_data_stream(batch_reader.boxed());
505 }
506
507 Ok(())
508 }
509
510 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
512 async fn build_batched_stream_reader(
513 source_desc: SourceDesc,
514 tasks: Vec<FileScanTask>,
515 chunk_size: usize,
516 batch_finished: Arc<RwLock<bool>>,
517 metrics: Arc<IcebergScanMetrics>,
518 ) {
519 let properties = match source_desc.source.config.clone() {
520 risingwave_connector::source::ConnectorProperties::Iceberg(props) => props,
521 _ => unreachable!("Expected Iceberg connector properties"),
522 };
523 let table = properties.load_table().await?;
524
525 for task in tasks {
526 let mut chunks = vec![];
527 #[for_await]
528 for chunk_result in scan_task_to_chunk_with_deletes(
529 table.clone(),
530 task,
531 IcebergScanOpts {
532 chunk_size,
533 need_seq_num: true, need_file_path_and_pos: true,
535 handle_delete_files: true,
536 },
537 Some(metrics.clone()),
538 ) {
539 let chunk = chunk_result?;
540 let ops = itertools::repeat_n(Op::Insert, chunk.capacity()).collect_vec();
541 chunks.push(StreamChunk::from_parts(ops, chunk));
542 }
543
544 yield ChunksWithState {
545 chunks,
546 data_file_path: String::new(), last_read_pos: None,
548 };
549 }
550
551 *batch_finished.write() = true;
552 }
553}
554
555impl<S: StateStore> Execute for BatchIcebergFetchExecutor<S> {
556 fn execute(self: Box<Self>) -> BoxedMessageStream {
557 self.into_stream().boxed()
558 }
559}
560
561impl<S: StateStore> Debug for BatchIcebergFetchExecutor<S> {
562 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
563 if let Some(core) = &self.stream_source_core {
564 f.debug_struct("BatchIcebergFetchExecutor")
565 .field("source_id", &core.source_id)
566 .field("column_ids", &core.column_ids)
567 .finish()
568 } else {
569 f.debug_struct("BatchIcebergFetchExecutor").finish()
570 }
571 }
572}