risingwave_connector/source/filesystem/opendal_source/
opendal_enumerator.rsuse std::marker::PhantomData;
use anyhow::anyhow;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::{self, BoxStream};
use futures::StreamExt;
use opendal::{Metakey, Operator};
use risingwave_common::types::Timestamptz;
use super::OpendalSource;
use crate::error::ConnectorResult;
use crate::source::filesystem::file_common::CompressionFormat;
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
#[derive(Debug, Clone)]
pub struct OpendalEnumerator<Src: OpendalSource> {
pub op: Operator,
pub(crate) prefix: Option<String>,
pub(crate) matcher: Option<glob::Pattern>,
pub(crate) marker: PhantomData<Src>,
pub(crate) compression_format: CompressionFormat,
}
#[async_trait]
impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
type Properties = Src::Properties;
type Split = OpendalFsSplit<Src>;
async fn new(
properties: Src::Properties,
_context: SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Src::new_enumerator(properties)
}
async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");
match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}
impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
let prefix = self.prefix.as_deref().unwrap_or("/");
let object_lister = self
.op
.lister_with(prefix)
.recursive(true)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
let stream = stream::unfold(object_lister, |mut object_lister| async move {
match object_lister.next().await {
Some(Ok(object)) => {
let name = object.path().to_string();
let om = object.metadata();
let t = match om.last_modified() {
Some(t) => t,
None => DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default(),
};
let timestamp = Timestamptz::from(t);
let size = om.content_length() as i64;
let metadata = FsPageItem {
name,
size,
timestamp,
};
Some((Ok(metadata), object_lister))
}
Some(Err(err)) => Some((Err(err.into()), object_lister)),
None => {
tracing::info!("list object completed.");
None
}
}
});
Ok(stream.boxed())
}
pub fn get_matcher(&self) -> &Option<glob::Pattern> {
&self.matcher
}
pub fn get_prefix(&self) -> &str {
self.prefix.as_deref().unwrap_or("/")
}
}
pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;