risingwave_stream/executor/source/
fs_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::ColumnId;
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::id::SourceId;
25use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
26use risingwave_common::types::ScalarRef;
27use risingwave_connector::source::filesystem::OpendalFsSplit;
28use risingwave_connector::source::filesystem::opendal_source::{
29    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
30};
31use risingwave_connector::source::reader::desc::SourceDesc;
32use risingwave_connector::source::{
33    BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
34};
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{
39    SourceStateTableHandler, StreamSourceCore,
40    apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
41    get_split_offset_mapping_from_chunk, prune_additional_cols,
42};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::prelude::*;
45use crate::executor::stream_reader::StreamReaderWithPause;
46
47const SPLIT_BATCH_SIZE: usize = 1000;
48
49type SplitBatch = Option<Vec<SplitImpl>>;
50
51pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
52    actor_ctx: ActorContextRef,
53
54    /// Streaming source for external
55    stream_source_core: Option<StreamSourceCore<S>>,
56
57    /// Upstream list executor.
58    upstream: Option<Executor>,
59
60    /// Rate limit in rows/s.
61    rate_limit_rps: Option<u32>,
62
63    _marker: PhantomData<Src>,
64}
65
66impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
67    pub fn new(
68        actor_ctx: ActorContextRef,
69        stream_source_core: StreamSourceCore<S>,
70        upstream: Executor,
71        rate_limit_rps: Option<u32>,
72    ) -> Self {
73        Self {
74            actor_ctx,
75            stream_source_core: Some(stream_source_core),
76            upstream: Some(upstream),
77            rate_limit_rps,
78            _marker: PhantomData,
79        }
80    }
81
82    async fn replace_with_new_batch_reader<const BIASED: bool>(
83        splits_on_fetch: &mut usize,
84        state_store_handler: &SourceStateTableHandler<S>,
85        column_ids: Vec<ColumnId>,
86        source_ctx: SourceContext,
87        source_desc: &SourceDesc,
88        stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
89        rate_limit_rps: Option<u32>,
90    ) -> StreamExecutorResult<()> {
91        let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
92        let state_table = state_store_handler.state_table();
93        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
94            let table_iter = state_table
95                .iter_with_vnode(
96                    vnode,
97                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
98                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
99                    PrefetchOptions::prefetch_for_small_range_scan(),
100                )
101                .await?;
102            pin_mut!(table_iter);
103            let properties = source_desc.source.config.clone();
104            while let Some(item) = table_iter.next().await {
105                let row = item?;
106                let split = match row.datum_at(1) {
107                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
108                        risingwave_connector::source::ConnectorProperties::Gcs(_) => {
109                            let split: OpendalFsSplit<OpendalGcs> =
110                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
111                            SplitImpl::from(split)
112                        }
113                        risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
114                            let split: OpendalFsSplit<OpendalS3> =
115                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
116                            SplitImpl::from(split)
117                        }
118                        risingwave_connector::source::ConnectorProperties::Azblob(_) => {
119                            let split: OpendalFsSplit<OpendalAzblob> =
120                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
121                            SplitImpl::from(split)
122                        }
123                        risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
124                            let split: OpendalFsSplit<OpendalPosixFs> =
125                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
126                            SplitImpl::from(split)
127                        }
128                        _ => unreachable!(),
129                    },
130                    _ => unreachable!(),
131                };
132                batch.push(split);
133
134                if batch.len() >= SPLIT_BATCH_SIZE {
135                    break 'vnodes;
136                }
137            }
138        }
139        if batch.is_empty() {
140            stream.replace_data_stream(stream::pending().boxed());
141        } else {
142            *splits_on_fetch += batch.len();
143
144            let mut merged_stream =
145                stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
146            // Change the previous implementation where multiple files shared a single SourceReader
147            // to a new approach where each SourceReader reads only one file.
148            // Then, merge the streams of multiple files serially here.
149            for split in batch {
150                let single_file_stream = Self::build_single_file_stream_reader(
151                    column_ids.clone(),
152                    source_ctx.clone(),
153                    source_desc,
154                    Some(vec![split]),
155                    rate_limit_rps,
156                )
157                .await?
158                .map_err(StreamExecutorError::connector_error);
159                let single_file_stream = single_file_stream.map(|reader| reader);
160                merged_stream = merged_stream.chain(single_file_stream).boxed();
161            }
162
163            stream.replace_data_stream(merged_stream);
164        }
165
166        Ok(())
167    }
168
169    // Note: This change applies only to the file source.
170    //
171    // Each SourceReader (for the streaming file source, this is the `OpendalReader` struct)
172    // reads only one file. After the chunk stream returned by the SourceReader,
173    // chain a None to indicate that the file has been fully read.
174    async fn build_single_file_stream_reader(
175        column_ids: Vec<ColumnId>,
176        source_ctx: SourceContext,
177        source_desc: &SourceDesc,
178        batch: SplitBatch,
179        rate_limit_rps: Option<u32>,
180    ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
181        let (stream, _) = source_desc
182            .source
183            .build_stream(batch, column_ids, Arc::new(source_ctx), false)
184            .await
185            .map_err(StreamExecutorError::connector_error)?;
186        let optional_stream: BoxStreamingFileSourceChunkStream = stream
187            .map(|item| item.map(Some))
188            .chain(stream::once(async { Ok(None) }))
189            .boxed();
190        Ok(
191            apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
192                .boxed(),
193        )
194    }
195
196    fn build_source_ctx(
197        &self,
198        source_desc: &SourceDesc,
199        source_id: SourceId,
200        source_name: &str,
201    ) -> SourceContext {
202        SourceContext::new(
203            self.actor_ctx.id,
204            source_id,
205            self.actor_ctx.fragment_id,
206            source_name.to_owned(),
207            source_desc.metrics.clone(),
208            SourceCtrlOpts {
209                chunk_size: limited_chunk_size(self.rate_limit_rps),
210                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
211            },
212            source_desc.source.config.clone(),
213            None,
214        )
215    }
216
217    #[try_stream(ok = Message, error = StreamExecutorError)]
218    async fn into_stream(mut self) {
219        let mut upstream = self.upstream.take().unwrap().execute();
220        let barrier = expect_first_barrier(&mut upstream).await?;
221        let first_epoch = barrier.epoch;
222        let is_pause_on_startup = barrier.is_pause_on_startup();
223        yield Message::Barrier(barrier);
224
225        let mut core = self.stream_source_core.take().unwrap();
226        let mut state_store_handler = core.split_state_store;
227
228        // Build source description from the builder.
229        let source_desc_builder = core.source_desc_builder.take().unwrap();
230
231        let source_desc = source_desc_builder
232            .build()
233            .map_err(StreamExecutorError::connector_error)?;
234
235        // pulsar's `message_id_data_idx` is not used in this executor, so we don't need to get it.
236        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
237        else {
238            unreachable!("Partition and offset columns must be set.");
239        };
240        // Initialize state table.
241        state_store_handler.init_epoch(first_epoch).await?;
242
243        let mut splits_on_fetch: usize = 0;
244        let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
245            upstream,
246            stream::pending().boxed(),
247        );
248        if is_pause_on_startup {
249            stream.pause_stream();
250        }
251
252        // If it is a recovery startup,
253        // there can be file assignments in the state table.
254        // Hence we try building a reader first.
255        Self::replace_with_new_batch_reader(
256            &mut splits_on_fetch,
257            &state_store_handler, // move into the function
258            core.column_ids.clone(),
259            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
260            &source_desc,
261            &mut stream,
262            self.rate_limit_rps,
263        )
264        .await?;
265        let mut reading_file: Option<Arc<str>> = None;
266
267        while let Some(msg) = stream.next().await {
268            match msg {
269                Err(e) => {
270                    tracing::error!(
271                        source_id = %core.source_id,
272                        source_name = %core.source_name,
273                        fragment_id = %self.actor_ctx.fragment_id,
274                        error = %e.as_report(),
275                        "Fetch Error"
276                    );
277                    GLOBAL_ERROR_METRICS.user_source_error.report([
278                        "File source fetch error".to_owned(),
279                        core.source_id.to_string(),
280                        core.source_name.clone(),
281                        self.actor_ctx.fragment_id.to_string(),
282                    ]);
283                    splits_on_fetch = 0;
284                }
285                Ok(msg) => {
286                    match msg {
287                        // This branch will be preferred.
288                        Either::Left(msg) => {
289                            match msg {
290                                Message::Barrier(barrier) => {
291                                    if let Some(mutation) = barrier.mutation.as_deref() {
292                                        match mutation {
293                                            Mutation::Pause => stream.pause_stream(),
294                                            Mutation::Resume => stream.resume_stream(),
295                                            Mutation::Throttle(actor_to_apply) => {
296                                                if let Some(new_rate_limit) =
297                                                    actor_to_apply.get(&self.actor_ctx.id)
298                                                    && *new_rate_limit != self.rate_limit_rps
299                                                {
300                                                    tracing::info!(
301                                                        "updating rate limit from {:?} to {:?}",
302                                                        self.rate_limit_rps,
303                                                        *new_rate_limit
304                                                    );
305                                                    self.rate_limit_rps = *new_rate_limit;
306                                                    splits_on_fetch = 0;
307                                                }
308                                            }
309                                            _ => (),
310                                        }
311                                    }
312
313                                    let post_commit = state_store_handler
314                                        .commit_may_update_vnode_bitmap(barrier.epoch)
315                                        .await?;
316
317                                    let update_vnode_bitmap =
318                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
319                                    // Propagate the barrier.
320                                    yield Message::Barrier(barrier);
321
322                                    if let Some((_, cache_may_stale)) =
323                                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
324                                    {
325                                        // if cache_may_stale, we must rebuild the stream to adjust vnode mappings
326                                        if cache_may_stale {
327                                            splits_on_fetch = 0;
328                                        }
329                                    }
330
331                                    if splits_on_fetch == 0 {
332                                        Self::replace_with_new_batch_reader(
333                                            &mut splits_on_fetch,
334                                            &state_store_handler,
335                                            core.column_ids.clone(),
336                                            self.build_source_ctx(
337                                                &source_desc,
338                                                core.source_id,
339                                                &core.source_name,
340                                            ),
341                                            &source_desc,
342                                            &mut stream,
343                                            self.rate_limit_rps,
344                                        )
345                                        .await?;
346                                    }
347                                }
348                                // Receiving file assignments from upstream list executor,
349                                // store into state table.
350                                Message::Chunk(chunk) => {
351                                    // For Parquet encoding, the offset indicates the current row being read.
352                                    let file_assignment = chunk
353                                        .data_chunk()
354                                        .rows()
355                                        .filter_map(|row| {
356                                            let filename = row.datum_at(0).unwrap().into_utf8();
357                                            let size = row.datum_at(2).unwrap().into_int64();
358
359                                            if size > 0 {
360                                                Some(OpendalFsSplit::<Src>::new(
361                                                    filename.to_owned(),
362                                                    0,
363                                                    size as usize,
364                                                ))
365                                            } else {
366                                                None
367                                            }
368                                        })
369                                        .collect();
370                                    state_store_handler.set_states(file_assignment).await?;
371                                    state_store_handler.try_flush().await?;
372                                }
373                                Message::Watermark(_) => unreachable!(),
374                            }
375                        }
376                        // StreamChunk from FsSourceReader, and the reader reads only one file.
377                        // Motivation for the changes:
378                        //
379                        // Previously, the fetch executor determined whether a file was fully read by checking if the
380                        // offset reached the size of the file. However, this approach had some issues related to
381                        // maintaining the offset:
382                        //
383                        // 1. For files compressed with gzip, the size reported corresponds to the original uncompressed
384                        //    size, not the actual size of the compressed file.
385                        //
386                        // 2. For Parquet files, the offset represents the number of rows. Therefore, when listing each
387                        //    file, it was necessary to read the metadata to obtain the total number of rows in the file,
388                        //    which is an expensive operation.
389                        //
390                        // To address these issues, we changes the approach to determining whether a file is fully
391                        // read by moving the check outside the reader. The fetch executor's right stream has been
392                        // changed from a chunk stream to an `Option<Chunk>` stream. When a file is completely read,
393                        // a `None` value is added at the end of the file stream to signal to the fetch executor that the
394                        // file has been fully read. Upon encountering `None`, the file is deleted.
395                        Either::Right(optional_chunk) => match optional_chunk {
396                            Some(chunk) => {
397                                let mapping = get_split_offset_mapping_from_chunk(
398                                    &chunk, split_idx, offset_idx,
399                                )
400                                .unwrap();
401                                debug_assert_eq!(mapping.len(), 1);
402                                if let Some((split_id, offset)) = mapping.into_iter().next() {
403                                    reading_file = Some(split_id.clone());
404                                    let row = state_store_handler.get(&split_id).await?
405                                        .unwrap_or_else(|| {
406                                            panic!("The fs_split (file_name) {:?} should be in the state table.",
407                                        split_id)
408                                        });
409                                    let mut fs_split = match row.datum_at(1) {
410                                        Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
411                                            OpendalFsSplit::<Src>::restore_from_json(
412                                                jsonb_ref.to_owned_scalar(),
413                                            )?
414                                        }
415                                        _ => unreachable!(),
416                                    };
417                                    fs_split.update_offset(offset)?;
418
419                                    state_store_handler
420                                        .set(&split_id, fs_split.encode_to_json())
421                                        .await?;
422                                }
423                                let chunk = prune_additional_cols(
424                                    &chunk,
425                                    &[split_idx, offset_idx],
426                                    &source_desc.columns,
427                                );
428                                yield Message::Chunk(chunk);
429                            }
430                            None => {
431                                tracing::debug!("Deleting file: {:?}", reading_file);
432                                if let Some(ref delete_file_name) = reading_file {
433                                    splits_on_fetch -= 1;
434                                    state_store_handler.delete(delete_file_name).await?;
435                                }
436                            }
437                        },
438                    }
439                }
440            }
441        }
442    }
443}
444
445impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
446    fn execute(self: Box<Self>) -> BoxedMessageStream {
447        self.into_stream().boxed()
448    }
449}
450
451impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
452    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
453        if let Some(core) = &self.stream_source_core {
454            f.debug_struct("FsFetchExecutor")
455                .field("source_id", &core.source_id)
456                .field("column_ids", &core.column_ids)
457                .finish()
458        } else {
459            f.debug_struct("FsFetchExecutor").finish()
460        }
461    }
462}