risingwave_stream/executor/source/
fs_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20
21use either::Either;
22use futures::TryStreamExt;
23use futures::stream::{self, StreamExt};
24use futures_async_stream::try_stream;
25use pin_project::pin_project;
26use risingwave_common::catalog::ColumnId;
27use risingwave_common::hash::VnodeBitmapExt;
28use risingwave_common::id::SourceId;
29use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
30use risingwave_common::types::ScalarRef;
31use risingwave_connector::source::filesystem::OpendalFsSplit;
32use risingwave_connector::source::filesystem::opendal_source::{
33    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
34};
35use risingwave_connector::source::reader::desc::SourceDesc;
36use risingwave_connector::source::{
37    BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
38};
39use risingwave_pb::common::ThrottleType;
40use risingwave_storage::store::PrefetchOptions;
41use thiserror_ext::AsReport;
42
43use super::{
44    SourceStateTableHandler, StreamSourceCore,
45    apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
46    get_split_offset_mapping_from_chunk, prune_additional_cols,
47    source_reader_event_to_chunk_stream,
48};
49use crate::common::rate_limit::limited_chunk_size;
50use crate::executor::prelude::*;
51use crate::executor::stream_reader::StreamReaderWithPause;
52
53const SPLIT_BATCH_SIZE: usize = 1000;
54const MAX_RETRIES_PER_SPLIT: u32 = 3;
55const RETRY_BASE_BACKOFF: Duration = Duration::from_millis(200);
56
57type SplitBatch = Option<Vec<SplitImpl>>;
58
59struct ReplaceReaderArgs<'a, S: StateStore, const BIASED: bool> {
60    splits_on_fetch: &'a mut usize,
61    state_store_handler: &'a SourceStateTableHandler<S>,
62    dirty_splits: &'a HashSet<Arc<str>>,
63    column_ids: Vec<ColumnId>,
64    source_ctx: SourceContext,
65    source_desc: &'a SourceDesc,
66    stream: &'a mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
67    rate_limit_rps: Option<u32>,
68    reading_file: Arc<Mutex<Option<Arc<str>>>>,
69}
70
71/// A stream wrapper that sets `reading_file` before polling the underlying stream for the first time.
72///
73/// This helps attribute errors that happen before the first chunk is produced (e.g., invalid UTF-8)
74/// to the corresponding split.
75#[pin_project]
76struct SetReadingFileOnPoll<S> {
77    #[pin]
78    inner: S,
79    reading_file: Arc<Mutex<Option<Arc<str>>>>,
80    split_id: Arc<str>,
81    is_set: bool,
82}
83
84impl<S> SetReadingFileOnPoll<S> {
85    fn new(inner: S, reading_file: Arc<Mutex<Option<Arc<str>>>>, split_id: Arc<str>) -> Self {
86        Self {
87            inner,
88            reading_file,
89            split_id,
90            is_set: false,
91        }
92    }
93}
94
95impl<S> futures::Stream for SetReadingFileOnPoll<S>
96where
97    S: futures::Stream,
98{
99    type Item = S::Item;
100
101    fn poll_next(
102        self: std::pin::Pin<&mut Self>,
103        cx: &mut std::task::Context<'_>,
104    ) -> std::task::Poll<Option<Self::Item>> {
105        let this = self.project();
106        if !*this.is_set {
107            *this.reading_file.lock().expect("mutex poisoned") = Some(this.split_id.clone());
108            *this.is_set = true;
109        }
110        this.inner.poll_next(cx)
111    }
112}
113
114pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
115    actor_ctx: ActorContextRef,
116
117    /// Streaming source for external
118    stream_source_core: Option<StreamSourceCore<S>>,
119
120    /// Upstream list executor.
121    upstream: Option<Executor>,
122
123    /// Rate limit in rows/s.
124    rate_limit_rps: Option<u32>,
125
126    _marker: PhantomData<Src>,
127}
128
129impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
130    pub fn new(
131        actor_ctx: ActorContextRef,
132        stream_source_core: StreamSourceCore<S>,
133        upstream: Executor,
134        rate_limit_rps: Option<u32>,
135    ) -> Self {
136        Self {
137            actor_ctx,
138            stream_source_core: Some(stream_source_core),
139            upstream: Some(upstream),
140            rate_limit_rps,
141            _marker: PhantomData,
142        }
143    }
144
145    async fn replace_with_new_batch_reader<const BIASED: bool>(
146        args: ReplaceReaderArgs<'_, S, BIASED>,
147    ) -> StreamExecutorResult<()> {
148        let ReplaceReaderArgs {
149            splits_on_fetch,
150            state_store_handler,
151            dirty_splits,
152            column_ids,
153            source_ctx,
154            source_desc,
155            stream,
156            rate_limit_rps,
157            reading_file,
158        } = args;
159        let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
160        let state_table = state_store_handler.state_table();
161        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
162            let table_iter = state_table
163                .iter_with_vnode(
164                    vnode,
165                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
166                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
167                    PrefetchOptions::prefetch_for_small_range_scan(),
168                )
169                .await?;
170            pin_mut!(table_iter);
171            while let Some(item) = table_iter.next().await {
172                let row = item?;
173                let split = match row.datum_at(1) {
174                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match &source_desc.source.config {
175                        risingwave_connector::source::ConnectorProperties::Gcs(_) => {
176                            let split: OpendalFsSplit<OpendalGcs> =
177                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
178                            SplitImpl::from(split)
179                        }
180                        risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
181                            let split: OpendalFsSplit<OpendalS3> =
182                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
183                            SplitImpl::from(split)
184                        }
185                        risingwave_connector::source::ConnectorProperties::Azblob(_) => {
186                            let split: OpendalFsSplit<OpendalAzblob> =
187                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
188                            SplitImpl::from(split)
189                        }
190                        risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
191                            let split: OpendalFsSplit<OpendalPosixFs> =
192                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
193                            SplitImpl::from(split)
194                        }
195                        _ => unreachable!(),
196                    },
197                    _ => unreachable!(),
198                };
199                let split_id = split.id();
200                if dirty_splits.contains(&split_id) {
201                    continue;
202                }
203                batch.push(split);
204
205                if batch.len() >= SPLIT_BATCH_SIZE {
206                    break 'vnodes;
207                }
208            }
209        }
210        if batch.is_empty() {
211            stream.replace_data_stream(stream::pending().boxed());
212        } else {
213            *splits_on_fetch += batch.len();
214
215            let mut merged_stream =
216                stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
217            // Change the previous implementation where multiple files shared a single SourceReader
218            // to a new approach where each SourceReader reads only one file.
219            // Then, merge the streams of multiple files serially here.
220            for split in batch {
221                let split_id = split.id();
222                let single_file_stream = Self::build_single_file_stream_reader(
223                    column_ids.clone(),
224                    source_ctx.clone(),
225                    source_desc,
226                    Some(vec![split]),
227                    rate_limit_rps,
228                )
229                .await?
230                .map_err(StreamExecutorError::connector_error);
231                let single_file_stream =
232                    SetReadingFileOnPoll::new(single_file_stream, reading_file.clone(), split_id)
233                        .boxed();
234                merged_stream = merged_stream.chain(single_file_stream).boxed();
235            }
236
237            stream.replace_data_stream(merged_stream);
238        }
239
240        Ok(())
241    }
242
243    // Note: This change applies only to the file source.
244    //
245    // Each SourceReader (for the streaming file source, this is the `OpendalReader` struct)
246    // reads only one file. After the chunk stream returned by the SourceReader,
247    // chain a None to indicate that the file has been fully read.
248    async fn build_single_file_stream_reader(
249        column_ids: Vec<ColumnId>,
250        source_ctx: SourceContext,
251        source_desc: &SourceDesc,
252        batch: SplitBatch,
253        rate_limit_rps: Option<u32>,
254    ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
255        let (stream, _) = source_desc
256            .source
257            .build_stream(batch, column_ids, Arc::new(source_ctx), false)
258            .await
259            .map_err(StreamExecutorError::connector_error)?;
260        let optional_stream: BoxStreamingFileSourceChunkStream =
261            source_reader_event_to_chunk_stream(stream)
262                .boxed()
263                .map(|item| item.map(Some))
264                .chain(stream::once(async { Ok(None) }))
265                .boxed();
266        Ok(
267            apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
268                .boxed(),
269        )
270    }
271
272    fn build_source_ctx(
273        &self,
274        source_desc: &SourceDesc,
275        source_id: SourceId,
276        source_name: &str,
277    ) -> SourceContext {
278        SourceContext::new(
279            self.actor_ctx.id,
280            source_id,
281            self.actor_ctx.fragment_id,
282            source_name.to_owned(),
283            source_desc.metrics.clone(),
284            SourceCtrlOpts {
285                chunk_size: limited_chunk_size(self.rate_limit_rps),
286                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
287            },
288            source_desc.source.config.clone(),
289            None,
290        )
291    }
292
293    #[try_stream(ok = Message, error = StreamExecutorError)]
294    async fn into_stream(mut self) {
295        let mut upstream = self.upstream.take().unwrap().execute();
296        let barrier = expect_first_barrier(&mut upstream).await?;
297        let first_epoch = barrier.epoch;
298        let is_pause_on_startup = barrier.is_pause_on_startup();
299        yield Message::Barrier(barrier);
300
301        let mut core = self.stream_source_core.take().unwrap();
302        let mut state_store_handler = core.split_state_store;
303
304        // Build source description from the builder.
305        let source_desc_builder = core.source_desc_builder.take().unwrap();
306
307        let source_desc = source_desc_builder
308            .build()
309            .map_err(StreamExecutorError::connector_error)?;
310        let actor_id = self.actor_ctx.id.to_string();
311        let fragment_id = self.actor_ctx.fragment_id.to_string();
312        let source_id = core.source_id.to_string();
313        let source_name = core.source_name.clone();
314        let dirty_split_count_metrics = source_desc
315            .metrics
316            .file_source_dirty_split_count
317            .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
318        let failed_split_count_metrics = source_desc
319            .metrics
320            .file_source_failed_split_count
321            .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
322        dirty_split_count_metrics.set(0);
323
324        // pulsar's `message_id_data_idx` is not used in this executor, so we don't need to get it.
325        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
326        else {
327            unreachable!("Partition and offset columns must be set.");
328        };
329        // Initialize state table.
330        state_store_handler.init_epoch(first_epoch).await?;
331
332        let reading_file: Arc<Mutex<Option<Arc<str>>>> = Arc::new(Mutex::new(None));
333        let mut retry_counts: HashMap<Arc<str>, u32> = HashMap::new();
334        let mut dirty_splits: HashSet<Arc<str>> = HashSet::new();
335
336        let mut splits_on_fetch: usize = 0;
337        let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
338            upstream,
339            stream::pending().boxed(),
340        );
341        if is_pause_on_startup {
342            stream.pause_stream();
343        }
344
345        // If it is a recovery startup,
346        // there can be file assignments in the state table.
347        // Hence we try building a reader first.
348        Self::replace_with_new_batch_reader(ReplaceReaderArgs {
349            splits_on_fetch: &mut splits_on_fetch,
350            state_store_handler: &state_store_handler,
351            dirty_splits: &dirty_splits,
352            column_ids: core.column_ids.clone(),
353            source_ctx: self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
354            source_desc: &source_desc,
355            stream: &mut stream,
356            rate_limit_rps: self.rate_limit_rps,
357            reading_file: reading_file.clone(),
358        })
359        .await?;
360
361        while let Some(msg) = stream.next().await {
362            match msg {
363                Err(e) => {
364                    let cur_file = reading_file.lock().expect("mutex poisoned").clone();
365                    let Some(split_id) = cur_file else {
366                        tracing::error!(
367                            source_id = %core.source_id,
368                            source_name = %core.source_name,
369                            fragment_id = %self.actor_ctx.fragment_id,
370                            error = %e.as_report(),
371                            "Fetch Error but failed to infer reading file; aborting actor"
372                        );
373                        return Err(e);
374                    };
375
376                    let retries_done = retry_counts.entry(split_id.clone()).or_insert(0);
377                    if *retries_done < MAX_RETRIES_PER_SPLIT {
378                        *retries_done = retries_done.saturating_add(1);
379                        let backoff = RETRY_BASE_BACKOFF
380                            .checked_mul(1u32 << (*retries_done - 1))
381                            .unwrap_or(Duration::from_secs(60));
382                        tracing::warn!(
383                            source_id = %core.source_id,
384                            source_name = %core.source_name,
385                            fragment_id = %self.actor_ctx.fragment_id,
386                            reading_file = %split_id,
387                            retries_done = *retries_done,
388                            max_retries = MAX_RETRIES_PER_SPLIT,
389                            error = %e.as_report(),
390                            "Fetch Error, retrying file split"
391                        );
392                        tokio::time::sleep(backoff).await;
393                    } else {
394                        // Exceeded max retries: mark dirty in memory and skip this split afterwards.
395                        dirty_splits.insert(split_id.clone());
396                        dirty_split_count_metrics.set(dirty_splits.len() as i64);
397                        failed_split_count_metrics.inc();
398                        retry_counts.remove(&split_id);
399                        tracing::error!(
400                            source_id = %core.source_id,
401                            source_name = %core.source_name,
402                            fragment_id = %self.actor_ctx.fragment_id,
403                            reading_file = %split_id,
404                            max_retries = MAX_RETRIES_PER_SPLIT,
405                            error = %e.as_report(),
406                            "Fetch Error, exceeded max retries; marking split dirty and skipping"
407                        );
408                        GLOBAL_ERROR_METRICS.user_source_error.report([
409                            "File source dirty split".to_owned(),
410                            core.source_id.to_string(),
411                            core.source_name.clone(),
412                            self.actor_ctx.fragment_id.to_string(),
413                        ]);
414                    }
415
416                    // Clear current reading file and rebuild reader to continue with other splits.
417                    *reading_file.lock().expect("mutex poisoned") = None;
418                    splits_on_fetch = 0;
419                    Self::replace_with_new_batch_reader(ReplaceReaderArgs {
420                        splits_on_fetch: &mut splits_on_fetch,
421                        state_store_handler: &state_store_handler,
422                        dirty_splits: &dirty_splits,
423                        column_ids: core.column_ids.clone(),
424                        source_ctx: self.build_source_ctx(
425                            &source_desc,
426                            core.source_id,
427                            &core.source_name,
428                        ),
429                        source_desc: &source_desc,
430                        stream: &mut stream,
431                        rate_limit_rps: self.rate_limit_rps,
432                        reading_file: reading_file.clone(),
433                    })
434                    .await?;
435                    continue;
436                }
437                Ok(msg) => {
438                    match msg {
439                        // This branch will be preferred.
440                        Either::Left(msg) => {
441                            match msg {
442                                Message::Barrier(barrier) => {
443                                    if let Some(mutation) = barrier.mutation.as_deref() {
444                                        match mutation {
445                                            Mutation::Pause => stream.pause_stream(),
446                                            Mutation::Resume => stream.resume_stream(),
447                                            Mutation::Throttle(fragment_to_apply) => {
448                                                if let Some(entry) = fragment_to_apply
449                                                    .get(&self.actor_ctx.fragment_id)
450                                                    && entry.throttle_type() == ThrottleType::Source
451                                                    && entry.rate_limit != self.rate_limit_rps
452                                                {
453                                                    tracing::info!(
454                                                        "updating rate limit from {:?} to {:?}",
455                                                        self.rate_limit_rps,
456                                                        entry.rate_limit
457                                                    );
458                                                    self.rate_limit_rps = entry.rate_limit;
459                                                    splits_on_fetch = 0;
460                                                    *reading_file.lock().expect("mutex poisoned") =
461                                                        None;
462                                                }
463                                            }
464                                            _ => (),
465                                        }
466                                    }
467
468                                    let post_commit = state_store_handler
469                                        .commit_may_update_vnode_bitmap(barrier.epoch)
470                                        .await?;
471
472                                    let update_vnode_bitmap =
473                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
474                                    // Propagate the barrier.
475                                    yield Message::Barrier(barrier);
476
477                                    if post_commit
478                                        .post_yield_barrier(update_vnode_bitmap)
479                                        .await?
480                                        .is_some()
481                                    {
482                                        // Vnode bitmap update changes which file assignments this executor
483                                        // should read. Rebuild the reader to avoid reading splits that no
484                                        // longer belong to this actor (e.g., during scale-out).
485                                        splits_on_fetch = 0;
486                                        *reading_file.lock().expect("mutex poisoned") = None;
487                                    }
488
489                                    if splits_on_fetch == 0 {
490                                        Self::replace_with_new_batch_reader(ReplaceReaderArgs {
491                                            splits_on_fetch: &mut splits_on_fetch,
492                                            state_store_handler: &state_store_handler,
493                                            dirty_splits: &dirty_splits,
494                                            column_ids: core.column_ids.clone(),
495                                            source_ctx: self.build_source_ctx(
496                                                &source_desc,
497                                                core.source_id,
498                                                &core.source_name,
499                                            ),
500                                            source_desc: &source_desc,
501                                            stream: &mut stream,
502                                            rate_limit_rps: self.rate_limit_rps,
503                                            reading_file: reading_file.clone(),
504                                        })
505                                        .await?;
506                                    }
507                                }
508                                // Receiving file assignments from upstream list executor,
509                                // store into state table.
510                                Message::Chunk(chunk) => {
511                                    // For Parquet encoding, the offset indicates the current row being read.
512                                    let file_assignment: Vec<OpendalFsSplit<Src>> = chunk
513                                        .data_chunk()
514                                        .rows()
515                                        .filter_map(|row| {
516                                            let filename = row.datum_at(0).unwrap().into_utf8();
517                                            let size = row.datum_at(2).unwrap().into_int64();
518
519                                            if size > 0 {
520                                                Some(OpendalFsSplit::<Src>::new(
521                                                    filename.to_owned(),
522                                                    0,
523                                                    size as usize,
524                                                ))
525                                            } else {
526                                                None
527                                            }
528                                        })
529                                        .collect();
530
531                                    state_store_handler.set_states(file_assignment).await?;
532                                    state_store_handler.try_flush().await?;
533                                }
534                                Message::Watermark(_) => unreachable!(),
535                            }
536                        }
537                        // StreamChunk from FsSourceReader, and the reader reads only one file.
538                        // Motivation for the changes:
539                        //
540                        // Previously, the fetch executor determined whether a file was fully read by checking if the
541                        // offset reached the size of the file. However, this approach had some issues related to
542                        // maintaining the offset:
543                        //
544                        // 1. For files compressed with gzip, the size reported corresponds to the original uncompressed
545                        //    size, not the actual size of the compressed file.
546                        //
547                        // 2. For Parquet files, the offset represents the number of rows. Therefore, when listing each
548                        //    file, it was necessary to read the metadata to obtain the total number of rows in the file,
549                        //    which is an expensive operation.
550                        //
551                        // To address these issues, we changes the approach to determining whether a file is fully
552                        // read by moving the check outside the reader. The fetch executor's right stream has been
553                        // changed from a chunk stream to an `Option<Chunk>` stream. When a file is completely read,
554                        // a `None` value is added at the end of the file stream to signal to the fetch executor that the
555                        // file has been fully read. Upon encountering `None`, the file is deleted.
556                        Either::Right(optional_chunk) => match optional_chunk {
557                            Some(chunk) => {
558                                let mapping = get_split_offset_mapping_from_chunk(
559                                    &chunk, split_idx, offset_idx,
560                                )
561                                .unwrap();
562                                debug_assert_eq!(mapping.len(), 1);
563                                if let Some((split_id, offset)) = mapping.into_iter().next() {
564                                    *reading_file.lock().expect("mutex poisoned") =
565                                        Some(split_id.clone());
566                                    retry_counts.remove(&split_id);
567                                    let row = state_store_handler.get(&split_id).await?
568                                        .unwrap_or_else(|| {
569                                            panic!("The fs_split (file_name) {:?} should be in the state table.",
570                                        split_id)
571                                        });
572                                    let mut fs_split = match row.datum_at(1) {
573                                        Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
574                                            OpendalFsSplit::<Src>::restore_from_json(
575                                                jsonb_ref.to_owned_scalar(),
576                                            )?
577                                        }
578                                        _ => unreachable!(),
579                                    };
580                                    fs_split.update_offset(offset)?;
581
582                                    state_store_handler
583                                        .set(&split_id, fs_split.encode_to_json())
584                                        .await?;
585                                }
586                                let chunk = prune_additional_cols(
587                                    &chunk,
588                                    &[split_idx, offset_idx],
589                                    &source_desc.columns,
590                                );
591                                yield Message::Chunk(chunk);
592                            }
593                            None => {
594                                let cur_file = reading_file.lock().expect("mutex poisoned").clone();
595                                tracing::debug!("Deleting file: {:?}", cur_file);
596                                if let Some(ref delete_file_name) = cur_file {
597                                    splits_on_fetch -= 1;
598                                    state_store_handler.delete(delete_file_name).await?;
599                                    // Clean up in-memory retry count.
600                                    retry_counts.remove(delete_file_name);
601                                }
602                            }
603                        },
604                    }
605                }
606            }
607        }
608    }
609}
610
611impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
612    fn execute(self: Box<Self>) -> BoxedMessageStream {
613        self.into_stream().boxed()
614    }
615}
616
617impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
618    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
619        if let Some(core) = &self.stream_source_core {
620            f.debug_struct("FsFetchExecutor")
621                .field("source_id", &core.source_id)
622                .field("column_ids", &core.column_ids)
623                .finish()
624        } else {
625            f.debug_struct("FsFetchExecutor").finish()
626        }
627    }
628}