risingwave_connector/source/filesystem/opendal_source/
opendal_enumerator.rs1use std::marker::PhantomData;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use futures::stream::{self, BoxStream};
21use futures::{StreamExt, TryStreamExt};
22use opendal::Operator;
23use risingwave_common::types::Timestamptz;
24
25use super::OpendalSource;
26use crate::error::ConnectorResult;
27use crate::source::filesystem::file_common::CompressionFormat;
28use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
29use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
30
31#[inline]
32fn to_timestamptz(ts: Option<opendal::raw::Timestamp>) -> Timestamptz {
33 let system_time = ts
34 .map(std::time::SystemTime::from)
35 .unwrap_or(std::time::UNIX_EPOCH);
36 Timestamptz::from(DateTime::<Utc>::from(system_time))
37}
38
39#[derive(Debug, Clone)]
40pub struct OpendalEnumerator<Src: OpendalSource> {
41 pub op: Operator,
42 pub(crate) prefix: Option<String>,
44 pub(crate) matcher: Option<glob::Pattern>,
45 pub(crate) marker: PhantomData<Src>,
46 pub(crate) compression_format: CompressionFormat,
47}
48
49#[async_trait]
50impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
51 type Properties = Src::Properties;
52 type Split = OpendalFsSplit<Src>;
53
54 async fn new(
55 properties: Src::Properties,
56 _context: SourceEnumeratorContextRef,
57 ) -> ConnectorResult<Self> {
58 Src::new_enumerator(properties)
59 }
60
61 async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
62 let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
63 let prefix = self.prefix.as_deref().unwrap_or("/");
64 let list_prefix = Self::extract_list_prefix(prefix);
65
66 let mut lister = self.op.lister(&list_prefix).await?;
67 match lister.try_next().await {
69 Ok(_) => return Ok(vec![empty_split]),
70 Err(e) => {
71 return Err(anyhow!(e)
72 .context("fail to create source, please check your config.")
73 .into());
74 }
75 }
76 }
77}
78
79impl<Src: OpendalSource> OpendalEnumerator<Src> {
80 fn extract_list_prefix(prefix: &str) -> String {
84 if prefix.ends_with("/") {
85 prefix.to_owned()
86 } else if let Some(parent_pos) = prefix.rfind('/') {
87 prefix[..=parent_pos].to_owned()
88 } else {
89 "/".to_owned()
90 }
91 }
92
93 pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
94 let prefix = self.prefix.as_deref().unwrap_or("/");
95 let list_prefix = Self::extract_list_prefix(prefix);
96 let object_lister = self.op.lister_with(&list_prefix).recursive(true).await?;
97
98 let op = self.op.clone();
99 let stream = stream::unfold(object_lister, move |mut object_lister| {
100 let op = op.clone();
101
102 async move {
103 match object_lister.next().await {
104 Some(Ok(object)) => {
105 let name = object.path().to_owned();
106
107 let meta = object.metadata();
112 let mut t = meta.last_modified();
113 let mut size = meta.content_length() as i64;
114 if t.is_none() || size == 0 {
115 let stat_meta = op.stat(&name).await.ok()?;
116 t = stat_meta.last_modified();
117 size = stat_meta.content_length() as i64;
118 }
119
120 let timestamp = to_timestamptz(t);
121 let metadata = FsPageItem {
122 name,
123 size,
124 timestamp,
125 };
126 Some((Ok(metadata), object_lister))
127 }
128 Some(Err(err)) => Some((Err(err.into()), object_lister)),
129 None => {
130 tracing::info!("list object completed.");
131 None
132 }
133 }
134 }
135 });
136
137 Ok(stream.boxed())
138 }
139
140 pub fn get_matcher(&self) -> &Option<glob::Pattern> {
141 &self.matcher
142 }
143
144 pub fn get_prefix(&self) -> &str {
145 self.prefix.as_deref().unwrap_or("/")
146 }
147}
148pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use crate::source::filesystem::opendal_source::OpendalS3;
154 use crate::source::filesystem::s3::enumerator::get_prefix;
155
156 fn calculate_list_prefix(prefix: &str) -> String {
157 OpendalEnumerator::<OpendalS3>::extract_list_prefix(prefix)
158 }
159
160 #[test]
161 fn test_prefix_logic() {
162 let test_cases = vec![
163 ("a/b/c/hello*/*.json", "a/b/c/hello", "a/b/c/"),
164 ("a/b/c.json", "a/b/c.json", "a/b/"),
165 ("a/b/c/", "a/b/c/", "a/b/c/"),
166 ("a/b/c", "a/b/c", "a/b/"),
167 ("file.json", "file.json", "/"),
168 ("*.json", "", "/"),
169 ("a/b/c/[h]ello*/*.json", "a/b/c/", "a/b/c/"),
170 ];
171
172 for (pattern, expected_prefix, expected_list_prefix) in test_cases {
173 let prefix = get_prefix(pattern);
174 let list_prefix = calculate_list_prefix(&prefix);
175
176 assert_eq!(
177 prefix, expected_prefix,
178 "get_prefix failed for: {}",
179 pattern
180 );
181 assert_eq!(
182 list_prefix, expected_list_prefix,
183 "list_prefix failed for: {}",
184 pattern
185 );
186 }
187 }
188
189 #[test]
190 fn test_bug_fix() {
191 let problematic_pattern = "a/b/c/hello*/*.json";
192 let prefix = get_prefix(problematic_pattern);
193 let list_prefix = calculate_list_prefix(&prefix);
194
195 assert_eq!(prefix, "a/b/c/hello");
198 assert_eq!(list_prefix, "a/b/c/");
199 }
200}