risingwave_stream/executor/source/
fs_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20
21use either::Either;
22use futures::TryStreamExt;
23use futures::stream::{self, StreamExt};
24use futures_async_stream::try_stream;
25use pin_project::pin_project;
26use risingwave_common::catalog::ColumnId;
27use risingwave_common::hash::VnodeBitmapExt;
28use risingwave_common::id::SourceId;
29use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
30use risingwave_common::types::ScalarRef;
31use risingwave_connector::source::filesystem::OpendalFsSplit;
32use risingwave_connector::source::filesystem::opendal_source::{
33    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
34};
35use risingwave_connector::source::reader::desc::SourceDesc;
36use risingwave_connector::source::{
37    BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
38};
39use risingwave_pb::common::ThrottleType;
40use risingwave_storage::store::PrefetchOptions;
41use thiserror_ext::AsReport;
42
43use super::{
44    SourceStateTableHandler, StreamSourceCore,
45    apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
46    get_split_offset_mapping_from_chunk, prune_additional_cols,
47};
48use crate::common::rate_limit::limited_chunk_size;
49use crate::executor::prelude::*;
50use crate::executor::stream_reader::StreamReaderWithPause;
51
52const SPLIT_BATCH_SIZE: usize = 1000;
53const MAX_RETRIES_PER_SPLIT: u32 = 3;
54const RETRY_BASE_BACKOFF: Duration = Duration::from_millis(200);
55
56type SplitBatch = Option<Vec<SplitImpl>>;
57
58struct ReplaceReaderArgs<'a, S: StateStore, const BIASED: bool> {
59    splits_on_fetch: &'a mut usize,
60    state_store_handler: &'a SourceStateTableHandler<S>,
61    dirty_splits: &'a HashSet<Arc<str>>,
62    column_ids: Vec<ColumnId>,
63    source_ctx: SourceContext,
64    source_desc: &'a SourceDesc,
65    stream: &'a mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
66    rate_limit_rps: Option<u32>,
67    reading_file: Arc<Mutex<Option<Arc<str>>>>,
68}
69
70/// A stream wrapper that sets `reading_file` before polling the underlying stream for the first time.
71///
72/// This helps attribute errors that happen before the first chunk is produced (e.g., invalid UTF-8)
73/// to the corresponding split.
74#[pin_project]
75struct SetReadingFileOnPoll<S> {
76    #[pin]
77    inner: S,
78    reading_file: Arc<Mutex<Option<Arc<str>>>>,
79    split_id: Arc<str>,
80    is_set: bool,
81}
82
83impl<S> SetReadingFileOnPoll<S> {
84    fn new(inner: S, reading_file: Arc<Mutex<Option<Arc<str>>>>, split_id: Arc<str>) -> Self {
85        Self {
86            inner,
87            reading_file,
88            split_id,
89            is_set: false,
90        }
91    }
92}
93
94impl<S> futures::Stream for SetReadingFileOnPoll<S>
95where
96    S: futures::Stream,
97{
98    type Item = S::Item;
99
100    fn poll_next(
101        self: std::pin::Pin<&mut Self>,
102        cx: &mut std::task::Context<'_>,
103    ) -> std::task::Poll<Option<Self::Item>> {
104        let this = self.project();
105        if !*this.is_set {
106            *this.reading_file.lock().expect("mutex poisoned") = Some(this.split_id.clone());
107            *this.is_set = true;
108        }
109        this.inner.poll_next(cx)
110    }
111}
112
113pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
114    actor_ctx: ActorContextRef,
115
116    /// Streaming source for external
117    stream_source_core: Option<StreamSourceCore<S>>,
118
119    /// Upstream list executor.
120    upstream: Option<Executor>,
121
122    /// Rate limit in rows/s.
123    rate_limit_rps: Option<u32>,
124
125    _marker: PhantomData<Src>,
126}
127
128impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
129    pub fn new(
130        actor_ctx: ActorContextRef,
131        stream_source_core: StreamSourceCore<S>,
132        upstream: Executor,
133        rate_limit_rps: Option<u32>,
134    ) -> Self {
135        Self {
136            actor_ctx,
137            stream_source_core: Some(stream_source_core),
138            upstream: Some(upstream),
139            rate_limit_rps,
140            _marker: PhantomData,
141        }
142    }
143
144    async fn replace_with_new_batch_reader<const BIASED: bool>(
145        args: ReplaceReaderArgs<'_, S, BIASED>,
146    ) -> StreamExecutorResult<()> {
147        let ReplaceReaderArgs {
148            splits_on_fetch,
149            state_store_handler,
150            dirty_splits,
151            column_ids,
152            source_ctx,
153            source_desc,
154            stream,
155            rate_limit_rps,
156            reading_file,
157        } = args;
158        let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
159        let state_table = state_store_handler.state_table();
160        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
161            let table_iter = state_table
162                .iter_with_vnode(
163                    vnode,
164                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
165                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
166                    PrefetchOptions::prefetch_for_small_range_scan(),
167                )
168                .await?;
169            pin_mut!(table_iter);
170            while let Some(item) = table_iter.next().await {
171                let row = item?;
172                let split = match row.datum_at(1) {
173                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match &source_desc.source.config {
174                        risingwave_connector::source::ConnectorProperties::Gcs(_) => {
175                            let split: OpendalFsSplit<OpendalGcs> =
176                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
177                            SplitImpl::from(split)
178                        }
179                        risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
180                            let split: OpendalFsSplit<OpendalS3> =
181                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
182                            SplitImpl::from(split)
183                        }
184                        risingwave_connector::source::ConnectorProperties::Azblob(_) => {
185                            let split: OpendalFsSplit<OpendalAzblob> =
186                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
187                            SplitImpl::from(split)
188                        }
189                        risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
190                            let split: OpendalFsSplit<OpendalPosixFs> =
191                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
192                            SplitImpl::from(split)
193                        }
194                        _ => unreachable!(),
195                    },
196                    _ => unreachable!(),
197                };
198                let split_id = split.id();
199                if dirty_splits.contains(&split_id) {
200                    continue;
201                }
202                batch.push(split);
203
204                if batch.len() >= SPLIT_BATCH_SIZE {
205                    break 'vnodes;
206                }
207            }
208        }
209        if batch.is_empty() {
210            stream.replace_data_stream(stream::pending().boxed());
211        } else {
212            *splits_on_fetch += batch.len();
213
214            let mut merged_stream =
215                stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
216            // Change the previous implementation where multiple files shared a single SourceReader
217            // to a new approach where each SourceReader reads only one file.
218            // Then, merge the streams of multiple files serially here.
219            for split in batch {
220                let split_id = split.id();
221                let single_file_stream = Self::build_single_file_stream_reader(
222                    column_ids.clone(),
223                    source_ctx.clone(),
224                    source_desc,
225                    Some(vec![split]),
226                    rate_limit_rps,
227                )
228                .await?
229                .map_err(StreamExecutorError::connector_error);
230                let single_file_stream =
231                    SetReadingFileOnPoll::new(single_file_stream, reading_file.clone(), split_id)
232                        .boxed();
233                merged_stream = merged_stream.chain(single_file_stream).boxed();
234            }
235
236            stream.replace_data_stream(merged_stream);
237        }
238
239        Ok(())
240    }
241
242    // Note: This change applies only to the file source.
243    //
244    // Each SourceReader (for the streaming file source, this is the `OpendalReader` struct)
245    // reads only one file. After the chunk stream returned by the SourceReader,
246    // chain a None to indicate that the file has been fully read.
247    async fn build_single_file_stream_reader(
248        column_ids: Vec<ColumnId>,
249        source_ctx: SourceContext,
250        source_desc: &SourceDesc,
251        batch: SplitBatch,
252        rate_limit_rps: Option<u32>,
253    ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
254        let (stream, _) = source_desc
255            .source
256            .build_stream(batch, column_ids, Arc::new(source_ctx), false)
257            .await
258            .map_err(StreamExecutorError::connector_error)?;
259        let optional_stream: BoxStreamingFileSourceChunkStream = stream
260            .map(|item| item.map(Some))
261            .chain(stream::once(async { Ok(None) }))
262            .boxed();
263        Ok(
264            apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
265                .boxed(),
266        )
267    }
268
269    fn build_source_ctx(
270        &self,
271        source_desc: &SourceDesc,
272        source_id: SourceId,
273        source_name: &str,
274    ) -> SourceContext {
275        SourceContext::new(
276            self.actor_ctx.id,
277            source_id,
278            self.actor_ctx.fragment_id,
279            source_name.to_owned(),
280            source_desc.metrics.clone(),
281            SourceCtrlOpts {
282                chunk_size: limited_chunk_size(self.rate_limit_rps),
283                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
284            },
285            source_desc.source.config.clone(),
286            None,
287        )
288    }
289
290    #[try_stream(ok = Message, error = StreamExecutorError)]
291    async fn into_stream(mut self) {
292        let mut upstream = self.upstream.take().unwrap().execute();
293        let barrier = expect_first_barrier(&mut upstream).await?;
294        let first_epoch = barrier.epoch;
295        let is_pause_on_startup = barrier.is_pause_on_startup();
296        yield Message::Barrier(barrier);
297
298        let mut core = self.stream_source_core.take().unwrap();
299        let mut state_store_handler = core.split_state_store;
300
301        // Build source description from the builder.
302        let source_desc_builder = core.source_desc_builder.take().unwrap();
303
304        let source_desc = source_desc_builder
305            .build()
306            .map_err(StreamExecutorError::connector_error)?;
307        let actor_id = self.actor_ctx.id.to_string();
308        let fragment_id = self.actor_ctx.fragment_id.to_string();
309        let source_id = core.source_id.to_string();
310        let source_name = core.source_name.clone();
311        let dirty_split_count_metrics = source_desc
312            .metrics
313            .file_source_dirty_split_count
314            .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
315        let failed_split_count_metrics = source_desc
316            .metrics
317            .file_source_failed_split_count
318            .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
319        dirty_split_count_metrics.set(0);
320
321        // pulsar's `message_id_data_idx` is not used in this executor, so we don't need to get it.
322        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
323        else {
324            unreachable!("Partition and offset columns must be set.");
325        };
326        // Initialize state table.
327        state_store_handler.init_epoch(first_epoch).await?;
328
329        let reading_file: Arc<Mutex<Option<Arc<str>>>> = Arc::new(Mutex::new(None));
330        let mut retry_counts: HashMap<Arc<str>, u32> = HashMap::new();
331        let mut dirty_splits: HashSet<Arc<str>> = HashSet::new();
332
333        let mut splits_on_fetch: usize = 0;
334        let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
335            upstream,
336            stream::pending().boxed(),
337        );
338        if is_pause_on_startup {
339            stream.pause_stream();
340        }
341
342        // If it is a recovery startup,
343        // there can be file assignments in the state table.
344        // Hence we try building a reader first.
345        Self::replace_with_new_batch_reader(ReplaceReaderArgs {
346            splits_on_fetch: &mut splits_on_fetch,
347            state_store_handler: &state_store_handler,
348            dirty_splits: &dirty_splits,
349            column_ids: core.column_ids.clone(),
350            source_ctx: self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
351            source_desc: &source_desc,
352            stream: &mut stream,
353            rate_limit_rps: self.rate_limit_rps,
354            reading_file: reading_file.clone(),
355        })
356        .await?;
357
358        while let Some(msg) = stream.next().await {
359            match msg {
360                Err(e) => {
361                    let cur_file = reading_file.lock().expect("mutex poisoned").clone();
362                    let Some(split_id) = cur_file else {
363                        tracing::error!(
364                            source_id = %core.source_id,
365                            source_name = %core.source_name,
366                            fragment_id = %self.actor_ctx.fragment_id,
367                            error = %e.as_report(),
368                            "Fetch Error but failed to infer reading file; aborting actor"
369                        );
370                        return Err(e);
371                    };
372
373                    let retries_done = retry_counts.entry(split_id.clone()).or_insert(0);
374                    if *retries_done < MAX_RETRIES_PER_SPLIT {
375                        *retries_done = retries_done.saturating_add(1);
376                        let backoff = RETRY_BASE_BACKOFF
377                            .checked_mul(1u32 << (*retries_done - 1))
378                            .unwrap_or(Duration::from_secs(60));
379                        tracing::warn!(
380                            source_id = %core.source_id,
381                            source_name = %core.source_name,
382                            fragment_id = %self.actor_ctx.fragment_id,
383                            reading_file = %split_id,
384                            retries_done = *retries_done,
385                            max_retries = MAX_RETRIES_PER_SPLIT,
386                            error = %e.as_report(),
387                            "Fetch Error, retrying file split"
388                        );
389                        tokio::time::sleep(backoff).await;
390                    } else {
391                        // Exceeded max retries: mark dirty in memory and skip this split afterwards.
392                        dirty_splits.insert(split_id.clone());
393                        dirty_split_count_metrics.set(dirty_splits.len() as i64);
394                        failed_split_count_metrics.inc();
395                        retry_counts.remove(&split_id);
396                        tracing::error!(
397                            source_id = %core.source_id,
398                            source_name = %core.source_name,
399                            fragment_id = %self.actor_ctx.fragment_id,
400                            reading_file = %split_id,
401                            max_retries = MAX_RETRIES_PER_SPLIT,
402                            error = %e.as_report(),
403                            "Fetch Error, exceeded max retries; marking split dirty and skipping"
404                        );
405                        GLOBAL_ERROR_METRICS.user_source_error.report([
406                            "File source dirty split".to_owned(),
407                            core.source_id.to_string(),
408                            core.source_name.clone(),
409                            self.actor_ctx.fragment_id.to_string(),
410                        ]);
411                    }
412
413                    // Clear current reading file and rebuild reader to continue with other splits.
414                    *reading_file.lock().expect("mutex poisoned") = None;
415                    splits_on_fetch = 0;
416                    Self::replace_with_new_batch_reader(ReplaceReaderArgs {
417                        splits_on_fetch: &mut splits_on_fetch,
418                        state_store_handler: &state_store_handler,
419                        dirty_splits: &dirty_splits,
420                        column_ids: core.column_ids.clone(),
421                        source_ctx: self.build_source_ctx(
422                            &source_desc,
423                            core.source_id,
424                            &core.source_name,
425                        ),
426                        source_desc: &source_desc,
427                        stream: &mut stream,
428                        rate_limit_rps: self.rate_limit_rps,
429                        reading_file: reading_file.clone(),
430                    })
431                    .await?;
432                    continue;
433                }
434                Ok(msg) => {
435                    match msg {
436                        // This branch will be preferred.
437                        Either::Left(msg) => {
438                            match msg {
439                                Message::Barrier(barrier) => {
440                                    if let Some(mutation) = barrier.mutation.as_deref() {
441                                        match mutation {
442                                            Mutation::Pause => stream.pause_stream(),
443                                            Mutation::Resume => stream.resume_stream(),
444                                            Mutation::Throttle(fragment_to_apply) => {
445                                                if let Some(entry) = fragment_to_apply
446                                                    .get(&self.actor_ctx.fragment_id)
447                                                    && entry.throttle_type() == ThrottleType::Source
448                                                    && entry.rate_limit != self.rate_limit_rps
449                                                {
450                                                    tracing::info!(
451                                                        "updating rate limit from {:?} to {:?}",
452                                                        self.rate_limit_rps,
453                                                        entry.rate_limit
454                                                    );
455                                                    self.rate_limit_rps = entry.rate_limit;
456                                                    splits_on_fetch = 0;
457                                                    *reading_file.lock().expect("mutex poisoned") =
458                                                        None;
459                                                }
460                                            }
461                                            _ => (),
462                                        }
463                                    }
464
465                                    let post_commit = state_store_handler
466                                        .commit_may_update_vnode_bitmap(barrier.epoch)
467                                        .await?;
468
469                                    let update_vnode_bitmap =
470                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
471                                    // Propagate the barrier.
472                                    yield Message::Barrier(barrier);
473
474                                    if post_commit
475                                        .post_yield_barrier(update_vnode_bitmap)
476                                        .await?
477                                        .is_some()
478                                    {
479                                        // Vnode bitmap update changes which file assignments this executor
480                                        // should read. Rebuild the reader to avoid reading splits that no
481                                        // longer belong to this actor (e.g., during scale-out).
482                                        splits_on_fetch = 0;
483                                        *reading_file.lock().expect("mutex poisoned") = None;
484                                    }
485
486                                    if splits_on_fetch == 0 {
487                                        Self::replace_with_new_batch_reader(ReplaceReaderArgs {
488                                            splits_on_fetch: &mut splits_on_fetch,
489                                            state_store_handler: &state_store_handler,
490                                            dirty_splits: &dirty_splits,
491                                            column_ids: core.column_ids.clone(),
492                                            source_ctx: self.build_source_ctx(
493                                                &source_desc,
494                                                core.source_id,
495                                                &core.source_name,
496                                            ),
497                                            source_desc: &source_desc,
498                                            stream: &mut stream,
499                                            rate_limit_rps: self.rate_limit_rps,
500                                            reading_file: reading_file.clone(),
501                                        })
502                                        .await?;
503                                    }
504                                }
505                                // Receiving file assignments from upstream list executor,
506                                // store into state table.
507                                Message::Chunk(chunk) => {
508                                    // For Parquet encoding, the offset indicates the current row being read.
509                                    let file_assignment: Vec<OpendalFsSplit<Src>> = chunk
510                                        .data_chunk()
511                                        .rows()
512                                        .filter_map(|row| {
513                                            let filename = row.datum_at(0).unwrap().into_utf8();
514                                            let size = row.datum_at(2).unwrap().into_int64();
515
516                                            if size > 0 {
517                                                Some(OpendalFsSplit::<Src>::new(
518                                                    filename.to_owned(),
519                                                    0,
520                                                    size as usize,
521                                                ))
522                                            } else {
523                                                None
524                                            }
525                                        })
526                                        .collect();
527
528                                    state_store_handler.set_states(file_assignment).await?;
529                                    state_store_handler.try_flush().await?;
530                                }
531                                Message::Watermark(_) => unreachable!(),
532                            }
533                        }
534                        // StreamChunk from FsSourceReader, and the reader reads only one file.
535                        // Motivation for the changes:
536                        //
537                        // Previously, the fetch executor determined whether a file was fully read by checking if the
538                        // offset reached the size of the file. However, this approach had some issues related to
539                        // maintaining the offset:
540                        //
541                        // 1. For files compressed with gzip, the size reported corresponds to the original uncompressed
542                        //    size, not the actual size of the compressed file.
543                        //
544                        // 2. For Parquet files, the offset represents the number of rows. Therefore, when listing each
545                        //    file, it was necessary to read the metadata to obtain the total number of rows in the file,
546                        //    which is an expensive operation.
547                        //
548                        // To address these issues, we changes the approach to determining whether a file is fully
549                        // read by moving the check outside the reader. The fetch executor's right stream has been
550                        // changed from a chunk stream to an `Option<Chunk>` stream. When a file is completely read,
551                        // a `None` value is added at the end of the file stream to signal to the fetch executor that the
552                        // file has been fully read. Upon encountering `None`, the file is deleted.
553                        Either::Right(optional_chunk) => match optional_chunk {
554                            Some(chunk) => {
555                                let mapping = get_split_offset_mapping_from_chunk(
556                                    &chunk, split_idx, offset_idx,
557                                )
558                                .unwrap();
559                                debug_assert_eq!(mapping.len(), 1);
560                                if let Some((split_id, offset)) = mapping.into_iter().next() {
561                                    *reading_file.lock().expect("mutex poisoned") =
562                                        Some(split_id.clone());
563                                    retry_counts.remove(&split_id);
564                                    let row = state_store_handler.get(&split_id).await?
565                                        .unwrap_or_else(|| {
566                                            panic!("The fs_split (file_name) {:?} should be in the state table.",
567                                        split_id)
568                                        });
569                                    let mut fs_split = match row.datum_at(1) {
570                                        Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
571                                            OpendalFsSplit::<Src>::restore_from_json(
572                                                jsonb_ref.to_owned_scalar(),
573                                            )?
574                                        }
575                                        _ => unreachable!(),
576                                    };
577                                    fs_split.update_offset(offset)?;
578
579                                    state_store_handler
580                                        .set(&split_id, fs_split.encode_to_json())
581                                        .await?;
582                                }
583                                let chunk = prune_additional_cols(
584                                    &chunk,
585                                    &[split_idx, offset_idx],
586                                    &source_desc.columns,
587                                );
588                                yield Message::Chunk(chunk);
589                            }
590                            None => {
591                                let cur_file = reading_file.lock().expect("mutex poisoned").clone();
592                                tracing::debug!("Deleting file: {:?}", cur_file);
593                                if let Some(ref delete_file_name) = cur_file {
594                                    splits_on_fetch -= 1;
595                                    state_store_handler.delete(delete_file_name).await?;
596                                    // Clean up in-memory retry count.
597                                    retry_counts.remove(delete_file_name);
598                                }
599                            }
600                        },
601                    }
602                }
603            }
604        }
605    }
606}
607
608impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
609    fn execute(self: Box<Self>) -> BoxedMessageStream {
610        self.into_stream().boxed()
611    }
612}
613
614impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
615    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
616        if let Some(core) = &self.stream_source_core {
617            f.debug_struct("FsFetchExecutor")
618                .field("source_id", &core.source_id)
619                .field("column_ids", &core.column_ids)
620                .finish()
621        } else {
622            f.debug_struct("FsFetchExecutor").finish()
623        }
624    }
625}