risingwave_connector/source/filesystem/opendal_source/
batch_posix_fs_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use risingwave_common::types::JsonbVal;
20use serde::{Deserialize, Serialize};
21
22use super::BatchPosixFsProperties;
23use crate::error::ConnectorResult;
24use crate::parser::ParserConfig;
25use crate::source::batch::BatchSourceSplit;
26use crate::source::{
27    BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SplitEnumerator,
28    SplitId, SplitMetaData, SplitReader,
29};
30
31/// Batch Posix fs source for refreshable tables. (For testing only)
32/// Unlike regular `posix_fs`, this connector only lists files on demand (during refresh),
33/// not continuously. This makes it suitable for refreshable table functionality.
34///
35/// Split representing a single file to be read once
36#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
37pub struct BatchPosixFsSplit {
38    /// For batch posix fs, this is always the root directory. The reader will
39    /// scan all files in this directory.
40    pub file_path: String,
41    /// A unique identifier for the split, typically including a timestamp to force refresh.
42    pub split_id: SplitId,
43    /// Whether this split has finished reading all data (used for batch sources)
44    /// See [`BatchSourceSplit`] for details about recovery.
45    #[serde(skip)]
46    pub finished: bool,
47}
48
49impl SplitMetaData for BatchPosixFsSplit {
50    fn id(&self) -> SplitId {
51        self.split_id.clone()
52    }
53
54    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
55        serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
56    }
57
58    fn encode_to_json(&self) -> JsonbVal {
59        serde_json::to_value(self.clone()).unwrap().into()
60    }
61
62    fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
63        // Batch source doesn't use offsets - each file is read completely once
64        Ok(())
65    }
66}
67
68impl BatchSourceSplit for BatchPosixFsSplit {
69    fn finished(&self) -> bool {
70        self.finished
71    }
72
73    fn finish(&mut self) {
74        self.finished = true;
75    }
76
77    fn refresh(&mut self) {
78        self.finished = false;
79    }
80}
81
82impl BatchPosixFsSplit {
83    pub fn new(file_path: String, split_id: SplitId) -> Self {
84        Self {
85            file_path,
86            split_id,
87            finished: false,
88        }
89    }
90
91    pub fn mark_finished(&mut self) {
92        self.finished = true;
93    }
94}
95
96/// Enumerator for batch posix fs source
97#[derive(Debug)]
98pub struct BatchPosixFsEnumerator {
99    properties: BatchPosixFsProperties,
100}
101
102#[async_trait]
103impl SplitEnumerator for BatchPosixFsEnumerator {
104    type Properties = BatchPosixFsProperties;
105    type Split = BatchPosixFsSplit;
106
107    async fn new(
108        properties: Self::Properties,
109        _context: SourceEnumeratorContextRef,
110    ) -> ConnectorResult<Self> {
111        Ok(Self { properties })
112    }
113
114    async fn list_splits(&mut self) -> ConnectorResult<Vec<BatchPosixFsSplit>> {
115        // dummy list, just return one split
116        let root_path = Path::new(&self.properties.root);
117
118        if !root_path.exists() {
119            return Err(anyhow!("Root directory does not exist: {}", self.properties.root).into());
120        }
121
122        // For batch source, we return exactly one split representing all files to be processed.
123        Ok(vec![BatchPosixFsSplit::new(
124            self.properties.root.clone(), // file_path is the root
125            "114514".into(),              // split_id does not matter
126        )])
127    }
128}
129
130/// Reader for batch posix fs source
131#[derive(Debug)]
132pub struct BatchPosixFsReader {}
133
134#[async_trait]
135impl SplitReader for BatchPosixFsReader {
136    type Properties = BatchPosixFsProperties;
137    type Split = BatchPosixFsSplit;
138
139    async fn new(
140        _properties: Self::Properties,
141        _splits: Vec<Self::Split>,
142        _parser_config: ParserConfig,
143        _source_ctx: SourceContextRef,
144        _columns: Option<Vec<Column>>,
145    ) -> ConnectorResult<Self> {
146        return Err(anyhow!("BatchPosixFsReader should not be used").into());
147    }
148
149    fn into_stream(self) -> BoxSourceChunkStream {
150        unreachable!(
151            "BatchPosixFsReader should not hit this branch. refer to `batch_posix_fs_list.rs`, `batch_posix_fs_fetch.rs`"
152        );
153    }
154}