risingwave_connector/source/filesystem/opendal_source/
opendal_enumerator.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use futures::stream::{self, BoxStream};
21use futures::{StreamExt, TryStreamExt};
22use opendal::Operator;
23use risingwave_common::types::Timestamptz;
24
25use super::OpendalSource;
26use crate::error::ConnectorResult;
27use crate::source::filesystem::file_common::CompressionFormat;
28use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
29use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
30
31#[inline]
32fn to_timestamptz(ts: Option<opendal::raw::Timestamp>) -> Timestamptz {
33    let system_time = ts
34        .map(std::time::SystemTime::from)
35        .unwrap_or(std::time::UNIX_EPOCH);
36    Timestamptz::from(DateTime::<Utc>::from(system_time))
37}
38
39#[derive(Debug, Clone)]
40pub struct OpendalEnumerator<Src: OpendalSource> {
41    pub op: Operator,
42    // prefix is used to reduce the number of objects to be listed
43    pub(crate) prefix: Option<String>,
44    pub(crate) matcher: Option<glob::Pattern>,
45    pub(crate) marker: PhantomData<Src>,
46    pub(crate) compression_format: CompressionFormat,
47}
48
49#[async_trait]
50impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
51    type Properties = Src::Properties;
52    type Split = OpendalFsSplit<Src>;
53
54    async fn new(
55        properties: Src::Properties,
56        _context: SourceEnumeratorContextRef,
57    ) -> ConnectorResult<Self> {
58        Src::new_enumerator(properties)
59    }
60
61    async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
62        let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
63        let prefix = self.prefix.as_deref().unwrap_or("/");
64        let list_prefix = Self::extract_list_prefix(prefix);
65
66        let mut lister = self.op.lister(&list_prefix).await?;
67        // fetch one item as validation, no need to get all
68        match lister.try_next().await {
69            Ok(_) => return Ok(vec![empty_split]),
70            Err(e) => {
71                return Err(anyhow!(e)
72                    .context("fail to create source, please check your config.")
73                    .into());
74            }
75        }
76    }
77}
78
79impl<Src: OpendalSource> OpendalEnumerator<Src> {
80    /// Extract the directory to list from a prefix.
81    /// If prefix ends with "/", use it as-is (directory).
82    /// Otherwise, extract parent directory.
83    fn extract_list_prefix(prefix: &str) -> String {
84        if prefix.ends_with("/") {
85            prefix.to_owned()
86        } else if let Some(parent_pos) = prefix.rfind('/') {
87            prefix[..=parent_pos].to_owned()
88        } else {
89            "/".to_owned()
90        }
91    }
92
93    pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
94        let prefix = self.prefix.as_deref().unwrap_or("/");
95        let list_prefix = Self::extract_list_prefix(prefix);
96        let object_lister = self.op.lister_with(&list_prefix).recursive(true).await?;
97
98        let op = self.op.clone();
99        let stream = stream::unfold(object_lister, move |mut object_lister| {
100            let op = op.clone();
101
102            async move {
103                match object_lister.next().await {
104                    Some(Ok(object)) => {
105                        let name = object.path().to_owned();
106
107                        // OpenDAL 0.55 removed list metadata capability flags and reports
108                        // unknown content length as 0. Use listed metadata first, and call
109                        // stat() if timestamp is missing or size is 0 to avoid treating
110                        // unknown sizes as real zero-byte objects.
111                        let meta = object.metadata();
112                        let mut t = meta.last_modified();
113                        let mut size = meta.content_length() as i64;
114                        if t.is_none() || size == 0 {
115                            let stat_meta = op.stat(&name).await.ok()?;
116                            t = stat_meta.last_modified();
117                            size = stat_meta.content_length() as i64;
118                        }
119
120                        let timestamp = to_timestamptz(t);
121                        let metadata = FsPageItem {
122                            name,
123                            size,
124                            timestamp,
125                        };
126                        Some((Ok(metadata), object_lister))
127                    }
128                    Some(Err(err)) => Some((Err(err.into()), object_lister)),
129                    None => {
130                        tracing::info!("list object completed.");
131                        None
132                    }
133                }
134            }
135        });
136
137        Ok(stream.boxed())
138    }
139
140    pub fn get_matcher(&self) -> &Option<glob::Pattern> {
141        &self.matcher
142    }
143
144    pub fn get_prefix(&self) -> &str {
145        self.prefix.as_deref().unwrap_or("/")
146    }
147}
148pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use crate::source::filesystem::opendal_source::OpendalS3;
154    use crate::source::filesystem::s3::enumerator::get_prefix;
155
156    fn calculate_list_prefix(prefix: &str) -> String {
157        OpendalEnumerator::<OpendalS3>::extract_list_prefix(prefix)
158    }
159
160    #[test]
161    fn test_prefix_logic() {
162        let test_cases = vec![
163            ("a/b/c/hello*/*.json", "a/b/c/hello", "a/b/c/"),
164            ("a/b/c.json", "a/b/c.json", "a/b/"),
165            ("a/b/c/", "a/b/c/", "a/b/c/"),
166            ("a/b/c", "a/b/c", "a/b/"),
167            ("file.json", "file.json", "/"),
168            ("*.json", "", "/"),
169            ("a/b/c/[h]ello*/*.json", "a/b/c/", "a/b/c/"),
170        ];
171
172        for (pattern, expected_prefix, expected_list_prefix) in test_cases {
173            let prefix = get_prefix(pattern);
174            let list_prefix = calculate_list_prefix(&prefix);
175
176            assert_eq!(
177                prefix, expected_prefix,
178                "get_prefix failed for: {}",
179                pattern
180            );
181            assert_eq!(
182                list_prefix, expected_list_prefix,
183                "list_prefix failed for: {}",
184                pattern
185            );
186        }
187    }
188
189    #[test]
190    fn test_bug_fix() {
191        let problematic_pattern = "a/b/c/hello*/*.json";
192        let prefix = get_prefix(problematic_pattern);
193        let list_prefix = calculate_list_prefix(&prefix);
194
195        // Before fix: would fallback to "/"
196        // After fix: should use parent directory "a/b/c/"
197        assert_eq!(prefix, "a/b/c/hello");
198        assert_eq!(list_prefix, "a/b/c/");
199    }
200}