risingwave_stream/executor/source/batch_source/
batch_posix_fs_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::io::BufRead;
17use std::path::Path;
18
19use either::Either;
20use futures::TryStreamExt;
21use futures::stream::{self, StreamExt};
22use futures_async_stream::try_stream;
23use risingwave_common::id::TableId;
24use risingwave_common::types::{JsonbVal, ScalarRef};
25use risingwave_connector::parser::{ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig};
26use risingwave_connector::source::filesystem::OpendalFsSplit;
27use risingwave_connector::source::filesystem::opendal_source::OpendalPosixFs;
28use risingwave_connector::source::{
29    ConnectorProperties, SourceChunkStream, SourceContext, SourceCtrlOpts, SourceMessage,
30    SourceMessageEvent, SourceMeta, SourceReaderEvent, SplitMetaData,
31};
32use thiserror_ext::AsReport;
33use tokio::fs;
34
35use crate::common::rate_limit::limited_chunk_size;
36use crate::executor::prelude::*;
37use crate::executor::source::{StreamSourceCore, get_split_offset_col_idx, prune_additional_cols};
38use crate::executor::stream_reader::StreamReaderWithPause;
39use crate::task::LocalBarrierManager;
40
41/// Maximum number of files to process in a single batch
42const BATCH_SIZE: usize = 1000;
43
44fn into_data_chunk_stream(
45    stream: impl futures::Stream<Item = risingwave_connector::error::ConnectorResult<SourceReaderEvent>>
46    + Send
47    + 'static,
48) -> impl SourceChunkStream {
49    stream
50        .try_filter_map(|event| async move {
51            Ok(match event {
52                SourceReaderEvent::DataChunk(chunk) => Some(chunk),
53                SourceReaderEvent::SplitProgress(_) => None,
54            })
55        })
56        .boxed()
57}
58
59/// Executor for fetching and processing files in batch mode for refreshable tables.
60///
61/// This executor receives file assignments from an upstream list executor,
62/// reads the files, parses their contents, and emits stream chunks.
63///
64/// Key characteristics:
65/// - Uses **ephemeral in-memory state** (no persistent state table)
66/// - State is cleared on recovery and `RefreshStart` mutations
67/// - Suitable for refreshable materialized views
68pub struct BatchPosixFsFetchExecutor<S: StateStore> {
69    actor_ctx: ActorContextRef,
70
71    /// Core component for managing external streaming source state
72    stream_source_core: Option<StreamSourceCore<S>>,
73
74    /// Upstream list executor that provides the list of files to read
75    upstream: Option<Executor>,
76
77    /// Optional rate limit in rows/s to control data ingestion speed
78    rate_limit_rps: Option<u32>,
79
80    /// Local barrier manager for reporting load finished
81    barrier_manager: LocalBarrierManager,
82
83    /// In-memory queue of file assignments to process (`file_path`, `split_json`).
84    /// This is ephemeral and cleared on recovery and `RefreshStart` mutations.
85    file_queue: VecDeque<(String, JsonbVal)>,
86
87    /// Associated table ID for reporting load finished
88    associated_table_id: TableId,
89}
90
91/// Fetched data from a file, along with file path for logging
92struct FileData {
93    /// The actual data chunks read from the file
94    chunks: Vec<StreamChunk>,
95
96    /// Path to the data file
97    file_path: String,
98}
99
100impl<S: StateStore> BatchPosixFsFetchExecutor<S> {
101    pub fn new(
102        actor_ctx: ActorContextRef,
103        stream_source_core: StreamSourceCore<S>,
104        upstream: Executor,
105        rate_limit_rps: Option<u32>,
106        barrier_manager: LocalBarrierManager,
107        associated_table_id: Option<TableId>,
108    ) -> Self {
109        assert!(associated_table_id.is_some());
110        Self {
111            actor_ctx,
112            stream_source_core: Some(stream_source_core),
113            upstream: Some(upstream),
114            rate_limit_rps,
115            barrier_manager,
116            file_queue: VecDeque::new(),
117            associated_table_id: associated_table_id.unwrap(),
118        }
119    }
120
121    /// Pop files from the in-memory queue and create a batch reader for them.
122    /// Processes up to `BATCH_SIZE` files in parallel.
123    fn replace_with_new_batch_reader<const BIASED: bool>(
124        files_in_progress: &mut usize,
125        file_queue: &mut VecDeque<(String, JsonbVal)>,
126        stream: &mut StreamReaderWithPause<BIASED, FileData>,
127        properties: ConnectorProperties,
128        parser_config: ParserConfig,
129        source_ctx: Arc<SourceContext>,
130    ) -> StreamExecutorResult<()> {
131        // Pop up to BATCH_SIZE files from the queue to process
132        let mut batch = Vec::with_capacity(BATCH_SIZE);
133
134        for _ in 0..BATCH_SIZE {
135            if let Some((_file_path, split_json)) = file_queue.pop_front() {
136                let split = OpendalFsSplit::<OpendalPosixFs>::restore_from_json(split_json)?;
137                batch.push(split);
138            } else {
139                break;
140            }
141        }
142
143        if batch.is_empty() {
144            // No files to process, set stream to pending
145            stream.replace_data_stream(stream::pending().boxed());
146        } else {
147            *files_in_progress += batch.len();
148            let batch_reader =
149                Self::build_batched_stream_reader(batch, properties, parser_config, source_ctx);
150            stream.replace_data_stream(batch_reader.boxed());
151        }
152
153        Ok(())
154    }
155
156    /// Build a stream reader that reads multiple files in sequence
157    #[try_stream(ok = FileData, error = StreamExecutorError)]
158    async fn build_batched_stream_reader(
159        batch: Vec<OpendalFsSplit<OpendalPosixFs>>,
160        properties: ConnectorProperties,
161        parser_config: ParserConfig,
162        source_ctx: Arc<SourceContext>,
163    ) {
164        let ConnectorProperties::BatchPosixFs(batch_posix_fs_properties) = properties else {
165            unreachable!()
166        };
167
168        let root_path = batch_posix_fs_properties.root.clone();
169
170        for split in batch {
171            let file_path = split.name.clone();
172            let full_path = Path::new(&root_path).join(&file_path);
173
174            // Read the entire file
175            let content = match fs::read(&full_path).await {
176                Ok(content) => content,
177                Err(e) => {
178                    tracing::error!(
179                        error = %e.as_report(),
180                        file_path = %full_path.display(),
181                        "Failed to read file"
182                    );
183                    continue;
184                }
185            };
186
187            if content.is_empty() {
188                // Empty file, skip it
189                yield FileData {
190                    chunks: vec![],
191                    file_path,
192                };
193                continue;
194            }
195
196            let mut chunks = vec![];
197
198            // Process the file line by line
199            for line in content.lines() {
200                let line =
201                    line.map_err(|e| StreamExecutorError::connector_error(anyhow::Error::from(e)))?;
202
203                let message = SourceMessage {
204                    key: None,
205                    payload: Some(line.as_bytes().to_vec()),
206                    offset: "0".to_owned(),
207                    split_id: split.id(),
208                    meta: SourceMeta::Empty,
209                };
210
211                // TODO(tab): avoid rebuilding ByteStreamSourceParserImpl for each file
212                // Parser is rebuilt per line because `parse_stream_with_events` consumes it.
213                let parser =
214                    ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone())
215                        .await?;
216
217                let chunk_stream = into_data_chunk_stream(parser.parse_stream_with_events(
218                    Box::pin(futures::stream::once(async {
219                        Ok(SourceMessageEvent::Data(vec![message]))
220                    })),
221                ));
222
223                #[for_await]
224                for chunk in chunk_stream {
225                    chunks.push(chunk?);
226                }
227            }
228
229            yield FileData { chunks, file_path };
230        }
231    }
232
233    #[try_stream(ok = Message, error = StreamExecutorError)]
234    async fn into_stream(mut self) {
235        let mut upstream = self.upstream.take().unwrap().execute();
236        let barrier = expect_first_barrier(&mut upstream).await?;
237        let is_pause_on_startup = barrier.is_pause_on_startup();
238        yield Message::Barrier(barrier);
239
240        let mut core = self.stream_source_core.take().unwrap();
241
242        // Build source description from the builder.
243        let source_desc_builder = core.source_desc_builder.take().unwrap();
244
245        let source_desc = source_desc_builder
246            .build()
247            .map_err(StreamExecutorError::connector_error)?;
248        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
249        else {
250            unreachable!("Partition and offset columns must be set.");
251        };
252
253        let properties = source_desc.source.config.clone();
254        let parser_config = ParserConfig {
255            common: CommonParserConfig {
256                rw_columns: source_desc.columns.clone(),
257            },
258            specific: source_desc.source.parser_config.clone(),
259        };
260
261        let mut files_in_progress: usize = 0;
262        let mut stream =
263            StreamReaderWithPause::<true, FileData>::new(upstream, stream::pending().boxed());
264
265        if is_pause_on_startup {
266            stream.pause_stream();
267        }
268
269        // For refreshable tables, always start fresh on recovery - no state restoration
270        // File queue is empty by default (no restoration from persistent state)
271
272        let mut list_finished = false;
273        let mut is_refreshing = false;
274        let mut file_queue = self.file_queue;
275
276        // Extract fields we'll need later
277        let actor_ctx = self.actor_ctx.clone();
278        let barrier_manager = self.barrier_manager.clone();
279        let rate_limit_rps = &mut self.rate_limit_rps;
280
281        let source_ctx = Arc::new(SourceContext::new(
282            actor_ctx.id,
283            core.source_id,
284            actor_ctx.fragment_id,
285            core.source_name.clone(),
286            source_desc.metrics.clone(),
287            SourceCtrlOpts {
288                chunk_size: limited_chunk_size(*rate_limit_rps),
289                split_txn: rate_limit_rps.is_some(),
290            },
291            source_desc.source.config.clone(),
292            None,
293        ));
294
295        while let Some(msg) = stream.next().await {
296            match msg {
297                Err(e) => {
298                    tracing::error!(error = %e.as_report(), "Fetch Error");
299                    files_in_progress = 0;
300                }
301                Ok(msg) => match msg {
302                    // Barrier messages from upstream
303                    Either::Left(msg) => match msg {
304                        Message::Barrier(barrier) => {
305                            let need_rebuild_reader = false;
306
307                            if let Some(mutation) = barrier.mutation.as_deref() {
308                                match mutation {
309                                    Mutation::Pause => stream.pause_stream(),
310                                    Mutation::Resume => stream.resume_stream(),
311                                    Mutation::RefreshStart {
312                                        associated_source_id,
313                                        ..
314                                    } if associated_source_id.as_raw_id()
315                                        == core.source_id.as_raw_id() =>
316                                    {
317                                        tracing::info!(
318                                            ?barrier.epoch,
319                                            actor_id = %actor_ctx.id,
320                                            source_id = %core.source_id,
321                                            queue_len = file_queue.len(),
322                                            files_in_progress,
323                                            "RefreshStart: clearing state and aborting workload"
324                                        );
325
326                                        // Clear all in-memory state
327                                        file_queue.clear();
328                                        files_in_progress = 0;
329                                        list_finished = false;
330                                        is_refreshing = true;
331
332                                        // Abort current file reader
333                                        stream.replace_data_stream(stream::pending().boxed());
334                                    }
335                                    Mutation::ListFinish {
336                                        associated_source_id,
337                                    } => {
338                                        // Check if this ListFinish is for our source
339                                        if associated_source_id.as_raw_id()
340                                            == core.source_id.as_raw_id()
341                                        {
342                                            tracing::info!(
343                                                ?barrier.epoch,
344                                                actor_id = %actor_ctx.id,
345                                                source_id = %core.source_id,
346                                                "received ListFinish mutation"
347                                            );
348                                            list_finished = true;
349                                        }
350                                    }
351                                    _ => (),
352                                }
353                            }
354
355                            let epoch = barrier.epoch;
356
357                            // Report load finished BEFORE yielding barrier when:
358                            // 1. All files have been processed (files_in_progress == 0 and file_queue is empty)
359                            // 2. ListFinish mutation has been received
360                            //
361                            // IMPORTANT: Must report BEFORE yield to ensure epoch is still in inflight_barriers.
362                            // If we yield first, the barrier worker may collect the barrier and remove the epoch
363                            // from inflight_barriers, causing the report to be ignored with a warning.
364                            if files_in_progress == 0
365                                && file_queue.is_empty()
366                                && list_finished
367                                && is_refreshing
368                                && barrier.is_checkpoint()
369                            {
370                                tracing::info!(
371                                    ?epoch,
372                                    actor_id = %actor_ctx.id,
373                                    source_id = %core.source_id,
374                                    "Reporting source load finished"
375                                );
376                                barrier_manager.report_source_load_finished(
377                                    epoch,
378                                    actor_ctx.id,
379                                    self.associated_table_id,
380                                    core.source_id,
381                                );
382                                // Reset the flag to avoid duplicate reports
383                                list_finished = false;
384                                is_refreshing = false;
385                            }
386
387                            // Propagate the barrier AFTER reporting progress.
388                            yield Message::Barrier(barrier);
389
390                            // Rebuild reader when all current files are processed
391                            if files_in_progress == 0 || need_rebuild_reader {
392                                Self::replace_with_new_batch_reader(
393                                    &mut files_in_progress,
394                                    &mut file_queue,
395                                    &mut stream,
396                                    properties.clone(),
397                                    parser_config.clone(),
398                                    source_ctx.clone(),
399                                )?;
400                            }
401                        }
402                        // Receiving file assignments from upstream list executor,
403                        // store into in-memory queue (no persistent state).
404                        Message::Chunk(chunk) => {
405                            for row in chunk.data_chunk().rows() {
406                                let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
407                                let split = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
408                                file_queue.push_back((file_name, split));
409                            }
410
411                            tracing::debug!(
412                                actor_id = %actor_ctx.id,
413                                queue_len = file_queue.len(),
414                                "Added file assignments to queue"
415                            );
416                        }
417                        Message::Watermark(_) => unreachable!(),
418                    },
419                    // Data from file reader
420                    Either::Right(FileData { chunks, file_path }) => {
421                        // Decrement counter after processing a file
422                        files_in_progress -= 1;
423                        tracing::debug!(
424                            file_path = ?file_path,
425                            "Processed file"
426                        );
427
428                        // Yield all chunks from the file
429                        for chunk in chunks {
430                            let chunk = prune_additional_cols(
431                                &chunk,
432                                &[split_idx, offset_idx],
433                                &source_desc.columns,
434                            );
435                            yield Message::Chunk(chunk);
436                        }
437                    }
438                },
439            }
440        }
441    }
442}
443
444impl<S: StateStore> Execute for BatchPosixFsFetchExecutor<S> {
445    fn execute(self: Box<Self>) -> BoxedMessageStream {
446        self.into_stream().boxed()
447    }
448}
449
450impl<S: StateStore> Debug for BatchPosixFsFetchExecutor<S> {
451    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
452        if let Some(core) = &self.stream_source_core {
453            f.debug_struct("BatchPosixFsFetchExecutor")
454                .field("source_id", &core.source_id)
455                .field("column_ids", &core.column_ids)
456                .finish()
457        } else {
458            f.debug_struct("BatchPosixFsFetchExecutor").finish()
459        }
460    }
461}