risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use prometheus::core::GenericCounter;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
27use tokio_util::io::StreamReader;
28
29use super::OpendalSource;
30use super::opendal_enumerator::OpendalEnumerator;
31use crate::error::ConnectorResult;
32use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
33use crate::source::filesystem::OpendalFsSplit;
34use crate::source::filesystem::file_common::CompressionFormat;
35use crate::source::filesystem::nd_streaming::need_nd_streaming;
36use crate::source::iceberg::read_parquet_file;
37use crate::source::{
38    BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
39    SplitReader,
40};
41
42#[derive(Debug, Clone)]
43pub struct OpendalReader<Src: OpendalSource> {
44    connector: OpendalEnumerator<Src>,
45    splits: Vec<OpendalFsSplit<Src>>,
46    parser_config: ParserConfig,
47    source_ctx: SourceContextRef,
48    columns: Option<Vec<Column>>,
49}
50
51#[async_trait]
52impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
53    type Properties = Src::Properties;
54    type Split = OpendalFsSplit<Src>;
55
56    async fn new(
57        properties: Src::Properties,
58        splits: Vec<OpendalFsSplit<Src>>,
59        parser_config: ParserConfig,
60        source_ctx: SourceContextRef,
61        columns: Option<Vec<Column>>,
62    ) -> ConnectorResult<Self> {
63        let connector = Src::new_enumerator(properties)?;
64        let opendal_reader = OpendalReader {
65            connector,
66            splits,
67            parser_config,
68            source_ctx,
69            columns,
70        };
71        Ok(opendal_reader)
72    }
73
74    fn into_stream(self) -> BoxSourceChunkStream {
75        self.into_stream_inner()
76    }
77}
78
79impl<Src: OpendalSource> OpendalReader<Src> {
80    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
81    async fn into_stream_inner(self) {
82        for split in self.splits {
83            let source_ctx = self.source_ctx.clone();
84
85            let object_name = split.name.clone();
86            let actor_id = source_ctx.actor_id.to_string();
87            let fragment_id = source_ctx.fragment_id.to_string();
88            let source_id = source_ctx.source_id.to_string();
89            let source_name = source_ctx.source_name.clone();
90            let file_source_input_row_count = self
91                .source_ctx
92                .metrics
93                .file_source_input_row_count
94                .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
95            let chunk_stream;
96            if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
97                let actor_id = source_ctx.actor_id.to_string();
98                let source_id = source_ctx.source_id.to_string();
99                let split_id = split.id();
100                let source_name = source_ctx.source_name.clone();
101                let parquet_source_skip_row_count_metrics: LabelGuardedMetric<
102                    GenericCounter<prometheus::core::AtomicU64>,
103                > = self
104                    .source_ctx
105                    .metrics
106                    .parquet_source_skip_row_count
107                    .with_guarded_label_values(&[
108                        actor_id.as_str(),
109                        source_id.as_str(),
110                        &split_id,
111                        source_name.as_str(),
112                    ]);
113                chunk_stream = read_parquet_file(
114                    self.connector.op.clone(),
115                    object_name.clone(),
116                    self.columns.clone(),
117                    Some(self.parser_config.common.rw_columns.clone()),
118                    self.source_ctx.source_ctrl_opts.chunk_size,
119                    split.offset,
120                    Some(file_source_input_row_count.clone()),
121                    Some(parquet_source_skip_row_count_metrics),
122                )
123                .await?;
124            } else {
125                assert!(
126                    need_nd_streaming(&self.parser_config.specific.encoding_config),
127                    "except for parquet, file source only support split by newline for now"
128                );
129
130                let line_stream = Self::stream_read_lines(
131                    self.connector.op.clone(),
132                    split,
133                    self.source_ctx.clone(),
134                    self.connector.compression_format.clone(),
135                    file_source_input_row_count.clone(),
136                );
137
138                let parser =
139                    ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
140                        .await?;
141                chunk_stream = Box::pin(parser.parse_stream(line_stream));
142            }
143
144            #[for_await]
145            for chunk in chunk_stream {
146                yield chunk?;
147            }
148        }
149    }
150
151    #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
152    pub async fn stream_read_lines(
153        op: Operator,
154        split: OpendalFsSplit<Src>,
155        source_ctx: SourceContextRef,
156        compression_format: CompressionFormat,
157        file_source_input_row_count_metrics: LabelGuardedMetric<
158            GenericCounter<prometheus::core::AtomicU64>,
159        >,
160    ) {
161        let actor_id = source_ctx.actor_id.to_string();
162        let fragment_id = source_ctx.fragment_id.to_string();
163        let source_id = source_ctx.source_id.to_string();
164        let source_name = source_ctx.source_name.clone();
165        let split_id = split.id();
166        let object_name = split.name.clone();
167        let start_offset = split.offset;
168        // After a recovery occurs, for gzip-compressed files, it is necessary to read from the beginning each time,
169        // other files can continue reading from the last read `start_offset`.
170        let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
171            true => op.read_with(&object_name).into_future().await?,
172
173            false => {
174                op.read_with(&object_name)
175                    .range(start_offset as u64..)
176                    .into_future()
177                    .await?
178            }
179        };
180
181        let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
182
183        let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
184            CompressionFormat::Gzip => {
185                let gzip_decoder = GzipDecoder::new(stream_reader);
186                Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
187            }
188            CompressionFormat::None => {
189                // todo: support automatic decompression of more compression types.
190                if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
191                    let gzip_decoder = GzipDecoder::new(stream_reader);
192                    Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
193                } else {
194                    Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
195                }
196            }
197        };
198
199        let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
200            true => 0,
201            false => start_offset,
202        };
203        let partition_input_bytes_metrics = source_ctx
204            .metrics
205            .partition_input_bytes
206            .with_guarded_label_values(&[
207                actor_id.as_str(),
208                source_id.as_str(),
209                &split_id,
210                source_name.as_str(),
211                fragment_id.as_str(),
212            ]);
213
214        let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
215        let mut batch = Vec::with_capacity(max_chunk_size);
216        let mut line_buf = String::new();
217
218        loop {
219            let n_read = buf_reader.read_line(&mut line_buf).await?;
220            if n_read == 0 {
221                // EOF
222                break;
223            }
224            let msg_offset = (offset + n_read).to_string();
225            // note that the buffer contains the newline character
226            debug_assert_eq!(n_read, line_buf.len());
227            if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
228                && offset + n_read <= start_offset
229            {
230                // For gzip compressed files, the reader needs to read from the beginning each time,
231                // but it needs to skip the previously read part and start yielding chunks from a position greater than or equal to start_offset.
232            } else {
233                batch.push(SourceMessage {
234                    key: None,
235                    payload: Some(std::mem::take(&mut line_buf).into_bytes()),
236                    offset: msg_offset,
237                    split_id: split.id(),
238                    meta: SourceMeta::Empty,
239                });
240            }
241
242            offset += n_read;
243            partition_input_bytes_metrics.inc_by(n_read as _);
244
245            if batch.len() >= max_chunk_size {
246                file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
247                yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
248            }
249        }
250
251        if !batch.is_empty() {
252            batch.shrink_to_fit();
253            file_source_input_row_count_metrics.inc_by(batch.len() as _);
254            yield batch;
255        }
256    }
257}