risingwave_connector/source/filesystem/opendal_source/
opendal_enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use futures::stream::{self, BoxStream};
21use futures::{StreamExt, TryStreamExt};
22use opendal::Operator;
23use risingwave_common::types::Timestamptz;
24
25use super::OpendalSource;
26use crate::error::ConnectorResult;
27use crate::source::filesystem::file_common::CompressionFormat;
28use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
29use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
30
31#[derive(Debug, Clone)]
32pub struct OpendalEnumerator<Src: OpendalSource> {
33    pub op: Operator,
34    // prefix is used to reduce the number of objects to be listed
35    pub(crate) prefix: Option<String>,
36    pub(crate) matcher: Option<glob::Pattern>,
37    pub(crate) marker: PhantomData<Src>,
38    pub(crate) compression_format: CompressionFormat,
39}
40
41#[async_trait]
42impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
43    type Properties = Src::Properties;
44    type Split = OpendalFsSplit<Src>;
45
46    async fn new(
47        properties: Src::Properties,
48        _context: SourceEnumeratorContextRef,
49    ) -> ConnectorResult<Self> {
50        Src::new_enumerator(properties)
51    }
52
53    async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
54        let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
55        let prefix = self.prefix.as_deref().unwrap_or("/");
56        let list_prefix = Self::extract_list_prefix(prefix);
57
58        let mut lister = self.op.lister(&list_prefix).await?;
59        // fetch one item as validation, no need to get all
60        match lister.try_next().await {
61            Ok(_) => return Ok(vec![empty_split]),
62            Err(e) => {
63                return Err(anyhow!(e)
64                    .context("fail to create source, please check your config.")
65                    .into());
66            }
67        }
68    }
69}
70
71impl<Src: OpendalSource> OpendalEnumerator<Src> {
72    /// Extract the directory to list from a prefix.
73    /// If prefix ends with "/", use it as-is (directory).
74    /// Otherwise, extract parent directory.
75    fn extract_list_prefix(prefix: &str) -> String {
76        if prefix.ends_with("/") {
77            prefix.to_owned()
78        } else if let Some(parent_pos) = prefix.rfind('/') {
79            prefix[..=parent_pos].to_owned()
80        } else {
81            "/".to_owned()
82        }
83    }
84
85    pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
86        let prefix = self.prefix.as_deref().unwrap_or("/");
87        let list_prefix = Self::extract_list_prefix(prefix);
88        let object_lister = self.op.lister_with(&list_prefix).recursive(true).await?;
89
90        let op = self.op.clone();
91        let full_capability = op.info().full_capability();
92        let stream = stream::unfold(object_lister, move |mut object_lister| {
93            let op = op.clone();
94
95            async move {
96                match object_lister.next().await {
97                    Some(Ok(object)) => {
98                        let name = object.path().to_owned();
99
100                        // Check if we need to call stat() to get complete metadata
101                        let (t, size) = if !full_capability.list_has_content_length
102                            || !full_capability.list_has_last_modified
103                        {
104                            // Need complete metadata, call stat()
105                            let stat_meta = op.stat(&name).await.ok()?;
106                            let t = match stat_meta.last_modified() {
107                                Some(t) => t,
108                                None => DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default(),
109                            };
110                            let size = stat_meta.content_length() as i64;
111                            (t, size)
112                        } else {
113                            // Use metadata from list operation
114                            let meta = object.metadata();
115                            let t = match meta.last_modified() {
116                                Some(t) => t,
117                                None => DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default(),
118                            };
119                            let size = meta.content_length() as i64;
120                            (t, size)
121                        };
122
123                        let timestamp = Timestamptz::from(t);
124                        let metadata = FsPageItem {
125                            name,
126                            size,
127                            timestamp,
128                        };
129                        Some((Ok(metadata), object_lister))
130                    }
131                    Some(Err(err)) => Some((Err(err.into()), object_lister)),
132                    None => {
133                        tracing::info!("list object completed.");
134                        None
135                    }
136                }
137            }
138        });
139
140        Ok(stream.boxed())
141    }
142
143    pub fn get_matcher(&self) -> &Option<glob::Pattern> {
144        &self.matcher
145    }
146
147    pub fn get_prefix(&self) -> &str {
148        self.prefix.as_deref().unwrap_or("/")
149    }
150}
151pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::source::filesystem::opendal_source::OpendalS3;
157    use crate::source::filesystem::s3::enumerator::get_prefix;
158
159    fn calculate_list_prefix(prefix: &str) -> String {
160        OpendalEnumerator::<OpendalS3>::extract_list_prefix(prefix)
161    }
162
163    #[test]
164    fn test_prefix_logic() {
165        let test_cases = vec![
166            ("a/b/c/hello*/*.json", "a/b/c/hello", "a/b/c/"),
167            ("a/b/c.json", "a/b/c.json", "a/b/"),
168            ("a/b/c/", "a/b/c/", "a/b/c/"),
169            ("a/b/c", "a/b/c", "a/b/"),
170            ("file.json", "file.json", "/"),
171            ("*.json", "", "/"),
172            ("a/b/c/[h]ello*/*.json", "a/b/c/", "a/b/c/"),
173        ];
174
175        for (pattern, expected_prefix, expected_list_prefix) in test_cases {
176            let prefix = get_prefix(pattern);
177            let list_prefix = calculate_list_prefix(&prefix);
178
179            assert_eq!(
180                prefix, expected_prefix,
181                "get_prefix failed for: {}",
182                pattern
183            );
184            assert_eq!(
185                list_prefix, expected_list_prefix,
186                "list_prefix failed for: {}",
187                pattern
188            );
189        }
190    }
191
192    #[test]
193    fn test_bug_fix() {
194        let problematic_pattern = "a/b/c/hello*/*.json";
195        let prefix = get_prefix(problematic_pattern);
196        let list_prefix = calculate_list_prefix(&prefix);
197
198        // Before fix: would fallback to "/"
199        // After fix: should use parent directory "a/b/c/"
200        assert_eq!(prefix, "a/b/c/hello");
201        assert_eq!(list_prefix, "a/b/c/");
202    }
203}