risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs1use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use risingwave_common::array::StreamChunk;
24use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
25use tokio_util::io::StreamReader;
26
27use super::OpendalSource;
28use super::opendal_enumerator::OpendalEnumerator;
29use crate::error::ConnectorResult;
30use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
31use crate::source::filesystem::OpendalFsSplit;
32use crate::source::filesystem::file_common::CompressionFormat;
33use crate::source::filesystem::nd_streaming::need_nd_streaming;
34use crate::source::iceberg::read_parquet_file;
35use crate::source::{
36 BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
37 SplitReader,
38};
39
40#[derive(Debug, Clone)]
41pub struct OpendalReader<Src: OpendalSource> {
42 connector: OpendalEnumerator<Src>,
43 splits: Vec<OpendalFsSplit<Src>>,
44 parser_config: ParserConfig,
45 source_ctx: SourceContextRef,
46 columns: Option<Vec<Column>>,
47}
48
49#[async_trait]
50impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
51 type Properties = Src::Properties;
52 type Split = OpendalFsSplit<Src>;
53
54 async fn new(
55 properties: Src::Properties,
56 splits: Vec<OpendalFsSplit<Src>>,
57 parser_config: ParserConfig,
58 source_ctx: SourceContextRef,
59 columns: Option<Vec<Column>>,
60 ) -> ConnectorResult<Self> {
61 let connector = Src::new_enumerator(properties)?;
62 let opendal_reader = OpendalReader {
63 connector,
64 splits,
65 parser_config,
66 source_ctx,
67 columns,
68 };
69 Ok(opendal_reader)
70 }
71
72 fn into_stream(self) -> BoxSourceChunkStream {
73 self.into_stream_inner()
74 }
75}
76
77impl<Src: OpendalSource> OpendalReader<Src> {
78 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
79 async fn into_stream_inner(self) {
80 for split in self.splits {
81 let source_ctx = self.source_ctx.clone();
82
83 let object_name = split.name.clone();
84 let actor_id = source_ctx.actor_id.to_string();
85 let fragment_id = source_ctx.fragment_id.to_string();
86 let source_id = source_ctx.source_id.to_string();
87 let source_name = source_ctx.source_name.clone();
88 let file_source_input_row_count = self
89 .source_ctx
90 .metrics
91 .file_source_input_row_count
92 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
93 let chunk_stream;
94 if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
95 let actor_id = source_ctx.actor_id.to_string();
96 let source_id = source_ctx.source_id.to_string();
97 let source_name = source_ctx.source_name.clone();
98 let parquet_source_skip_row_count_metrics: risingwave_common::metrics::LabelGuardedMetric<prometheus::core::GenericCounter<prometheus::core::AtomicU64>, 4> = self.source_ctx.metrics
99 .parquet_source_skip_row_count
100 .with_guarded_label_values(&[
101 &actor_id,
102 &source_id,
103 split.id().as_ref(),
104 &source_name,
105 ]);
106 chunk_stream = read_parquet_file(
107 self.connector.op.clone(),
108 object_name.clone(),
109 self.columns.clone(),
110 Some(self.parser_config.common.rw_columns.clone()),
111 self.source_ctx.source_ctrl_opts.chunk_size,
112 split.offset,
113 Some(file_source_input_row_count.clone()),
114 Some(parquet_source_skip_row_count_metrics),
115 )
116 .await?;
117 } else {
118 assert!(
119 need_nd_streaming(&self.parser_config.specific.encoding_config),
120 "except for parquet, file source only support split by newline for now"
121 );
122
123 let line_stream = Self::stream_read_lines(
124 self.connector.op.clone(),
125 split,
126 self.source_ctx.clone(),
127 self.connector.compression_format.clone(),
128 file_source_input_row_count.clone(),
129 );
130
131 let parser =
132 ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
133 .await?;
134 chunk_stream = Box::pin(parser.parse_stream(line_stream));
135 }
136
137 #[for_await]
138 for chunk in chunk_stream {
139 yield chunk?;
140 }
141 }
142 }
143
144 #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
145 pub async fn stream_read_lines(
146 op: Operator,
147 split: OpendalFsSplit<Src>,
148 source_ctx: SourceContextRef,
149 compression_format: CompressionFormat,
150 file_source_input_row_count_metrics: risingwave_common::metrics::LabelGuardedMetric<
151 prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
152 4,
153 >,
154 ) {
155 let actor_id = source_ctx.actor_id.to_string();
156 let fragment_id = source_ctx.fragment_id.to_string();
157 let source_id = source_ctx.source_id.to_string();
158 let source_name = source_ctx.source_name.clone();
159 let object_name = split.name.clone();
160 let start_offset = split.offset;
161 let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
164 true => op.read_with(&object_name).into_future().await?,
165
166 false => {
167 op.read_with(&object_name)
168 .range(start_offset as u64..)
169 .into_future()
170 .await?
171 }
172 };
173
174 let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
175
176 let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
177 CompressionFormat::Gzip => {
178 let gzip_decoder = GzipDecoder::new(stream_reader);
179 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
180 }
181 CompressionFormat::None => {
182 if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
184 let gzip_decoder = GzipDecoder::new(stream_reader);
185 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
186 } else {
187 Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
188 }
189 }
190 };
191
192 let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
193 true => 0,
194 false => start_offset,
195 };
196 let partition_input_bytes_metrics = source_ctx
197 .metrics
198 .partition_input_bytes
199 .with_guarded_label_values(&[
200 &actor_id,
201 &source_id,
202 split.id().as_ref(),
203 &source_name,
204 &fragment_id,
205 ]);
206
207 let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
208 let mut batch = Vec::with_capacity(max_chunk_size);
209 let mut line_buf = String::new();
210
211 loop {
212 let n_read = buf_reader.read_line(&mut line_buf).await?;
213 if n_read == 0 {
214 break;
216 }
217 let msg_offset = (offset + n_read).to_string();
218 debug_assert_eq!(n_read, line_buf.len());
220 if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
221 && offset + n_read <= start_offset
222 {
223 } else {
226 batch.push(SourceMessage {
227 key: None,
228 payload: Some(std::mem::take(&mut line_buf).into_bytes()),
229 offset: msg_offset,
230 split_id: split.id(),
231 meta: SourceMeta::Empty,
232 });
233 }
234
235 offset += n_read;
236 partition_input_bytes_metrics.inc_by(n_read as _);
237
238 if batch.len() >= max_chunk_size {
239 file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
240 yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
241 }
242 }
243
244 if !batch.is_empty() {
245 batch.shrink_to_fit();
246 file_source_input_row_count_metrics.inc_by(batch.len() as _);
247 yield batch;
248 }
249 }
250}