risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs1use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use prometheus::core::GenericCounter;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
27use tokio_util::io::StreamReader;
28
29use super::OpendalSource;
30use super::opendal_enumerator::OpendalEnumerator;
31use crate::error::ConnectorResult;
32use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
33use crate::source::filesystem::OpendalFsSplit;
34use crate::source::filesystem::file_common::CompressionFormat;
35use crate::source::filesystem::nd_streaming::need_nd_streaming;
36use crate::source::iceberg::read_parquet_file;
37use crate::source::{
38 BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
39 SplitReader,
40};
41
42#[derive(Debug, Clone)]
43pub struct OpendalReader<Src: OpendalSource> {
44 connector: OpendalEnumerator<Src>,
45 splits: Vec<OpendalFsSplit<Src>>,
46 parser_config: ParserConfig,
47 source_ctx: SourceContextRef,
48 columns: Option<Vec<Column>>,
49}
50
51#[async_trait]
52impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
53 type Properties = Src::Properties;
54 type Split = OpendalFsSplit<Src>;
55
56 async fn new(
57 properties: Src::Properties,
58 splits: Vec<OpendalFsSplit<Src>>,
59 parser_config: ParserConfig,
60 source_ctx: SourceContextRef,
61 columns: Option<Vec<Column>>,
62 ) -> ConnectorResult<Self> {
63 let connector = Src::new_enumerator(properties)?;
64 let opendal_reader = OpendalReader {
65 connector,
66 splits,
67 parser_config,
68 source_ctx,
69 columns,
70 };
71 Ok(opendal_reader)
72 }
73
74 fn into_stream(self) -> BoxSourceChunkStream {
75 self.into_stream_inner()
76 }
77}
78
79impl<Src: OpendalSource> OpendalReader<Src> {
80 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
81 async fn into_stream_inner(self) {
82 for split in self.splits {
83 let source_ctx = self.source_ctx.clone();
84
85 let object_name = split.name.clone();
86 let actor_id = source_ctx.actor_id.to_string();
87 let fragment_id = source_ctx.fragment_id.to_string();
88 let source_id = source_ctx.source_id.to_string();
89 let source_name = source_ctx.source_name.clone();
90 let file_source_input_row_count = self
91 .source_ctx
92 .metrics
93 .file_source_input_row_count
94 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
95 let chunk_stream;
96 if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
97 let actor_id = source_ctx.actor_id.to_string();
98 let source_id = source_ctx.source_id.to_string();
99 let split_id = split.id();
100 let source_name = source_ctx.source_name.clone();
101 let parquet_source_skip_row_count_metrics: LabelGuardedMetric<
102 GenericCounter<prometheus::core::AtomicU64>,
103 > = self
104 .source_ctx
105 .metrics
106 .parquet_source_skip_row_count
107 .with_guarded_label_values(&[
108 actor_id.as_str(),
109 source_id.as_str(),
110 &split_id,
111 source_name.as_str(),
112 ]);
113 chunk_stream = read_parquet_file(
114 self.connector.op.clone(),
115 object_name.clone(),
116 self.columns.clone(),
117 Some(self.parser_config.common.rw_columns.clone()),
118 self.source_ctx.source_ctrl_opts.chunk_size,
119 split.offset,
120 Some(file_source_input_row_count.clone()),
121 Some(parquet_source_skip_row_count_metrics),
122 )
123 .await?;
124 } else {
125 assert!(
126 need_nd_streaming(&self.parser_config.specific.encoding_config),
127 "except for parquet, file source only support split by newline for now"
128 );
129
130 let line_stream = Self::stream_read_lines(
131 self.connector.op.clone(),
132 split,
133 self.source_ctx.clone(),
134 self.connector.compression_format.clone(),
135 file_source_input_row_count.clone(),
136 );
137
138 let parser =
139 ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
140 .await?;
141 chunk_stream = Box::pin(parser.parse_stream(line_stream));
142 }
143
144 #[for_await]
145 for chunk in chunk_stream {
146 yield chunk?;
147 }
148 }
149 }
150
151 #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
152 pub async fn stream_read_lines(
153 op: Operator,
154 split: OpendalFsSplit<Src>,
155 source_ctx: SourceContextRef,
156 compression_format: CompressionFormat,
157 file_source_input_row_count_metrics: LabelGuardedMetric<
158 GenericCounter<prometheus::core::AtomicU64>,
159 >,
160 ) {
161 let actor_id = source_ctx.actor_id.to_string();
162 let fragment_id = source_ctx.fragment_id.to_string();
163 let source_id = source_ctx.source_id.to_string();
164 let source_name = source_ctx.source_name.clone();
165 let split_id = split.id();
166 let object_name = split.name.clone();
167 let start_offset = split.offset;
168 let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
171 true => op.read_with(&object_name).into_future().await?,
172
173 false => {
174 op.read_with(&object_name)
175 .range(start_offset as u64..)
176 .into_future()
177 .await?
178 }
179 };
180
181 let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
182
183 let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
184 CompressionFormat::Gzip => {
185 let gzip_decoder = GzipDecoder::new(stream_reader);
186 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
187 }
188 CompressionFormat::None => {
189 if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
191 let gzip_decoder = GzipDecoder::new(stream_reader);
192 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
193 } else {
194 Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
195 }
196 }
197 };
198
199 let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
200 true => 0,
201 false => start_offset,
202 };
203 let partition_input_bytes_metrics = source_ctx
204 .metrics
205 .partition_input_bytes
206 .with_guarded_label_values(&[
207 actor_id.as_str(),
208 source_id.as_str(),
209 &split_id,
210 source_name.as_str(),
211 fragment_id.as_str(),
212 ]);
213
214 let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
215 let mut batch = Vec::with_capacity(max_chunk_size);
216 let mut line_buf = String::new();
217
218 loop {
219 let n_read = buf_reader.read_line(&mut line_buf).await?;
220 if n_read == 0 {
221 break;
223 }
224 let msg_offset = (offset + n_read).to_string();
225 debug_assert_eq!(n_read, line_buf.len());
227 if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
228 && offset + n_read <= start_offset
229 {
230 } else {
233 batch.push(SourceMessage {
234 key: None,
235 payload: Some(std::mem::take(&mut line_buf).into_bytes()),
236 offset: msg_offset,
237 split_id: split.id(),
238 meta: SourceMeta::Empty,
239 });
240 }
241
242 offset += n_read;
243 partition_input_bytes_metrics.inc_by(n_read as _);
244
245 if batch.len() >= max_chunk_size {
246 file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
247 yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
248 }
249 }
250
251 if !batch.is_empty() {
252 batch.shrink_to_fit();
253 file_source_input_row_count_metrics.inc_by(batch.len() as _);
254 yield batch;
255 }
256 }
257}