risingwave_connector/source/filesystem/opendal_source/
opendal_enumerator.rs1use std::marker::PhantomData;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use chrono::{DateTime, Utc};
20use futures::stream::{self, BoxStream};
21use futures::{StreamExt, TryStreamExt};
22use opendal::{Metakey, Operator};
23use risingwave_common::types::Timestamptz;
24
25use super::OpendalSource;
26use crate::error::ConnectorResult;
27use crate::source::filesystem::file_common::CompressionFormat;
28use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
29use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
30
31#[derive(Debug, Clone)]
32pub struct OpendalEnumerator<Src: OpendalSource> {
33 pub op: Operator,
34 pub(crate) prefix: Option<String>,
36 pub(crate) matcher: Option<glob::Pattern>,
37 pub(crate) marker: PhantomData<Src>,
38 pub(crate) compression_format: CompressionFormat,
39}
40
41#[async_trait]
42impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
43 type Properties = Src::Properties;
44 type Split = OpendalFsSplit<Src>;
45
46 async fn new(
47 properties: Src::Properties,
48 _context: SourceEnumeratorContextRef,
49 ) -> ConnectorResult<Self> {
50 Src::new_enumerator(properties)
51 }
52
53 async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
54 let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
55 let prefix = self.prefix.as_deref().unwrap_or("/");
56
57 let mut lister = self.op.lister(prefix).await?;
58 match lister.try_next().await {
60 Ok(_) => return Ok(vec![empty_split]),
61 Err(e) => {
62 return Err(anyhow!(e)
63 .context("fail to create source, please check your config.")
64 .into());
65 }
66 }
67 }
68}
69
70impl<Src: OpendalSource> OpendalEnumerator<Src> {
71 pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
72 let prefix = self.prefix.as_deref().unwrap_or("/");
73 let list_prefix = match prefix.ends_with("/") {
74 true => prefix,
75 false => "/",
76 };
77 let object_lister = self
78 .op
79 .lister_with(list_prefix)
80 .recursive(true)
81 .metakey(Metakey::ContentLength | Metakey::LastModified)
82 .await?;
83 let stream = stream::unfold(object_lister, |mut object_lister| async move {
84 match object_lister.next().await {
85 Some(Ok(object)) => {
86 let name = object.path().to_owned();
87 let om = object.metadata();
88
89 let t = match om.last_modified() {
90 Some(t) => t,
91 None => DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default(),
92 };
93 let timestamp = Timestamptz::from(t);
94 let size = om.content_length() as i64;
95
96 let metadata = FsPageItem {
97 name,
98 size,
99 timestamp,
100 };
101 Some((Ok(metadata), object_lister))
102 }
103 Some(Err(err)) => Some((Err(err.into()), object_lister)),
104 None => {
105 tracing::info!("list object completed.");
106 None
107 }
108 }
109 });
110
111 Ok(stream.boxed())
112 }
113
114 pub fn get_matcher(&self) -> &Option<glob::Pattern> {
115 &self.matcher
116 }
117
118 pub fn get_prefix(&self) -> &str {
119 self.prefix.as_deref().unwrap_or("/")
120 }
121}
122pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;