risingwave_stream/executor/source/batch_source/
batch_iceberg_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use either::Either;
18use futures::stream;
19use iceberg::scan::FileScanTask;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::array::Op;
23use risingwave_common::catalog::{ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME};
24use risingwave_common::config::StreamingConfig;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
27use risingwave_common::types::{JsonbVal, Scalar, ScalarRef};
28use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
29use risingwave_connector::source::reader::desc::SourceDesc;
30use thiserror_ext::AsReport;
31
32use crate::executor::prelude::*;
33use crate::executor::source::{
34    ChunksWithState, PersistedFileScanTask, StreamSourceCore, prune_additional_cols,
35};
36use crate::executor::stream_reader::StreamReaderWithPause;
37use crate::task::LocalBarrierManager;
38
39/// Type alias for file entries in the queue: (`file_name`, `scan_task_json`)
40type FileEntry = (String, JsonbVal);
41
42struct FetchState {
43    /// Whether we are in a refresh cycle (started by `RefreshStart`, ended by load finished report)
44    is_refreshing: bool,
45
46    /// Whether the upstream list executor has finished listing all files
47    is_list_finished: bool,
48
49    /// Number of files currently being fetched in the active batch reader
50    splits_on_fetch: usize,
51
52    /// Shared flag indicating whether the current batch reader has finished reading all files
53    is_batch_finished: Arc<RwLock<bool>>,
54
55    /// Queue of files waiting to be processed
56    file_queue: VecDeque<FileEntry>,
57
58    /// Files currently being fetched by the batch reader.
59    /// Used for at-least-once recovery: on error, these files are re-queued.
60    in_flight_files: Vec<FileEntry>,
61}
62
63impl FetchState {
64    fn new() -> Self {
65        Self {
66            is_refreshing: false,
67            is_list_finished: false,
68            splits_on_fetch: 0,
69            is_batch_finished: Arc::new(RwLock::new(false)),
70            file_queue: VecDeque::new(),
71            in_flight_files: Vec::new(),
72        }
73    }
74
75    /// Reset all state for a new refresh cycle.
76    fn reset_for_refresh(&mut self) {
77        tracing::info!(
78            "reset_for_refresh: clearing file_queue_len={}, in_flight_files_len={}, splits_on_fetch={}",
79            self.file_queue.len(),
80            self.in_flight_files.len(),
81            self.splits_on_fetch
82        );
83        self.file_queue.clear();
84        self.in_flight_files.clear();
85        self.splits_on_fetch = 0;
86        self.is_refreshing = true;
87        self.is_list_finished = false;
88        *self.is_batch_finished.write() = false;
89    }
90
91    /// Check if we should report load finished to the barrier manager.
92    fn should_report_load_finished(&self) -> bool {
93        self.splits_on_fetch == 0
94            && self.file_queue.is_empty()
95            && self.in_flight_files.is_empty()
96            && self.is_list_finished
97            && self.is_refreshing
98    }
99
100    /// Mark the refresh cycle as complete after reporting load finished.
101    fn mark_refresh_complete(&mut self) {
102        self.is_list_finished = false;
103        self.is_refreshing = false;
104    }
105
106    /// Check if we should start a new batch reader.
107    fn should_start_batch_reader(&self, need_rebuild: bool) -> bool {
108        need_rebuild
109            || (self.splits_on_fetch == 0 && !self.file_queue.is_empty() && self.is_refreshing)
110    }
111
112    /// Mark one file as successfully fetched.
113    fn mark_file_fetched(&mut self) {
114        self.splits_on_fetch -= 1;
115
116        // When all files in the current batch complete successfully, clear in-flight tracking.
117        // We don't need to wait for is_batch_finished because by the time splits_on_fetch reaches 0,
118        // all files have been processed successfully.
119        if self.splits_on_fetch == 0 {
120            tracing::info!("All files fetched successfully, clearing in_flight_files");
121            self.in_flight_files.clear();
122        }
123    }
124
125    /// Handle fetch error with at-least-once recovery.
126    /// Re-queues in-flight files to ensure no file is skipped.
127    fn handle_error_recovery(&mut self) {
128        if !self.in_flight_files.is_empty() {
129            // Re-queue in-flight files to the front (reverse to maintain original order)
130            for file in self.in_flight_files.drain(..).rev() {
131                self.file_queue.push_front(file);
132            }
133        }
134        self.splits_on_fetch = 0;
135        *self.is_batch_finished.write() = false;
136    }
137
138    /// Enqueue new file assignments from upstream.
139    fn enqueue_files(&mut self, files: impl IntoIterator<Item = FileEntry>) {
140        self.file_queue.extend(files);
141    }
142}
143
144// ============================================================================
145// Column Indices Helper
146// ============================================================================
147
148/// Indices of special columns that need to be pruned from output.
149struct ColumnIndices {
150    file_path_idx: usize,
151    file_pos_idx: usize,
152}
153
154impl ColumnIndices {
155    fn from_source_desc(source_desc: &SourceDesc) -> Self {
156        let file_path_idx = source_desc
157            .columns
158            .iter()
159            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
160            .expect("file path column not found");
161        let file_pos_idx = source_desc
162            .columns
163            .iter()
164            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
165            .expect("file pos column not found");
166        Self {
167            file_path_idx,
168            file_pos_idx,
169        }
170    }
171
172    fn to_prune(&self) -> [usize; 2] {
173        [self.file_path_idx, self.file_pos_idx]
174    }
175}
176
177// ============================================================================
178// Batch Iceberg Fetch Executor
179// ============================================================================
180
181/// Executor that fetches data from Iceberg files discovered by an upstream list executor.
182///
183///
184/// # Refresh Cycle
185///
186/// 1. Receives `RefreshStart` mutation - clears state and starts new cycle
187/// 2. Receives file chunks from upstream list executor - queues files for processing
188/// 3. On each barrier, starts batch reader if files are pending
189/// 4. Receives `ListFinish` mutation - marks listing as complete
190/// 5. When all files processed, reports load finished
191///
192/// # At-Least-Once Semantics
193///
194/// On fetch errors, in-flight files are re-queued to ensure no file is skipped.
195/// This may cause duplicate reads, but guarantees data completeness.
196pub struct BatchIcebergFetchExecutor<S: StateStore> {
197    actor_ctx: ActorContextRef,
198
199    /// Core component for managing external streaming source state
200    stream_source_core: Option<StreamSourceCore<S>>,
201
202    /// Upstream list executor that provides file scan tasks
203    upstream: Option<Executor>,
204
205    /// Barrier manager for reporting load finished
206    barrier_manager: LocalBarrierManager,
207
208    streaming_config: Arc<StreamingConfig>,
209
210    associated_table_id: TableId,
211}
212
213impl<S: StateStore> BatchIcebergFetchExecutor<S> {
214    pub fn new(
215        actor_ctx: ActorContextRef,
216        stream_source_core: StreamSourceCore<S>,
217        upstream: Executor,
218        barrier_manager: LocalBarrierManager,
219        streaming_config: Arc<StreamingConfig>,
220        associated_table_id: Option<TableId>,
221    ) -> Self {
222        assert!(associated_table_id.is_some());
223        Self {
224            actor_ctx,
225            stream_source_core: Some(stream_source_core),
226            upstream: Some(upstream),
227            barrier_manager,
228            streaming_config,
229            associated_table_id: associated_table_id.unwrap(),
230        }
231    }
232}
233
234impl<S: StateStore> BatchIcebergFetchExecutor<S> {
235    #[try_stream(ok = Message, error = StreamExecutorError)]
236    async fn into_stream(mut self) {
237        // Initialize upstream and wait for first barrier
238        let mut upstream = self.upstream.take().unwrap().execute();
239        let first_barrier = expect_first_barrier(&mut upstream).await?;
240        yield Message::Barrier(first_barrier);
241
242        // Initialize source description
243        let mut core = self.stream_source_core.take().unwrap();
244        let source_desc = core
245            .source_desc_builder
246            .take()
247            .unwrap()
248            .build()
249            .map_err(StreamExecutorError::connector_error)?;
250
251        // Find column indices for pruning
252        let column_indices = ColumnIndices::from_source_desc(&source_desc);
253
254        // Initialize state and stream reader
255        let mut state = FetchState::new();
256        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
257            upstream,
258            stream::pending().boxed(),
259        );
260
261        // Main processing loop
262        while let Some(msg) = stream.next().await {
263            match msg {
264                // ----- Error Handling with At-Least-Once Recovery -----
265                Err(e) => {
266                    tracing::error!(error = %e.as_report(), "Fetch Error");
267
268                    GLOBAL_ERROR_METRICS.user_source_error.report([
269                        e.variant_name().to_owned(),
270                        core.source_id.to_string(),
271                        self.actor_ctx.fragment_id.to_string(),
272                        self.associated_table_id.to_string(),
273                    ]);
274
275                    let in_flight_count = state.in_flight_files.len();
276                    state.handle_error_recovery();
277
278                    if in_flight_count > 0 {
279                        tracing::info!(
280                            source_id = %core.source_id,
281                            table_id = %self.associated_table_id,
282                            in_flight_count = %in_flight_count,
283                            "re-queued in-flight files for retry to ensure at-least-once semantics"
284                        );
285                    }
286
287                    stream.replace_data_stream(stream::pending().boxed());
288
289                    tracing::info!(
290                        source_id = %core.source_id,
291                        table_id = %self.associated_table_id,
292                        remaining_files = %state.file_queue.len(),
293                        "attempting to recover from fetch error, will retry on next barrier"
294                    );
295
296                    continue;
297                }
298
299                // ----- Upstream Messages (barriers, file assignments) -----
300                Ok(Either::Left(msg)) => match msg {
301                    Message::Barrier(barrier) => {
302                        let need_rebuild = Self::handle_barrier_mutations(
303                            &barrier,
304                            &core,
305                            &mut state,
306                            &mut stream,
307                        );
308
309                        if barrier.is_checkpoint() && state.should_report_load_finished() {
310                            tracing::info!(
311                                ?barrier.epoch,
312                                actor_id = %self.actor_ctx.id,
313                                source_id = %core.source_id,
314                                table_id = %self.associated_table_id,
315                                "Reporting load finished"
316                            );
317                            self.barrier_manager.report_source_load_finished(
318                                barrier.epoch,
319                                self.actor_ctx.id,
320                                self.associated_table_id,
321                                core.source_id,
322                            );
323                            state.mark_refresh_complete();
324                        }
325
326                        yield Message::Barrier(barrier);
327
328                        if state.should_start_batch_reader(need_rebuild) {
329                            Self::start_batch_reader(
330                                &mut state,
331                                &mut stream,
332                                source_desc.clone(),
333                                &self.streaming_config,
334                            )?;
335                        }
336                    }
337
338                    Message::Chunk(chunk) => {
339                        let files = Self::parse_file_assignments(&chunk);
340                        tracing::debug!("Received {} file assignments from upstream", files.len());
341                        state.enqueue_files(files);
342                    }
343
344                    Message::Watermark(_) => unreachable!(),
345                },
346
347                // ----- Fetched Data from Iceberg Files -----
348                Ok(Either::Right(ChunksWithState { chunks, .. })) => {
349                    state.mark_file_fetched();
350
351                    for chunk in &chunks {
352                        let pruned = prune_additional_cols(
353                            chunk,
354                            &column_indices.to_prune(),
355                            &source_desc.columns,
356                        );
357                        yield Message::Chunk(pruned);
358                    }
359                }
360            }
361        }
362    }
363
364    /// Handle barrier mutations and return whether reader needs to be rebuilt.
365    fn handle_barrier_mutations(
366        barrier: &Barrier,
367        core: &StreamSourceCore<S>,
368        state: &mut FetchState,
369        stream: &mut StreamReaderWithPause<true, ChunksWithState>,
370    ) -> bool {
371        let Some(mutation) = barrier.mutation.as_deref() else {
372            return false;
373        };
374
375        match mutation {
376            Mutation::Pause => {
377                stream.pause_stream();
378                false
379            }
380            Mutation::Resume => {
381                stream.resume_stream();
382                false
383            }
384            Mutation::RefreshStart {
385                associated_source_id,
386                ..
387            } if associated_source_id == &core.source_id => {
388                tracing::info!(
389                    ?barrier.epoch,
390                    source_id = %core.source_id,
391                    is_checkpoint = barrier.is_checkpoint(),
392                    "RefreshStart: resetting state for new refresh cycle"
393                );
394                state.reset_for_refresh();
395                true
396            }
397            Mutation::ListFinish {
398                associated_source_id,
399            } if associated_source_id == &core.source_id => {
400                tracing::info!(
401                    ?barrier.epoch,
402                    source_id = %core.source_id,
403                    is_checkpoint = barrier.is_checkpoint(),
404                    "ListFinish: upstream finished listing files"
405                );
406                state.is_list_finished = true;
407                false
408            }
409            _ => false,
410        }
411    }
412
413    /// Parse file assignments from an upstream chunk.
414    fn parse_file_assignments(chunk: &StreamChunk) -> Vec<FileEntry> {
415        chunk
416            .data_chunk()
417            .rows()
418            .map(|row| {
419                let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
420                let scan_task = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
421                (file_name, scan_task)
422            })
423            .collect()
424    }
425
426    /// Start a new batch reader for pending files.
427    fn start_batch_reader(
428        state: &mut FetchState,
429        stream: &mut StreamReaderWithPause<true, ChunksWithState>,
430        source_desc: SourceDesc,
431        streaming_config: &StreamingConfig,
432    ) -> StreamExecutorResult<()> {
433        // Clear previous in-flight files (should already be empty on success, re-queued on error)
434        state.in_flight_files.clear();
435
436        // Collect batch of files to process
437        let batch_size = streaming_config.developer.iceberg_fetch_batch_size as usize;
438        let mut batch = Vec::with_capacity(batch_size);
439
440        for _ in 0..batch_size {
441            let Some(file_entry) = state.file_queue.pop_front() else {
442                break;
443            };
444            // Track as in-flight for at-least-once recovery
445            state.in_flight_files.push(file_entry.clone());
446            batch.push(PersistedFileScanTask::decode(file_entry.1.as_scalar_ref())?);
447        }
448
449        if batch.is_empty() {
450            tracing::info!("Batch is empty, setting stream to pending");
451            stream.replace_data_stream(stream::pending().boxed());
452        } else {
453            tracing::debug!("Starting batch reader with {} files", batch.len());
454            state.splits_on_fetch += batch.len();
455            *state.is_batch_finished.write() = false;
456
457            let batch_reader = Self::build_batched_stream_reader(
458                source_desc,
459                batch,
460                streaming_config.developer.chunk_size,
461                state.is_batch_finished.clone(),
462            );
463            stream.replace_data_stream(batch_reader.boxed());
464        }
465
466        Ok(())
467    }
468
469    /// Build a stream reader that reads multiple Iceberg files in sequence.
470    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
471    async fn build_batched_stream_reader(
472        source_desc: SourceDesc,
473        tasks: Vec<FileScanTask>,
474        chunk_size: usize,
475        batch_finished: Arc<RwLock<bool>>,
476    ) {
477        let properties = match source_desc.source.config.clone() {
478            risingwave_connector::source::ConnectorProperties::Iceberg(props) => props,
479            _ => unreachable!("Expected Iceberg connector properties"),
480        };
481        let table = properties.load_table().await?;
482
483        for task in tasks {
484            let mut chunks = vec![];
485            #[for_await]
486            for chunk_result in scan_task_to_chunk_with_deletes(
487                table.clone(),
488                task,
489                IcebergScanOpts {
490                    chunk_size,
491                    need_seq_num: true, // Keep for potential future usage
492                    need_file_path_and_pos: true,
493                    handle_delete_files: true,
494                },
495                None,
496            ) {
497                let chunk = chunk_result?;
498                let ops = itertools::repeat_n(Op::Insert, chunk.capacity()).collect_vec();
499                chunks.push(StreamChunk::from_parts(ops, chunk));
500            }
501
502            yield ChunksWithState {
503                chunks,
504                data_file_path: String::new(), // Not needed for refreshable iceberg fetch
505                last_read_pos: None,
506            };
507        }
508
509        *batch_finished.write() = true;
510    }
511}
512
513impl<S: StateStore> Execute for BatchIcebergFetchExecutor<S> {
514    fn execute(self: Box<Self>) -> BoxedMessageStream {
515        self.into_stream().boxed()
516    }
517}
518
519impl<S: StateStore> Debug for BatchIcebergFetchExecutor<S> {
520    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
521        if let Some(core) = &self.stream_source_core {
522            f.debug_struct("BatchIcebergFetchExecutor")
523                .field("source_id", &core.source_id)
524                .field("column_ids", &core.column_ids)
525                .finish()
526        } else {
527            f.debug_struct("BatchIcebergFetchExecutor").finish()
528        }
529    }
530}