risingwave_connector/source/filesystem/opendal_source/
posix_fs_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::Fs;
21
22use super::opendal_enumerator::OpendalEnumerator;
23use super::{OpendalSource, PosixFsProperties};
24use crate::error::ConnectorResult;
25
26// Posix fs source should only be used for testing.
27// For a single-CN cluster, the behavior is well-defined. It will read from the local file system.
28// For a multi-CN cluster, each CN will read from its own local file system under the given directory.
29
30impl<Src: OpendalSource> OpendalEnumerator<Src> {
31    /// create opendal posix fs source.
32    pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> ConnectorResult<Self> {
33        // Create Fs builder.
34        let builder = Fs::default().root(&posix_fs_properties.root);
35        let op: Operator = Operator::new(builder)?
36            .layer(LoggingLayer::default())
37            .layer(RetryLayer::default())
38            .finish();
39
40        let (prefix, matcher) = if let Some(pattern) = posix_fs_properties.match_pattern.as_ref() {
41            // TODO(Kexiang): Currently, FsListnenr in opendal does not support a prefix. (Seems a bug in opendal)
42            // So we assign prefix to empty string.
43            let matcher = glob::Pattern::new(pattern)
44                .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
45            (Some(String::new()), Some(matcher))
46        } else {
47            (None, None)
48        };
49        let compression_format = posix_fs_properties.compression_format;
50        Ok(Self {
51            op,
52            prefix,
53            matcher,
54            marker: PhantomData,
55            compression_format,
56        })
57    }
58}