risingwave_stream/executor/source/
fs_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::{ColumnId, TableId};
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
25use risingwave_common::types::ScalarRef;
26use risingwave_connector::source::filesystem::OpendalFsSplit;
27use risingwave_connector::source::filesystem::opendal_source::{
28    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
29};
30use risingwave_connector::source::reader::desc::SourceDesc;
31use risingwave_connector::source::{
32    BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
33};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{
38    SourceStateTableHandler, StreamSourceCore,
39    apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
40    get_split_offset_mapping_from_chunk, prune_additional_cols,
41};
42use crate::common::rate_limit::limited_chunk_size;
43use crate::executor::prelude::*;
44use crate::executor::stream_reader::StreamReaderWithPause;
45
46const SPLIT_BATCH_SIZE: usize = 1000;
47
48type SplitBatch = Option<Vec<SplitImpl>>;
49
50pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
51    actor_ctx: ActorContextRef,
52
53    /// Streaming source for external
54    stream_source_core: Option<StreamSourceCore<S>>,
55
56    /// Upstream list executor.
57    upstream: Option<Executor>,
58
59    /// Rate limit in rows/s.
60    rate_limit_rps: Option<u32>,
61
62    _marker: PhantomData<Src>,
63}
64
65impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
66    pub fn new(
67        actor_ctx: ActorContextRef,
68        stream_source_core: StreamSourceCore<S>,
69        upstream: Executor,
70        rate_limit_rps: Option<u32>,
71    ) -> Self {
72        Self {
73            actor_ctx,
74            stream_source_core: Some(stream_source_core),
75            upstream: Some(upstream),
76            rate_limit_rps,
77            _marker: PhantomData,
78        }
79    }
80
81    async fn replace_with_new_batch_reader<const BIASED: bool>(
82        splits_on_fetch: &mut usize,
83        state_store_handler: &SourceStateTableHandler<S>,
84        column_ids: Vec<ColumnId>,
85        source_ctx: SourceContext,
86        source_desc: &SourceDesc,
87        stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
88        rate_limit_rps: Option<u32>,
89    ) -> StreamExecutorResult<()> {
90        let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
91        let state_table = state_store_handler.state_table();
92        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
93            let table_iter = state_table
94                .iter_with_vnode(
95                    vnode,
96                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
97                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
98                    PrefetchOptions::prefetch_for_small_range_scan(),
99                )
100                .await?;
101            pin_mut!(table_iter);
102            let properties = source_desc.source.config.clone();
103            while let Some(item) = table_iter.next().await {
104                let row = item?;
105                let split = match row.datum_at(1) {
106                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
107                        risingwave_connector::source::ConnectorProperties::Gcs(_) => {
108                            let split: OpendalFsSplit<OpendalGcs> =
109                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
110                            SplitImpl::from(split)
111                        }
112                        risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
113                            let split: OpendalFsSplit<OpendalS3> =
114                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
115                            SplitImpl::from(split)
116                        }
117                        risingwave_connector::source::ConnectorProperties::Azblob(_) => {
118                            let split: OpendalFsSplit<OpendalAzblob> =
119                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
120                            SplitImpl::from(split)
121                        }
122                        risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
123                            let split: OpendalFsSplit<OpendalPosixFs> =
124                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
125                            SplitImpl::from(split)
126                        }
127                        _ => unreachable!(),
128                    },
129                    _ => unreachable!(),
130                };
131                batch.push(split);
132
133                if batch.len() >= SPLIT_BATCH_SIZE {
134                    break 'vnodes;
135                }
136            }
137        }
138        if batch.is_empty() {
139            stream.replace_data_stream(stream::pending().boxed());
140        } else {
141            *splits_on_fetch += batch.len();
142
143            let mut merged_stream =
144                stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
145            // Change the previous implementation where multiple files shared a single SourceReader
146            // to a new approach where each SourceReader reads only one file.
147            // Then, merge the streams of multiple files serially here.
148            for split in batch {
149                let single_file_stream = Self::build_single_file_stream_reader(
150                    column_ids.clone(),
151                    source_ctx.clone(),
152                    source_desc,
153                    Some(vec![split]),
154                    rate_limit_rps,
155                )
156                .await?
157                .map_err(StreamExecutorError::connector_error);
158                let single_file_stream = single_file_stream.map(|reader| reader);
159                merged_stream = merged_stream.chain(single_file_stream).boxed();
160            }
161
162            stream.replace_data_stream(merged_stream);
163        }
164
165        Ok(())
166    }
167
168    // Note: This change applies only to the file source.
169    //
170    // Each SourceReader (for the streaming file source, this is the `OpendalReader` struct)
171    // reads only one file. After the chunk stream returned by the SourceReader,
172    // chain a None to indicate that the file has been fully read.
173    async fn build_single_file_stream_reader(
174        column_ids: Vec<ColumnId>,
175        source_ctx: SourceContext,
176        source_desc: &SourceDesc,
177        batch: SplitBatch,
178        rate_limit_rps: Option<u32>,
179    ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
180        let (stream, _) = source_desc
181            .source
182            .build_stream(batch, column_ids, Arc::new(source_ctx), false)
183            .await
184            .map_err(StreamExecutorError::connector_error)?;
185        let optional_stream: BoxStreamingFileSourceChunkStream = stream
186            .map(|item| item.map(Some))
187            .chain(stream::once(async { Ok(None) }))
188            .boxed();
189        Ok(
190            apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
191                .boxed(),
192        )
193    }
194
195    fn build_source_ctx(
196        &self,
197        source_desc: &SourceDesc,
198        source_id: TableId,
199        source_name: &str,
200    ) -> SourceContext {
201        SourceContext::new(
202            self.actor_ctx.id,
203            source_id,
204            self.actor_ctx.fragment_id,
205            source_name.to_owned(),
206            source_desc.metrics.clone(),
207            SourceCtrlOpts {
208                chunk_size: limited_chunk_size(self.rate_limit_rps),
209                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
210            },
211            source_desc.source.config.clone(),
212            None,
213        )
214    }
215
216    #[try_stream(ok = Message, error = StreamExecutorError)]
217    async fn into_stream(mut self) {
218        let mut upstream = self.upstream.take().unwrap().execute();
219        let barrier = expect_first_barrier(&mut upstream).await?;
220        let first_epoch = barrier.epoch;
221        let is_pause_on_startup = barrier.is_pause_on_startup();
222        yield Message::Barrier(barrier);
223
224        let mut core = self.stream_source_core.take().unwrap();
225        let mut state_store_handler = core.split_state_store;
226
227        // Build source description from the builder.
228        let source_desc_builder = core.source_desc_builder.take().unwrap();
229
230        let source_desc = source_desc_builder
231            .build()
232            .map_err(StreamExecutorError::connector_error)?;
233
234        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
235        else {
236            unreachable!("Partition and offset columns must be set.");
237        };
238        // Initialize state table.
239        state_store_handler.init_epoch(first_epoch).await?;
240
241        let mut splits_on_fetch: usize = 0;
242        let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
243            upstream,
244            stream::pending().boxed(),
245        );
246        if is_pause_on_startup {
247            stream.pause_stream();
248        }
249
250        // If it is a recovery startup,
251        // there can be file assignments in the state table.
252        // Hence we try building a reader first.
253        Self::replace_with_new_batch_reader(
254            &mut splits_on_fetch,
255            &state_store_handler, // move into the function
256            core.column_ids.clone(),
257            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
258            &source_desc,
259            &mut stream,
260            self.rate_limit_rps,
261        )
262        .await?;
263        let mut reading_file: Option<Arc<str>> = None;
264
265        while let Some(msg) = stream.next().await {
266            match msg {
267                Err(e) => {
268                    tracing::error!(
269                        source_id = %core.source_id,
270                        source_name = %core.source_name,
271                        fragment_id = %self.actor_ctx.fragment_id,
272                        error = %e.as_report(),
273                        "Fetch Error"
274                    );
275                    GLOBAL_ERROR_METRICS.user_source_error.report([
276                        "File source fetch error".to_owned(),
277                        core.source_id.to_string(),
278                        core.source_name.to_owned(),
279                        self.actor_ctx.fragment_id.to_string(),
280                    ]);
281                    splits_on_fetch = 0;
282                }
283                Ok(msg) => {
284                    match msg {
285                        // This branch will be preferred.
286                        Either::Left(msg) => {
287                            match msg {
288                                Message::Barrier(barrier) => {
289                                    if let Some(mutation) = barrier.mutation.as_deref() {
290                                        match mutation {
291                                            Mutation::Pause => stream.pause_stream(),
292                                            Mutation::Resume => stream.resume_stream(),
293                                            Mutation::Throttle(actor_to_apply) => {
294                                                if let Some(new_rate_limit) =
295                                                    actor_to_apply.get(&self.actor_ctx.id)
296                                                    && *new_rate_limit != self.rate_limit_rps
297                                                {
298                                                    tracing::info!(
299                                                        "updating rate limit from {:?} to {:?}",
300                                                        self.rate_limit_rps,
301                                                        *new_rate_limit
302                                                    );
303                                                    self.rate_limit_rps = *new_rate_limit;
304                                                    splits_on_fetch = 0;
305                                                }
306                                            }
307                                            _ => (),
308                                        }
309                                    }
310
311                                    let post_commit = state_store_handler
312                                        .commit_may_update_vnode_bitmap(barrier.epoch)
313                                        .await?;
314
315                                    let update_vnode_bitmap =
316                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
317                                    // Propagate the barrier.
318                                    yield Message::Barrier(barrier);
319
320                                    if let Some((_, cache_may_stale)) =
321                                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
322                                    {
323                                        // if cache_may_stale, we must rebuild the stream to adjust vnode mappings
324                                        if cache_may_stale {
325                                            splits_on_fetch = 0;
326                                        }
327                                    }
328
329                                    if splits_on_fetch == 0 {
330                                        Self::replace_with_new_batch_reader(
331                                            &mut splits_on_fetch,
332                                            &state_store_handler,
333                                            core.column_ids.clone(),
334                                            self.build_source_ctx(
335                                                &source_desc,
336                                                core.source_id,
337                                                &core.source_name,
338                                            ),
339                                            &source_desc,
340                                            &mut stream,
341                                            self.rate_limit_rps,
342                                        )
343                                        .await?;
344                                    }
345                                }
346                                // Receiving file assignments from upstream list executor,
347                                // store into state table.
348                                Message::Chunk(chunk) => {
349                                    // For Parquet encoding, the offset indicates the current row being read.
350                                    let file_assignment = chunk
351                                        .data_chunk()
352                                        .rows()
353                                        .filter_map(|row| {
354                                            let filename = row.datum_at(0).unwrap().into_utf8();
355                                            let size = row.datum_at(2).unwrap().into_int64();
356
357                                            if size > 0 {
358                                                Some(OpendalFsSplit::<Src>::new(
359                                                    filename.to_owned(),
360                                                    0,
361                                                    size as usize,
362                                                ))
363                                            } else {
364                                                None
365                                            }
366                                        })
367                                        .collect();
368                                    state_store_handler.set_states(file_assignment).await?;
369                                    state_store_handler.try_flush().await?;
370                                }
371                                Message::Watermark(_) => unreachable!(),
372                            }
373                        }
374                        // StreamChunk from FsSourceReader, and the reader reads only one file.
375                        // Motivation for the changes:
376                        //
377                        // Previously, the fetch executor determined whether a file was fully read by checking if the
378                        // offset reached the size of the file. However, this approach had some issues related to
379                        // maintaining the offset:
380                        //
381                        // 1. For files compressed with gzip, the size reported corresponds to the original uncompressed
382                        //    size, not the actual size of the compressed file.
383                        //
384                        // 2. For Parquet files, the offset represents the number of rows. Therefore, when listing each
385                        //    file, it was necessary to read the metadata to obtain the total number of rows in the file,
386                        //    which is an expensive operation.
387                        //
388                        // To address these issues, we changes the approach to determining whether a file is fully
389                        // read by moving the check outside the reader. The fetch executor's right stream has been
390                        // changed from a chunk stream to an `Option<Chunk>` stream. When a file is completely read,
391                        // a `None` value is added at the end of the file stream to signal to the fetch executor that the
392                        // file has been fully read. Upon encountering `None`, the file is deleted.
393                        Either::Right(optional_chunk) => match optional_chunk {
394                            Some(chunk) => {
395                                let mapping = get_split_offset_mapping_from_chunk(
396                                    &chunk, split_idx, offset_idx,
397                                )
398                                .unwrap();
399                                debug_assert_eq!(mapping.len(), 1);
400                                if let Some((split_id, offset)) = mapping.into_iter().next() {
401                                    reading_file = Some(split_id.clone());
402                                    let row = state_store_handler.get(&split_id).await?
403                                        .unwrap_or_else(|| {
404                                            panic!("The fs_split (file_name) {:?} should be in the state table.",
405                                        split_id)
406                                        });
407                                    let mut fs_split = match row.datum_at(1) {
408                                        Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
409                                            OpendalFsSplit::<Src>::restore_from_json(
410                                                jsonb_ref.to_owned_scalar(),
411                                            )?
412                                        }
413                                        _ => unreachable!(),
414                                    };
415                                    fs_split.update_offset(offset)?;
416
417                                    state_store_handler
418                                        .set(&split_id, fs_split.encode_to_json())
419                                        .await?;
420                                }
421                                let chunk = prune_additional_cols(
422                                    &chunk,
423                                    split_idx,
424                                    offset_idx,
425                                    &source_desc.columns,
426                                );
427                                yield Message::Chunk(chunk);
428                            }
429                            None => {
430                                tracing::debug!("Deleting file: {:?}", reading_file);
431                                if let Some(ref delete_file_name) = reading_file {
432                                    splits_on_fetch -= 1;
433                                    state_store_handler.delete(delete_file_name).await?;
434                                }
435                            }
436                        },
437                    }
438                }
439            }
440        }
441    }
442}
443
444impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
445    fn execute(self: Box<Self>) -> BoxedMessageStream {
446        self.into_stream().boxed()
447    }
448}
449
450impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
451    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
452        if let Some(core) = &self.stream_source_core {
453            f.debug_struct("FsFetchExecutor")
454                .field("source_id", &core.source_id)
455                .field("column_ids", &core.column_ids)
456                .finish()
457        } else {
458            f.debug_struct("FsFetchExecutor").finish()
459        }
460    }
461}