risingwave_stream/executor/source/batch_source/
batch_posix_fs_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::io::BufRead;
17use std::path::Path;
18
19use either::Either;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::types::{JsonbVal, ScalarRef};
23use risingwave_connector::parser::{ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig};
24use risingwave_connector::source::filesystem::OpendalFsSplit;
25use risingwave_connector::source::filesystem::opendal_source::OpendalPosixFs;
26use risingwave_connector::source::{
27    ConnectorProperties, SourceContext, SourceCtrlOpts, SourceMessage, SourceMeta, SplitMetaData,
28};
29use thiserror_ext::AsReport;
30use tokio::fs;
31
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::prelude::*;
34use crate::executor::source::{StreamSourceCore, get_split_offset_col_idx, prune_additional_cols};
35use crate::executor::stream_reader::StreamReaderWithPause;
36use crate::task::LocalBarrierManager;
37
38/// Maximum number of files to process in a single batch
39const BATCH_SIZE: usize = 1000;
40
41/// Executor for fetching and processing files in batch mode for refreshable tables.
42///
43/// This executor receives file assignments from an upstream list executor,
44/// reads the files, parses their contents, and emits stream chunks.
45///
46/// Key characteristics:
47/// - Uses **ephemeral in-memory state** (no persistent state table)
48/// - State is cleared on recovery and `RefreshStart` mutations
49/// - Suitable for refreshable materialized views
50pub struct BatchPosixFsFetchExecutor<S: StateStore> {
51    actor_ctx: ActorContextRef,
52
53    /// Core component for managing external streaming source state
54    stream_source_core: Option<StreamSourceCore<S>>,
55
56    /// Upstream list executor that provides the list of files to read
57    upstream: Option<Executor>,
58
59    /// Optional rate limit in rows/s to control data ingestion speed
60    rate_limit_rps: Option<u32>,
61
62    /// Local barrier manager for reporting load finished
63    barrier_manager: LocalBarrierManager,
64
65    /// In-memory queue of file assignments to process (`file_path`, `split_json`).
66    /// This is ephemeral and cleared on recovery and `RefreshStart` mutations.
67    file_queue: VecDeque<(String, JsonbVal)>,
68}
69
70/// Fetched data from a file, along with file path for logging
71struct FileData {
72    /// The actual data chunks read from the file
73    chunks: Vec<StreamChunk>,
74
75    /// Path to the data file
76    file_path: String,
77}
78
79impl<S: StateStore> BatchPosixFsFetchExecutor<S> {
80    pub fn new(
81        actor_ctx: ActorContextRef,
82        stream_source_core: StreamSourceCore<S>,
83        upstream: Executor,
84        rate_limit_rps: Option<u32>,
85        barrier_manager: LocalBarrierManager,
86    ) -> Self {
87        Self {
88            actor_ctx,
89            stream_source_core: Some(stream_source_core),
90            upstream: Some(upstream),
91            rate_limit_rps,
92            barrier_manager,
93            file_queue: VecDeque::new(),
94        }
95    }
96
97    /// Pop files from the in-memory queue and create a batch reader for them.
98    /// Processes up to `BATCH_SIZE` files in parallel.
99    fn replace_with_new_batch_reader<const BIASED: bool>(
100        files_in_progress: &mut usize,
101        file_queue: &mut VecDeque<(String, JsonbVal)>,
102        stream: &mut StreamReaderWithPause<BIASED, FileData>,
103        properties: ConnectorProperties,
104        parser_config: ParserConfig,
105        source_ctx: Arc<SourceContext>,
106    ) -> StreamExecutorResult<()> {
107        // Pop up to BATCH_SIZE files from the queue to process
108        let mut batch = Vec::with_capacity(BATCH_SIZE);
109
110        for _ in 0..BATCH_SIZE {
111            if let Some((_file_path, split_json)) = file_queue.pop_front() {
112                let split = OpendalFsSplit::<OpendalPosixFs>::restore_from_json(split_json)?;
113                batch.push(split);
114            } else {
115                break;
116            }
117        }
118
119        if batch.is_empty() {
120            // No files to process, set stream to pending
121            stream.replace_data_stream(stream::pending().boxed());
122        } else {
123            *files_in_progress += batch.len();
124            let batch_reader =
125                Self::build_batched_stream_reader(batch, properties, parser_config, source_ctx);
126            stream.replace_data_stream(batch_reader.boxed());
127        }
128
129        Ok(())
130    }
131
132    /// Build a stream reader that reads multiple files in sequence
133    #[try_stream(ok = FileData, error = StreamExecutorError)]
134    async fn build_batched_stream_reader(
135        batch: Vec<OpendalFsSplit<OpendalPosixFs>>,
136        properties: ConnectorProperties,
137        parser_config: ParserConfig,
138        source_ctx: Arc<SourceContext>,
139    ) {
140        let ConnectorProperties::BatchPosixFs(batch_posix_fs_properties) = properties else {
141            unreachable!()
142        };
143
144        let root_path = batch_posix_fs_properties.root.clone();
145
146        for split in batch {
147            let file_path = split.name.clone();
148            let full_path = Path::new(&root_path).join(&file_path);
149
150            // Read the entire file
151            let content = match fs::read(&full_path).await {
152                Ok(content) => content,
153                Err(e) => {
154                    tracing::error!(
155                        error = %e.as_report(),
156                        file_path = %full_path.display(),
157                        "Failed to read file"
158                    );
159                    continue;
160                }
161            };
162
163            if content.is_empty() {
164                // Empty file, skip it
165                yield FileData {
166                    chunks: vec![],
167                    file_path,
168                };
169                continue;
170            }
171
172            let mut chunks = vec![];
173
174            // Process the file line by line
175            for line in content.lines() {
176                let line =
177                    line.map_err(|e| StreamExecutorError::connector_error(anyhow::Error::from(e)))?;
178
179                let message = SourceMessage {
180                    key: None,
181                    payload: Some(line.as_bytes().to_vec()),
182                    offset: "0".to_owned(),
183                    split_id: split.id(),
184                    meta: SourceMeta::Empty,
185                };
186
187                // TODO(tab): avoid rebuilding ByteStreamSourceParserImpl for each file
188                // Parser is created per line because it's consumed by parse_stream
189                let parser =
190                    ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone())
191                        .await?;
192
193                let chunk_stream = parser
194                    .parse_stream(Box::pin(futures::stream::once(async { Ok(vec![message]) })));
195
196                #[for_await]
197                for chunk in chunk_stream {
198                    chunks.push(chunk?);
199                }
200            }
201
202            yield FileData { chunks, file_path };
203        }
204    }
205
206    #[try_stream(ok = Message, error = StreamExecutorError)]
207    async fn into_stream(mut self) {
208        let mut upstream = self.upstream.take().unwrap().execute();
209        let barrier = expect_first_barrier(&mut upstream).await?;
210        let is_pause_on_startup = barrier.is_pause_on_startup();
211        yield Message::Barrier(barrier);
212
213        let mut core = self.stream_source_core.take().unwrap();
214
215        // Build source description from the builder.
216        let source_desc_builder = core.source_desc_builder.take().unwrap();
217
218        let source_desc = source_desc_builder
219            .build()
220            .map_err(StreamExecutorError::connector_error)?;
221        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
222        else {
223            unreachable!("Partition and offset columns must be set.");
224        };
225
226        let properties = source_desc.source.config.clone();
227        let parser_config = ParserConfig {
228            common: CommonParserConfig {
229                rw_columns: source_desc.columns.clone(),
230            },
231            specific: source_desc.source.parser_config.clone(),
232        };
233
234        let mut files_in_progress: usize = 0;
235        let mut stream =
236            StreamReaderWithPause::<true, FileData>::new(upstream, stream::pending().boxed());
237
238        if is_pause_on_startup {
239            stream.pause_stream();
240        }
241
242        // For refreshable tables, always start fresh on recovery - no state restoration
243        // File queue is empty by default (no restoration from persistent state)
244
245        let mut list_finished = false;
246        let mut is_refreshing = false;
247        let mut file_queue = self.file_queue;
248
249        // Extract fields we'll need later
250        let actor_ctx = self.actor_ctx.clone();
251        let barrier_manager = self.barrier_manager.clone();
252        let rate_limit_rps = &mut self.rate_limit_rps;
253
254        let source_ctx = Arc::new(SourceContext::new(
255            actor_ctx.id,
256            core.source_id,
257            actor_ctx.fragment_id,
258            core.source_name.clone(),
259            source_desc.metrics.clone(),
260            SourceCtrlOpts {
261                chunk_size: limited_chunk_size(*rate_limit_rps),
262                split_txn: rate_limit_rps.is_some(),
263            },
264            source_desc.source.config.clone(),
265            None,
266        ));
267
268        while let Some(msg) = stream.next().await {
269            match msg {
270                Err(e) => {
271                    tracing::error!(error = %e.as_report(), "Fetch Error");
272                    files_in_progress = 0;
273                }
274                Ok(msg) => match msg {
275                    // Barrier messages from upstream
276                    Either::Left(msg) => match msg {
277                        Message::Barrier(barrier) => {
278                            let need_rebuild_reader = false;
279
280                            if let Some(mutation) = barrier.mutation.as_deref() {
281                                match mutation {
282                                    Mutation::Pause => stream.pause_stream(),
283                                    Mutation::Resume => stream.resume_stream(),
284                                    Mutation::RefreshStart {
285                                        associated_source_id,
286                                        ..
287                                    } if associated_source_id.table_id()
288                                        == core.source_id.table_id() =>
289                                    {
290                                        tracing::info!(
291                                            ?barrier.epoch,
292                                            actor_id = actor_ctx.id,
293                                            source_id = %core.source_id,
294                                            queue_len = file_queue.len(),
295                                            files_in_progress,
296                                            "RefreshStart: clearing state and aborting workload"
297                                        );
298
299                                        // Clear all in-memory state
300                                        file_queue.clear();
301                                        files_in_progress = 0;
302                                        list_finished = false;
303                                        is_refreshing = true;
304
305                                        // Abort current file reader
306                                        stream.replace_data_stream(stream::pending().boxed());
307                                    }
308                                    Mutation::ListFinish {
309                                        associated_source_id,
310                                    } => {
311                                        // Check if this ListFinish is for our source
312                                        if associated_source_id.table_id()
313                                            == core.source_id.table_id()
314                                        {
315                                            tracing::info!(
316                                                ?barrier.epoch,
317                                                actor_id = actor_ctx.id,
318                                                source_id = %core.source_id,
319                                                "received ListFinish mutation"
320                                            );
321                                            list_finished = true;
322                                        }
323                                    }
324                                    _ => (),
325                                }
326                            }
327
328                            let epoch = barrier.epoch;
329
330                            // Report load finished BEFORE yielding barrier when:
331                            // 1. All files have been processed (files_in_progress == 0 and file_queue is empty)
332                            // 2. ListFinish mutation has been received
333                            //
334                            // IMPORTANT: Must report BEFORE yield to ensure epoch is still in inflight_barriers.
335                            // If we yield first, the barrier worker may collect the barrier and remove the epoch
336                            // from inflight_barriers, causing the report to be ignored with a warning.
337                            if files_in_progress == 0
338                                && file_queue.is_empty()
339                                && list_finished
340                                && is_refreshing
341                            {
342                                tracing::info!(
343                                    ?epoch,
344                                    actor_id = actor_ctx.id,
345                                    source_id = %core.source_id,
346                                    "Reporting source load finished"
347                                );
348                                barrier_manager.report_source_load_finished(
349                                    epoch,
350                                    actor_ctx.id,
351                                    core.source_id.table_id(),
352                                    core.source_id.table_id(),
353                                );
354                                // Reset the flag to avoid duplicate reports
355                                list_finished = false;
356                                is_refreshing = false;
357                            }
358
359                            // Propagate the barrier AFTER reporting progress.
360                            yield Message::Barrier(barrier);
361
362                            // Rebuild reader when all current files are processed
363                            if files_in_progress == 0 || need_rebuild_reader {
364                                Self::replace_with_new_batch_reader(
365                                    &mut files_in_progress,
366                                    &mut file_queue,
367                                    &mut stream,
368                                    properties.clone(),
369                                    parser_config.clone(),
370                                    source_ctx.clone(),
371                                )?;
372                            }
373                        }
374                        // Receiving file assignments from upstream list executor,
375                        // store into in-memory queue (no persistent state).
376                        Message::Chunk(chunk) => {
377                            for row in chunk.data_chunk().rows() {
378                                let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
379                                let split = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
380                                file_queue.push_back((file_name, split));
381                            }
382
383                            tracing::debug!(
384                                actor_id = actor_ctx.id,
385                                queue_len = file_queue.len(),
386                                "Added file assignments to queue"
387                            );
388                        }
389                        Message::Watermark(_) => unreachable!(),
390                    },
391                    // Data from file reader
392                    Either::Right(FileData { chunks, file_path }) => {
393                        // Decrement counter after processing a file
394                        files_in_progress -= 1;
395                        tracing::debug!(
396                            file_path = ?file_path,
397                            "Processed file"
398                        );
399
400                        // Yield all chunks from the file
401                        for chunk in chunks {
402                            let chunk = prune_additional_cols(
403                                &chunk,
404                                split_idx,
405                                offset_idx,
406                                &source_desc.columns,
407                            );
408                            yield Message::Chunk(chunk);
409                        }
410                    }
411                },
412            }
413        }
414    }
415}
416
417impl<S: StateStore> Execute for BatchPosixFsFetchExecutor<S> {
418    fn execute(self: Box<Self>) -> BoxedMessageStream {
419        self.into_stream().boxed()
420    }
421}
422
423impl<S: StateStore> Debug for BatchPosixFsFetchExecutor<S> {
424    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
425        if let Some(core) = &self.stream_source_core {
426            f.debug_struct("BatchPosixFsFetchExecutor")
427                .field("source_id", &core.source_id)
428                .field("column_ids", &core.column_ids)
429                .finish()
430        } else {
431            f.debug_struct("BatchPosixFsFetchExecutor").finish()
432        }
433    }
434}