risingwave_stream/executor/source/batch_source/
batch_iceberg_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use either::Either;
18use futures::stream;
19use iceberg::scan::FileScanTask;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::array::Op;
23use risingwave_common::catalog::{ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME};
24use risingwave_common::config::StreamingConfig;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
27use risingwave_common::types::{JsonbVal, Scalar, ScalarRef};
28use risingwave_connector::source::iceberg::metrics::{
29    GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics,
30};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use thiserror_ext::AsReport;
34
35use crate::executor::prelude::*;
36use crate::executor::source::{
37    ChunksWithState, PersistedFileScanTask, StreamSourceCore, prune_additional_cols,
38};
39use crate::executor::stream_reader::StreamReaderWithPause;
40use crate::task::LocalBarrierManager;
41
42/// Type alias for file entries in the queue: (`file_name`, `scan_task_json`)
43type FileEntry = (String, JsonbVal);
44
45struct FetchState {
46    /// Whether we are in a refresh cycle (started by `RefreshStart`, ended by load finished report)
47    is_refreshing: bool,
48
49    /// Whether the upstream list executor has finished listing all files
50    is_list_finished: bool,
51
52    /// Number of files currently being fetched in the active batch reader
53    splits_on_fetch: usize,
54
55    /// Shared flag indicating whether the current batch reader has finished reading all files
56    is_batch_finished: Arc<RwLock<bool>>,
57
58    /// Queue of files waiting to be processed
59    file_queue: VecDeque<FileEntry>,
60
61    /// Files currently being fetched by the batch reader.
62    /// Used for at-least-once recovery: on error, these files are re-queued.
63    in_flight_files: Vec<FileEntry>,
64}
65
66impl FetchState {
67    fn new() -> Self {
68        Self {
69            is_refreshing: false,
70            is_list_finished: false,
71            splits_on_fetch: 0,
72            is_batch_finished: Arc::new(RwLock::new(false)),
73            file_queue: VecDeque::new(),
74            in_flight_files: Vec::new(),
75        }
76    }
77
78    /// Reset all state for a new refresh cycle.
79    fn reset_for_refresh(&mut self) {
80        tracing::info!(
81            "reset_for_refresh: clearing file_queue_len={}, in_flight_files_len={}, splits_on_fetch={}",
82            self.file_queue.len(),
83            self.in_flight_files.len(),
84            self.splits_on_fetch
85        );
86        self.file_queue.clear();
87        self.in_flight_files.clear();
88        self.splits_on_fetch = 0;
89        self.is_refreshing = true;
90        self.is_list_finished = false;
91        *self.is_batch_finished.write() = false;
92    }
93
94    /// Check if we should report load finished to the barrier manager.
95    fn should_report_load_finished(&self) -> bool {
96        self.splits_on_fetch == 0
97            && self.file_queue.is_empty()
98            && self.in_flight_files.is_empty()
99            && self.is_list_finished
100            && self.is_refreshing
101    }
102
103    /// Mark the refresh cycle as complete after reporting load finished.
104    fn mark_refresh_complete(&mut self) {
105        self.is_list_finished = false;
106        self.is_refreshing = false;
107    }
108
109    /// Check if we should start a new batch reader.
110    fn should_start_batch_reader(&self, need_rebuild: bool) -> bool {
111        need_rebuild
112            || (self.splits_on_fetch == 0 && !self.file_queue.is_empty() && self.is_refreshing)
113    }
114
115    /// Mark one file as successfully fetched.
116    fn mark_file_fetched(&mut self) {
117        self.splits_on_fetch -= 1;
118
119        // When all files in the current batch complete successfully, clear in-flight tracking.
120        // We don't need to wait for is_batch_finished because by the time splits_on_fetch reaches 0,
121        // all files have been processed successfully.
122        if self.splits_on_fetch == 0 {
123            tracing::info!("All files fetched successfully, clearing in_flight_files");
124            self.in_flight_files.clear();
125        }
126    }
127
128    /// Handle fetch error with at-least-once recovery.
129    /// Re-queues in-flight files to ensure no file is skipped.
130    fn handle_error_recovery(&mut self) {
131        if !self.in_flight_files.is_empty() {
132            // Re-queue in-flight files to the front (reverse to maintain original order)
133            for file in self.in_flight_files.drain(..).rev() {
134                self.file_queue.push_front(file);
135            }
136        }
137        self.splits_on_fetch = 0;
138        *self.is_batch_finished.write() = false;
139    }
140
141    /// Enqueue new file assignments from upstream.
142    fn enqueue_files(&mut self, files: impl IntoIterator<Item = FileEntry>) {
143        self.file_queue.extend(files);
144    }
145}
146
147// ============================================================================
148// Column Indices Helper
149// ============================================================================
150
151/// Indices of special columns that need to be pruned from output.
152struct ColumnIndices {
153    file_path_idx: usize,
154    file_pos_idx: usize,
155}
156
157impl ColumnIndices {
158    fn from_source_desc(source_desc: &SourceDesc) -> Self {
159        let file_path_idx = source_desc
160            .columns
161            .iter()
162            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
163            .expect("file path column not found");
164        let file_pos_idx = source_desc
165            .columns
166            .iter()
167            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
168            .expect("file pos column not found");
169        Self {
170            file_path_idx,
171            file_pos_idx,
172        }
173    }
174
175    fn to_prune(&self) -> [usize; 2] {
176        [self.file_path_idx, self.file_pos_idx]
177    }
178}
179
180// ============================================================================
181// Batch Iceberg Fetch Executor
182// ============================================================================
183
184/// Executor that fetches data from Iceberg files discovered by an upstream list executor.
185///
186///
187/// # Refresh Cycle
188///
189/// 1. Receives `RefreshStart` mutation - clears state and starts new cycle
190/// 2. Receives file chunks from upstream list executor - queues files for processing
191/// 3. On each barrier, starts batch reader if files are pending
192/// 4. Receives `ListFinish` mutation - marks listing as complete
193/// 5. When all files processed, reports load finished
194///
195/// # At-Least-Once Semantics
196///
197/// On fetch errors, in-flight files are re-queued to ensure no file is skipped.
198/// This may cause duplicate reads, but guarantees data completeness.
199pub struct BatchIcebergFetchExecutor<S: StateStore> {
200    actor_ctx: ActorContextRef,
201
202    /// Core component for managing external streaming source state
203    stream_source_core: Option<StreamSourceCore<S>>,
204
205    /// Upstream list executor that provides file scan tasks
206    upstream: Option<Executor>,
207
208    /// Barrier manager for reporting load finished
209    barrier_manager: LocalBarrierManager,
210
211    streaming_config: Arc<StreamingConfig>,
212
213    associated_table_id: TableId,
214}
215
216impl<S: StateStore> BatchIcebergFetchExecutor<S> {
217    pub fn new(
218        actor_ctx: ActorContextRef,
219        stream_source_core: StreamSourceCore<S>,
220        upstream: Executor,
221        barrier_manager: LocalBarrierManager,
222        streaming_config: Arc<StreamingConfig>,
223        associated_table_id: Option<TableId>,
224    ) -> Self {
225        assert!(associated_table_id.is_some());
226        Self {
227            actor_ctx,
228            stream_source_core: Some(stream_source_core),
229            upstream: Some(upstream),
230            barrier_manager,
231            streaming_config,
232            associated_table_id: associated_table_id.unwrap(),
233        }
234    }
235}
236
237impl<S: StateStore> BatchIcebergFetchExecutor<S> {
238    #[try_stream(ok = Message, error = StreamExecutorError)]
239    async fn into_stream(mut self) {
240        // Initialize upstream and wait for first barrier
241        let mut upstream = self.upstream.take().unwrap().execute();
242        let first_barrier = expect_first_barrier(&mut upstream).await?;
243        yield Message::Barrier(first_barrier);
244
245        // Initialize source description
246        let mut core = self.stream_source_core.take().unwrap();
247        let source_desc = core
248            .source_desc_builder
249            .take()
250            .unwrap()
251            .build()
252            .map_err(StreamExecutorError::connector_error)?;
253
254        // Find column indices for pruning
255        let column_indices = ColumnIndices::from_source_desc(&source_desc);
256
257        // Initialize metrics context
258        let iceberg_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
259        let iceberg_table_name = match &source_desc.source.config {
260            risingwave_connector::source::ConnectorProperties::Iceberg(props) => {
261                props.table.table_name().to_owned()
262            }
263            _ => unreachable!("BatchIcebergFetchExecutor must be built with Iceberg properties"),
264        };
265        let source_id_str = core.source_id.to_string();
266        let source_name_str = core.source_name.clone();
267        let metrics_labels = [
268            source_id_str.as_str(),
269            source_name_str.as_str(),
270            iceberg_table_name.as_str(),
271        ];
272
273        // Initialize state and stream reader
274        let mut state = FetchState::new();
275        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
276            upstream,
277            stream::pending().boxed(),
278        );
279
280        // Main processing loop
281        while let Some(msg) = stream.next().await {
282            match msg {
283                // ----- Error Handling with At-Least-Once Recovery -----
284                Err(e) => {
285                    tracing::error!(error = %e.as_report(), "Fetch Error");
286
287                    GLOBAL_ERROR_METRICS.user_source_error.report([
288                        e.variant_name().to_owned(),
289                        core.source_id.to_string(),
290                        self.actor_ctx.fragment_id.to_string(),
291                        self.associated_table_id.to_string(),
292                    ]);
293
294                    iceberg_metrics
295                        .iceberg_source_scan_errors_total
296                        .with_guarded_label_values(&[
297                            metrics_labels[0],
298                            metrics_labels[1],
299                            metrics_labels[2],
300                            "fetch_error",
301                        ])
302                        .inc();
303
304                    let in_flight_count = state.in_flight_files.len();
305                    state.handle_error_recovery();
306
307                    if in_flight_count > 0 {
308                        tracing::info!(
309                            source_id = %core.source_id,
310                            table_id = %self.associated_table_id,
311                            in_flight_count = %in_flight_count,
312                            "re-queued in-flight files for retry to ensure at-least-once semantics"
313                        );
314                    }
315
316                    stream.replace_data_stream(stream::pending().boxed());
317
318                    tracing::info!(
319                        source_id = %core.source_id,
320                        table_id = %self.associated_table_id,
321                        remaining_files = %state.file_queue.len(),
322                        "attempting to recover from fetch error, will retry on next barrier"
323                    );
324
325                    continue;
326                }
327
328                // ----- Upstream Messages (barriers, file assignments) -----
329                Ok(Either::Left(msg)) => match msg {
330                    Message::Barrier(barrier) => {
331                        let need_rebuild = Self::handle_barrier_mutations(
332                            &barrier,
333                            &core,
334                            &mut state,
335                            &mut stream,
336                        );
337
338                        if barrier.is_checkpoint() && state.should_report_load_finished() {
339                            tracing::info!(
340                                ?barrier.epoch,
341                                actor_id = %self.actor_ctx.id,
342                                source_id = %core.source_id,
343                                table_id = %self.associated_table_id,
344                                "Reporting load finished"
345                            );
346                            self.barrier_manager.report_source_load_finished(
347                                barrier.epoch,
348                                self.actor_ctx.id,
349                                self.associated_table_id,
350                                core.source_id,
351                            );
352                            state.mark_refresh_complete();
353                        }
354
355                        yield Message::Barrier(barrier);
356
357                        if state.should_start_batch_reader(need_rebuild) {
358                            Self::start_batch_reader(
359                                &mut state,
360                                &mut stream,
361                                source_desc.clone(),
362                                &self.streaming_config,
363                            )?;
364
365                            iceberg_metrics
366                                .iceberg_source_inflight_file_count
367                                .with_guarded_label_values(&metrics_labels)
368                                .set(state.splits_on_fetch as i64);
369                        }
370                    }
371
372                    Message::Chunk(chunk) => {
373                        let files = Self::parse_file_assignments(&chunk);
374                        tracing::debug!("Received {} file assignments from upstream", files.len());
375                        state.enqueue_files(files);
376                    }
377
378                    Message::Watermark(_) => unreachable!(),
379                },
380
381                // ----- Fetched Data from Iceberg Files -----
382                Ok(Either::Right(ChunksWithState { chunks, .. })) => {
383                    state.mark_file_fetched();
384
385                    iceberg_metrics
386                        .iceberg_source_inflight_file_count
387                        .with_guarded_label_values(&metrics_labels)
388                        .set(state.splits_on_fetch as i64);
389
390                    for chunk in &chunks {
391                        let pruned = prune_additional_cols(
392                            chunk,
393                            &column_indices.to_prune(),
394                            &source_desc.columns,
395                        );
396                        yield Message::Chunk(pruned);
397                    }
398                }
399            }
400        }
401    }
402
403    /// Handle barrier mutations and return whether reader needs to be rebuilt.
404    fn handle_barrier_mutations(
405        barrier: &Barrier,
406        core: &StreamSourceCore<S>,
407        state: &mut FetchState,
408        stream: &mut StreamReaderWithPause<true, ChunksWithState>,
409    ) -> bool {
410        let Some(mutation) = barrier.mutation.as_deref() else {
411            return false;
412        };
413
414        match mutation {
415            Mutation::Pause => {
416                stream.pause_stream();
417                false
418            }
419            Mutation::Resume => {
420                stream.resume_stream();
421                false
422            }
423            Mutation::RefreshStart {
424                associated_source_id,
425                ..
426            } if associated_source_id == &core.source_id => {
427                tracing::info!(
428                    ?barrier.epoch,
429                    source_id = %core.source_id,
430                    is_checkpoint = barrier.is_checkpoint(),
431                    "RefreshStart: resetting state for new refresh cycle"
432                );
433                state.reset_for_refresh();
434                true
435            }
436            Mutation::ListFinish {
437                associated_source_id,
438            } if associated_source_id == &core.source_id => {
439                tracing::info!(
440                    ?barrier.epoch,
441                    source_id = %core.source_id,
442                    is_checkpoint = barrier.is_checkpoint(),
443                    "ListFinish: upstream finished listing files"
444                );
445                state.is_list_finished = true;
446                false
447            }
448            _ => false,
449        }
450    }
451
452    /// Parse file assignments from an upstream chunk.
453    fn parse_file_assignments(chunk: &StreamChunk) -> Vec<FileEntry> {
454        chunk
455            .data_chunk()
456            .rows()
457            .map(|row| {
458                let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
459                let scan_task = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
460                (file_name, scan_task)
461            })
462            .collect()
463    }
464
465    /// Start a new batch reader for pending files.
466    fn start_batch_reader(
467        state: &mut FetchState,
468        stream: &mut StreamReaderWithPause<true, ChunksWithState>,
469        source_desc: SourceDesc,
470        streaming_config: &StreamingConfig,
471    ) -> StreamExecutorResult<()> {
472        let metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
473        // Clear previous in-flight files (should already be empty on success, re-queued on error)
474        state.in_flight_files.clear();
475
476        // Collect batch of files to process
477        let batch_size = streaming_config.developer.iceberg_fetch_batch_size as usize;
478        let mut batch = Vec::with_capacity(batch_size);
479
480        for _ in 0..batch_size {
481            let Some(file_entry) = state.file_queue.pop_front() else {
482                break;
483            };
484            // Track as in-flight for at-least-once recovery
485            state.in_flight_files.push(file_entry.clone());
486            batch.push(PersistedFileScanTask::decode(file_entry.1.as_scalar_ref())?);
487        }
488
489        if batch.is_empty() {
490            tracing::info!("Batch is empty, setting stream to pending");
491            stream.replace_data_stream(stream::pending().boxed());
492        } else {
493            tracing::debug!("Starting batch reader with {} files", batch.len());
494            state.splits_on_fetch += batch.len();
495            *state.is_batch_finished.write() = false;
496
497            let batch_reader = Self::build_batched_stream_reader(
498                source_desc,
499                batch,
500                streaming_config.developer.chunk_size,
501                state.is_batch_finished.clone(),
502                metrics,
503            );
504            stream.replace_data_stream(batch_reader.boxed());
505        }
506
507        Ok(())
508    }
509
510    /// Build a stream reader that reads multiple Iceberg files in sequence.
511    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
512    async fn build_batched_stream_reader(
513        source_desc: SourceDesc,
514        tasks: Vec<FileScanTask>,
515        chunk_size: usize,
516        batch_finished: Arc<RwLock<bool>>,
517        metrics: Arc<IcebergScanMetrics>,
518    ) {
519        let properties = match source_desc.source.config.clone() {
520            risingwave_connector::source::ConnectorProperties::Iceberg(props) => props,
521            _ => unreachable!("Expected Iceberg connector properties"),
522        };
523        let table = properties.load_table().await?;
524
525        for task in tasks {
526            let mut chunks = vec![];
527            #[for_await]
528            for chunk_result in scan_task_to_chunk_with_deletes(
529                table.clone(),
530                task,
531                IcebergScanOpts {
532                    chunk_size,
533                    need_seq_num: true, // Keep for potential future usage
534                    need_file_path_and_pos: true,
535                    handle_delete_files: true,
536                },
537                Some(metrics.clone()),
538            ) {
539                let chunk = chunk_result?;
540                let ops = itertools::repeat_n(Op::Insert, chunk.capacity()).collect_vec();
541                chunks.push(StreamChunk::from_parts(ops, chunk));
542            }
543
544            yield ChunksWithState {
545                chunks,
546                data_file_path: String::new(), // Not needed for refreshable iceberg fetch
547                last_read_pos: None,
548            };
549        }
550
551        *batch_finished.write() = true;
552    }
553}
554
555impl<S: StateStore> Execute for BatchIcebergFetchExecutor<S> {
556    fn execute(self: Box<Self>) -> BoxedMessageStream {
557        self.into_stream().boxed()
558    }
559}
560
561impl<S: StateStore> Debug for BatchIcebergFetchExecutor<S> {
562    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
563        if let Some(core) = &self.stream_source_core {
564            f.debug_struct("BatchIcebergFetchExecutor")
565                .field("source_id", &core.source_id)
566                .field("column_ids", &core.column_ids)
567                .finish()
568        } else {
569            f.debug_struct("BatchIcebergFetchExecutor").finish()
570        }
571    }
572}