risingwave_stream/executor/source/batch_source/
batch_iceberg_fetch.rs1use std::collections::VecDeque;
16
17use either::Either;
18use futures::stream;
19use iceberg::scan::FileScanTask;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::array::Op;
23use risingwave_common::catalog::{ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME};
24use risingwave_common::config::StreamingConfig;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
27use risingwave_common::types::{JsonbVal, Scalar, ScalarRef};
28use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
29use risingwave_connector::source::reader::desc::SourceDesc;
30use thiserror_ext::AsReport;
31
32use crate::executor::prelude::*;
33use crate::executor::source::{
34 ChunksWithState, PersistedFileScanTask, StreamSourceCore, prune_additional_cols,
35};
36use crate::executor::stream_reader::StreamReaderWithPause;
37use crate::task::LocalBarrierManager;
38
39type FileEntry = (String, JsonbVal);
41
42struct FetchState {
43 is_refreshing: bool,
45
46 is_list_finished: bool,
48
49 splits_on_fetch: usize,
51
52 is_batch_finished: Arc<RwLock<bool>>,
54
55 file_queue: VecDeque<FileEntry>,
57
58 in_flight_files: Vec<FileEntry>,
61}
62
63impl FetchState {
64 fn new() -> Self {
65 Self {
66 is_refreshing: false,
67 is_list_finished: false,
68 splits_on_fetch: 0,
69 is_batch_finished: Arc::new(RwLock::new(false)),
70 file_queue: VecDeque::new(),
71 in_flight_files: Vec::new(),
72 }
73 }
74
75 fn reset_for_refresh(&mut self) {
77 tracing::info!(
78 "reset_for_refresh: clearing file_queue_len={}, in_flight_files_len={}, splits_on_fetch={}",
79 self.file_queue.len(),
80 self.in_flight_files.len(),
81 self.splits_on_fetch
82 );
83 self.file_queue.clear();
84 self.in_flight_files.clear();
85 self.splits_on_fetch = 0;
86 self.is_refreshing = true;
87 self.is_list_finished = false;
88 *self.is_batch_finished.write() = false;
89 }
90
91 fn should_report_load_finished(&self) -> bool {
93 self.splits_on_fetch == 0
94 && self.file_queue.is_empty()
95 && self.in_flight_files.is_empty()
96 && self.is_list_finished
97 && self.is_refreshing
98 }
99
100 fn mark_refresh_complete(&mut self) {
102 self.is_list_finished = false;
103 self.is_refreshing = false;
104 }
105
106 fn should_start_batch_reader(&self, need_rebuild: bool) -> bool {
108 need_rebuild
109 || (self.splits_on_fetch == 0 && !self.file_queue.is_empty() && self.is_refreshing)
110 }
111
112 fn mark_file_fetched(&mut self) {
114 self.splits_on_fetch -= 1;
115
116 if self.splits_on_fetch == 0 {
120 tracing::info!("All files fetched successfully, clearing in_flight_files");
121 self.in_flight_files.clear();
122 }
123 }
124
125 fn handle_error_recovery(&mut self) {
128 if !self.in_flight_files.is_empty() {
129 for file in self.in_flight_files.drain(..).rev() {
131 self.file_queue.push_front(file);
132 }
133 }
134 self.splits_on_fetch = 0;
135 *self.is_batch_finished.write() = false;
136 }
137
138 fn enqueue_files(&mut self, files: impl IntoIterator<Item = FileEntry>) {
140 self.file_queue.extend(files);
141 }
142}
143
144struct ColumnIndices {
150 file_path_idx: usize,
151 file_pos_idx: usize,
152}
153
154impl ColumnIndices {
155 fn from_source_desc(source_desc: &SourceDesc) -> Self {
156 let file_path_idx = source_desc
157 .columns
158 .iter()
159 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
160 .expect("file path column not found");
161 let file_pos_idx = source_desc
162 .columns
163 .iter()
164 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
165 .expect("file pos column not found");
166 Self {
167 file_path_idx,
168 file_pos_idx,
169 }
170 }
171
172 fn to_prune(&self) -> [usize; 2] {
173 [self.file_path_idx, self.file_pos_idx]
174 }
175}
176
177pub struct BatchIcebergFetchExecutor<S: StateStore> {
197 actor_ctx: ActorContextRef,
198
199 stream_source_core: Option<StreamSourceCore<S>>,
201
202 upstream: Option<Executor>,
204
205 barrier_manager: LocalBarrierManager,
207
208 streaming_config: Arc<StreamingConfig>,
209
210 associated_table_id: TableId,
211}
212
213impl<S: StateStore> BatchIcebergFetchExecutor<S> {
214 pub fn new(
215 actor_ctx: ActorContextRef,
216 stream_source_core: StreamSourceCore<S>,
217 upstream: Executor,
218 barrier_manager: LocalBarrierManager,
219 streaming_config: Arc<StreamingConfig>,
220 associated_table_id: Option<TableId>,
221 ) -> Self {
222 assert!(associated_table_id.is_some());
223 Self {
224 actor_ctx,
225 stream_source_core: Some(stream_source_core),
226 upstream: Some(upstream),
227 barrier_manager,
228 streaming_config,
229 associated_table_id: associated_table_id.unwrap(),
230 }
231 }
232}
233
234impl<S: StateStore> BatchIcebergFetchExecutor<S> {
235 #[try_stream(ok = Message, error = StreamExecutorError)]
236 async fn into_stream(mut self) {
237 let mut upstream = self.upstream.take().unwrap().execute();
239 let first_barrier = expect_first_barrier(&mut upstream).await?;
240 yield Message::Barrier(first_barrier);
241
242 let mut core = self.stream_source_core.take().unwrap();
244 let source_desc = core
245 .source_desc_builder
246 .take()
247 .unwrap()
248 .build()
249 .map_err(StreamExecutorError::connector_error)?;
250
251 let column_indices = ColumnIndices::from_source_desc(&source_desc);
253
254 let mut state = FetchState::new();
256 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
257 upstream,
258 stream::pending().boxed(),
259 );
260
261 while let Some(msg) = stream.next().await {
263 match msg {
264 Err(e) => {
266 tracing::error!(error = %e.as_report(), "Fetch Error");
267
268 GLOBAL_ERROR_METRICS.user_source_error.report([
269 e.variant_name().to_owned(),
270 core.source_id.to_string(),
271 self.actor_ctx.fragment_id.to_string(),
272 self.associated_table_id.to_string(),
273 ]);
274
275 let in_flight_count = state.in_flight_files.len();
276 state.handle_error_recovery();
277
278 if in_flight_count > 0 {
279 tracing::info!(
280 source_id = %core.source_id,
281 table_id = %self.associated_table_id,
282 in_flight_count = %in_flight_count,
283 "re-queued in-flight files for retry to ensure at-least-once semantics"
284 );
285 }
286
287 stream.replace_data_stream(stream::pending().boxed());
288
289 tracing::info!(
290 source_id = %core.source_id,
291 table_id = %self.associated_table_id,
292 remaining_files = %state.file_queue.len(),
293 "attempting to recover from fetch error, will retry on next barrier"
294 );
295
296 continue;
297 }
298
299 Ok(Either::Left(msg)) => match msg {
301 Message::Barrier(barrier) => {
302 let need_rebuild = Self::handle_barrier_mutations(
303 &barrier,
304 &core,
305 &mut state,
306 &mut stream,
307 );
308
309 if barrier.is_checkpoint() && state.should_report_load_finished() {
310 tracing::info!(
311 ?barrier.epoch,
312 actor_id = %self.actor_ctx.id,
313 source_id = %core.source_id,
314 table_id = %self.associated_table_id,
315 "Reporting load finished"
316 );
317 self.barrier_manager.report_source_load_finished(
318 barrier.epoch,
319 self.actor_ctx.id,
320 self.associated_table_id,
321 core.source_id,
322 );
323 state.mark_refresh_complete();
324 }
325
326 yield Message::Barrier(barrier);
327
328 if state.should_start_batch_reader(need_rebuild) {
329 Self::start_batch_reader(
330 &mut state,
331 &mut stream,
332 source_desc.clone(),
333 &self.streaming_config,
334 )?;
335 }
336 }
337
338 Message::Chunk(chunk) => {
339 let files = Self::parse_file_assignments(&chunk);
340 tracing::debug!("Received {} file assignments from upstream", files.len());
341 state.enqueue_files(files);
342 }
343
344 Message::Watermark(_) => unreachable!(),
345 },
346
347 Ok(Either::Right(ChunksWithState { chunks, .. })) => {
349 state.mark_file_fetched();
350
351 for chunk in &chunks {
352 let pruned = prune_additional_cols(
353 chunk,
354 &column_indices.to_prune(),
355 &source_desc.columns,
356 );
357 yield Message::Chunk(pruned);
358 }
359 }
360 }
361 }
362 }
363
364 fn handle_barrier_mutations(
366 barrier: &Barrier,
367 core: &StreamSourceCore<S>,
368 state: &mut FetchState,
369 stream: &mut StreamReaderWithPause<true, ChunksWithState>,
370 ) -> bool {
371 let Some(mutation) = barrier.mutation.as_deref() else {
372 return false;
373 };
374
375 match mutation {
376 Mutation::Pause => {
377 stream.pause_stream();
378 false
379 }
380 Mutation::Resume => {
381 stream.resume_stream();
382 false
383 }
384 Mutation::RefreshStart {
385 associated_source_id,
386 ..
387 } if associated_source_id == &core.source_id => {
388 tracing::info!(
389 ?barrier.epoch,
390 source_id = %core.source_id,
391 is_checkpoint = barrier.is_checkpoint(),
392 "RefreshStart: resetting state for new refresh cycle"
393 );
394 state.reset_for_refresh();
395 true
396 }
397 Mutation::ListFinish {
398 associated_source_id,
399 } if associated_source_id == &core.source_id => {
400 tracing::info!(
401 ?barrier.epoch,
402 source_id = %core.source_id,
403 is_checkpoint = barrier.is_checkpoint(),
404 "ListFinish: upstream finished listing files"
405 );
406 state.is_list_finished = true;
407 false
408 }
409 _ => false,
410 }
411 }
412
413 fn parse_file_assignments(chunk: &StreamChunk) -> Vec<FileEntry> {
415 chunk
416 .data_chunk()
417 .rows()
418 .map(|row| {
419 let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
420 let scan_task = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
421 (file_name, scan_task)
422 })
423 .collect()
424 }
425
426 fn start_batch_reader(
428 state: &mut FetchState,
429 stream: &mut StreamReaderWithPause<true, ChunksWithState>,
430 source_desc: SourceDesc,
431 streaming_config: &StreamingConfig,
432 ) -> StreamExecutorResult<()> {
433 state.in_flight_files.clear();
435
436 let batch_size = streaming_config.developer.iceberg_fetch_batch_size as usize;
438 let mut batch = Vec::with_capacity(batch_size);
439
440 for _ in 0..batch_size {
441 let Some(file_entry) = state.file_queue.pop_front() else {
442 break;
443 };
444 state.in_flight_files.push(file_entry.clone());
446 batch.push(PersistedFileScanTask::decode(file_entry.1.as_scalar_ref())?);
447 }
448
449 if batch.is_empty() {
450 tracing::info!("Batch is empty, setting stream to pending");
451 stream.replace_data_stream(stream::pending().boxed());
452 } else {
453 tracing::debug!("Starting batch reader with {} files", batch.len());
454 state.splits_on_fetch += batch.len();
455 *state.is_batch_finished.write() = false;
456
457 let batch_reader = Self::build_batched_stream_reader(
458 source_desc,
459 batch,
460 streaming_config.developer.chunk_size,
461 state.is_batch_finished.clone(),
462 );
463 stream.replace_data_stream(batch_reader.boxed());
464 }
465
466 Ok(())
467 }
468
469 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
471 async fn build_batched_stream_reader(
472 source_desc: SourceDesc,
473 tasks: Vec<FileScanTask>,
474 chunk_size: usize,
475 batch_finished: Arc<RwLock<bool>>,
476 ) {
477 let properties = match source_desc.source.config.clone() {
478 risingwave_connector::source::ConnectorProperties::Iceberg(props) => props,
479 _ => unreachable!("Expected Iceberg connector properties"),
480 };
481 let table = properties.load_table().await?;
482
483 for task in tasks {
484 let mut chunks = vec![];
485 #[for_await]
486 for chunk_result in scan_task_to_chunk_with_deletes(
487 table.clone(),
488 task,
489 IcebergScanOpts {
490 chunk_size,
491 need_seq_num: true, need_file_path_and_pos: true,
493 handle_delete_files: true,
494 },
495 None,
496 ) {
497 let chunk = chunk_result?;
498 let ops = itertools::repeat_n(Op::Insert, chunk.capacity()).collect_vec();
499 chunks.push(StreamChunk::from_parts(ops, chunk));
500 }
501
502 yield ChunksWithState {
503 chunks,
504 data_file_path: String::new(), last_read_pos: None,
506 };
507 }
508
509 *batch_finished.write() = true;
510 }
511}
512
513impl<S: StateStore> Execute for BatchIcebergFetchExecutor<S> {
514 fn execute(self: Box<Self>) -> BoxedMessageStream {
515 self.into_stream().boxed()
516 }
517}
518
519impl<S: StateStore> Debug for BatchIcebergFetchExecutor<S> {
520 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
521 if let Some(core) = &self.stream_source_core {
522 f.debug_struct("BatchIcebergFetchExecutor")
523 .field("source_id", &core.source_id)
524 .field("column_ids", &core.column_ids)
525 .finish()
526 } else {
527 f.debug_struct("BatchIcebergFetchExecutor").finish()
528 }
529 }
530}