risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rsuse std::future::IntoFuture;
use std::pin::Pin;
use async_compression::tokio::bufread::GzipDecoder;
use async_trait::async_trait;
use futures::TryStreamExt;
use futures_async_stream::try_stream;
use opendal::Operator;
use risingwave_common::array::StreamChunk;
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::{ReaderStream, StreamReader};
use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::error::ConnectorResult;
use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
use crate::source::filesystem::file_common::CompressionFormat;
use crate::source::filesystem::nd_streaming::need_nd_streaming;
use crate::source::filesystem::{nd_streaming, OpendalFsSplit};
use crate::source::iceberg::read_parquet_file;
use crate::source::{
BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
SplitReader,
};
const STREAM_READER_CAPACITY: usize = 4096;
#[derive(Debug, Clone)]
pub struct OpendalReader<Src: OpendalSource> {
connector: OpendalEnumerator<Src>,
splits: Vec<OpendalFsSplit<Src>>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
}
#[async_trait]
impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
type Properties = Src::Properties;
type Split = OpendalFsSplit<Src>;
async fn new(
properties: Src::Properties,
splits: Vec<OpendalFsSplit<Src>>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> ConnectorResult<Self> {
let connector = Src::new_enumerator(properties)?;
let opendal_reader = OpendalReader {
connector,
splits,
parser_config,
source_ctx,
columns,
};
Ok(opendal_reader)
}
fn into_stream(self) -> BoxChunkSourceStream {
self.into_stream_inner()
}
}
impl<Src: OpendalSource> OpendalReader<Src> {
#[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
async fn into_stream_inner(self) {
for split in self.splits {
let source_ctx = self.source_ctx.clone();
let object_name = split.name.clone();
let msg_stream;
if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
msg_stream = read_parquet_file(
self.connector.op.clone(),
object_name,
self.columns.clone(),
Some(self.parser_config.common.rw_columns.clone()),
self.source_ctx.source_ctrl_opts.chunk_size,
split.offset,
)
.await?;
} else {
let data_stream = Self::stream_read_object(
self.connector.op.clone(),
split,
self.source_ctx.clone(),
self.connector.compression_format.clone(),
);
let parser =
ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
.await?;
msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) {
Box::pin(parser.into_stream(nd_streaming::split_stream(data_stream)))
} else {
Box::pin(parser.into_stream(data_stream))
};
}
#[for_await]
for msg in msg_stream {
let msg = msg?;
yield msg;
}
}
}
#[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
pub async fn stream_read_object(
op: Operator,
split: OpendalFsSplit<Src>,
source_ctx: SourceContextRef,
compression_format: CompressionFormat,
) {
let actor_id = source_ctx.actor_id.to_string();
let fragment_id = source_ctx.fragment_id.to_string();
let source_id = source_ctx.source_id.to_string();
let source_name = source_ctx.source_name.to_string();
let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
let split_id = split.id();
let object_name = split.name.clone();
let reader = op
.read_with(&object_name)
.range(split.offset as u64..)
.into_future() .await?;
let stream_reader = StreamReader::new(
reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);
let buf_reader: Pin<Box<dyn AsyncRead + Send>> = match compression_format {
CompressionFormat::Gzip => {
let gzip_decoder = GzipDecoder::new(stream_reader);
Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncRead + Send>>
}
CompressionFormat::None => {
if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
let gzip_decoder = GzipDecoder::new(stream_reader);
Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncRead + Send>>
} else {
Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncRead + Send>>
}
}
};
let mut offset: usize = split.offset;
let mut batch_size: usize = 0;
let mut batch = Vec::new();
let partition_input_bytes_metrics = source_ctx
.metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
]);
let stream = ReaderStream::with_capacity(buf_reader, STREAM_READER_CAPACITY);
#[for_await]
for read in stream {
let bytes = read?;
let len = bytes.len();
let msg = SourceMessage {
key: None,
payload: Some(bytes.as_ref().to_vec()),
offset: offset.to_string(),
split_id: split.id(),
meta: SourceMeta::Empty,
};
offset += len;
batch_size += len;
batch.push(msg);
if batch.len() >= max_chunk_size {
partition_input_bytes_metrics.inc_by(batch_size as u64);
let yield_batch = std::mem::take(&mut batch);
batch_size = 0;
yield yield_batch;
}
}
if !batch.is_empty() {
partition_input_bytes_metrics.inc_by(batch_size as u64);
yield batch;
}
}
}