risingwave_stream/executor/source/
fs_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::{ColumnId, TableId};
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::types::ScalarRef;
25use risingwave_connector::source::filesystem::OpendalFsSplit;
26use risingwave_connector::source::filesystem::opendal_source::{
27    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
28};
29use risingwave_connector::source::reader::desc::SourceDesc;
30use risingwave_connector::source::{
31    BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
32};
33use risingwave_storage::store::PrefetchOptions;
34use thiserror_ext::AsReport;
35
36use super::{
37    SourceStateTableHandler, StreamSourceCore,
38    apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
39    get_split_offset_mapping_from_chunk, prune_additional_cols,
40};
41use crate::common::rate_limit::limited_chunk_size;
42use crate::executor::prelude::*;
43use crate::executor::stream_reader::StreamReaderWithPause;
44
45const SPLIT_BATCH_SIZE: usize = 1000;
46
47type SplitBatch = Option<Vec<SplitImpl>>;
48
49pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
50    actor_ctx: ActorContextRef,
51
52    /// Streaming source for external
53    stream_source_core: Option<StreamSourceCore<S>>,
54
55    /// Upstream list executor.
56    upstream: Option<Executor>,
57
58    /// Rate limit in rows/s.
59    rate_limit_rps: Option<u32>,
60
61    _marker: PhantomData<Src>,
62}
63
64impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
65    pub fn new(
66        actor_ctx: ActorContextRef,
67        stream_source_core: StreamSourceCore<S>,
68        upstream: Executor,
69        rate_limit_rps: Option<u32>,
70    ) -> Self {
71        Self {
72            actor_ctx,
73            stream_source_core: Some(stream_source_core),
74            upstream: Some(upstream),
75            rate_limit_rps,
76            _marker: PhantomData,
77        }
78    }
79
80    async fn replace_with_new_batch_reader<const BIASED: bool>(
81        splits_on_fetch: &mut usize,
82        state_store_handler: &SourceStateTableHandler<S>,
83        column_ids: Vec<ColumnId>,
84        source_ctx: SourceContext,
85        source_desc: &SourceDesc,
86        stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
87        rate_limit_rps: Option<u32>,
88    ) -> StreamExecutorResult<()> {
89        let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
90        let state_table = state_store_handler.state_table();
91        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
92            let table_iter = state_table
93                .iter_with_vnode(
94                    vnode,
95                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
96                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
97                    PrefetchOptions::prefetch_for_small_range_scan(),
98                )
99                .await?;
100            pin_mut!(table_iter);
101            let properties = source_desc.source.config.clone();
102            while let Some(item) = table_iter.next().await {
103                let row = item?;
104                let split = match row.datum_at(1) {
105                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
106                        risingwave_connector::source::ConnectorProperties::Gcs(_) => {
107                            let split: OpendalFsSplit<OpendalGcs> =
108                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
109                            SplitImpl::from(split)
110                        }
111                        risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
112                            let split: OpendalFsSplit<OpendalS3> =
113                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
114                            SplitImpl::from(split)
115                        }
116                        risingwave_connector::source::ConnectorProperties::Azblob(_) => {
117                            let split: OpendalFsSplit<OpendalAzblob> =
118                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
119                            SplitImpl::from(split)
120                        }
121                        risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
122                            let split: OpendalFsSplit<OpendalPosixFs> =
123                                OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
124                            SplitImpl::from(split)
125                        }
126                        _ => unreachable!(),
127                    },
128                    _ => unreachable!(),
129                };
130                batch.push(split);
131
132                if batch.len() >= SPLIT_BATCH_SIZE {
133                    break 'vnodes;
134                }
135            }
136        }
137        if batch.is_empty() {
138            stream.replace_data_stream(stream::pending().boxed());
139        } else {
140            *splits_on_fetch += batch.len();
141
142            let mut merged_stream =
143                stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
144            // Change the previous implementation where multiple files shared a single SourceReader
145            // to a new approach where each SourceReader reads only one file.
146            // Then, merge the streams of multiple files serially here.
147            for split in batch {
148                let single_file_stream = Self::build_single_file_stream_reader(
149                    column_ids.clone(),
150                    source_ctx.clone(),
151                    source_desc,
152                    Some(vec![split]),
153                    rate_limit_rps,
154                )
155                .await?
156                .map_err(StreamExecutorError::connector_error);
157                let single_file_stream = single_file_stream.map(|reader| reader);
158                merged_stream = merged_stream.chain(single_file_stream).boxed();
159            }
160
161            stream.replace_data_stream(merged_stream);
162        }
163
164        Ok(())
165    }
166
167    // Note: This change applies only to the file source.
168    //
169    // Each SourceReader (for the streaming file source, this is the `OpendalReader` struct)
170    // reads only one file. After the chunk stream returned by the SourceReader,
171    // chain a None to indicate that the file has been fully read.
172    async fn build_single_file_stream_reader(
173        column_ids: Vec<ColumnId>,
174        source_ctx: SourceContext,
175        source_desc: &SourceDesc,
176        batch: SplitBatch,
177        rate_limit_rps: Option<u32>,
178    ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
179        let (stream, _) = source_desc
180            .source
181            .build_stream(batch, column_ids, Arc::new(source_ctx), false)
182            .await
183            .map_err(StreamExecutorError::connector_error)?;
184        let optional_stream: BoxStreamingFileSourceChunkStream = stream
185            .map(|item| item.map(Some))
186            .chain(stream::once(async { Ok(None) }))
187            .boxed();
188        Ok(
189            apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
190                .boxed(),
191        )
192    }
193
194    fn build_source_ctx(
195        &self,
196        source_desc: &SourceDesc,
197        source_id: TableId,
198        source_name: &str,
199    ) -> SourceContext {
200        SourceContext::new(
201            self.actor_ctx.id,
202            source_id,
203            self.actor_ctx.fragment_id,
204            source_name.to_owned(),
205            source_desc.metrics.clone(),
206            SourceCtrlOpts {
207                chunk_size: limited_chunk_size(self.rate_limit_rps),
208                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
209            },
210            source_desc.source.config.clone(),
211            None,
212        )
213    }
214
215    #[try_stream(ok = Message, error = StreamExecutorError)]
216    async fn into_stream(mut self) {
217        let mut upstream = self.upstream.take().unwrap().execute();
218        let barrier = expect_first_barrier(&mut upstream).await?;
219        let first_epoch = barrier.epoch;
220        let is_pause_on_startup = barrier.is_pause_on_startup();
221        yield Message::Barrier(barrier);
222
223        let mut core = self.stream_source_core.take().unwrap();
224        let mut state_store_handler = core.split_state_store;
225
226        // Build source description from the builder.
227        let source_desc_builder = core.source_desc_builder.take().unwrap();
228
229        let source_desc = source_desc_builder
230            .build()
231            .map_err(StreamExecutorError::connector_error)?;
232
233        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
234        else {
235            unreachable!("Partition and offset columns must be set.");
236        };
237        // Initialize state table.
238        state_store_handler.init_epoch(first_epoch).await?;
239
240        let mut splits_on_fetch: usize = 0;
241        let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
242            upstream,
243            stream::pending().boxed(),
244        );
245        if is_pause_on_startup {
246            stream.pause_stream();
247        }
248
249        // If it is a recovery startup,
250        // there can be file assignments in the state table.
251        // Hence we try building a reader first.
252        Self::replace_with_new_batch_reader(
253            &mut splits_on_fetch,
254            &state_store_handler, // move into the function
255            core.column_ids.clone(),
256            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
257            &source_desc,
258            &mut stream,
259            self.rate_limit_rps,
260        )
261        .await?;
262        let mut reading_file: Arc<str> = "".into();
263
264        while let Some(msg) = stream.next().await {
265            match msg {
266                Err(e) => {
267                    tracing::error!(error = %e.as_report(), "Fetch Error");
268                    splits_on_fetch = 0;
269                }
270                Ok(msg) => {
271                    match msg {
272                        // This branch will be preferred.
273                        Either::Left(msg) => {
274                            match msg {
275                                Message::Barrier(barrier) => {
276                                    if let Some(mutation) = barrier.mutation.as_deref() {
277                                        match mutation {
278                                            Mutation::Pause => stream.pause_stream(),
279                                            Mutation::Resume => stream.resume_stream(),
280                                            Mutation::Throttle(actor_to_apply) => {
281                                                if let Some(new_rate_limit) =
282                                                    actor_to_apply.get(&self.actor_ctx.id)
283                                                    && *new_rate_limit != self.rate_limit_rps
284                                                {
285                                                    tracing::info!(
286                                                        "updating rate limit from {:?} to {:?}",
287                                                        self.rate_limit_rps,
288                                                        *new_rate_limit
289                                                    );
290                                                    self.rate_limit_rps = *new_rate_limit;
291                                                    splits_on_fetch = 0;
292                                                }
293                                            }
294                                            _ => (),
295                                        }
296                                    }
297
298                                    let post_commit = state_store_handler
299                                        .commit_may_update_vnode_bitmap(barrier.epoch)
300                                        .await?;
301
302                                    let update_vnode_bitmap =
303                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
304                                    // Propagate the barrier.
305                                    yield Message::Barrier(barrier);
306
307                                    if let Some((_, cache_may_stale)) =
308                                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
309                                    {
310                                        // if cache_may_stale, we must rebuild the stream to adjust vnode mappings
311                                        if cache_may_stale {
312                                            splits_on_fetch = 0;
313                                        }
314                                    }
315
316                                    if splits_on_fetch == 0 {
317                                        Self::replace_with_new_batch_reader(
318                                            &mut splits_on_fetch,
319                                            &state_store_handler,
320                                            core.column_ids.clone(),
321                                            self.build_source_ctx(
322                                                &source_desc,
323                                                core.source_id,
324                                                &core.source_name,
325                                            ),
326                                            &source_desc,
327                                            &mut stream,
328                                            self.rate_limit_rps,
329                                        )
330                                        .await?;
331                                    }
332                                }
333                                // Receiving file assignments from upstream list executor,
334                                // store into state table.
335                                Message::Chunk(chunk) => {
336                                    // For Parquet encoding, the offset indicates the current row being read.
337                                    let file_assignment = chunk
338                                        .data_chunk()
339                                        .rows()
340                                        .map(|row| {
341                                            let filename = row.datum_at(0).unwrap().into_utf8();
342
343                                            let size = row.datum_at(2).unwrap().into_int64();
344                                            OpendalFsSplit::<Src>::new(
345                                                filename.to_owned(),
346                                                0,
347                                                size as usize,
348                                            )
349                                        })
350                                        .collect();
351                                    state_store_handler.set_states(file_assignment).await?;
352                                    state_store_handler.try_flush().await?;
353                                }
354                                Message::Watermark(_) => unreachable!(),
355                            }
356                        }
357                        // StreamChunk from FsSourceReader, and the reader reads only one file.
358                        // Motivation for the changes:
359                        //
360                        // Previously, the fetch executor determined whether a file was fully read by checking if the
361                        // offset reached the size of the file. However, this approach had some issues related to
362                        // maintaining the offset:
363                        //
364                        // 1. For files compressed with gzip, the size reported corresponds to the original uncompressed
365                        //    size, not the actual size of the compressed file.
366                        //
367                        // 2. For Parquet files, the offset represents the number of rows. Therefore, when listing each
368                        //    file, it was necessary to read the metadata to obtain the total number of rows in the file,
369                        //    which is an expensive operation.
370                        //
371                        // To address these issues, we changes the approach to determining whether a file is fully
372                        // read by moving the check outside the reader. The fetch executor's right stream has been
373                        // changed from a chunk stream to an `Option<Chunk>` stream. When a file is completely read,
374                        // a `None` value is added at the end of the file stream to signal to the fetch executor that the
375                        // file has been fully read. Upon encountering `None`, the file is deleted.
376                        Either::Right(optional_chunk) => match optional_chunk {
377                            Some(chunk) => {
378                                let mapping = get_split_offset_mapping_from_chunk(
379                                    &chunk, split_idx, offset_idx,
380                                )
381                                .unwrap();
382                                debug_assert_eq!(mapping.len(), 1);
383                                if let Some((split_id, _offset)) = mapping.into_iter().next() {
384                                    reading_file = split_id.clone();
385                                    let row = state_store_handler.get(&split_id).await?
386                                        .unwrap_or_else(|| {
387                                            panic!("The fs_split (file_name) {:?} should be in the state table.",
388                                        split_id)
389                                        });
390                                    let fs_split = match row.datum_at(1) {
391                                        Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
392                                            OpendalFsSplit::<Src>::restore_from_json(
393                                                jsonb_ref.to_owned_scalar(),
394                                            )?
395                                        }
396                                        _ => unreachable!(),
397                                    };
398
399                                    state_store_handler
400                                        .set(&split_id, fs_split.encode_to_json())
401                                        .await?;
402                                }
403                                let chunk = prune_additional_cols(
404                                    &chunk,
405                                    split_idx,
406                                    offset_idx,
407                                    &source_desc.columns,
408                                );
409                                yield Message::Chunk(chunk);
410                            }
411                            None => {
412                                splits_on_fetch -= 1;
413                                state_store_handler.delete(&reading_file).await?;
414                            }
415                        },
416                    }
417                }
418            }
419        }
420    }
421}
422
423impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
424    fn execute(self: Box<Self>) -> BoxedMessageStream {
425        self.into_stream().boxed()
426    }
427}
428
429impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
430    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
431        if let Some(core) = &self.stream_source_core {
432            f.debug_struct("FsFetchExecutor")
433                .field("source_id", &core.source_id)
434                .field("column_ids", &core.column_ids)
435                .finish()
436        } else {
437            f.debug_struct("FsFetchExecutor").finish()
438        }
439    }
440}