risingwave_connector/source/filesystem/opendal_source/posix_fs_source.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::Fs;
21
22use super::opendal_enumerator::OpendalEnumerator;
23use super::{OpendalSource, PosixFsProperties};
24use crate::error::ConnectorResult;
25
26// Posix fs source should only be used for testing.
27// For a single-CN cluster, the behavior is well-defined. It will read from the local file system.
28// For a multi-CN cluster, each CN will read from its own local file system under the given directory.
29
30impl<Src: OpendalSource> OpendalEnumerator<Src> {
31 /// create opendal posix fs source.
32 pub fn new_posix_fs_source(posix_fs_properties: PosixFsProperties) -> ConnectorResult<Self> {
33 // Create Fs builder.
34 let builder = Fs::default().root(&posix_fs_properties.root);
35 let op: Operator = Operator::new(builder)?
36 .layer(LoggingLayer::default())
37 .layer(RetryLayer::default())
38 .finish();
39
40 let (prefix, matcher) = if let Some(pattern) = posix_fs_properties.match_pattern.as_ref() {
41 // TODO(Kexiang): Currently, FsListnenr in opendal does not support a prefix. (Seems a bug in opendal)
42 // So we assign prefix to empty string.
43 let matcher = glob::Pattern::new(pattern)
44 .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
45 (Some(String::new()), Some(matcher))
46 } else {
47 (None, None)
48 };
49 let compression_format = posix_fs_properties.compression_format;
50 Ok(Self {
51 op,
52 prefix,
53 matcher,
54 marker: PhantomData,
55 compression_format,
56 })
57 }
58}