risingwave_connector/source/filesystem/opendal_source/
opendal_enumerator.rs1use std::marker::PhantomData;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use futures::stream::{self, BoxStream};
21use futures::{StreamExt, TryStreamExt};
22use opendal::Operator;
23use risingwave_common::types::Timestamptz;
24
25use super::OpendalSource;
26use crate::error::ConnectorResult;
27use crate::source::filesystem::file_common::CompressionFormat;
28use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
29use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
30
31#[derive(Debug, Clone)]
32pub struct OpendalEnumerator<Src: OpendalSource> {
33 pub op: Operator,
34 pub(crate) prefix: Option<String>,
36 pub(crate) matcher: Option<glob::Pattern>,
37 pub(crate) marker: PhantomData<Src>,
38 pub(crate) compression_format: CompressionFormat,
39}
40
41#[async_trait]
42impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
43 type Properties = Src::Properties;
44 type Split = OpendalFsSplit<Src>;
45
46 async fn new(
47 properties: Src::Properties,
48 _context: SourceEnumeratorContextRef,
49 ) -> ConnectorResult<Self> {
50 Src::new_enumerator(properties)
51 }
52
53 async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
54 let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
55 let prefix = self.prefix.as_deref().unwrap_or("/");
56 let list_prefix = Self::extract_list_prefix(prefix);
57
58 let mut lister = self.op.lister(&list_prefix).await?;
59 match lister.try_next().await {
61 Ok(_) => return Ok(vec![empty_split]),
62 Err(e) => {
63 return Err(anyhow!(e)
64 .context("fail to create source, please check your config.")
65 .into());
66 }
67 }
68 }
69}
70
71impl<Src: OpendalSource> OpendalEnumerator<Src> {
72 fn extract_list_prefix(prefix: &str) -> String {
76 if prefix.ends_with("/") {
77 prefix.to_owned()
78 } else if let Some(parent_pos) = prefix.rfind('/') {
79 prefix[..=parent_pos].to_owned()
80 } else {
81 "/".to_owned()
82 }
83 }
84
85 pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
86 let prefix = self.prefix.as_deref().unwrap_or("/");
87 let list_prefix = Self::extract_list_prefix(prefix);
88 let object_lister = self.op.lister_with(&list_prefix).recursive(true).await?;
89
90 let op = self.op.clone();
91 let full_capability = op.info().full_capability();
92 let stream = stream::unfold(object_lister, move |mut object_lister| {
93 let op = op.clone();
94
95 async move {
96 match object_lister.next().await {
97 Some(Ok(object)) => {
98 let name = object.path().to_owned();
99
100 let (t, size) = if !full_capability.list_has_content_length
102 || !full_capability.list_has_last_modified
103 {
104 let stat_meta = op.stat(&name).await.ok()?;
106 let t = match stat_meta.last_modified() {
107 Some(t) => t,
108 None => DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default(),
109 };
110 let size = stat_meta.content_length() as i64;
111 (t, size)
112 } else {
113 let meta = object.metadata();
115 let t = match meta.last_modified() {
116 Some(t) => t,
117 None => DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default(),
118 };
119 let size = meta.content_length() as i64;
120 (t, size)
121 };
122
123 let timestamp = Timestamptz::from(t);
124 let metadata = FsPageItem {
125 name,
126 size,
127 timestamp,
128 };
129 Some((Ok(metadata), object_lister))
130 }
131 Some(Err(err)) => Some((Err(err.into()), object_lister)),
132 None => {
133 tracing::info!("list object completed.");
134 None
135 }
136 }
137 }
138 });
139
140 Ok(stream.boxed())
141 }
142
143 pub fn get_matcher(&self) -> &Option<glob::Pattern> {
144 &self.matcher
145 }
146
147 pub fn get_prefix(&self) -> &str {
148 self.prefix.as_deref().unwrap_or("/")
149 }
150}
151pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::source::filesystem::opendal_source::OpendalS3;
157 use crate::source::filesystem::s3::enumerator::get_prefix;
158
159 fn calculate_list_prefix(prefix: &str) -> String {
160 OpendalEnumerator::<OpendalS3>::extract_list_prefix(prefix)
161 }
162
163 #[test]
164 fn test_prefix_logic() {
165 let test_cases = vec![
166 ("a/b/c/hello*/*.json", "a/b/c/hello", "a/b/c/"),
167 ("a/b/c.json", "a/b/c.json", "a/b/"),
168 ("a/b/c/", "a/b/c/", "a/b/c/"),
169 ("a/b/c", "a/b/c", "a/b/"),
170 ("file.json", "file.json", "/"),
171 ("*.json", "", "/"),
172 ("a/b/c/[h]ello*/*.json", "a/b/c/", "a/b/c/"),
173 ];
174
175 for (pattern, expected_prefix, expected_list_prefix) in test_cases {
176 let prefix = get_prefix(pattern);
177 let list_prefix = calculate_list_prefix(&prefix);
178
179 assert_eq!(
180 prefix, expected_prefix,
181 "get_prefix failed for: {}",
182 pattern
183 );
184 assert_eq!(
185 list_prefix, expected_list_prefix,
186 "list_prefix failed for: {}",
187 pattern
188 );
189 }
190 }
191
192 #[test]
193 fn test_bug_fix() {
194 let problematic_pattern = "a/b/c/hello*/*.json";
195 let prefix = get_prefix(problematic_pattern);
196 let list_prefix = calculate_list_prefix(&prefix);
197
198 assert_eq!(prefix, "a/b/c/hello");
201 assert_eq!(list_prefix, "a/b/c/");
202 }
203}