risingwave_connector/source/filesystem/opendal_source/
opendal_enumerator.rs1use std::marker::PhantomData;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use futures::stream::{self, BoxStream};
21use futures::{StreamExt, TryStreamExt};
22use opendal::{Metakey, Operator};
23use risingwave_common::types::Timestamptz;
24
25use super::OpendalSource;
26use crate::error::ConnectorResult;
27use crate::source::filesystem::file_common::CompressionFormat;
28use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
29use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
30
31#[derive(Debug, Clone)]
32pub struct OpendalEnumerator<Src: OpendalSource> {
33 pub op: Operator,
34 pub(crate) prefix: Option<String>,
36 pub(crate) matcher: Option<glob::Pattern>,
37 pub(crate) marker: PhantomData<Src>,
38 pub(crate) compression_format: CompressionFormat,
39}
40
41#[async_trait]
42impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
43 type Properties = Src::Properties;
44 type Split = OpendalFsSplit<Src>;
45
46 async fn new(
47 properties: Src::Properties,
48 _context: SourceEnumeratorContextRef,
49 ) -> ConnectorResult<Self> {
50 Src::new_enumerator(properties)
51 }
52
53 async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
54 let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
55 let prefix = self.prefix.as_deref().unwrap_or("/");
56 let list_prefix = Self::extract_list_prefix(prefix);
57
58 let mut lister = self.op.lister(&list_prefix).await?;
59 match lister.try_next().await {
61 Ok(_) => return Ok(vec![empty_split]),
62 Err(e) => {
63 return Err(anyhow!(e)
64 .context("fail to create source, please check your config.")
65 .into());
66 }
67 }
68 }
69}
70
71impl<Src: OpendalSource> OpendalEnumerator<Src> {
72 fn extract_list_prefix(prefix: &str) -> String {
76 if prefix.ends_with("/") {
77 prefix.to_owned()
78 } else if let Some(parent_pos) = prefix.rfind('/') {
79 prefix[..=parent_pos].to_owned()
80 } else {
81 "/".to_owned()
82 }
83 }
84
85 pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
86 let prefix = self.prefix.as_deref().unwrap_or("/");
87 let list_prefix = Self::extract_list_prefix(prefix);
88 let object_lister = self
89 .op
90 .lister_with(&list_prefix)
91 .recursive(true)
92 .metakey(Metakey::ContentLength | Metakey::LastModified)
93 .await?;
94 let stream = stream::unfold(object_lister, |mut object_lister| async move {
95 match object_lister.next().await {
96 Some(Ok(object)) => {
97 let name = object.path().to_owned();
98 let om = object.metadata();
99
100 let t = match om.last_modified() {
101 Some(t) => t,
102 None => DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default(),
103 };
104 let timestamp = Timestamptz::from(t);
105 let size = om.content_length() as i64;
106
107 let metadata = FsPageItem {
108 name,
109 size,
110 timestamp,
111 };
112 Some((Ok(metadata), object_lister))
113 }
114 Some(Err(err)) => Some((Err(err.into()), object_lister)),
115 None => {
116 tracing::info!("list object completed.");
117 None
118 }
119 }
120 });
121
122 Ok(stream.boxed())
123 }
124
125 pub fn get_matcher(&self) -> &Option<glob::Pattern> {
126 &self.matcher
127 }
128
129 pub fn get_prefix(&self) -> &str {
130 self.prefix.as_deref().unwrap_or("/")
131 }
132}
133pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use crate::source::filesystem::opendal_source::OpendalS3;
139 use crate::source::filesystem::s3::enumerator::get_prefix;
140
141 fn calculate_list_prefix(prefix: &str) -> String {
142 OpendalEnumerator::<OpendalS3>::extract_list_prefix(prefix)
143 }
144
145 #[test]
146 fn test_prefix_logic() {
147 let test_cases = vec![
148 ("a/b/c/hello*/*.json", "a/b/c/hello", "a/b/c/"),
149 ("a/b/c.json", "a/b/c.json", "a/b/"),
150 ("a/b/c/", "a/b/c/", "a/b/c/"),
151 ("a/b/c", "a/b/c", "a/b/"),
152 ("file.json", "file.json", "/"),
153 ("*.json", "", "/"),
154 ("a/b/c/[h]ello*/*.json", "a/b/c/", "a/b/c/"),
155 ];
156
157 for (pattern, expected_prefix, expected_list_prefix) in test_cases {
158 let prefix = get_prefix(pattern);
159 let list_prefix = calculate_list_prefix(&prefix);
160
161 assert_eq!(
162 prefix, expected_prefix,
163 "get_prefix failed for: {}",
164 pattern
165 );
166 assert_eq!(
167 list_prefix, expected_list_prefix,
168 "list_prefix failed for: {}",
169 pattern
170 );
171 }
172 }
173
174 #[test]
175 fn test_bug_fix() {
176 let problematic_pattern = "a/b/c/hello*/*.json";
177 let prefix = get_prefix(problematic_pattern);
178 let list_prefix = calculate_list_prefix(&prefix);
179
180 assert_eq!(prefix, "a/b/c/hello");
183 assert_eq!(list_prefix, "a/b/c/");
184 }
185}