risingwave_connector/source/filesystem/opendal_source/
batch_posix_fs_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::BufRead;
16use std::path::Path;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use futures::future::BoxFuture;
21use futures_async_stream::try_stream;
22use glob;
23use risingwave_common::array::StreamChunk;
24use risingwave_common::types::JsonbVal;
25use serde::{Deserialize, Serialize};
26use tokio::fs;
27
28use super::BatchPosixFsProperties;
29use crate::error::ConnectorResult;
30use crate::parser::{ByteStreamSourceParserImpl, ParserConfig};
31use crate::source::batch::BatchSourceSplit;
32use crate::source::{
33    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceMessage,
34    SourceMeta, SplitEnumerator, SplitId, SplitMetaData, SplitReader,
35};
36
37/// Batch Posix fs source for refreshable tables. (For testing only)
38/// Unlike regular `posix_fs`, this connector only lists files on demand (during refresh),
39/// not continuously. This makes it suitable for refreshable table functionality.
40///
41/// Split representing a single file to be read once
42#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
43pub struct BatchPosixFsSplit {
44    /// For batch posix fs, this is always the root directory. The reader will
45    /// scan all files in this directory.
46    pub file_path: String,
47    /// A unique identifier for the split, typically including a timestamp to force refresh.
48    pub split_id: SplitId,
49    /// Whether this split has finished reading all data (used for batch sources)
50    /// See [`BatchSourceSplit`] for details about recovery.
51    #[serde(skip)]
52    pub finished: bool,
53}
54
55impl SplitMetaData for BatchPosixFsSplit {
56    fn id(&self) -> SplitId {
57        self.split_id.clone()
58    }
59
60    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
61        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
62    }
63
64    fn encode_to_json(&self) -> JsonbVal {
65        serde_json::to_value(self.clone()).unwrap().into()
66    }
67
68    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
69        // Batch source doesn't use offsets - each file is read completely once
70        Ok(())
71    }
72}
73
74impl BatchSourceSplit for BatchPosixFsSplit {
75    fn finished(&self) -> bool {
76        self.finished
77    }
78
79    fn finish(&mut self) {
80        self.finished = true;
81    }
82
83    fn refresh(&mut self) {
84        self.finished = false;
85    }
86}
87
88impl BatchPosixFsSplit {
89    pub fn new(file_path: String, split_id: SplitId) -> Self {
90        Self {
91            file_path,
92            split_id,
93            finished: false,
94        }
95    }
96
97    pub fn mark_finished(&mut self) {
98        self.finished = true;
99    }
100}
101
102/// Enumerator for batch posix fs source
103#[derive(Debug)]
104pub struct BatchPosixFsEnumerator {
105    properties: BatchPosixFsProperties,
106}
107
108#[async_trait]
109impl SplitEnumerator for BatchPosixFsEnumerator {
110    type Properties = BatchPosixFsProperties;
111    type Split = BatchPosixFsSplit;
112
113    async fn new(
114        properties: Self::Properties,
115        _context: SourceEnumeratorContextRef,
116    ) -> ConnectorResult<Self> {
117        Ok(Self { properties })
118    }
119
120    async fn list_splits(&mut self) -> ConnectorResult<Vec<BatchPosixFsSplit>> {
121        let root_path = Path::new(&self.properties.root);
122
123        if !root_path.exists() {
124            return Err(anyhow!("Root directory does not exist: {}", self.properties.root).into());
125        }
126
127        // For batch source, we return exactly one split representing all files to be processed.
128        Ok(vec![BatchPosixFsSplit::new(
129            self.properties.root.clone(), // file_path is the root
130            "114514".into(),              // split_id does not matter
131        )])
132    }
133}
134
135/// Reader for batch posix fs source
136#[derive(Debug)]
137pub struct BatchPosixFsReader {
138    properties: BatchPosixFsProperties,
139    splits: Vec<BatchPosixFsSplit>,
140    parser_config: ParserConfig,
141    source_ctx: SourceContextRef,
142}
143
144#[async_trait]
145impl SplitReader for BatchPosixFsReader {
146    type Properties = BatchPosixFsProperties;
147    type Split = BatchPosixFsSplit;
148
149    async fn new(
150        properties: Self::Properties,
151        splits: Vec<Self::Split>,
152        parser_config: ParserConfig,
153        source_ctx: SourceContextRef,
154        _columns: Option<Vec<Column>>,
155    ) -> ConnectorResult<Self> {
156        Ok(Self {
157            properties,
158            splits,
159            parser_config,
160            source_ctx,
161        })
162    }
163
164    fn into_stream(self) -> BoxSourceChunkStream {
165        self.into_stream_inner()
166    }
167}
168
169impl BatchPosixFsReader {
170    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
171    async fn into_stream_inner(self) {
172        for split in &self.splits {
173            let files = self.collect_files_for_split(split).await?;
174            tracing::debug!(?files, ?split, "BatchPosixFsReader: colleted files");
175
176            for file_path in files {
177                let full_path = Path::new(&self.properties.root).join(&file_path);
178
179                // Read the entire file at once
180                let content = fs::read(&full_path)
181                    .await
182                    .with_context(|| format!("Failed to read file: {}", full_path.display()))?;
183
184                if content.is_empty() {
185                    continue;
186                }
187
188                // This is test-only, so we just be simple here...
189                for line in content.lines() {
190                    let line = line?;
191                    // Create a message for each line
192                    let message = SourceMessage {
193                        key: None,
194                        payload: Some(line.as_bytes().to_vec()),
195                        offset: "0".to_owned(), // Single read, no offset needed
196                        split_id: split.id(),
197                        meta: SourceMeta::Empty,
198                    };
199
200                    // Parse the content
201                    let parser = ByteStreamSourceParserImpl::create(
202                        self.parser_config.clone(),
203                        self.source_ctx.clone(),
204                    )
205                    .await?;
206                    let chunk_stream = parser
207                        .parse_stream(Box::pin(futures::stream::once(async { Ok(vec![message]) })));
208
209                    #[for_await]
210                    for chunk in chunk_stream {
211                        yield chunk?;
212                    }
213                }
214            }
215        }
216
217        // Log completion for debugging
218        tracing::info!("BatchPosixFs has finished reading all files");
219    }
220
221    async fn collect_files_for_split(
222        &self,
223        _split: &BatchPosixFsSplit,
224    ) -> ConnectorResult<Vec<String>> {
225        let root_path = Path::new(&self.properties.root);
226        let mut files = Vec::new();
227
228        let pattern = self
229            .properties
230            .match_pattern
231            .as_ref()
232            .map(|p| glob::Pattern::new(p))
233            .transpose()
234            .with_context(|| {
235                format!("Invalid match_pattern: {:?}", self.properties.match_pattern)
236            })?;
237
238        Self::collect_files_recursive(root_path, root_path, &pattern, &mut files).await?;
239        Ok(files)
240    }
241
242    fn collect_files_recursive<'a>(
243        current_dir: &'a Path,
244        root_path: &'a Path,
245        pattern: &'a Option<glob::Pattern>,
246        files: &'a mut Vec<String>,
247    ) -> BoxFuture<'a, ConnectorResult<()>> {
248        Box::pin(async move {
249            let mut entries = fs::read_dir(current_dir)
250                .await
251                .with_context(|| format!("Failed to read directory: {}", current_dir.display()))?;
252
253            while let Some(entry) = entries.next_entry().await.with_context(|| {
254                format!(
255                    "Failed to read directory entry in: {}",
256                    current_dir.display()
257                )
258            })? {
259                let path = entry.path();
260
261                if path.is_dir() {
262                    // Recursively process subdirectories
263                    Self::collect_files_recursive(&path, root_path, pattern, files).await?;
264                } else if path.is_file() {
265                    let relative_path = path.strip_prefix(root_path).with_context(|| {
266                        format!("Failed to get relative path for: {}", path.display())
267                    })?;
268
269                    let relative_path_str = relative_path.to_string_lossy().to_string();
270
271                    // Check if file matches the pattern (if specified)
272                    if let Some(pattern) = pattern {
273                        if pattern.matches(&relative_path_str) {
274                            files.push(relative_path_str);
275                        }
276                    } else {
277                        files.push(relative_path_str);
278                    }
279                }
280            }
281
282            Ok(())
283        })
284    }
285}