risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use risingwave_common::array::StreamChunk;
24use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
25use tokio_util::io::StreamReader;
26
27use super::OpendalSource;
28use super::opendal_enumerator::OpendalEnumerator;
29use crate::error::ConnectorResult;
30use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
31use crate::source::filesystem::OpendalFsSplit;
32use crate::source::filesystem::file_common::CompressionFormat;
33use crate::source::filesystem::nd_streaming::need_nd_streaming;
34use crate::source::iceberg::read_parquet_file;
35use crate::source::{
36    BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
37    SplitReader,
38};
39
40#[derive(Debug, Clone)]
41pub struct OpendalReader<Src: OpendalSource> {
42    connector: OpendalEnumerator<Src>,
43    splits: Vec<OpendalFsSplit<Src>>,
44    parser_config: ParserConfig,
45    source_ctx: SourceContextRef,
46    columns: Option<Vec<Column>>,
47}
48
49#[async_trait]
50impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
51    type Properties = Src::Properties;
52    type Split = OpendalFsSplit<Src>;
53
54    async fn new(
55        properties: Src::Properties,
56        splits: Vec<OpendalFsSplit<Src>>,
57        parser_config: ParserConfig,
58        source_ctx: SourceContextRef,
59        columns: Option<Vec<Column>>,
60    ) -> ConnectorResult<Self> {
61        let connector = Src::new_enumerator(properties)?;
62        let opendal_reader = OpendalReader {
63            connector,
64            splits,
65            parser_config,
66            source_ctx,
67            columns,
68        };
69        Ok(opendal_reader)
70    }
71
72    fn into_stream(self) -> BoxSourceChunkStream {
73        self.into_stream_inner()
74    }
75}
76
77impl<Src: OpendalSource> OpendalReader<Src> {
78    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
79    async fn into_stream_inner(self) {
80        for split in self.splits {
81            let source_ctx = self.source_ctx.clone();
82
83            let object_name = split.name.clone();
84            let actor_id = source_ctx.actor_id.to_string();
85            let fragment_id = source_ctx.fragment_id.to_string();
86            let source_id = source_ctx.source_id.to_string();
87            let source_name = source_ctx.source_name.clone();
88            let file_source_input_row_count = self
89                .source_ctx
90                .metrics
91                .file_source_input_row_count
92                .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
93            let chunk_stream;
94            if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
95                let actor_id = source_ctx.actor_id.to_string();
96                let source_id = source_ctx.source_id.to_string();
97                let source_name = source_ctx.source_name.clone();
98                let parquet_source_skip_row_count_metrics: risingwave_common::metrics::LabelGuardedMetric<prometheus::core::GenericCounter<prometheus::core::AtomicU64>, 4> = self.source_ctx.metrics
99                .parquet_source_skip_row_count
100                .with_guarded_label_values(&[
101                    &actor_id,
102                    &source_id,
103                    split.id().as_ref(),
104                    &source_name,
105                ]);
106                chunk_stream = read_parquet_file(
107                    self.connector.op.clone(),
108                    object_name.clone(),
109                    self.columns.clone(),
110                    Some(self.parser_config.common.rw_columns.clone()),
111                    self.source_ctx.source_ctrl_opts.chunk_size,
112                    split.offset,
113                    Some(file_source_input_row_count.clone()),
114                    Some(parquet_source_skip_row_count_metrics),
115                )
116                .await?;
117            } else {
118                assert!(
119                    need_nd_streaming(&self.parser_config.specific.encoding_config),
120                    "except for parquet, file source only support split by newline for now"
121                );
122
123                let line_stream = Self::stream_read_lines(
124                    self.connector.op.clone(),
125                    split,
126                    self.source_ctx.clone(),
127                    self.connector.compression_format.clone(),
128                    file_source_input_row_count.clone(),
129                );
130
131                let parser =
132                    ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
133                        .await?;
134                chunk_stream = Box::pin(parser.parse_stream(line_stream));
135            }
136
137            #[for_await]
138            for chunk in chunk_stream {
139                yield chunk?;
140            }
141        }
142    }
143
144    #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
145    pub async fn stream_read_lines(
146        op: Operator,
147        split: OpendalFsSplit<Src>,
148        source_ctx: SourceContextRef,
149        compression_format: CompressionFormat,
150        file_source_input_row_count_metrics: risingwave_common::metrics::LabelGuardedMetric<
151            prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
152            4,
153        >,
154    ) {
155        let actor_id = source_ctx.actor_id.to_string();
156        let fragment_id = source_ctx.fragment_id.to_string();
157        let source_id = source_ctx.source_id.to_string();
158        let source_name = source_ctx.source_name.clone();
159        let object_name = split.name.clone();
160        let start_offset = split.offset;
161        // After a recovery occurs, for gzip-compressed files, it is necessary to read from the beginning each time,
162        // other files can continue reading from the last read `start_offset`.
163        let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
164            true => op.read_with(&object_name).into_future().await?,
165
166            false => {
167                op.read_with(&object_name)
168                    .range(start_offset as u64..)
169                    .into_future()
170                    .await?
171            }
172        };
173
174        let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
175
176        let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
177            CompressionFormat::Gzip => {
178                let gzip_decoder = GzipDecoder::new(stream_reader);
179                Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
180            }
181            CompressionFormat::None => {
182                // todo: support automatic decompression of more compression types.
183                if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
184                    let gzip_decoder = GzipDecoder::new(stream_reader);
185                    Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
186                } else {
187                    Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
188                }
189            }
190        };
191
192        let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
193            true => 0,
194            false => start_offset,
195        };
196        let partition_input_bytes_metrics = source_ctx
197            .metrics
198            .partition_input_bytes
199            .with_guarded_label_values(&[
200                &actor_id,
201                &source_id,
202                split.id().as_ref(),
203                &source_name,
204                &fragment_id,
205            ]);
206
207        let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
208        let mut batch = Vec::with_capacity(max_chunk_size);
209        let mut line_buf = String::new();
210
211        loop {
212            let n_read = buf_reader.read_line(&mut line_buf).await?;
213            if n_read == 0 {
214                // EOF
215                break;
216            }
217            let msg_offset = (offset + n_read).to_string();
218            // note that the buffer contains the newline character
219            debug_assert_eq!(n_read, line_buf.len());
220            if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
221                && offset + n_read <= start_offset
222            {
223                // For gzip compressed files, the reader needs to read from the beginning each time,
224                // but it needs to skip the previously read part and start yielding chunks from a position greater than or equal to start_offset.
225            } else {
226                batch.push(SourceMessage {
227                    key: None,
228                    payload: Some(std::mem::take(&mut line_buf).into_bytes()),
229                    offset: msg_offset,
230                    split_id: split.id(),
231                    meta: SourceMeta::Empty,
232                });
233            }
234
235            offset += n_read;
236            partition_input_bytes_metrics.inc_by(n_read as _);
237
238            if batch.len() >= max_chunk_size {
239                file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
240                yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
241            }
242        }
243
244        if !batch.is_empty() {
245            batch.shrink_to_fit();
246            file_source_input_row_count_metrics.inc_by(batch.len() as _);
247            yield batch;
248        }
249    }
250}