risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::{StreamExt, TryStreamExt};
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use prometheus::core::GenericCounter;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
27use tokio_util::io::StreamReader;
28
29use super::OpendalSource;
30use super::opendal_enumerator::OpendalEnumerator;
31use crate::error::ConnectorResult;
32use crate::parser::{
33    ByteStreamSourceParserImpl, EncodingProperties, ParserConfig, into_data_chunk_stream,
34};
35use crate::source::filesystem::OpendalFsSplit;
36use crate::source::filesystem::file_common::CompressionFormat;
37use crate::source::filesystem::nd_streaming::need_nd_streaming;
38use crate::source::iceberg::read_parquet_file;
39use crate::source::{
40    BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMessageEvent, SourceMeta,
41    SplitMetaData, SplitReader,
42};
43
44#[derive(Debug, Clone)]
45pub struct OpendalReader<Src: OpendalSource> {
46    connector: OpendalEnumerator<Src>,
47    splits: Vec<OpendalFsSplit<Src>>,
48    parser_config: ParserConfig,
49    source_ctx: SourceContextRef,
50    columns: Option<Vec<Column>>,
51}
52
53#[async_trait]
54impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
55    type Properties = Src::Properties;
56    type Split = OpendalFsSplit<Src>;
57
58    async fn new(
59        properties: Src::Properties,
60        splits: Vec<OpendalFsSplit<Src>>,
61        parser_config: ParserConfig,
62        source_ctx: SourceContextRef,
63        columns: Option<Vec<Column>>,
64    ) -> ConnectorResult<Self> {
65        let connector = Src::new_enumerator(properties)?;
66        let opendal_reader = OpendalReader {
67            connector,
68            splits,
69            parser_config,
70            source_ctx,
71            columns,
72        };
73        Ok(opendal_reader)
74    }
75
76    fn into_stream(self) -> BoxSourceChunkStream {
77        self.into_stream_inner()
78    }
79}
80
81impl<Src: OpendalSource> OpendalReader<Src> {
82    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
83    async fn into_stream_inner(self) {
84        for split in self.splits {
85            let source_ctx = self.source_ctx.clone();
86
87            let object_name = split.name.clone();
88            let actor_id = source_ctx.actor_id.to_string();
89            let fragment_id = source_ctx.fragment_id.to_string();
90            let source_id = source_ctx.source_id.to_string();
91            let source_name = source_ctx.source_name.clone();
92            let file_source_input_row_count = self
93                .source_ctx
94                .metrics
95                .file_source_input_row_count
96                .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
97            let chunk_stream;
98            if let EncodingProperties::Parquet(parquet_props) =
99                &self.parser_config.specific.encoding_config
100            {
101                let actor_id = source_ctx.actor_id.to_string();
102                let source_id = source_ctx.source_id.to_string();
103                let split_id = split.id();
104                let source_name = source_ctx.source_name.clone();
105                let parquet_source_skip_row_count_metrics: LabelGuardedMetric<
106                    GenericCounter<prometheus::core::AtomicU64>,
107                > = self
108                    .source_ctx
109                    .metrics
110                    .parquet_source_skip_row_count
111                    .with_guarded_label_values(&[
112                        actor_id.as_str(),
113                        source_id.as_str(),
114                        &split_id,
115                        source_name.as_str(),
116                    ]);
117                chunk_stream = read_parquet_file(
118                    self.connector.op.clone(),
119                    object_name.clone(),
120                    self.columns.clone(),
121                    Some(self.parser_config.common.rw_columns.clone()),
122                    parquet_props.case_insensitive,
123                    self.source_ctx.source_ctrl_opts.chunk_size,
124                    split.offset,
125                    Some(file_source_input_row_count.clone()),
126                    Some(parquet_source_skip_row_count_metrics),
127                )
128                .await?;
129            } else {
130                assert!(
131                    need_nd_streaming(&self.parser_config.specific.encoding_config),
132                    "except for parquet, file source only support split by newline for now"
133                );
134
135                let line_stream = Self::stream_read_lines(
136                    self.connector.op.clone(),
137                    split,
138                    self.source_ctx.clone(),
139                    self.connector.compression_format.clone(),
140                    file_source_input_row_count.clone(),
141                );
142
143                let parser =
144                    ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
145                        .await?;
146                chunk_stream = Box::pin(into_data_chunk_stream(parser.parse_stream_with_events(
147                    line_stream.map_ok(SourceMessageEvent::Data).boxed(),
148                )));
149            }
150
151            #[for_await]
152            for chunk in chunk_stream {
153                yield chunk?;
154            }
155        }
156    }
157
158    #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
159    pub async fn stream_read_lines(
160        op: Operator,
161        split: OpendalFsSplit<Src>,
162        source_ctx: SourceContextRef,
163        compression_format: CompressionFormat,
164        file_source_input_row_count_metrics: LabelGuardedMetric<
165            GenericCounter<prometheus::core::AtomicU64>,
166        >,
167    ) {
168        let actor_id = source_ctx.actor_id.to_string();
169        let fragment_id = source_ctx.fragment_id.to_string();
170        let source_id = source_ctx.source_id.to_string();
171        let source_name = source_ctx.source_name.clone();
172        let split_id = split.id();
173        let object_name = split.name.clone();
174        let start_offset = split.offset;
175        // After a recovery occurs, for gzip-compressed files, it is necessary to read from the beginning each time,
176        // other files can continue reading from the last read `start_offset`.
177        let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
178            true => op.read_with(&object_name).into_future().await?,
179
180            false => {
181                op.read_with(&object_name)
182                    .range(start_offset as u64..)
183                    .into_future()
184                    .await?
185            }
186        };
187
188        let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
189
190        let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
191            CompressionFormat::Gzip => {
192                let gzip_decoder = GzipDecoder::new(stream_reader);
193                Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
194            }
195            CompressionFormat::None => {
196                // todo: support automatic decompression of more compression types.
197                if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
198                    let gzip_decoder = GzipDecoder::new(stream_reader);
199                    Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
200                } else {
201                    Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
202                }
203            }
204        };
205
206        let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
207            true => 0,
208            false => start_offset,
209        };
210        let partition_input_bytes_metrics = source_ctx
211            .metrics
212            .partition_input_bytes
213            .with_guarded_label_values(&[
214                actor_id.as_str(),
215                source_id.as_str(),
216                &split_id,
217                source_name.as_str(),
218                fragment_id.as_str(),
219            ]);
220
221        let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
222        let mut batch = Vec::with_capacity(max_chunk_size);
223        let mut line_buf = String::new();
224
225        loop {
226            let n_read = buf_reader.read_line(&mut line_buf).await?;
227            if n_read == 0 {
228                // EOF
229                break;
230            }
231            let msg_offset = (offset + n_read).to_string();
232            // note that the buffer contains the newline character
233            debug_assert_eq!(n_read, line_buf.len());
234            if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
235                && offset + n_read <= start_offset
236            {
237                // For gzip compressed files, the reader needs to read from the beginning each time,
238                // but it needs to skip the previously read part and start yielding chunks from a position greater than or equal to start_offset.
239            } else {
240                batch.push(SourceMessage {
241                    key: None,
242                    payload: Some(std::mem::take(&mut line_buf).into_bytes()),
243                    offset: msg_offset,
244                    split_id: split.id(),
245                    meta: SourceMeta::Empty,
246                });
247            }
248
249            offset += n_read;
250            partition_input_bytes_metrics.inc_by(n_read as _);
251
252            if batch.len() >= max_chunk_size {
253                file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
254                yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
255            }
256        }
257
258        if !batch.is_empty() {
259            batch.shrink_to_fit();
260            file_source_input_row_count_metrics.inc_by(batch.len() as _);
261            yield batch;
262        }
263    }
264}