risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs1use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::{StreamExt, TryStreamExt};
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use prometheus::core::GenericCounter;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
27use tokio_util::io::StreamReader;
28
29use super::OpendalSource;
30use super::opendal_enumerator::OpendalEnumerator;
31use crate::error::ConnectorResult;
32use crate::parser::{
33 ByteStreamSourceParserImpl, EncodingProperties, ParserConfig, into_data_chunk_stream,
34};
35use crate::source::filesystem::OpendalFsSplit;
36use crate::source::filesystem::file_common::CompressionFormat;
37use crate::source::filesystem::nd_streaming::need_nd_streaming;
38use crate::source::iceberg::read_parquet_file;
39use crate::source::{
40 BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMessageEvent, SourceMeta,
41 SplitMetaData, SplitReader,
42};
43
44#[derive(Debug, Clone)]
45pub struct OpendalReader<Src: OpendalSource> {
46 connector: OpendalEnumerator<Src>,
47 splits: Vec<OpendalFsSplit<Src>>,
48 parser_config: ParserConfig,
49 source_ctx: SourceContextRef,
50 columns: Option<Vec<Column>>,
51}
52
53#[async_trait]
54impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
55 type Properties = Src::Properties;
56 type Split = OpendalFsSplit<Src>;
57
58 async fn new(
59 properties: Src::Properties,
60 splits: Vec<OpendalFsSplit<Src>>,
61 parser_config: ParserConfig,
62 source_ctx: SourceContextRef,
63 columns: Option<Vec<Column>>,
64 ) -> ConnectorResult<Self> {
65 let connector = Src::new_enumerator(properties)?;
66 let opendal_reader = OpendalReader {
67 connector,
68 splits,
69 parser_config,
70 source_ctx,
71 columns,
72 };
73 Ok(opendal_reader)
74 }
75
76 fn into_stream(self) -> BoxSourceChunkStream {
77 self.into_stream_inner()
78 }
79}
80
81impl<Src: OpendalSource> OpendalReader<Src> {
82 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
83 async fn into_stream_inner(self) {
84 for split in self.splits {
85 let source_ctx = self.source_ctx.clone();
86
87 let object_name = split.name.clone();
88 let actor_id = source_ctx.actor_id.to_string();
89 let fragment_id = source_ctx.fragment_id.to_string();
90 let source_id = source_ctx.source_id.to_string();
91 let source_name = source_ctx.source_name.clone();
92 let file_source_input_row_count = self
93 .source_ctx
94 .metrics
95 .file_source_input_row_count
96 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
97 let chunk_stream;
98 if let EncodingProperties::Parquet(parquet_props) =
99 &self.parser_config.specific.encoding_config
100 {
101 let actor_id = source_ctx.actor_id.to_string();
102 let source_id = source_ctx.source_id.to_string();
103 let split_id = split.id();
104 let source_name = source_ctx.source_name.clone();
105 let parquet_source_skip_row_count_metrics: LabelGuardedMetric<
106 GenericCounter<prometheus::core::AtomicU64>,
107 > = self
108 .source_ctx
109 .metrics
110 .parquet_source_skip_row_count
111 .with_guarded_label_values(&[
112 actor_id.as_str(),
113 source_id.as_str(),
114 &split_id,
115 source_name.as_str(),
116 ]);
117 chunk_stream = read_parquet_file(
118 self.connector.op.clone(),
119 object_name.clone(),
120 self.columns.clone(),
121 Some(self.parser_config.common.rw_columns.clone()),
122 parquet_props.case_insensitive,
123 self.source_ctx.source_ctrl_opts.chunk_size,
124 split.offset,
125 Some(file_source_input_row_count.clone()),
126 Some(parquet_source_skip_row_count_metrics),
127 )
128 .await?;
129 } else {
130 assert!(
131 need_nd_streaming(&self.parser_config.specific.encoding_config),
132 "except for parquet, file source only support split by newline for now"
133 );
134
135 let line_stream = Self::stream_read_lines(
136 self.connector.op.clone(),
137 split,
138 self.source_ctx.clone(),
139 self.connector.compression_format.clone(),
140 file_source_input_row_count.clone(),
141 );
142
143 let parser =
144 ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
145 .await?;
146 chunk_stream = Box::pin(into_data_chunk_stream(parser.parse_stream_with_events(
147 line_stream.map_ok(SourceMessageEvent::Data).boxed(),
148 )));
149 }
150
151 #[for_await]
152 for chunk in chunk_stream {
153 yield chunk?;
154 }
155 }
156 }
157
158 #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
159 pub async fn stream_read_lines(
160 op: Operator,
161 split: OpendalFsSplit<Src>,
162 source_ctx: SourceContextRef,
163 compression_format: CompressionFormat,
164 file_source_input_row_count_metrics: LabelGuardedMetric<
165 GenericCounter<prometheus::core::AtomicU64>,
166 >,
167 ) {
168 let actor_id = source_ctx.actor_id.to_string();
169 let fragment_id = source_ctx.fragment_id.to_string();
170 let source_id = source_ctx.source_id.to_string();
171 let source_name = source_ctx.source_name.clone();
172 let split_id = split.id();
173 let object_name = split.name.clone();
174 let start_offset = split.offset;
175 let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
178 true => op.read_with(&object_name).into_future().await?,
179
180 false => {
181 op.read_with(&object_name)
182 .range(start_offset as u64..)
183 .into_future()
184 .await?
185 }
186 };
187
188 let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
189
190 let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
191 CompressionFormat::Gzip => {
192 let gzip_decoder = GzipDecoder::new(stream_reader);
193 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
194 }
195 CompressionFormat::None => {
196 if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
198 let gzip_decoder = GzipDecoder::new(stream_reader);
199 Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
200 } else {
201 Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
202 }
203 }
204 };
205
206 let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
207 true => 0,
208 false => start_offset,
209 };
210 let partition_input_bytes_metrics = source_ctx
211 .metrics
212 .partition_input_bytes
213 .with_guarded_label_values(&[
214 actor_id.as_str(),
215 source_id.as_str(),
216 &split_id,
217 source_name.as_str(),
218 fragment_id.as_str(),
219 ]);
220
221 let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
222 let mut batch = Vec::with_capacity(max_chunk_size);
223 let mut line_buf = String::new();
224
225 loop {
226 let n_read = buf_reader.read_line(&mut line_buf).await?;
227 if n_read == 0 {
228 break;
230 }
231 let msg_offset = (offset + n_read).to_string();
232 debug_assert_eq!(n_read, line_buf.len());
234 if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
235 && offset + n_read <= start_offset
236 {
237 } else {
240 batch.push(SourceMessage {
241 key: None,
242 payload: Some(std::mem::take(&mut line_buf).into_bytes()),
243 offset: msg_offset,
244 split_id: split.id(),
245 meta: SourceMeta::Empty,
246 });
247 }
248
249 offset += n_read;
250 partition_input_bytes_metrics.inc_by(n_read as _);
251
252 if batch.len() >= max_chunk_size {
253 file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
254 yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
255 }
256 }
257
258 if !batch.is_empty() {
259 batch.shrink_to_fit();
260 file_source_input_row_count_metrics.inc_by(batch.len() as _);
261 yield batch;
262 }
263 }
264}