risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs1use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use prometheus::core::GenericCounter;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
27use tokio_util::io::StreamReader;
28
29use super::OpendalSource;
30use super::opendal_enumerator::OpendalEnumerator;
31use crate::error::ConnectorResult;
32use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
33use crate::source::filesystem::OpendalFsSplit;
34use crate::source::filesystem::file_common::CompressionFormat;
35use crate::source::filesystem::nd_streaming::need_nd_streaming;
36use crate::source::iceberg::read_parquet_file;
37use crate::source::{
38 BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
39 SplitReader,
40};
41
42#[derive(Debug, Clone)]
43pub struct OpendalReader<Src: OpendalSource> {
44 connector: OpendalEnumerator<Src>,
45 splits: Vec<OpendalFsSplit<Src>>,
46 parser_config: ParserConfig,
47 source_ctx: SourceContextRef,
48 columns: Option<Vec<Column>>,
49}
50
51#[async_trait]
52impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
53 type Properties = Src::Properties;
54 type Split = OpendalFsSplit<Src>;
55
56 async fn new(
57 properties: Src::Properties,
58 splits: Vec<OpendalFsSplit<Src>>,
59 parser_config: ParserConfig,
60 source_ctx: SourceContextRef,
61 columns: Option<Vec<Column>>,
62 ) -> ConnectorResult<Self> {
63 let connector = Src::new_enumerator(properties)?;
64 let opendal_reader = OpendalReader {
65 connector,
66 splits,
67 parser_config,
68 source_ctx,
69 columns,
70 };
71 Ok(opendal_reader)
72 }
73
74 fn into_stream(self) -> BoxSourceChunkStream {
75 self.into_stream_inner()
76 }
77}
78
79impl<Src: OpendalSource> OpendalReader<Src> {
80 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
81 async fn into_stream_inner(self) {
82 for split in self.splits {
83 let source_ctx = self.source_ctx.clone();
84
85 let object_name = split.name.clone();
86 let actor_id = source_ctx.actor_id.to_string();
87 let fragment_id = source_ctx.fragment_id.to_string();
88 let source_id = source_ctx.source_id.to_string();
89 let source_name = source_ctx.source_name.clone();
90 let file_source_input_row_count = self
91 .source_ctx
92 .metrics
93 .file_source_input_row_count
94 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
95 let chunk_stream;
96 if let EncodingProperties::Parquet(parquet_props) =
97 &self.parser_config.specific.encoding_config
98 {
99 let actor_id = source_ctx.actor_id.to_string();
100 let source_id = source_ctx.source_id.to_string();
101 let split_id = split.id();
102 let source_name = source_ctx.source_name.clone();
103 let parquet_source_skip_row_count_metrics: LabelGuardedMetric<
104 GenericCounter<prometheus::core::AtomicU64>,
105 > = self
106 .source_ctx
107 .metrics
108 .parquet_source_skip_row_count
109 .with_guarded_label_values(&[
110 actor_id.as_str(),
111 source_id.as_str(),
112 &split_id,
113 source_name.as_str(),
114 ]);
115 chunk_stream = read_parquet_file(
116 self.connector.op.clone(),
117 object_name.clone(),
118 self.columns.clone(),
119 Some(self.parser_config.common.rw_columns.clone()),
120 parquet_props.case_insensitive,
121 self.source_ctx.source_ctrl_opts.chunk_size,
122 split.offset,
123 Some(file_source_input_row_count.clone()),
124 Some(parquet_source_skip_row_count_metrics),
125 )
126 .await?;
127 } else {
128 assert!(
129 need_nd_streaming(&self.parser_config.specific.encoding_config),
130 "except for parquet, file source only support split by newline for now"
131 );
132
133 let line_stream = Self::stream_read_lines(
134 self.connector.op.clone(),
135 split,
136 self.source_ctx.clone(),
137 self.connector.compression_format.clone(),
138 file_source_input_row_count.clone(),
139 );
140
141 let parser =
142 ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
143 .await?;
144 chunk_stream = Box::pin(parser.parse_stream(line_stream));
145 }
146
147 #[for_await]
148 for chunk in chunk_stream {
149 yield chunk?;
150 }
151 }
152 }
153
154 #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
155 pub async fn stream_read_lines(
156 op: Operator,
157 split: OpendalFsSplit<Src>,
158 source_ctx: SourceContextRef,
159 compression_format: CompressionFormat,
160 file_source_input_row_count_metrics: LabelGuardedMetric<
161 GenericCounter<prometheus::core::AtomicU64>,
162 >,
163 ) {
164 let actor_id = source_ctx.actor_id.to_string();
165 let fragment_id = source_ctx.fragment_id.to_string();
166 let source_id = source_ctx.source_id.to_string();
167 let source_name = source_ctx.source_name.clone();
168 let split_id = split.id();
169 let object_name = split.name.clone();
170 let start_offset = split.offset;
171 let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
174 true => op.read_with(&object_name).into_future().await?,
175
176 false => {
177 op.read_with(&object_name)
178 .range(start_offset as u64..)
179 .into_future()
180 .await?
181 }
182 };
183
184 let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
185
186 let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
187 CompressionFormat::Gzip => {
188 let gzip_decoder = GzipDecoder::new(stream_reader);
189 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
190 }
191 CompressionFormat::None => {
192 if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
194 let gzip_decoder = GzipDecoder::new(stream_reader);
195 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
196 } else {
197 Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
198 }
199 }
200 };
201
202 let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
203 true => 0,
204 false => start_offset,
205 };
206 let partition_input_bytes_metrics = source_ctx
207 .metrics
208 .partition_input_bytes
209 .with_guarded_label_values(&[
210 actor_id.as_str(),
211 source_id.as_str(),
212 &split_id,
213 source_name.as_str(),
214 fragment_id.as_str(),
215 ]);
216
217 let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
218 let mut batch = Vec::with_capacity(max_chunk_size);
219 let mut line_buf = String::new();
220
221 loop {
222 let n_read = buf_reader.read_line(&mut line_buf).await?;
223 if n_read == 0 {
224 break;
226 }
227 let msg_offset = (offset + n_read).to_string();
228 debug_assert_eq!(n_read, line_buf.len());
230 if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
231 && offset + n_read <= start_offset
232 {
233 } else {
236 batch.push(SourceMessage {
237 key: None,
238 payload: Some(std::mem::take(&mut line_buf).into_bytes()),
239 offset: msg_offset,
240 split_id: split.id(),
241 meta: SourceMeta::Empty,
242 });
243 }
244
245 offset += n_read;
246 partition_input_bytes_metrics.inc_by(n_read as _);
247
248 if batch.len() >= max_chunk_size {
249 file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
250 yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
251 }
252 }
253
254 if !batch.is_empty() {
255 batch.shrink_to_fit();
256 file_source_input_row_count_metrics.inc_by(batch.len() as _);
257 yield batch;
258 }
259 }
260}