risingwave_stream/executor/source/batch_source/
batch_posix_fs_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::io::BufRead;
17use std::path::Path;
18
19use either::Either;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::id::TableId;
23use risingwave_common::types::{JsonbVal, ScalarRef};
24use risingwave_connector::parser::{ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig};
25use risingwave_connector::source::filesystem::OpendalFsSplit;
26use risingwave_connector::source::filesystem::opendal_source::OpendalPosixFs;
27use risingwave_connector::source::{
28    ConnectorProperties, SourceContext, SourceCtrlOpts, SourceMessage, SourceMeta, SplitMetaData,
29};
30use thiserror_ext::AsReport;
31use tokio::fs;
32
33use crate::common::rate_limit::limited_chunk_size;
34use crate::executor::prelude::*;
35use crate::executor::source::{StreamSourceCore, get_split_offset_col_idx, prune_additional_cols};
36use crate::executor::stream_reader::StreamReaderWithPause;
37use crate::task::LocalBarrierManager;
38
39/// Maximum number of files to process in a single batch
40const BATCH_SIZE: usize = 1000;
41
42/// Executor for fetching and processing files in batch mode for refreshable tables.
43///
44/// This executor receives file assignments from an upstream list executor,
45/// reads the files, parses their contents, and emits stream chunks.
46///
47/// Key characteristics:
48/// - Uses **ephemeral in-memory state** (no persistent state table)
49/// - State is cleared on recovery and `RefreshStart` mutations
50/// - Suitable for refreshable materialized views
51pub struct BatchPosixFsFetchExecutor<S: StateStore> {
52    actor_ctx: ActorContextRef,
53
54    /// Core component for managing external streaming source state
55    stream_source_core: Option<StreamSourceCore<S>>,
56
57    /// Upstream list executor that provides the list of files to read
58    upstream: Option<Executor>,
59
60    /// Optional rate limit in rows/s to control data ingestion speed
61    rate_limit_rps: Option<u32>,
62
63    /// Local barrier manager for reporting load finished
64    barrier_manager: LocalBarrierManager,
65
66    /// In-memory queue of file assignments to process (`file_path`, `split_json`).
67    /// This is ephemeral and cleared on recovery and `RefreshStart` mutations.
68    file_queue: VecDeque<(String, JsonbVal)>,
69
70    /// Associated table ID for reporting load finished
71    associated_table_id: TableId,
72}
73
74/// Fetched data from a file, along with file path for logging
75struct FileData {
76    /// The actual data chunks read from the file
77    chunks: Vec<StreamChunk>,
78
79    /// Path to the data file
80    file_path: String,
81}
82
83impl<S: StateStore> BatchPosixFsFetchExecutor<S> {
84    pub fn new(
85        actor_ctx: ActorContextRef,
86        stream_source_core: StreamSourceCore<S>,
87        upstream: Executor,
88        rate_limit_rps: Option<u32>,
89        barrier_manager: LocalBarrierManager,
90        associated_table_id: Option<TableId>,
91    ) -> Self {
92        assert!(associated_table_id.is_some());
93        Self {
94            actor_ctx,
95            stream_source_core: Some(stream_source_core),
96            upstream: Some(upstream),
97            rate_limit_rps,
98            barrier_manager,
99            file_queue: VecDeque::new(),
100            associated_table_id: associated_table_id.unwrap(),
101        }
102    }
103
104    /// Pop files from the in-memory queue and create a batch reader for them.
105    /// Processes up to `BATCH_SIZE` files in parallel.
106    fn replace_with_new_batch_reader<const BIASED: bool>(
107        files_in_progress: &mut usize,
108        file_queue: &mut VecDeque<(String, JsonbVal)>,
109        stream: &mut StreamReaderWithPause<BIASED, FileData>,
110        properties: ConnectorProperties,
111        parser_config: ParserConfig,
112        source_ctx: Arc<SourceContext>,
113    ) -> StreamExecutorResult<()> {
114        // Pop up to BATCH_SIZE files from the queue to process
115        let mut batch = Vec::with_capacity(BATCH_SIZE);
116
117        for _ in 0..BATCH_SIZE {
118            if let Some((_file_path, split_json)) = file_queue.pop_front() {
119                let split = OpendalFsSplit::<OpendalPosixFs>::restore_from_json(split_json)?;
120                batch.push(split);
121            } else {
122                break;
123            }
124        }
125
126        if batch.is_empty() {
127            // No files to process, set stream to pending
128            stream.replace_data_stream(stream::pending().boxed());
129        } else {
130            *files_in_progress += batch.len();
131            let batch_reader =
132                Self::build_batched_stream_reader(batch, properties, parser_config, source_ctx);
133            stream.replace_data_stream(batch_reader.boxed());
134        }
135
136        Ok(())
137    }
138
139    /// Build a stream reader that reads multiple files in sequence
140    #[try_stream(ok = FileData, error = StreamExecutorError)]
141    async fn build_batched_stream_reader(
142        batch: Vec<OpendalFsSplit<OpendalPosixFs>>,
143        properties: ConnectorProperties,
144        parser_config: ParserConfig,
145        source_ctx: Arc<SourceContext>,
146    ) {
147        let ConnectorProperties::BatchPosixFs(batch_posix_fs_properties) = properties else {
148            unreachable!()
149        };
150
151        let root_path = batch_posix_fs_properties.root.clone();
152
153        for split in batch {
154            let file_path = split.name.clone();
155            let full_path = Path::new(&root_path).join(&file_path);
156
157            // Read the entire file
158            let content = match fs::read(&full_path).await {
159                Ok(content) => content,
160                Err(e) => {
161                    tracing::error!(
162                        error = %e.as_report(),
163                        file_path = %full_path.display(),
164                        "Failed to read file"
165                    );
166                    continue;
167                }
168            };
169
170            if content.is_empty() {
171                // Empty file, skip it
172                yield FileData {
173                    chunks: vec![],
174                    file_path,
175                };
176                continue;
177            }
178
179            let mut chunks = vec![];
180
181            // Process the file line by line
182            for line in content.lines() {
183                let line =
184                    line.map_err(|e| StreamExecutorError::connector_error(anyhow::Error::from(e)))?;
185
186                let message = SourceMessage {
187                    key: None,
188                    payload: Some(line.as_bytes().to_vec()),
189                    offset: "0".to_owned(),
190                    split_id: split.id(),
191                    meta: SourceMeta::Empty,
192                };
193
194                // TODO(tab): avoid rebuilding ByteStreamSourceParserImpl for each file
195                // Parser is created per line because it's consumed by parse_stream
196                let parser =
197                    ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone())
198                        .await?;
199
200                let chunk_stream = parser
201                    .parse_stream(Box::pin(futures::stream::once(async { Ok(vec![message]) })));
202
203                #[for_await]
204                for chunk in chunk_stream {
205                    chunks.push(chunk?);
206                }
207            }
208
209            yield FileData { chunks, file_path };
210        }
211    }
212
213    #[try_stream(ok = Message, error = StreamExecutorError)]
214    async fn into_stream(mut self) {
215        let mut upstream = self.upstream.take().unwrap().execute();
216        let barrier = expect_first_barrier(&mut upstream).await?;
217        let is_pause_on_startup = barrier.is_pause_on_startup();
218        yield Message::Barrier(barrier);
219
220        let mut core = self.stream_source_core.take().unwrap();
221
222        // Build source description from the builder.
223        let source_desc_builder = core.source_desc_builder.take().unwrap();
224
225        let source_desc = source_desc_builder
226            .build()
227            .map_err(StreamExecutorError::connector_error)?;
228        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
229        else {
230            unreachable!("Partition and offset columns must be set.");
231        };
232
233        let properties = source_desc.source.config.clone();
234        let parser_config = ParserConfig {
235            common: CommonParserConfig {
236                rw_columns: source_desc.columns.clone(),
237            },
238            specific: source_desc.source.parser_config.clone(),
239        };
240
241        let mut files_in_progress: usize = 0;
242        let mut stream =
243            StreamReaderWithPause::<true, FileData>::new(upstream, stream::pending().boxed());
244
245        if is_pause_on_startup {
246            stream.pause_stream();
247        }
248
249        // For refreshable tables, always start fresh on recovery - no state restoration
250        // File queue is empty by default (no restoration from persistent state)
251
252        let mut list_finished = false;
253        let mut is_refreshing = false;
254        let mut file_queue = self.file_queue;
255
256        // Extract fields we'll need later
257        let actor_ctx = self.actor_ctx.clone();
258        let barrier_manager = self.barrier_manager.clone();
259        let rate_limit_rps = &mut self.rate_limit_rps;
260
261        let source_ctx = Arc::new(SourceContext::new(
262            actor_ctx.id,
263            core.source_id,
264            actor_ctx.fragment_id,
265            core.source_name.clone(),
266            source_desc.metrics.clone(),
267            SourceCtrlOpts {
268                chunk_size: limited_chunk_size(*rate_limit_rps),
269                split_txn: rate_limit_rps.is_some(),
270            },
271            source_desc.source.config.clone(),
272            None,
273        ));
274
275        while let Some(msg) = stream.next().await {
276            match msg {
277                Err(e) => {
278                    tracing::error!(error = %e.as_report(), "Fetch Error");
279                    files_in_progress = 0;
280                }
281                Ok(msg) => match msg {
282                    // Barrier messages from upstream
283                    Either::Left(msg) => match msg {
284                        Message::Barrier(barrier) => {
285                            let need_rebuild_reader = false;
286
287                            if let Some(mutation) = barrier.mutation.as_deref() {
288                                match mutation {
289                                    Mutation::Pause => stream.pause_stream(),
290                                    Mutation::Resume => stream.resume_stream(),
291                                    Mutation::RefreshStart {
292                                        associated_source_id,
293                                        ..
294                                    } if associated_source_id.as_raw_id()
295                                        == core.source_id.as_raw_id() =>
296                                    {
297                                        tracing::info!(
298                                            ?barrier.epoch,
299                                            actor_id = %actor_ctx.id,
300                                            source_id = %core.source_id,
301                                            queue_len = file_queue.len(),
302                                            files_in_progress,
303                                            "RefreshStart: clearing state and aborting workload"
304                                        );
305
306                                        // Clear all in-memory state
307                                        file_queue.clear();
308                                        files_in_progress = 0;
309                                        list_finished = false;
310                                        is_refreshing = true;
311
312                                        // Abort current file reader
313                                        stream.replace_data_stream(stream::pending().boxed());
314                                    }
315                                    Mutation::ListFinish {
316                                        associated_source_id,
317                                    } => {
318                                        // Check if this ListFinish is for our source
319                                        if associated_source_id.as_raw_id()
320                                            == core.source_id.as_raw_id()
321                                        {
322                                            tracing::info!(
323                                                ?barrier.epoch,
324                                                actor_id = %actor_ctx.id,
325                                                source_id = %core.source_id,
326                                                "received ListFinish mutation"
327                                            );
328                                            list_finished = true;
329                                        }
330                                    }
331                                    _ => (),
332                                }
333                            }
334
335                            let epoch = barrier.epoch;
336
337                            // Report load finished BEFORE yielding barrier when:
338                            // 1. All files have been processed (files_in_progress == 0 and file_queue is empty)
339                            // 2. ListFinish mutation has been received
340                            //
341                            // IMPORTANT: Must report BEFORE yield to ensure epoch is still in inflight_barriers.
342                            // If we yield first, the barrier worker may collect the barrier and remove the epoch
343                            // from inflight_barriers, causing the report to be ignored with a warning.
344                            if files_in_progress == 0
345                                && file_queue.is_empty()
346                                && list_finished
347                                && is_refreshing
348                                && barrier.is_checkpoint()
349                            {
350                                tracing::info!(
351                                    ?epoch,
352                                    actor_id = %actor_ctx.id,
353                                    source_id = %core.source_id,
354                                    "Reporting source load finished"
355                                );
356                                barrier_manager.report_source_load_finished(
357                                    epoch,
358                                    actor_ctx.id,
359                                    self.associated_table_id,
360                                    core.source_id,
361                                );
362                                // Reset the flag to avoid duplicate reports
363                                list_finished = false;
364                                is_refreshing = false;
365                            }
366
367                            // Propagate the barrier AFTER reporting progress.
368                            yield Message::Barrier(barrier);
369
370                            // Rebuild reader when all current files are processed
371                            if files_in_progress == 0 || need_rebuild_reader {
372                                Self::replace_with_new_batch_reader(
373                                    &mut files_in_progress,
374                                    &mut file_queue,
375                                    &mut stream,
376                                    properties.clone(),
377                                    parser_config.clone(),
378                                    source_ctx.clone(),
379                                )?;
380                            }
381                        }
382                        // Receiving file assignments from upstream list executor,
383                        // store into in-memory queue (no persistent state).
384                        Message::Chunk(chunk) => {
385                            for row in chunk.data_chunk().rows() {
386                                let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
387                                let split = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
388                                file_queue.push_back((file_name, split));
389                            }
390
391                            tracing::debug!(
392                                actor_id = %actor_ctx.id,
393                                queue_len = file_queue.len(),
394                                "Added file assignments to queue"
395                            );
396                        }
397                        Message::Watermark(_) => unreachable!(),
398                    },
399                    // Data from file reader
400                    Either::Right(FileData { chunks, file_path }) => {
401                        // Decrement counter after processing a file
402                        files_in_progress -= 1;
403                        tracing::debug!(
404                            file_path = ?file_path,
405                            "Processed file"
406                        );
407
408                        // Yield all chunks from the file
409                        for chunk in chunks {
410                            let chunk = prune_additional_cols(
411                                &chunk,
412                                &[split_idx, offset_idx],
413                                &source_desc.columns,
414                            );
415                            yield Message::Chunk(chunk);
416                        }
417                    }
418                },
419            }
420        }
421    }
422}
423
424impl<S: StateStore> Execute for BatchPosixFsFetchExecutor<S> {
425    fn execute(self: Box<Self>) -> BoxedMessageStream {
426        self.into_stream().boxed()
427    }
428}
429
430impl<S: StateStore> Debug for BatchPosixFsFetchExecutor<S> {
431    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
432        if let Some(core) = &self.stream_source_core {
433            f.debug_struct("BatchPosixFsFetchExecutor")
434                .field("source_id", &core.source_id)
435                .field("column_ids", &core.column_ids)
436                .finish()
437        } else {
438            f.debug_struct("BatchPosixFsFetchExecutor").finish()
439        }
440    }
441}